DRILL-326: Fixes for merge join allocations

Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/d3c01968
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/d3c01968
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/d3c01968

Branch: refs/heads/master
Commit: d3c01968ec0afd5d188f23becefaec456d59b168
Parents: e64a682
Author: Jinfeng Ni <[email protected]>
Authored: Mon Mar 17 09:01:21 2014 -0700
Committer: Jacques Nadeau <[email protected]>
Committed: Mon Mar 17 09:05:28 2014 -0700

----------------------------------------------------------------------
 .../exec/physical/impl/join/JoinStatus.java     |  4 +
 .../exec/physical/impl/join/JoinTemplate.java   | 15 ++--
 .../exec/physical/impl/join/MergeJoinBatch.java | 10 ++-
 .../IteratorValidatorBatchIterator.java         |  4 +
 .../apache/drill/exec/record/RecordBatch.java   |  3 +
 .../exec/physical/impl/join/TestMergeJoin.java  | 26 ++++++
 .../src/test/resources/join/join_batchsize.json | 88 ++++++++++++++++++++
 7 files changed, 142 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d3c01968/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java
index 762cce7..bf87c0a 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java
@@ -101,6 +101,10 @@ public final class JoinStatus {
     outputPosition = 0;
   }
 
+  public final void incOutputPos() {
+    outputPosition++;
+  }
+
   public final void notifyLeftRepeating() {
     leftRepeating = true;
     outputBatch.resetBatchBuilder();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d3c01968/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java
index 7c8a51c..f43934e 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java
@@ -95,8 +95,10 @@ public abstract class JoinTemplate implements JoinWorker {
         if (((MergeJoinPOP)status.outputBatch.getPopConfig()).getJoinType() == 
JoinRelType.LEFT) {
           // we've hit the end of the right record batch; copy any remaining 
values from the left batch
           while (status.isLeftPositionAllowed()) {
-            if (!doCopyLeft(status.getLeftPosition(), 
status.fetchAndIncOutputPos()))
+            if (!doCopyLeft(status.getLeftPosition(), status.getOutPosition()))
               return false;
+            
+            status.incOutputPos();  
             status.advanceLeft();
           }
         }
@@ -110,10 +112,11 @@ public abstract class JoinTemplate implements JoinWorker {
 
       case -1:
         // left key < right key
-        if (((MergeJoinPOP)status.outputBatch.getPopConfig()).getJoinType() == 
JoinRelType.LEFT)
-          if (!doCopyLeft(status.getLeftPosition(), 
status.fetchAndIncOutputPos())) {
+        if (((MergeJoinPOP)status.outputBatch.getPopConfig()).getJoinType() == 
JoinRelType.LEFT) {
+          if (!doCopyLeft(status.getLeftPosition(), status.getOutPosition())) 
             return false;
-          }
+          status.incOutputPos();
+        }
         status.advanceLeft();
         continue;
 
@@ -140,9 +143,11 @@ public abstract class JoinTemplate implements JoinWorker {
           if (!doCopyLeft(status.getLeftPosition(), status.getOutPosition()))
             return false;
 
-          if (!doCopyRight(status.getRightPosition(), 
status.fetchAndIncOutputPos()))
+          if (!doCopyRight(status.getRightPosition(), 
status.getOutPosition())) 
             return false;
           
+          status.incOutputPos();
+          
           // If the left key has duplicates and we're about to cross a 
boundary in the right batch, add the
           // right table's record batch to the sv4 builder before calling 
next.  These records will need to be
           // copied again for each duplicate left key.

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d3c01968/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
index bd668e7..7680ff9 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
@@ -395,16 +395,20 @@ public class MergeJoinBatch extends 
AbstractRecordBatch<MergeJoinPOP> {
   private void allocateBatch() {
     // allocate new batch space.
     container.clear();
-    // add fields from both batches
+    
+    //estimation of joinBatchSize : max of left/right size, expanded by a 
factor of 16, which is then bounded by MAX_BATCH_SIZE.
+    int joinBatchSize = Math.min(Math.max(left.getRecordCount() , 
right.getRecordCount() ) * 16, MAX_BATCH_SIZE);
+    
+    // add fields from both batches    
     for (VectorWrapper<?> w : left) {
       ValueVector outgoingVector = TypeHelper.getNewVector(w.getField(), 
context.getAllocator());
-      VectorAllocator.getAllocator(outgoingVector, (int) 
Math.ceil(w.getValueVector().getBufferSize() / 
left.getRecordCount())).alloc(left.getRecordCount() * 16);
+      VectorAllocator.getAllocator(outgoingVector, (int) 
Math.ceil(w.getValueVector().getBufferSize() / 
left.getRecordCount())).alloc(joinBatchSize);
       container.add(outgoingVector);
     }
 
     for (VectorWrapper<?> w : right) {
       ValueVector outgoingVector = TypeHelper.getNewVector(w.getField(), 
context.getAllocator());
-      VectorAllocator.getAllocator(outgoingVector, (int) 
Math.ceil(w.getValueVector().getBufferSize() / 
right.getRecordCount())).alloc(right.getRecordCount() * 16);
+      VectorAllocator.getAllocator(outgoingVector, (int) 
Math.ceil(w.getValueVector().getBufferSize() / 
right.getRecordCount())).alloc(joinBatchSize);
       container.add(outgoingVector);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d3c01968/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
index 8552465..379fad2 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
@@ -105,6 +105,10 @@ public class IteratorValidatorBatchIterator implements 
RecordBatch{
   public IterOutcome next() {
     if(state == IterOutcome.NONE ) throw new IllegalStateException("The 
incoming iterator has previously moved to a state of NONE. You should not be 
attempting to call next() again.");
     state = incoming.next();
+    
+    if ((state == IterOutcome.OK || state == IterOutcome.OK_NEW_SCHEMA) && 
incoming.getRecordCount() > MAX_BATCH_SIZE)
+      throw new IllegalStateException (String.format("Incoming batch of %s has 
size %d, which is beyond the limit of %d",  incoming.getClass().getName(), 
incoming.getRecordCount(), MAX_BATCH_SIZE)); 
+    
     return state;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d3c01968/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
index b41b733..b77a6a8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
@@ -33,6 +33,9 @@ import org.apache.drill.exec.vector.ValueVector;
  */
 public interface RecordBatch extends VectorAccessible {
 
+  /* max batch size, limited by 2-byte-lentgh in SV2 : 65535 = 2^16 -1 */
+  public static final int MAX_BATCH_SIZE = 65535;
+
   /**
    * Describes the outcome of a RecordBatch being incremented forward.
    */

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d3c01968/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoin.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoin.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoin.java
index 27fae08..6e681e1 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoin.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoin.java
@@ -42,6 +42,7 @@ import org.apache.drill.exec.proto.CoordinationProtos;
 import org.apache.drill.exec.proto.ExecProtos;
 import org.apache.drill.exec.proto.BitControl.PlanFragment;
 import org.apache.drill.exec.rpc.user.UserServer;
+import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.store.StoragePluginRegistry;
 import org.apache.drill.exec.vector.ValueVector;
@@ -266,6 +267,31 @@ public class TestMergeJoin {
 
   }
 
+  @Test
+  public void testJoinBatchSize(@Injectable final DrillbitContext bitContext, 
@Injectable UserClientConnection connection) throws Throwable{
+    new NonStrictExpectations(){{
+      bitContext.getMetrics(); result = new MetricRegistry();
+      bitContext.getAllocator(); result = new TopLevelAllocator();;
+      bitContext.getConfig(); result = c;
+      bitContext.getOperatorCreatorRegistry(); result = new 
OperatorCreatorRegistry(c);
+    }};
+    
+    PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), 
CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
+    PhysicalPlan plan = 
reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile("/join/join_batchsize.json"),
 Charsets.UTF_8));
+    FunctionImplementationRegistry registry = new 
FunctionImplementationRegistry(c);
+    FragmentContext context = new FragmentContext(bitContext, 
PlanFragment.getDefaultInstance(), connection, registry);
+    SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, 
(FragmentRoot) plan.getSortedOperators(false).iterator().next()));
+    while(exec.next()){
+      assertEquals(100, exec.getRecordCount());
+    }
+    
+    if(context.getFailureCause() != null){
+      throw context.getFailureCause();
+    }
+    assertTrue(!context.isFailed());
+
+  }
+
   @AfterClass
   public static void tearDown() throws Exception{
     // pause to get logger to catch up.

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d3c01968/exec/java-exec/src/test/resources/join/join_batchsize.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/join/join_batchsize.json 
b/exec/java-exec/src/test/resources/join/join_batchsize.json
new file mode 100644
index 0000000..a3be355
--- /dev/null
+++ b/exec/java-exec/src/test/resources/join/join_batchsize.json
@@ -0,0 +1,88 @@
+{
+  head:{
+    type:"APACHE_DRILL_PHYSICAL",
+    version:"1",
+    generator:{
+      type:"manual"
+    }
+  },
+  graph:[
+    {
+      @id:1,
+      pop:"mock-sub-scan",
+      url: "http://source1.apache.org";,
+      entries:[
+        {records: 100, types: [
+          {name: "blue", type: "INT", mode: "REQUIRED"},
+          {name: "red", type: "INT", mode: "REQUIRED"},
+          {name: "green", type: "INT", mode: "REQUIRED"}
+        ]}
+      ]
+    }, 
+    {
+    pop : "sort",
+    @id : 2,
+    child : 1,
+    orderings : [ {
+      order : "ASC",
+      expr : "blue"
+    } ],
+    reverse : false
+    }, 
+    {
+    pop : "selection-vector-remover",
+    @id : 3,
+    child : 2
+    },    
+    {
+      @id:4,
+      pop:"mock-sub-scan",
+      url: "http://source2.apache.org";,
+      entries:[
+        {records: 2, types: [
+          {name: "blue1", type: "INT", mode: "REQUIRED"},
+          {name: "red1", type: "INT", mode: "REQUIRED"},
+          {name: "green1", type: "INT", mode: "REQUIRED"}
+        ]}      
+        ]
+    },
+    {
+    pop : "sort",
+    @id : 5,
+    child : 4,
+    orderings : [ {
+      order : "ASC",
+      expr : "blue1"
+    } ],
+    reverse : false
+    }, 
+    {
+    pop : "selection-vector-remover",
+    @id : 6,
+    child : 5
+    },       
+    {
+      @id: 7,
+      right: 6,
+      left: 3,
+      pop: "merge-join",
+      join-conditions: [ {relationship: "==", left: "blue", right: "blue1"} ]
+    },
+    {
+    pop : "limit",
+    @id : 8,
+    child : 7,
+    first : 0,
+    last : 100
+    }, {
+    pop : "selection-vector-remover",
+    @id : 9,
+    child : 8
+    }, 
+    {
+      @id: 10,
+      child: 9,
+      pop: "screen"
+    }
+  ]
+}

Reply via email to