DRILL-6123: Limit batch size for Merge Join based on memory

closes #1107


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/20185c9b
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/20185c9b
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/20185c9b

Branch: refs/heads/master
Commit: 20185c9bf0f4c94815fd2ab1eae1b98b3d4e4ff7
Parents: 58e4cec
Author: Padma Penumarthy <ppenuma...@yahoo.com>
Authored: Fri Feb 9 13:54:38 2018 -0800
Committer: Vitalii Diravka <vitalii.dira...@gmail.com>
Committed: Fri Feb 16 20:31:15 2018 +0000

----------------------------------------------------------------------
 .../org/apache/drill/exec/ExecConstants.java    |   4 +-
 .../impl/flatten/FlattenRecordBatch.java        |  32 +--
 .../exec/physical/impl/join/JoinStatus.java     |  12 +-
 .../exec/physical/impl/join/MergeJoinBatch.java |  65 ++++-
 .../AbstractRecordBatchMemoryManager.java       |  61 +++++
 .../drill/exec/record/RecordBatchSizer.java     |   6 +-
 .../drill/exec/record/RecordIterator.java       |  13 +-
 .../exec/physical/unit/TestOutputBatchSize.java | 249 ++++++++++++++++++-
 .../drill/exec/record/TestRecordIterator.java   |   4 +-
 .../exec/vector/complex/AbstractMapVector.java  |   4 +
 .../vector/complex/BaseRepeatedValueVector.java |   3 +
 .../drill/exec/vector/complex/ListVector.java   |   4 +
 .../exec/vector/complex/RepeatedListVector.java |   3 +
 13 files changed, 425 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/20185c9b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index f3572d8..a1a94fa 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -79,7 +79,9 @@ public final class ExecConstants {
   public static final String SPILL_DIRS = "drill.exec.spill.directories";
 
   public static final String OUTPUT_BATCH_SIZE = 
"drill.exec.memory.operator.output_batch_size";
-  public static final LongValidator OUTPUT_BATCH_SIZE_VALIDATOR = new 
RangeLongValidator(OUTPUT_BATCH_SIZE, 1024, 512 * 1024 * 1024);
+  // Output Batch Size in Bytes. We have a small lower bound so we can test 
with unit tests without the
+  // need to produce very large batches that take up lot of memory.
+  public static final LongValidator OUTPUT_BATCH_SIZE_VALIDATOR = new 
RangeLongValidator(OUTPUT_BATCH_SIZE, 128, 512 * 1024 * 1024);
 
   // External Sort Boot configuration
 

http://git-wip-us.apache.org/repos/asf/drill/blob/20185c9b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
index 5f693cb..4a910ef 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
@@ -44,6 +44,7 @@ import org.apache.drill.exec.physical.config.FlattenPOP;
 import org.apache.drill.exec.record.RecordBatchSizer;
 import org.apache.drill.exec.record.AbstractSingleRecordBatch;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.record.AbstractRecordBatchMemoryManager;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.TransferPair;
@@ -70,7 +71,8 @@ public class FlattenRecordBatch extends 
AbstractSingleRecordBatch<FlattenPOP> {
   private boolean hasRemainder = false;
   private int remainderIndex = 0;
   private int recordCount;
-  private long outputBatchSize;
+  private int outputBatchSize;
+  private final FlattenMemoryManager flattenMemoryManager = new 
FlattenMemoryManager();
 
   private final Flattener.Monitor monitor = new Flattener.Monitor() {
     @Override
@@ -97,23 +99,19 @@ public class FlattenRecordBatch extends 
AbstractSingleRecordBatch<FlattenPOP> {
     }
   }
 
-  private class FlattenMemoryManager {
-    private final int outputRowCount;
-    private static final int OFFSET_VECTOR_WIDTH = 4;
-    private static final int WORST_CASE_FRAGMENTATION_FACTOR = 2;
-    private static final int MAX_NUM_ROWS = ValueVector.MAX_ROW_COUNT;
-    private static final int MIN_NUM_ROWS = 1;
+  private class FlattenMemoryManager extends AbstractRecordBatchMemoryManager {
 
-    private FlattenMemoryManager(RecordBatch incoming, long outputBatchSize, 
SchemaPath flattenColumn) {
+    @Override
+    public void update() {
       // Get sizing information for the batch.
       RecordBatchSizer sizer = new RecordBatchSizer(incoming);
 
-      final TypedFieldId typedFieldId = 
incoming.getValueVectorId(flattenColumn);
+      final TypedFieldId typedFieldId = 
incoming.getValueVectorId(popConfig.getColumn());
       final MaterializedField field = 
incoming.getSchema().getColumn(typedFieldId.getFieldIds()[0]);
 
       // Get column size of flatten column.
       RecordBatchSizer.ColumnSize columnSize = 
RecordBatchSizer.getColumn(incoming.getValueAccessorById(field.getValueClass(),
-          typedFieldId.getFieldIds()).getValueVector(), field.getName());
+        typedFieldId.getFieldIds()).getValueVector(), field.getName());
 
       // Average rowWidth of flatten column
       final int avgRowWidthFlattenColumn = 
RecordBatchSizer.safeDivide(columnSize.netSize, incoming.getRecordCount());
@@ -124,22 +122,18 @@ public class FlattenRecordBatch extends 
AbstractSingleRecordBatch<FlattenPOP> {
       // Average rowWidth of single element in the flatten list.
       // subtract the offset vector size from column data size.
       final int avgRowWidthSingleFlattenEntry =
-          RecordBatchSizer.safeDivide(columnSize.netSize - 
(OFFSET_VECTOR_WIDTH * columnSize.valueCount), columnSize.elementCount);
+        RecordBatchSizer.safeDivide(columnSize.netSize - (OFFSET_VECTOR_WIDTH 
* columnSize.valueCount), columnSize.elementCount);
 
       // Average rowWidth of outgoing batch.
       final int avgOutgoingRowWidth = avgRowWidthWithOutFlattenColumn + 
avgRowWidthSingleFlattenEntry;
 
       // Number of rows in outgoing batch
-      outputRowCount = Math.max(MIN_NUM_ROWS, Math.min(MAX_NUM_ROWS,
-          
RecordBatchSizer.safeDivide((outputBatchSize/WORST_CASE_FRAGMENTATION_FACTOR), 
avgOutgoingRowWidth)));
+      setOutputRowCount(outputBatchSize, avgOutgoingRowWidth);
 
       logger.debug("flatten incoming batch sizer : {}, outputBatchSize : {}," +
-              "avgOutgoingRowWidth : {}, outputRowCount : {}", sizer, 
outputBatchSize, avgOutgoingRowWidth, outputRowCount);
+        "avgOutgoingRowWidth : {}, outputRowCount : {}", sizer, 
outputBatchSize, avgOutgoingRowWidth, getOutputRowCount());
     }
 
-    public int getOutputRowCount() {
-      return outputRowCount;
-    }
   }
 
 
@@ -147,7 +141,7 @@ public class FlattenRecordBatch extends 
AbstractSingleRecordBatch<FlattenPOP> {
     super(pop, context, incoming);
 
     // get the output batch size from config.
-    outputBatchSize = 
context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
+    outputBatchSize = (int) 
context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
   }
 
   @Override
@@ -200,7 +194,7 @@ public class FlattenRecordBatch extends 
AbstractSingleRecordBatch<FlattenPOP> {
 
   @Override
   protected IterOutcome doWork() {
-    FlattenMemoryManager flattenMemoryManager = new 
FlattenMemoryManager(incoming, outputBatchSize, popConfig.getColumn());
+    flattenMemoryManager.update();
     flattener.setOutputCount(flattenMemoryManager.getOutputRowCount());
 
     int incomingRecordCount = incoming.getRecordCount();

http://git-wip-us.apache.org/repos/asf/drill/blob/20185c9b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java
index 527c984..beae021 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java
@@ -29,8 +29,6 @@ import org.apache.calcite.rel.core.JoinRelType;
 public final class JoinStatus {
   private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(JoinStatus.class);
 
-  private static final int OUTPUT_BATCH_SIZE = 32*1024;
-
   public final RecordIterator left;
   public final RecordIterator right;
   private boolean iteratorInitialized;
@@ -44,6 +42,8 @@ public final class JoinStatus {
   public boolean ok = true;
   public boolean hasMoreData = false;
 
+  private int targetOutputRowCount;
+
   public JoinStatus(RecordIterator left, RecordIterator right, MergeJoinBatch 
output) {
     this.left = left;
     this.right = right;
@@ -101,8 +101,12 @@ public final class JoinStatus {
   }
 
   public final boolean isOutgoingBatchFull() {
-    Preconditions.checkArgument(outputPosition <= OUTPUT_BATCH_SIZE);
-    return outputPosition == OUTPUT_BATCH_SIZE;
+    Preconditions.checkArgument(outputPosition <= targetOutputRowCount);
+    return outputPosition >= targetOutputRowCount;
+  }
+
+  public final void setTargetOutputRowCount(int outputRowCount) {
+    this.targetOutputRowCount = outputRowCount;
   }
 
   public final void incOutputPos() {

http://git-wip-us.apache.org/repos/asf/drill/blob/20185c9b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
index 1ed4722..f612ae2 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
@@ -33,6 +33,7 @@ import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.compile.sig.MappingSet;
 import org.apache.drill.exec.exception.ClassTransformationException;
 import org.apache.drill.exec.exception.OutOfMemoryException;
@@ -45,6 +46,7 @@ import org.apache.drill.exec.expr.fn.FunctionGenerationHelper;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.config.MergeJoinPOP;
 import org.apache.drill.exec.physical.impl.common.Comparator;
+import org.apache.drill.exec.record.RecordBatchSizer;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.RecordBatch;
@@ -54,6 +56,7 @@ import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.record.AbstractRecordBatch;
+import org.apache.drill.exec.record.AbstractRecordBatchMemoryManager;
 import org.apache.drill.exec.vector.AllocationHelper;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.complex.AbstractContainerVector;
@@ -102,20 +105,75 @@ public class MergeJoinBatch extends 
AbstractRecordBatch<MergeJoinPOP> {
   private final List<Comparator> comparators;
   private final JoinRelType joinType;
   private JoinWorker worker;
+  private final int outputBatchSize;
 
   private static final String LEFT_INPUT = "LEFT INPUT";
   private static final String RIGHT_INPUT = "RIGHT INPUT";
 
+  private class MergeJoinMemoryManager extends 
AbstractRecordBatchMemoryManager {
+    private int leftRowWidth;
+    private int rightRowWidth;
+
+    /**
+     * mergejoin operates on one record at a time from the left and right 
batches
+     * using RecordIterator abstraction. We have a callback mechanism to get 
notified
+     * when new batch is loaded in record iterator.
+     * This can get called in the middle of current output batch we are 
building.
+     * when this gets called, adjust number of output rows for the current 
batch and
+     * update the value to be used for subsequent batches.
+     */
+    @Override
+    public void update(int inputIndex) {
+      switch(inputIndex) {
+        case 0:
+          final RecordBatchSizer leftSizer = new RecordBatchSizer(left);
+          leftRowWidth = leftSizer.netRowWidth();
+          break;
+        case 1:
+          final RecordBatchSizer rightSizer = new RecordBatchSizer(right);
+          rightRowWidth = rightSizer.netRowWidth();
+        default:
+          break;
+      }
+
+      final int newOutgoingRowWidth = leftRowWidth + rightRowWidth;
+
+      // If outgoing row width is 0, just return. This is possible for empty 
batches or
+      // when first set of batches come with OK_NEW_SCHEMA and no data.
+      if (newOutgoingRowWidth == 0) {
+        return;
+      }
+
+      // update the value to be used for next batch(es)
+      setOutputRowCount(outputBatchSize, newOutgoingRowWidth);
+
+      // Adjust for the current batch.
+      // calculate memory used so far based on previous outgoing row width and 
how many rows we already processed.
+      final long memoryUsed = status.getOutPosition() * getOutgoingRowWidth();
+      // This is the remaining memory.
+      final long remainingMemory = 
Math.max(outputBatchSize/WORST_CASE_FRAGMENTATION_FACTOR - memoryUsed, 0);
+      // These are number of rows we can fit in remaining memory based on new 
outgoing row width.
+      final int numOutputRowsRemaining = 
RecordBatchSizer.safeDivide(remainingMemory, newOutgoingRowWidth);
+
+      status.setTargetOutputRowCount(status.getOutPosition() + 
numOutputRowsRemaining);
+      setOutgoingRowWidth(newOutgoingRowWidth);
+    }
+  }
+
+  private final MergeJoinMemoryManager mergeJoinMemoryManager = new 
MergeJoinMemoryManager();
+
   protected MergeJoinBatch(MergeJoinPOP popConfig, FragmentContext context, 
RecordBatch left, RecordBatch right) throws OutOfMemoryException {
     super(popConfig, context, true);
 
+    outputBatchSize = (int) 
context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
+
     if (popConfig.getConditions().size() == 0) {
       throw new UnsupportedOperationException("Merge Join currently does not 
support cartesian join.  This join operator was configured with 0 conditions");
     }
     this.left = left;
-    this.leftIterator = new RecordIterator(left, this, oContext, 0, false);
+    this.leftIterator = new RecordIterator(left, this, oContext, 0, false, 
mergeJoinMemoryManager);
     this.right = right;
-    this.rightIterator = new RecordIterator(right, this, oContext, 1);
+    this.rightIterator = new RecordIterator(right, this, oContext, 1, 
mergeJoinMemoryManager);
     this.joinType = popConfig.getJoinType();
     this.status = new JoinStatus(leftIterator, rightIterator, this);
     this.conditions = popConfig.getConditions();
@@ -171,10 +229,12 @@ public class MergeJoinBatch extends 
AbstractRecordBatch<MergeJoinPOP> {
         case BATCH_RETURNED:
           allocateBatch(false);
           status.resetOutputPos();
+          
status.setTargetOutputRowCount(mergeJoinMemoryManager.getOutputRowCount());
           break;
         case SCHEMA_CHANGED:
           allocateBatch(true);
           status.resetOutputPos();
+          
status.setTargetOutputRowCount(mergeJoinMemoryManager.getOutputRowCount());
           break;
         case NO_MORE_DATA:
           status.resetOutputPos();
@@ -272,6 +332,7 @@ public class MergeJoinBatch extends 
AbstractRecordBatch<MergeJoinPOP> {
   private JoinWorker generateNewWorker() throws ClassTransformationException, 
IOException, SchemaChangeException {
     final ClassGenerator<JoinWorker> cg = 
CodeGenerator.getRoot(JoinWorker.TEMPLATE_DEFINITION, context.getOptions());
     cg.getCodeGenerator().plainJavaCapable(true);
+    // cg.getCodeGenerator().saveCodeForDebugging(true);
     final ErrorCollector collector = new ErrorCollectorImpl();
 
     // Generate members and initialization code

http://git-wip-us.apache.org/repos/asf/drill/blob/20185c9b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatchMemoryManager.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatchMemoryManager.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatchMemoryManager.java
new file mode 100644
index 0000000..b91ede0
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatchMemoryManager.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.record;
+
+import org.apache.drill.exec.vector.ValueVector;
+
+public abstract class AbstractRecordBatchMemoryManager {
+  protected static final int OFFSET_VECTOR_WIDTH = 4;
+  protected static final int WORST_CASE_FRAGMENTATION_FACTOR = 2;
+  protected static final int MAX_NUM_ROWS = ValueVector.MAX_ROW_COUNT;
+  protected static final int MIN_NUM_ROWS = 1;
+  private int outputRowCount = MAX_NUM_ROWS;
+  private int outgoingRowWidth;
+
+  public void update(int inputIndex) {};
+
+  public void update() {};
+
+  public int getOutputRowCount() {
+    return outputRowCount;
+  }
+
+  /**
+   * Given batchSize and rowWidth, this will set output rowCount taking into 
account
+   * the min and max that is allowed.
+   */
+  public void setOutputRowCount(int targetBatchSize, int rowWidth) {
+    this.outputRowCount = 
adjustOutputRowCount(RecordBatchSizer.safeDivide(targetBatchSize/WORST_CASE_FRAGMENTATION_FACTOR,
 rowWidth));
+  }
+
+  /**
+   * This will adjust rowCount taking into account the min and max that is 
allowed.
+   */
+  public static int adjustOutputRowCount(int rowCount) {
+    return (Math.min(MAX_NUM_ROWS, Math.max(rowCount, MIN_NUM_ROWS)));
+  }
+
+  public void setOutgoingRowWidth(int outgoingRowWidth) {
+    this.outgoingRowWidth = outgoingRowWidth;
+  }
+
+  public int getOutgoingRowWidth() {
+    return outgoingRowWidth;
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/20185c9b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchSizer.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchSizer.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchSizer.java
index f5c77ce..536c8bc 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchSizer.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchSizer.java
@@ -153,7 +153,7 @@ public class RecordBatchSizer {
       // that changes the value count of the contained vectors.
 
       UInt4Vector offsetVector = ((RepeatedValueVector) v).getOffsetVector();
-      int childCount = offsetVector.getAccessor().get(valueCount);
+      int childCount = valueCount == 0 ? 0 : 
offsetVector.getAccessor().get(valueCount);
       if (metadata.getType().getMinorType() == MinorType.MAP) {
 
         // For map, the only data associated with the map vector
@@ -305,8 +305,8 @@ public class RecordBatchSizer {
 
   public RecordBatchSizer(RecordBatch batch) {
     this(batch,
-         (batch.getSchema().getSelectionVectorMode() == 
BatchSchema.SelectionVectorMode.TWO_BYTE) ?
-         batch.getSelectionVector2() : null);
+      (batch.getSchema() == null ? null : 
(batch.getSchema().getSelectionVectorMode() == 
BatchSchema.SelectionVectorMode.TWO_BYTE ?
+        batch.getSelectionVector2() : null)));
   }
   /**
    * Create empirical metadata for a record batch given a vector accessible

http://git-wip-us.apache.org/repos/asf/drill/blob/20185c9b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java
index 01acd7f..32c69ce 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java
@@ -57,18 +57,21 @@ public class RecordIterator implements VectorAccessible {
   private final VectorContainer container; // Holds VectorContainer of current 
record batch
   private final TreeRangeMap<Long, RecordBatchData> batches = 
TreeRangeMap.create();
 
+  private final AbstractRecordBatchMemoryManager newBatchCallBack;
+
   public RecordIterator(RecordBatch incoming,
                         AbstractRecordBatch<?> outgoing,
                         OperatorContext oContext,
-                        int inputIndex) {
-    this(incoming, outgoing, oContext, inputIndex, true);
+                        int inputIndex, AbstractRecordBatchMemoryManager 
callBack) {
+    this(incoming, outgoing, oContext, inputIndex, true, callBack);
   }
 
   public RecordIterator(RecordBatch incoming,
                         AbstractRecordBatch<?> outgoing,
                         OperatorContext oContext,
                         int inputIndex,
-                        boolean enableMarkAndReset) {
+                        boolean enableMarkAndReset,
+                        AbstractRecordBatchMemoryManager callBack) {
     this.incoming = incoming;
     this.outgoing = outgoing;
     this.inputIndex = inputIndex;
@@ -78,6 +81,7 @@ public class RecordIterator implements VectorAccessible {
     resetIndices();
     this.initialized = false;
     this.enableMarkAndReset = enableMarkAndReset;
+    this.newBatchCallBack = callBack;
   }
 
   private void resetIndices() {
@@ -97,6 +101,9 @@ public class RecordIterator implements VectorAccessible {
       return;
     }
     lastOutcome = outgoing != null ? outgoing.next(inputIndex, incoming) : 
incoming.next();
+    if ((lastOutcome == IterOutcome.OK || lastOutcome == 
IterOutcome.OK_NEW_SCHEMA) && newBatchCallBack != null) {
+      newBatchCallBack.update(inputIndex);
+    }
   }
 
   public void mark() {

http://git-wip-us.apache.org/repos/asf/drill/blob/20185c9b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestOutputBatchSize.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestOutputBatchSize.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestOutputBatchSize.java
index 9a4633d..99af4c2 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestOutputBatchSize.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestOutputBatchSize.java
@@ -19,12 +19,14 @@
 package org.apache.drill.exec.physical.unit;
 
 import com.google.common.collect.Lists;
+import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.SchemaPath;
 
 import org.apache.drill.exec.physical.base.AbstractBase;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.config.FlattenPOP;
+import org.apache.drill.exec.physical.config.MergeJoinPOP;
 import org.apache.drill.exec.physical.impl.ScanBatch;
 import org.apache.drill.exec.record.RecordBatchSizer;
 import org.apache.drill.exec.record.RecordBatch;
@@ -879,4 +881,249 @@ public class TestOutputBatchSize extends 
PhysicalOpUnitTestBase {
 
     opTestBuilder.go();
   }
-}
+
+  @Test
+  public void testMergeJoinMultipleOutputBatches() throws Exception {
+    MergeJoinPOP mergeJoin = new MergeJoinPOP(null, null,
+      Lists.newArrayList(joinCond("c1", "EQUALS", "c2")), JoinRelType.INNER);
+    mockOpContext(mergeJoin, initReservation, maxAllocation);
+
+    // create left input rows like this.
+    // "a1" : 5, "b1" : wideString, "c1" : <id>
+    List<String> leftJsonBatches = Lists.newArrayList();
+    StringBuilder leftBatchString = new StringBuilder();
+    leftBatchString.append("[");
+    for (int i = 0; i < numRows; i++) {
+      leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString 
+ "\"," + "\"c1\" : " + i + "},");
+    }
+    leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + 
"\"," + "\"c1\" : " + numRows + "}");
+    leftBatchString.append("]");
+
+    leftJsonBatches.add(leftBatchString.toString());
+
+    // create right input rows like this.
+    // "a2" : 6, "b2" : wideString, "c2" : <id>
+    List<String> rightJsonBatches = Lists.newArrayList();
+    StringBuilder rightBatchString = new StringBuilder();
+    rightBatchString.append("[");
+    for (int i = 0; i < numRows; i++) {
+      rightBatchString.append("{\"a2\": 6, " + "\"b2\" : " + "\"" + wideString 
+ "\"," + "\"c2\" : " + i + "},");
+    }
+    rightBatchString.append("{\"a2\": 6, " + "\"b2\" : " + "\"" + wideString + 
"\"," + "\"c2\" : " + numRows + "}");
+    rightBatchString.append("]");
+    rightJsonBatches.add(rightBatchString.toString());
+
+    // output rows will be like this.
+    // "a1" : 5, "b1" : wideString, "c1" : 1, "a2":6, "b2" : wideString, "c2": 
1
+    // "a1" : 5, "b1" : wideString, "c1" : 2, "a2":6, "b2" : wideString, "c2": 
2
+    // "a1" : 5, "b1" : wideString, "c1" : 3, "a2":6, "b2" : wideString, "c2": 
3
+    List<String> expectedJsonBatches = Lists.newArrayList();
+    StringBuilder expectedBatchString = new StringBuilder();
+    expectedBatchString.append("[");
+    for (int i = 0; i < numRows; i++) {
+      expectedBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + 
wideString + "\"," + "\"c1\" : " + i);
+      expectedBatchString.append(", \"a2\": 6, " + "\"b2\" : " + "\"" + 
wideString + "\"," + "\"c2\" : " + i + "},");
+    }
+    expectedBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + 
wideString + "\"," + "\"c1\" : " + numRows);
+    expectedBatchString.append(", \"a2\": 6, " + "\"b2\" : " + "\"" + 
wideString + "\"," + "\"c2\" : " + numRows + "}");
+    expectedBatchString.append("]");
+    expectedJsonBatches.add(expectedBatchString.toString());
+
+    long totalSize = getExpectedSize(expectedJsonBatches);
+
+    // set the output batch size to 1/2 of total size expected.
+    // We will get approximately 4 batches because of fragmentation factor of 
2 accounted for in merge join.
+    
fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size",
 totalSize/2);
+
+    OperatorTestBuilder opTestBuilder = opTestBuilder()
+      .physicalOperator(mergeJoin)
+      .baselineColumns("a1", "b1", "c1", "a2", "b2", "c2")
+      .expectedNumBatches(4)  // verify number of batches
+      .expectedBatchSize(totalSize / 2) // verify batch size
+      .inputDataStreamsJson(Lists.newArrayList(leftJsonBatches, 
rightJsonBatches));
+
+    for (long i = 0; i < numRows + 1; i++) {
+      opTestBuilder.baselineValues(5l, wideString, i, 6l, wideString, i);
+    }
+
+    opTestBuilder.go();
+  }
+
+  @Test
+  public void testMergeJoinSingleOutputBatch() throws Exception {
+    MergeJoinPOP mergeJoin = new MergeJoinPOP(null, null,
+      Lists.newArrayList(joinCond("c1", "EQUALS", "c2")), JoinRelType.INNER);
+    mockOpContext(mergeJoin, initReservation, maxAllocation);
+
+    // create multiple batches from both sides.
+    numRows = 4096 * 2;
+
+    // create left input rows like this.
+    // "a1" : 5, "b1" : wideString, "c1" : <id>
+    List<String> leftJsonBatches = Lists.newArrayList();
+    StringBuilder leftBatchString = new StringBuilder();
+    leftBatchString.append("[");
+    for (int i = 0; i < numRows; i++) {
+      leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString 
+ "\"," + "\"c1\" : " + i + "},");
+    }
+    leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + 
"\"," + "\"c1\" : " + numRows + "}");
+    leftBatchString.append("]");
+
+    leftJsonBatches.add(leftBatchString.toString());
+
+    // create right input rows like this.
+    // "a2" : 6, "b2" : wideString, "c2" : <id>
+    List<String> rightJsonBatches = Lists.newArrayList();
+    StringBuilder rightBatchString = new StringBuilder();
+    rightBatchString.append("[");
+    for (int i = 0; i < numRows; i++) {
+      rightBatchString.append("{\"a2\": 6, " + "\"b2\" : " + "\"" + wideString 
+ "\"," + "\"c2\" : " + i + "},");
+    }
+    rightBatchString.append("{\"a2\": 6, " + "\"b2\" : " + "\"" + wideString + 
"\"," + "\"c2\" : " + numRows + "}");
+    rightBatchString.append("]");
+    rightJsonBatches.add(rightBatchString.toString());
+
+    // output rows will be like this.
+    // "a1" : 5, "b1" : wideString, "c1" : 1, "a2":6, "b2" : wideString, "c2": 
1
+    // "a1" : 5, "b1" : wideString, "c1" : 2, "a2":6, "b2" : wideString, "c2": 
2
+    // "a1" : 5, "b1" : wideString, "c1" : 3, "a2":6, "b2" : wideString, "c2": 
3
+    List<String> expectedJsonBatches = Lists.newArrayList();
+    StringBuilder expectedBatchString = new StringBuilder();
+    expectedBatchString.append("[");
+    for (int i = 0; i < numRows; i++) {
+      expectedBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + 
wideString + "\"," + "\"c1\" : " + i);
+      expectedBatchString.append(", \"a2\": 6, " + "\"b2\" : " + "\"" + 
wideString + "\"," + "\"c2\" : " + i + "},");
+    }
+    expectedBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + 
wideString + "\"," + "\"c1\" : " + numRows);
+    expectedBatchString.append(", \"a2\": 6, " + "\"b2\" : " + "\"" + 
wideString + "\"," + "\"c2\" : " + numRows + "}");
+    expectedBatchString.append("]");
+    expectedJsonBatches.add(expectedBatchString.toString());
+
+    long totalSize = getExpectedSize(expectedJsonBatches);
+
+    // set the output batch size to twice of total size expected.
+    // We should get 1 batch.
+    
fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size",
 totalSize*2);
+
+    OperatorTestBuilder opTestBuilder = opTestBuilder()
+      .physicalOperator(mergeJoin)
+      .baselineColumns("a1", "b1", "c1", "a2", "b2", "c2")
+      .expectedNumBatches(1)  // verify number of batches
+      .expectedBatchSize(totalSize) // verify batch size
+      .inputDataStreamsJson(Lists.newArrayList(leftJsonBatches, 
rightJsonBatches));
+
+    for (long i = 0; i < numRows + 1; i++) {
+      opTestBuilder.baselineValues(5l, wideString, i, 6l, wideString, i);
+    }
+
+    opTestBuilder.go();
+  }
+
+  @Test
+  public void testMergeJoinUpperLimit() throws Exception {
+    // test the upper limit of 65535 records per batch.
+    MergeJoinPOP mergeJoin = new MergeJoinPOP(null, null,
+      Lists.newArrayList(joinCond("c1", "EQUALS", "c2")), JoinRelType.LEFT);
+    mockOpContext(mergeJoin, initReservation, maxAllocation);
+
+    numRows = 100000;
+
+    // create left input rows like this.
+    // "a1" : 5,  "c1" : <id>
+    List<String> leftJsonBatches = Lists.newArrayList();
+    StringBuilder leftBatchString = new StringBuilder();
+    leftBatchString.append("[");
+    for (int i = 0; i < numRows; i++) {
+      leftBatchString.append("{\"a1\": 5, " + "\"c1\" : " + i + "},");
+    }
+    leftBatchString.append("{\"a1\": 5, " + "\"c1\" : " + numRows + "}");
+    leftBatchString.append("]");
+
+    leftJsonBatches.add(leftBatchString.toString());
+
+    // create right input rows like this.
+    // "a2" : 6, "c2" : <id>
+    List<String> rightJsonBatches = Lists.newArrayList();
+    StringBuilder rightBatchString = new StringBuilder();
+    rightBatchString.append("[");
+    for (int i = 0; i < numRows; i++) {
+      rightBatchString.append("{\"a2\": 6, " + "\"c2\" : " + i + "},");
+    }
+    rightBatchString.append("{\"a2\": 6, " + "\"c2\" : " + numRows + "}");
+    rightBatchString.append("]");
+    rightJsonBatches.add(rightBatchString.toString());
+
+    // output rows will be like this.
+    // "a1" : 5,  "c1" : 1, "a2":6,  "c2": 1
+    // "a1" : 5,  "c1" : 2, "a2":6,  "c2": 2
+    // "a1" : 5,  "c1" : 3, "a2":6,  "c2": 3
+
+    // expect two batches, batch limited by 65535 records
+    OperatorTestBuilder opTestBuilder = opTestBuilder()
+      .physicalOperator(mergeJoin)
+      .baselineColumns("a1", "c1", "a2", "c2")
+      .expectedNumBatches(2)  // verify number of batches
+      .inputDataStreamsJson(Lists.newArrayList(leftJsonBatches, 
rightJsonBatches));
+
+    for (long i = 0; i < numRows + 1; i++) {
+      opTestBuilder.baselineValues(5l, i, 6l, i);
+    }
+
+    opTestBuilder.go();
+  }
+
+  @Test
+  public void testMergeJoinLowerLimit() throws Exception {
+    // test the lower limit of at least one batch
+    MergeJoinPOP mergeJoin = new MergeJoinPOP(null, null,
+      Lists.newArrayList(joinCond("c1", "EQUALS", "c2")), JoinRelType.RIGHT);
+    mockOpContext(mergeJoin, initReservation, maxAllocation);
+
+    numRows = 10;
+
+    // create left input rows like this.
+    // "a1" : 5, "b1" : wideString, "c1" : <id>
+    List<String> leftJsonBatches = Lists.newArrayList();
+    StringBuilder leftBatchString = new StringBuilder();
+    leftBatchString.append("[");
+    for (int i = 0; i < numRows; i++) {
+      leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString 
+ "\"," + "\"c1\" : " + i + "},");
+    }
+    leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + 
"\"," + "\"c1\" : " + numRows + "}");
+    leftBatchString.append("]");
+
+    leftJsonBatches.add(leftBatchString.toString());
+
+    // create right input rows like this.
+    // "a2" : 6, "b2" : wideString, "c2" : <id>
+    List<String> rightJsonBatches = Lists.newArrayList();
+    StringBuilder rightBatchString = new StringBuilder();
+    rightBatchString.append("[");
+    for (int i = 0; i < numRows; i++) {
+      rightBatchString.append("{\"a2\": 6, " + "\"b2\" : " + "\"" + wideString 
+ "\"," + "\"c2\" : " + i + "},");
+    }
+    rightBatchString.append("{\"a2\": 6, " + "\"b2\" : " + "\"" + wideString + 
"\"," + "\"c2\" : " + numRows + "}");
+    rightBatchString.append("]");
+    rightJsonBatches.add(rightBatchString.toString());
+
+    // output rows will be like this.
+    // "a1" : 5, "b1" : wideString, "c1" : 1, "a2":6, "b2" : wideString, "c2": 
1
+    // "a1" : 5, "b1" : wideString, "c1" : 2, "a2":6, "b2" : wideString, "c2": 
2
+    // "a1" : 5, "b1" : wideString, "c1" : 3, "a2":6, "b2" : wideString, "c2": 
3
+
+    // set very low value of output batch size so we can do only one row per 
batch.
+    
fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size",
 128);
+
+    OperatorTestBuilder opTestBuilder = opTestBuilder()
+      .physicalOperator(mergeJoin)
+      .baselineColumns("a1", "b1", "c1", "a2", "b2", "c2")
+      .expectedNumBatches(10)  // verify number of batches
+      .inputDataStreamsJson(Lists.newArrayList(leftJsonBatches, 
rightJsonBatches));
+
+    for (long i = 0; i < numRows + 1; i++) {
+      opTestBuilder.baselineValues(5l, wideString, i, 6l, wideString, i);
+    }
+
+    opTestBuilder.go();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/20185c9b/exec/java-exec/src/test/java/org/apache/drill/exec/record/TestRecordIterator.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/record/TestRecordIterator.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/record/TestRecordIterator.java
index e88bb41..7e13dad 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/record/TestRecordIterator.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/record/TestRecordIterator.java
@@ -77,7 +77,7 @@ public class TestRecordIterator extends PopUnitTestBase {
     OpProfileDef def = new OpProfileDef(dummyPop.getOperatorId(), 
UserBitShared.CoreOperatorType.MOCK_SUB_SCAN_VALUE,
       OperatorUtilities.getChildCount(dummyPop));
     OperatorStats stats = exec.getContext().getStats().newOperatorStats(def, 
exec.getContext().getAllocator());
-    RecordIterator iter = new RecordIterator(singleBatch, null, 
exec.getContext().newOperatorContext(dummyPop, stats), 0, false);
+    RecordIterator iter = new RecordIterator(singleBatch, null, 
exec.getContext().newOperatorContext(dummyPop, stats), 0, false, null);
     int totalRecords = 0;
     List<ValueVector> vectors = null;
 
@@ -133,7 +133,7 @@ public class TestRecordIterator extends PopUnitTestBase {
     OpProfileDef def = new OpProfileDef(dummyPop.getOperatorId(), 
UserBitShared.CoreOperatorType.MOCK_SUB_SCAN_VALUE,
         OperatorUtilities.getChildCount(dummyPop));
     OperatorStats stats = exec.getContext().getStats().newOperatorStats(def, 
exec.getContext().getAllocator());
-    RecordIterator iter = new RecordIterator(singleBatch, null, 
exec.getContext().newOperatorContext(dummyPop, stats), 0);
+    RecordIterator iter = new RecordIterator(singleBatch, null, 
exec.getContext().newOperatorContext(dummyPop, stats), 0, null);
     List<ValueVector> vectors = null;
     // batche sizes
     // 1, 100, 10, 10000, 1, 1000

http://git-wip-us.apache.org/repos/asf/drill/blob/20185c9b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/AbstractMapVector.java
----------------------------------------------------------------------
diff --git 
a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/AbstractMapVector.java
 
b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/AbstractMapVector.java
index 5515b7a..3682397 100644
--- 
a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/AbstractMapVector.java
+++ 
b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/AbstractMapVector.java
@@ -297,6 +297,10 @@ public abstract class AbstractMapVector extends 
AbstractContainerVector {
 
   @Override
   public int getPayloadByteCount(int valueCount) {
+    if (valueCount == 0) {
+      return 0;
+    }
+
     int count = 0;
 
     for (final ValueVector v : vectors.values()) {

http://git-wip-us.apache.org/repos/asf/drill/blob/20185c9b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/BaseRepeatedValueVector.java
----------------------------------------------------------------------
diff --git 
a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/BaseRepeatedValueVector.java
 
b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/BaseRepeatedValueVector.java
index 4b0c1b5..02243c8 100644
--- 
a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/BaseRepeatedValueVector.java
+++ 
b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/BaseRepeatedValueVector.java
@@ -220,6 +220,9 @@ public abstract class BaseRepeatedValueVector extends 
BaseValueVector implements
 
   @Override
   public int getPayloadByteCount(int valueCount) {
+    if (valueCount == 0) {
+      return 0;
+    }
     int entryCount = offsets.getAccessor().get(valueCount);
     return offsets.getPayloadByteCount(valueCount) + 
vector.getPayloadByteCount(entryCount);
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/20185c9b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/ListVector.java
----------------------------------------------------------------------
diff --git 
a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/ListVector.java
 
b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/ListVector.java
index 7de5ce6..45d9160 100644
--- 
a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/ListVector.java
+++ 
b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/ListVector.java
@@ -333,6 +333,10 @@ public class ListVector extends BaseRepeatedValueVector {
 
   @Override
   public int getPayloadByteCount(int valueCount) {
+    if (valueCount == 0) {
+      return 0;
+    }
+
     return offsets.getPayloadByteCount(valueCount) + 
bits.getPayloadByteCount(valueCount) +
            super.getPayloadByteCount(valueCount);
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/20185c9b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java
----------------------------------------------------------------------
diff --git 
a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java
 
b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java
index 6442417..4a7eda1 100644
--- 
a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java
+++ 
b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java
@@ -442,6 +442,9 @@ public class RepeatedListVector extends 
AbstractContainerVector
 
   @Override
   public int getPayloadByteCount(int valueCount) {
+    if (valueCount == 0) {
+      return 0;
+    }
     return delegate.getPayloadByteCount(valueCount);
   }
 

Reply via email to