Repository: incubator-drill Updated Branches: refs/heads/master 0ed4fadfe -> c96773474
DRILL-1601: Have a minimum allocation for variable length value vectors. Improve error messages in PartitionerTemplate and UnorderedRawBatchBuffer. Minor change in cleanup for HashJoin. Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/c9677347 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/c9677347 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/c9677347 Branch: refs/heads/master Commit: c967734746dbf34535f9e360d82b66f60db1f4ae Parents: 0ed4fad Author: Aman Sinha <asi...@maprtech.com> Authored: Tue Oct 28 10:37:43 2014 -0700 Committer: Aman Sinha <asi...@maprtech.com> Committed: Thu Oct 30 17:02:20 2014 -0700 ---------------------------------------------------------------------- .../templates/VariableLengthVectors.java | 8 ++++-- .../org/apache/drill/exec/memory/Accountor.java | 11 +++----- .../exec/physical/impl/join/HashJoinBatch.java | 2 +- .../partitionsender/PartitionerTemplate.java | 28 +++++++++++++++----- .../work/batch/UnlimitedRawBatchBuffer.java | 6 ++++- 5 files changed, 37 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c9677347/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java b/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java index fe255a8..e1a754e 100644 --- a/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java +++ b/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java @@ -50,6 +50,9 @@ package org.apache.drill.exec.vector; public final class ${minor.class}Vector extends BaseDataValueVector implements VariableWidthVector{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(${minor.class}Vector.class); + private static final int INITIAL_BYTE_COUNT = 32768; + private static final int MIN_BYTE_COUNT = 4096; + private final UInt${type.width}Vector offsetVector; private final Accessor accessor; private final Mutator mutator; @@ -57,7 +60,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V private final UInt${type.width}Vector.Accessor oAccessor; - private int allocationTotalByteCount = 32768; + private int allocationTotalByteCount = INITIAL_BYTE_COUNT; private int allocationMonitor = 0; public ${minor.class}Vector(MaterializedField field, BufferAllocator allocator) { @@ -252,12 +255,13 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V public boolean allocateNewSafe() { clear(); if (allocationMonitor > 10) { - allocationTotalByteCount = Math.max(8, (int) (allocationTotalByteCount / 2)); + allocationTotalByteCount = Math.max(MIN_BYTE_COUNT, (int) (allocationTotalByteCount / 2)); allocationMonitor = 0; } else if (allocationMonitor < -2) { allocationTotalByteCount = (int) (allocationTotalByteCount * 2); allocationMonitor = 0; } + data = allocator.buffer(allocationTotalByteCount); if(data == null){ return false; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c9677347/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java index a86367f..0874585 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java @@ -17,10 +17,6 @@ */ package org.apache.drill.exec.memory; -import com.google.common.collect.LinkedListMultimap; -import com.google.common.collect.Maps; -import com.google.common.collect.Multimap; -import com.typesafe.config.ConfigException; import io.netty.buffer.ByteBuf; import io.netty.buffer.DrillBuf; @@ -35,12 +31,11 @@ import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; import org.apache.drill.exec.proto.helper.QueryIdHelper; -import org.apache.drill.exec.server.options.OptionValue; import org.apache.drill.exec.util.AssertionUtil; -import java.util.Arrays; -import java.util.Collection; -import java.util.concurrent.ConcurrentMap; +import com.google.common.collect.LinkedListMultimap; +import com.google.common.collect.Maps; +import com.google.common.collect.Multimap; public class Accountor { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Accountor.class); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c9677347/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java index 238c992..7f4d03c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java @@ -514,8 +514,8 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> { hashTable.clear(); } super.cleanup(); - left.cleanup(); right.cleanup(); + left.cleanup(); } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c9677347/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java index 5224f75..a16e29f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java @@ -39,6 +39,7 @@ import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; import org.apache.drill.exec.record.FragmentWritableBatch; +import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.SchemaBuilder; import org.apache.drill.exec.record.TypedFieldId; @@ -73,6 +74,7 @@ public abstract class PartitionerTemplate implements Partitioner { return outgoingBatches; } + @Override public final void setup(FragmentContext context, RecordBatch incoming, HashPartitionSender popConfig, @@ -122,6 +124,7 @@ public abstract class PartitionerTemplate implements Partitioner { * @param isLastBatch true if this is the last incoming batch * @param schemaChanged true if the schema has changed */ + @Override public void flushOutgoingBatches(boolean isLastBatch, boolean schemaChanged) throws IOException { for (OutgoingRecordBatch batch : outgoingBatches) { logger.debug("Attempting to flush all outgoing batches"); @@ -149,8 +152,9 @@ public abstract class PartitionerTemplate implements Partitioner { logger.trace(REWRITE_MSG, recordId); outgoingBatch.flush(); if (!outgoingBatch.copy(recordId)) { - logger.debug(RECORD_TOO_BIG_MSG, recordId); - throw new IOException(RECORD_TOO_BIG_MSG); + String msg = composeTooBigMsg(recordId, incoming); + logger.debug(msg); + throw new IOException(msg); } } } @@ -164,8 +168,9 @@ public abstract class PartitionerTemplate implements Partitioner { logger.trace(REWRITE_MSG, svIndex); outgoingBatch.flush(); if (!outgoingBatch.copy(svIndex)) { - logger.debug(RECORD_TOO_BIG_MSG, recordId); - throw new IOException(RECORD_TOO_BIG_MSG); + String msg = composeTooBigMsg(recordId, incoming); + logger.debug(msg); + throw new IOException(msg); } } } @@ -179,8 +184,9 @@ public abstract class PartitionerTemplate implements Partitioner { logger.trace(REWRITE_MSG, svIndex); outgoingBatch.flush(); if (!outgoingBatch.copy(svIndex)) { - logger.debug(RECORD_TOO_BIG_MSG, recordId); - throw new IOException(RECORD_TOO_BIG_MSG); + String msg = composeTooBigMsg(recordId, incoming); + logger.debug(msg); + throw new IOException(msg); } } } @@ -198,6 +204,16 @@ public abstract class PartitionerTemplate implements Partitioner { } } + private String composeTooBigMsg(int recordId, RecordBatch incoming) { + String msg = String.format("Record " + recordId + " is too big to fit into the allocated memory of ValueVector."); + msg += " Schema: "; + for (int i = 0; i < incoming.getSchema().getFieldCount(); i++) { + MaterializedField f = incoming.getSchema().getColumn(i); + msg += f.getPath().getRootSegment().getPath() + " "; + } + return msg; + } + public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") RecordBatch incoming, @Named("outgoing") OutgoingRecordBatch[] outgoing) throws SchemaChangeException; public abstract int doEval(@Named("inIndex") int inIndex); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c9677347/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java index bb43b1e..ffa4e2c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java @@ -38,6 +38,7 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{ private final AtomicBoolean outOfMemory = new AtomicBoolean(false); private final ResponseSenderQueue readController = new ResponseSenderQueue(); private int streamCounter; + private int fragmentCount; private FragmentContext context; public UnlimitedRawBatchBuffer(FragmentContext context, int fragmentCount) { @@ -46,6 +47,7 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{ this.softlimit = bufferSizePerSocket * fragmentCount; this.startlimit = Math.max(softlimit/2, 1); this.buffer = Queues.newLinkedBlockingDeque(); + this.fragmentCount = fragmentCount; this.streamCounter = fragmentCount; this.context = context; } @@ -77,7 +79,9 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{ @Override public void cleanup() { if (!finished && !context.isCancelled()) { - IllegalStateException e = new IllegalStateException("Cleanup before finished"); + String msg = String.format("Cleanup before finished. " + (fragmentCount - streamCounter) + " out of " + fragmentCount + " streams have finished."); + logger.error(msg); + IllegalStateException e = new IllegalStateException(msg); context.fail(e); throw e; }