Repository: drill Updated Branches: refs/heads/master 583ca4a95 -> f7f6efc52
DRILL-3071: fix memory leak in RecordBatchLoader#load Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/f7f6efc5 Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/f7f6efc5 Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/f7f6efc5 Branch: refs/heads/master Commit: f7f6efc525cd833ce1530deae32eb9ccb20b664a Parents: 583ca4a Author: Hanifi Gunes <hgu...@maprtech.com> Authored: Wed May 13 16:36:41 2015 -0700 Committer: Hanifi Gunes <hgu...@maprtech.com> Committed: Thu May 14 14:22:50 2015 -0700 ---------------------------------------------------------------------- .../drill/exec/record/RecordBatchLoader.java | 114 ++++++++++--------- 1 file changed, 62 insertions(+), 52 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/f7f6efc5/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java index 1b8b7ce..de6f665 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java @@ -34,18 +34,19 @@ import org.apache.drill.exec.vector.ValueVector; import com.google.common.base.Preconditions; import com.google.common.collect.Maps; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class RecordBatchLoader implements VectorAccessible, Iterable<VectorWrapper<?>>{ - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RecordBatchLoader.class); + private final static Logger logger = LoggerFactory.getLogger(RecordBatchLoader.class); - private VectorContainer container = new VectorContainer(); private final BufferAllocator allocator; + private VectorContainer container = new VectorContainer(); private int valueCount; private BatchSchema schema; public RecordBatchLoader(BufferAllocator allocator) { - super(); - this.allocator = allocator; + this.allocator = Preconditions.checkNotNull(allocator); } /** @@ -60,66 +61,75 @@ public class RecordBatchLoader implements VectorAccessible, Iterable<VectorWrapp * TODO: Clean: DRILL-2933 load(...) never actually throws SchemaChangeException. */ public boolean load(RecordBatchDef def, DrillBuf buf) throws SchemaChangeException { -// logger.debug("Loading record batch with def {} and data {}", def, buf); + if (logger.isTraceEnabled()) { + logger.trace("Loading record batch with def {} and data {}", def, buf); + logger.trace("Load, ThreadID: {}", Thread.currentThread().getId(), new RuntimeException("For Stack Trace Only")); + } container.zeroVectors(); - this.valueCount = def.getRecordCount(); + valueCount = def.getRecordCount(); boolean schemaChanged = schema == null; -// logger.info("Load, ThreadID: {}", Thread.currentThread().getId(), new RuntimeException("For Stack Trace Only")); -// System.out.println("Load, ThreadId: " + Thread.currentThread().getId()); - Map<MaterializedField, ValueVector> oldFields = Maps.newHashMap(); - for(VectorWrapper<?> w : container){ - ValueVector v = w.getValueVector(); - oldFields.put(v.getField(), v); - } - VectorContainer newVectors = new VectorContainer(); + final Map<MaterializedField, ValueVector> oldFields = Maps.newHashMap(); + for (final VectorWrapper wrapper : container) { + final ValueVector vector = wrapper.getValueVector(); + oldFields.put(vector.getField(), vector); + } - List<SerializedField> fields = def.getFieldList(); + final VectorContainer newVectors = new VectorContainer(); + try { + final List<SerializedField> fields = def.getFieldList(); + int bufOffset = 0; + for (final SerializedField field : fields) { + final MaterializedField fieldDef = MaterializedField.create(field); + ValueVector vector = oldFields.remove(fieldDef); + + if (vector == null) { + schemaChanged = true; + vector = TypeHelper.getNewVector(fieldDef, allocator); + } else if (!vector.getField().getType().equals(fieldDef.getType())) { + // clear previous vector + vector.clear(); + schemaChanged = true; + vector = TypeHelper.getNewVector(fieldDef, allocator); + } + + if (field.getValueCount() == 0 && (!field.hasGroupCount() || field.getGroupCount() == 0)) { + AllocationHelper.allocate(vector, 0, 0, 0); + } else { + vector.load(field, buf.slice(bufOffset, field.getBufferLength())); + } + bufOffset += field.getBufferLength(); + newVectors.add(vector); + } - int bufOffset = 0; - for (SerializedField fmd : fields) { - MaterializedField fieldDef = MaterializedField.create(fmd); - ValueVector vector = oldFields.remove(fieldDef); + Preconditions.checkArgument(buf == null || bufOffset == buf.capacity()); - if (vector == null) { - schemaChanged = true; - vector = TypeHelper.getNewVector(fieldDef, allocator); - } else if (!vector.getField().getType().equals(fieldDef.getType())) { - // clear previous vector - vector.clear(); - schemaChanged = true; - vector = TypeHelper.getNewVector(fieldDef, allocator); + // rebuild the schema. + final SchemaBuilder builder = BatchSchema.newBuilder(); + for (VectorWrapper<?> v : newVectors) { + builder.addField(v.getField()); } - - if (fmd.getValueCount() == 0 && (!fmd.hasGroupCount() || fmd.getGroupCount() == 0)) { - AllocationHelper.allocate(vector, 0, 0, 0); - } else { - vector.load(fmd, buf.slice(bufOffset, fmd.getBufferLength())); + builder.setSelectionVectorMode(BatchSchema.SelectionVectorMode.NONE); + schema = builder.build(); + newVectors.buildSchema(BatchSchema.SelectionVectorMode.NONE); + container = newVectors; + } catch (final Throwable cause) { + // We have to clean up new vectors created here and pass over the actual cause. It is upper layer who should + // adjudicate to call upper layer specific clean up logic. + for (final VectorWrapper wrapper:newVectors) { + wrapper.getValueVector().clear(); } - bufOffset += fmd.getBufferLength(); - newVectors.add(vector); - } - - Preconditions.checkArgument(buf == null || bufOffset == buf.capacity()); - - if(!oldFields.isEmpty()){ - schemaChanged = true; - for(ValueVector v : oldFields.values()){ - v.close(); + throw cause; + } finally { + if (!oldFields.isEmpty()) { + schemaChanged = true; + for (final ValueVector vector:oldFields.values()) { + vector.clear(); + } } } - // rebuild the schema. - SchemaBuilder b = BatchSchema.newBuilder(); - for(VectorWrapper<?> v : newVectors){ - b.addField(v.getField()); - } - b.setSelectionVectorMode(BatchSchema.SelectionVectorMode.NONE); - this.schema = b.build(); - newVectors.buildSchema(BatchSchema.SelectionVectorMode.NONE); - container = newVectors; return schemaChanged; - } public TypedFieldId getValueVectorId(SchemaPath path) {