Repository: drill
Updated Branches:
  refs/heads/master 583ca4a95 -> f7f6efc52


DRILL-3071: fix memory leak in RecordBatchLoader#load


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/f7f6efc5
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/f7f6efc5
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/f7f6efc5

Branch: refs/heads/master
Commit: f7f6efc525cd833ce1530deae32eb9ccb20b664a
Parents: 583ca4a
Author: Hanifi Gunes <hgu...@maprtech.com>
Authored: Wed May 13 16:36:41 2015 -0700
Committer: Hanifi Gunes <hgu...@maprtech.com>
Committed: Thu May 14 14:22:50 2015 -0700

----------------------------------------------------------------------
 .../drill/exec/record/RecordBatchLoader.java    | 114 ++++++++++---------
 1 file changed, 62 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/f7f6efc5/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
index 1b8b7ce..de6f665 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
@@ -34,18 +34,19 @@ import org.apache.drill.exec.vector.ValueVector;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class RecordBatchLoader implements VectorAccessible, 
Iterable<VectorWrapper<?>>{
-  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(RecordBatchLoader.class);
+  private final static Logger logger = 
LoggerFactory.getLogger(RecordBatchLoader.class);
 
-  private VectorContainer container = new VectorContainer();
   private final BufferAllocator allocator;
+  private VectorContainer container = new VectorContainer();
   private int valueCount;
   private BatchSchema schema;
 
   public RecordBatchLoader(BufferAllocator allocator) {
-    super();
-    this.allocator = allocator;
+    this.allocator = Preconditions.checkNotNull(allocator);
   }
 
   /**
@@ -60,66 +61,75 @@ public class RecordBatchLoader implements VectorAccessible, 
Iterable<VectorWrapp
    *   TODO:  Clean:  DRILL-2933  load(...) never actually throws 
SchemaChangeException.
    */
   public boolean load(RecordBatchDef def, DrillBuf buf) throws 
SchemaChangeException {
-//    logger.debug("Loading record batch with def {} and data {}", def, buf);
+    if (logger.isTraceEnabled()) {
+      logger.trace("Loading record batch with def {} and data {}", def, buf);
+      logger.trace("Load, ThreadID: {}", Thread.currentThread().getId(), new 
RuntimeException("For Stack Trace Only"));
+    }
     container.zeroVectors();
-    this.valueCount = def.getRecordCount();
+    valueCount = def.getRecordCount();
     boolean schemaChanged = schema == null;
-//    logger.info("Load, ThreadID: {}", Thread.currentThread().getId(), new 
RuntimeException("For Stack Trace Only"));
-//    System.out.println("Load, ThreadId: " + Thread.currentThread().getId());
-    Map<MaterializedField, ValueVector> oldFields = Maps.newHashMap();
-    for(VectorWrapper<?> w : container){
-      ValueVector v = w.getValueVector();
-      oldFields.put(v.getField(), v);
-    }
 
-    VectorContainer newVectors = new VectorContainer();
+    final Map<MaterializedField, ValueVector> oldFields = Maps.newHashMap();
+    for (final VectorWrapper wrapper : container) {
+      final ValueVector vector = wrapper.getValueVector();
+      oldFields.put(vector.getField(), vector);
+    }
 
-    List<SerializedField> fields = def.getFieldList();
+    final VectorContainer newVectors = new VectorContainer();
+    try {
+      final List<SerializedField> fields = def.getFieldList();
+      int bufOffset = 0;
+      for (final SerializedField field : fields) {
+        final MaterializedField fieldDef = MaterializedField.create(field);
+        ValueVector vector = oldFields.remove(fieldDef);
+
+        if (vector == null) {
+          schemaChanged = true;
+          vector = TypeHelper.getNewVector(fieldDef, allocator);
+        } else if (!vector.getField().getType().equals(fieldDef.getType())) {
+          // clear previous vector
+          vector.clear();
+          schemaChanged = true;
+          vector = TypeHelper.getNewVector(fieldDef, allocator);
+        }
+
+        if (field.getValueCount() == 0 && (!field.hasGroupCount() || 
field.getGroupCount() == 0)) {
+          AllocationHelper.allocate(vector, 0, 0, 0);
+        } else {
+          vector.load(field, buf.slice(bufOffset, field.getBufferLength()));
+        }
+        bufOffset += field.getBufferLength();
+        newVectors.add(vector);
+      }
 
-    int bufOffset = 0;
-    for (SerializedField fmd : fields) {
-      MaterializedField fieldDef = MaterializedField.create(fmd);
-      ValueVector vector = oldFields.remove(fieldDef);
+      Preconditions.checkArgument(buf == null || bufOffset == buf.capacity());
 
-      if (vector == null) {
-        schemaChanged = true;
-        vector = TypeHelper.getNewVector(fieldDef, allocator);
-      } else if (!vector.getField().getType().equals(fieldDef.getType())) {
-        // clear previous vector
-        vector.clear();
-        schemaChanged = true;
-        vector = TypeHelper.getNewVector(fieldDef, allocator);
+      // rebuild the schema.
+      final SchemaBuilder builder = BatchSchema.newBuilder();
+      for (VectorWrapper<?> v : newVectors) {
+        builder.addField(v.getField());
       }
-
-      if (fmd.getValueCount() == 0 && (!fmd.hasGroupCount() || 
fmd.getGroupCount() == 0)) {
-        AllocationHelper.allocate(vector, 0, 0, 0);
-      } else {
-        vector.load(fmd, buf.slice(bufOffset, fmd.getBufferLength()));
+      builder.setSelectionVectorMode(BatchSchema.SelectionVectorMode.NONE);
+      schema = builder.build();
+      newVectors.buildSchema(BatchSchema.SelectionVectorMode.NONE);
+      container = newVectors;
+    } catch (final Throwable cause) {
+      // We have to clean up new vectors created here and pass over the actual 
cause. It is upper layer who should
+      // adjudicate to call upper layer specific clean up logic.
+      for (final VectorWrapper wrapper:newVectors) {
+        wrapper.getValueVector().clear();
       }
-      bufOffset += fmd.getBufferLength();
-      newVectors.add(vector);
-    }
-
-    Preconditions.checkArgument(buf == null || bufOffset == buf.capacity());
-
-    if(!oldFields.isEmpty()){
-      schemaChanged = true;
-      for(ValueVector v : oldFields.values()){
-        v.close();
+      throw cause;
+    } finally {
+      if (!oldFields.isEmpty()) {
+        schemaChanged = true;
+        for (final ValueVector vector:oldFields.values()) {
+          vector.clear();
+        }
       }
     }
 
-    // rebuild the schema.
-    SchemaBuilder b = BatchSchema.newBuilder();
-    for(VectorWrapper<?> v : newVectors){
-      b.addField(v.getField());
-    }
-    b.setSelectionVectorMode(BatchSchema.SelectionVectorMode.NONE);
-    this.schema = b.build();
-    newVectors.buildSchema(BatchSchema.SelectionVectorMode.NONE);
-    container = newVectors;
     return schemaChanged;
-
   }
 
   public TypedFieldId getValueVectorId(SchemaPath path) {

Reply via email to