Repository: incubator-drill
Updated Branches:
  refs/heads/master 0ed4fadfe -> c96773474


DRILL-1601: Have a minimum allocation for variable length value vectors.
Improve error messages in PartitionerTemplate and UnorderedRawBatchBuffer.
Minor change in cleanup for HashJoin.


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/c9677347
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/c9677347
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/c9677347

Branch: refs/heads/master
Commit: c967734746dbf34535f9e360d82b66f60db1f4ae
Parents: 0ed4fad
Author: Aman Sinha <asi...@maprtech.com>
Authored: Tue Oct 28 10:37:43 2014 -0700
Committer: Aman Sinha <asi...@maprtech.com>
Committed: Thu Oct 30 17:02:20 2014 -0700

----------------------------------------------------------------------
 .../templates/VariableLengthVectors.java        |  8 ++++--
 .../org/apache/drill/exec/memory/Accountor.java | 11 +++-----
 .../exec/physical/impl/join/HashJoinBatch.java  |  2 +-
 .../partitionsender/PartitionerTemplate.java    | 28 +++++++++++++++-----
 .../work/batch/UnlimitedRawBatchBuffer.java     |  6 ++++-
 5 files changed, 37 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c9677347/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java 
b/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
index fe255a8..e1a754e 100644
--- a/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
+++ b/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
@@ -50,6 +50,9 @@ package org.apache.drill.exec.vector;
 public final class ${minor.class}Vector extends BaseDataValueVector implements 
VariableWidthVector{
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(${minor.class}Vector.class);
 
+  private static final int INITIAL_BYTE_COUNT = 32768;
+  private static final int MIN_BYTE_COUNT = 4096;
+  
   private final UInt${type.width}Vector offsetVector;
   private final Accessor accessor;
   private final Mutator mutator;
@@ -57,7 +60,7 @@ public final class ${minor.class}Vector extends 
BaseDataValueVector implements V
   private final UInt${type.width}Vector.Accessor oAccessor;
   
 
-  private int allocationTotalByteCount = 32768;
+  private int allocationTotalByteCount = INITIAL_BYTE_COUNT;
   private int allocationMonitor = 0;
 
   public ${minor.class}Vector(MaterializedField field, BufferAllocator 
allocator) {
@@ -252,12 +255,13 @@ public final class ${minor.class}Vector extends 
BaseDataValueVector implements V
   public boolean allocateNewSafe() {
     clear();
     if (allocationMonitor > 10) {
-      allocationTotalByteCount = Math.max(8, (int) (allocationTotalByteCount / 
2));
+      allocationTotalByteCount = Math.max(MIN_BYTE_COUNT, (int) 
(allocationTotalByteCount / 2));
       allocationMonitor = 0;
     } else if (allocationMonitor < -2) {
       allocationTotalByteCount = (int) (allocationTotalByteCount * 2);
       allocationMonitor = 0;
     }
+
     data = allocator.buffer(allocationTotalByteCount);
     if(data == null){
       return false;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c9677347/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java
index a86367f..0874585 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java
@@ -17,10 +17,6 @@
  */
 package org.apache.drill.exec.memory;
 
-import com.google.common.collect.LinkedListMultimap;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Multimap;
-import com.typesafe.config.ConfigException;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.DrillBuf;
 
@@ -35,12 +31,11 @@ import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.proto.helper.QueryIdHelper;
-import org.apache.drill.exec.server.options.OptionValue;
 import org.apache.drill.exec.util.AssertionUtil;
 
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.concurrent.ConcurrentMap;
+import com.google.common.collect.LinkedListMultimap;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
 
 public class Accountor {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(Accountor.class);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c9677347/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 238c992..7f4d03c 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -514,8 +514,8 @@ public class HashJoinBatch extends 
AbstractRecordBatch<HashJoinPOP> {
             hashTable.clear();
         }
         super.cleanup();
-        left.cleanup();
         right.cleanup();
+        left.cleanup();
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c9677347/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
index 5224f75..a16e29f 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
@@ -39,6 +39,7 @@ import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 import org.apache.drill.exec.record.FragmentWritableBatch;
+import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.SchemaBuilder;
 import org.apache.drill.exec.record.TypedFieldId;
@@ -73,6 +74,7 @@ public abstract class PartitionerTemplate implements 
Partitioner {
     return outgoingBatches;
   }
 
+  @Override
   public final void setup(FragmentContext context,
                           RecordBatch incoming,
                           HashPartitionSender popConfig,
@@ -122,6 +124,7 @@ public abstract class PartitionerTemplate implements 
Partitioner {
    * @param isLastBatch    true if this is the last incoming batch
    * @param schemaChanged  true if the schema has changed
    */
+  @Override
   public void flushOutgoingBatches(boolean isLastBatch, boolean schemaChanged) 
throws IOException {
     for (OutgoingRecordBatch batch : outgoingBatches) {
       logger.debug("Attempting to flush all outgoing batches");
@@ -149,8 +152,9 @@ public abstract class PartitionerTemplate implements 
Partitioner {
             logger.trace(REWRITE_MSG, recordId);
             outgoingBatch.flush();
             if (!outgoingBatch.copy(recordId)) {
-              logger.debug(RECORD_TOO_BIG_MSG, recordId);
-              throw new IOException(RECORD_TOO_BIG_MSG);
+              String msg = composeTooBigMsg(recordId, incoming);
+              logger.debug(msg);
+              throw new IOException(msg);
             }
           }
         }
@@ -164,8 +168,9 @@ public abstract class PartitionerTemplate implements 
Partitioner {
             logger.trace(REWRITE_MSG, svIndex);
             outgoingBatch.flush();
             if (!outgoingBatch.copy(svIndex)) {
-              logger.debug(RECORD_TOO_BIG_MSG, recordId);
-              throw new IOException(RECORD_TOO_BIG_MSG);
+              String msg = composeTooBigMsg(recordId, incoming);
+              logger.debug(msg);
+              throw new IOException(msg);
             }
           }
         }
@@ -179,8 +184,9 @@ public abstract class PartitionerTemplate implements 
Partitioner {
             logger.trace(REWRITE_MSG, svIndex);
             outgoingBatch.flush();
             if (!outgoingBatch.copy(svIndex)) {
-              logger.debug(RECORD_TOO_BIG_MSG, recordId);
-              throw new IOException(RECORD_TOO_BIG_MSG);
+              String msg = composeTooBigMsg(recordId, incoming);
+              logger.debug(msg);
+              throw new IOException(msg);
             }
           }
         }
@@ -198,6 +204,16 @@ public abstract class PartitionerTemplate implements 
Partitioner {
     }
   }
 
+  private String composeTooBigMsg(int recordId, RecordBatch incoming) {
+    String msg = String.format("Record " + recordId + " is too big to fit into 
the allocated memory of ValueVector.");
+    msg += " Schema: ";
+    for (int i = 0; i < incoming.getSchema().getFieldCount(); i++) {
+      MaterializedField f = incoming.getSchema().getColumn(i);
+      msg += f.getPath().getRootSegment().getPath() + " ";
+    }
+    return msg;
+  }
+
   public abstract void doSetup(@Named("context") FragmentContext context, 
@Named("incoming") RecordBatch incoming, @Named("outgoing") 
OutgoingRecordBatch[] outgoing) throws SchemaChangeException;
   public abstract int doEval(@Named("inIndex") int inIndex);
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c9677347/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
index bb43b1e..ffa4e2c 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
@@ -38,6 +38,7 @@ public class UnlimitedRawBatchBuffer implements 
RawBatchBuffer{
   private final AtomicBoolean outOfMemory = new AtomicBoolean(false);
   private final ResponseSenderQueue readController = new ResponseSenderQueue();
   private int streamCounter;
+  private int fragmentCount;
   private FragmentContext context;
 
   public UnlimitedRawBatchBuffer(FragmentContext context, int fragmentCount) {
@@ -46,6 +47,7 @@ public class UnlimitedRawBatchBuffer implements 
RawBatchBuffer{
     this.softlimit = bufferSizePerSocket * fragmentCount;
     this.startlimit = Math.max(softlimit/2, 1);
     this.buffer = Queues.newLinkedBlockingDeque();
+    this.fragmentCount = fragmentCount;
     this.streamCounter = fragmentCount;
     this.context = context;
   }
@@ -77,7 +79,9 @@ public class UnlimitedRawBatchBuffer implements 
RawBatchBuffer{
   @Override
   public void cleanup() {
     if (!finished && !context.isCancelled()) {
-      IllegalStateException e = new IllegalStateException("Cleanup before 
finished");
+      String msg = String.format("Cleanup before finished. " + (fragmentCount 
- streamCounter) + " out of " + fragmentCount + " streams have finished.");
+      logger.error(msg);
+      IllegalStateException e = new IllegalStateException(msg);
       context.fail(e);
       throw e;
     }

Reply via email to