DRILL-6323: Lateral Join - Refactor BatchMemorySize to put outputBatchSize in 
abstract class. Created a new JoinBatchMemoryManager to be shared across join 
record batches. Changed merge join to use AbstractBinaryRecordBatch instead of 
AbstractRecordBatch, and use JoinBatchMemoryManager


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/7f19b500
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/7f19b500
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/7f19b500

Branch: refs/heads/master
Commit: 7f19b500facffe52bf5611d0ae58bfd4e4d0336f
Parents: 4b6f10e
Author: Sorabh Hamirwasia <shamirwa...@maprtech.com>
Authored: Wed Mar 14 16:59:29 2018 -0700
Committer: Parth Chandra <par...@apache.org>
Committed: Tue Apr 17 18:16:09 2018 -0700

----------------------------------------------------------------------
 .../drill/exec/ops/OperatorMetricRegistry.java  |   6 +-
 .../impl/flatten/FlattenRecordBatch.java        |  14 +-
 .../exec/physical/impl/join/MergeJoinBatch.java | 184 ++++---------------
 .../exec/record/AbstractBinaryRecordBatch.java  |  88 +++++++--
 .../drill/exec/record/AbstractRecordBatch.java  |   1 -
 .../exec/record/JoinBatchMemoryManager.java     | 124 +++++++++++++
 .../exec/record/RecordBatchMemoryManager.java   |  25 ++-
 .../drill/exec/record/RecordIterator.java       |   2 +-
 8 files changed, 260 insertions(+), 184 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/7f19b500/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorMetricRegistry.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorMetricRegistry.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorMetricRegistry.java
index c703071..d9a5fdc 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorMetricRegistry.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorMetricRegistry.java
@@ -23,13 +23,13 @@ import 
org.apache.drill.exec.physical.impl.aggregate.HashAggTemplate;
 import 
org.apache.drill.exec.physical.impl.broadcastsender.BroadcastSenderRootExec;
 import org.apache.drill.exec.physical.impl.flatten.FlattenRecordBatch;
 import org.apache.drill.exec.physical.impl.join.HashJoinBatch;
-import org.apache.drill.exec.physical.impl.join.MergeJoinBatch;
 import org.apache.drill.exec.physical.impl.mergereceiver.MergingRecordBatch;
 import 
org.apache.drill.exec.physical.impl.partitionsender.PartitionSenderRootExec;
 import 
org.apache.drill.exec.physical.impl.unorderedreceiver.UnorderedReceiverBatch;
 import org.apache.drill.exec.physical.impl.xsort.ExternalSortBatch;
-import org.apache.drill.exec.store.parquet.columnreaders.ParquetRecordReader;
 import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
+import org.apache.drill.exec.record.JoinBatchMemoryManager;
+import org.apache.drill.exec.store.parquet.columnreaders.ParquetRecordReader;
 
 /**
  * Registry of operator metrics.
@@ -52,7 +52,7 @@ public class OperatorMetricRegistry {
     register(CoreOperatorType.EXTERNAL_SORT_VALUE, 
ExternalSortBatch.Metric.class);
     register(CoreOperatorType.PARQUET_ROW_GROUP_SCAN_VALUE, 
ParquetRecordReader.Metric.class);
     register(CoreOperatorType.FLATTEN_VALUE, FlattenRecordBatch.Metric.class);
-    register(CoreOperatorType.MERGE_JOIN_VALUE, MergeJoinBatch.Metric.class);
+    register(CoreOperatorType.MERGE_JOIN_VALUE, 
JoinBatchMemoryManager.Metric.class);
   }
 
   private static void register(final int operatorType, final Class<? extends 
MetricDef> metricDef) {

http://git-wip-us.apache.org/repos/asf/drill/blob/7f19b500/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
index aea415b..bbe9f76 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
@@ -72,8 +72,7 @@ public class FlattenRecordBatch extends 
AbstractSingleRecordBatch<FlattenPOP> {
   private boolean hasRemainder = false;
   private int remainderIndex = 0;
   private int recordCount;
-  private int outputBatchSize;
-  private final FlattenMemoryManager flattenMemoryManager = new 
FlattenMemoryManager();
+  private final FlattenMemoryManager flattenMemoryManager;
 
   private final Flattener.Monitor monitor = new Flattener.Monitor() {
     @Override
@@ -118,6 +117,10 @@ public class FlattenRecordBatch extends 
AbstractSingleRecordBatch<FlattenPOP> {
 
   private class FlattenMemoryManager extends RecordBatchMemoryManager {
 
+    FlattenMemoryManager(int outputBatchSize) {
+      super(outputBatchSize);
+    }
+
     @Override
     public void update() {
       // Get sizing information for the batch.
@@ -138,11 +141,13 @@ public class FlattenRecordBatch extends 
AbstractSingleRecordBatch<FlattenPOP> {
       // Average rowWidth of single element in the flatten list.
       // subtract the offset vector size from column data size.
       final int avgRowWidthSingleFlattenEntry =
-        RecordBatchSizer.safeDivide(columnSize.getTotalNetSize() - 
(OFFSET_VECTOR_WIDTH * columnSize.getValueCount()), 
columnSize.getElementCount());
+        RecordBatchSizer.safeDivide(columnSize.getTotalNetSize() - 
(getOffsetVectorWidth() * columnSize.getValueCount()),
+          columnSize.getElementCount());
 
       // Average rowWidth of outgoing batch.
       final int avgOutgoingRowWidth = avgRowWidthWithOutFlattenColumn + 
avgRowWidthSingleFlattenEntry;
 
+      final int outputBatchSize = getOutputBatchSize();
       // Number of rows in outgoing batch
       setOutputRowCount(outputBatchSize, avgOutgoingRowWidth);
 
@@ -165,7 +170,8 @@ public class FlattenRecordBatch extends 
AbstractSingleRecordBatch<FlattenPOP> {
     super(pop, context, incoming);
 
     // get the output batch size from config.
-    outputBatchSize = (int) 
context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
+    int configuredBatchSize = (int) 
context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
+    flattenMemoryManager = new FlattenMemoryManager(configuredBatchSize);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/7f19b500/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
index ab50b22..ffcbae3 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
@@ -17,12 +17,13 @@
  */
 package org.apache.drill.exec.physical.impl.join;
 
-import static org.apache.drill.exec.compile.sig.GeneratorMapping.GM;
-
-import java.io.IOException;
-import java.util.List;
-
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
+import com.sun.codemodel.JClass;
+import com.sun.codemodel.JConditional;
+import com.sun.codemodel.JExpr;
+import com.sun.codemodel.JMod;
+import com.sun.codemodel.JVar;
 import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.drill.common.expression.ErrorCollector;
 import org.apache.drill.common.expression.ErrorCollectorImpl;
@@ -44,34 +45,31 @@ import org.apache.drill.exec.expr.CodeGenerator;
 import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
 import org.apache.drill.exec.expr.fn.FunctionGenerationHelper;
 import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.ops.MetricDef;
 import org.apache.drill.exec.physical.config.MergeJoinPOP;
 import org.apache.drill.exec.physical.impl.common.Comparator;
-import org.apache.drill.exec.record.RecordBatchSizer;
+import org.apache.drill.exec.record.AbstractBinaryRecordBatch;
 import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.JoinBatchMemoryManager;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.RecordBatchSizer;
 import org.apache.drill.exec.record.RecordIterator;
 import org.apache.drill.exec.record.TypedFieldId;
 import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.record.AbstractRecordBatch;
-import org.apache.drill.exec.record.RecordBatchMemoryManager;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.complex.AbstractContainerVector;
 
-import com.google.common.base.Preconditions;
-import com.sun.codemodel.JClass;
-import com.sun.codemodel.JConditional;
-import com.sun.codemodel.JExpr;
-import com.sun.codemodel.JMod;
-import com.sun.codemodel.JVar;
+import java.io.IOException;
+import java.util.List;
+
+import static org.apache.drill.exec.compile.sig.GeneratorMapping.GM;
 
 /**
  * A join operator merges two sorted streams using record iterator.
  */
-public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
+public class MergeJoinBatch extends AbstractBinaryRecordBatch<MergeJoinPOP> {
 
   private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(MergeJoinBatch.class);
 
@@ -96,8 +94,6 @@ public class MergeJoinBatch extends 
AbstractRecordBatch<MergeJoinPOP> {
       GM("doSetup", "doSetup", null, null),
       GM("doSetup", "doCompare", null, null));
 
-  private final RecordBatch left;
-  private final RecordBatch right;
   private final RecordIterator leftIterator;
   private final RecordIterator rightIterator;
   private final JoinStatus status;
@@ -105,41 +101,14 @@ public class MergeJoinBatch extends 
AbstractRecordBatch<MergeJoinPOP> {
   private final List<Comparator> comparators;
   private final JoinRelType joinType;
   private JoinWorker worker;
-  private final int outputBatchSize;
 
   private static final String LEFT_INPUT = "LEFT INPUT";
   private static final String RIGHT_INPUT = "RIGHT INPUT";
 
-  private static final int numInputs = 2;
-  private static final int LEFT_INDEX = 0;
-  private static final int RIGHT_INDEX = 1;
-
-  public enum Metric implements MetricDef {
-    LEFT_INPUT_BATCH_COUNT,
-    LEFT_AVG_INPUT_BATCH_BYTES,
-    LEFT_AVG_INPUT_ROW_BYTES,
-    LEFT_INPUT_RECORD_COUNT,
-    RIGHT_INPUT_BATCH_COUNT,
-    RIGHT_AVG_INPUT_BATCH_BYTES,
-    RIGHT_AVG_INPUT_ROW_BYTES,
-    RIGHT_INPUT_RECORD_COUNT,
-    OUTPUT_BATCH_COUNT,
-    AVG_OUTPUT_BATCH_BYTES,
-    AVG_OUTPUT_ROW_BYTES,
-    OUTPUT_RECORD_COUNT;
+  private class MergeJoinMemoryManager extends JoinBatchMemoryManager {
 
-    @Override
-    public int metricId() {
-      return ordinal();
-    }
-  }
-
-  private class MergeJoinMemoryManager extends RecordBatchMemoryManager {
-    private int leftRowWidth;
-    private int rightRowWidth;
-
-    public MergeJoinMemoryManager() {
-      super(numInputs);
+    MergeJoinMemoryManager(int outputBatchSize, RecordBatch leftBatch, 
RecordBatch rightBatch) {
+      super(outputBatchSize, leftBatch, rightBatch);
     }
 
     /**
@@ -152,73 +121,22 @@ public class MergeJoinBatch extends 
AbstractRecordBatch<MergeJoinPOP> {
      */
     @Override
     public void update(int inputIndex) {
-      switch(inputIndex) {
-        case LEFT_INDEX:
-          setRecordBatchSizer(inputIndex, new RecordBatchSizer(left));
-          leftRowWidth = getRecordBatchSizer(inputIndex).netRowWidth();
-          logger.debug("left incoming batch size : {}", 
getRecordBatchSizer(inputIndex));
-          break;
-        case RIGHT_INDEX:
-          setRecordBatchSizer(inputIndex, new RecordBatchSizer(right));
-          rightRowWidth = getRecordBatchSizer(inputIndex).netRowWidth();
-          logger.debug("right incoming batch size : {}", 
getRecordBatchSizer(inputIndex));
-        default:
-          break;
-      }
-
-      updateIncomingStats(inputIndex);
-      final int newOutgoingRowWidth = leftRowWidth + rightRowWidth;
-
-      // If outgoing row width is 0, just return. This is possible for empty 
batches or
-      // when first set of batches come with OK_NEW_SCHEMA and no data.
-      if (newOutgoingRowWidth == 0) {
-        return;
-      }
-
-      // update the value to be used for next batch(es)
-      setOutputRowCount(outputBatchSize, newOutgoingRowWidth);
-
-      // Adjust for the current batch.
-      // calculate memory used so far based on previous outgoing row width and 
how many rows we already processed.
-      final long memoryUsed = status.getOutPosition() * getOutgoingRowWidth();
-      // This is the remaining memory.
-      final long remainingMemory = Math.max(outputBatchSize - memoryUsed, 0);
-      // These are number of rows we can fit in remaining memory based on new 
outgoing row width.
-      final int numOutputRowsRemaining = 
RecordBatchSizer.safeDivide(remainingMemory, newOutgoingRowWidth);
-
-      
status.setTargetOutputRowCount(adjustOutputRowCount(status.getOutPosition() + 
numOutputRowsRemaining));
-      setOutgoingRowWidth(newOutgoingRowWidth);
-
-      logger.debug("output batch size : {}, avg outgoing rowWidth : {}, output 
rowCount : {}",
-        outputBatchSize, getOutgoingRowWidth(), getOutputRowCount());
-    }
-
-    @Override
-    public RecordBatchSizer.ColumnSize getColumnSize(String name) {
-      RecordBatchSizer leftSizer = getRecordBatchSizer(LEFT_INDEX);
-      RecordBatchSizer rightSizer = getRecordBatchSizer(RIGHT_INDEX);
-
-      if (leftSizer != null && leftSizer.getColumn(name) != null) {
-        return leftSizer.getColumn(name);
-      }
-      return rightSizer == null ? null : rightSizer.getColumn(name);
+      status.setTargetOutputRowCount(super.update(inputIndex, 
status.getOutPosition()));
     }
   }
 
-  private final MergeJoinMemoryManager mergeJoinMemoryManager = new 
MergeJoinMemoryManager();
-
   protected MergeJoinBatch(MergeJoinPOP popConfig, FragmentContext context, 
RecordBatch left, RecordBatch right) throws OutOfMemoryException {
-    super(popConfig, context, true);
+    super(popConfig, context, true, left, right);
 
-    outputBatchSize = (int) 
context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
+    // Instantiate the batch memory manager
+    final int outputBatchSize = (int) 
context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
+    batchMemoryManager = new MergeJoinMemoryManager(outputBatchSize, left, 
right);
 
     if (popConfig.getConditions().size() == 0) {
       throw new UnsupportedOperationException("Merge Join currently does not 
support cartesian join.  This join operator was configured with 0 conditions");
     }
-    this.left = left;
-    this.leftIterator = new RecordIterator(left, this, oContext, 0, false, 
mergeJoinMemoryManager);
-    this.right = right;
-    this.rightIterator = new RecordIterator(right, this, oContext, 1, 
mergeJoinMemoryManager);
+    this.leftIterator = new RecordIterator(left, this, oContext, 0, false, 
batchMemoryManager);
+    this.rightIterator = new RecordIterator(right, this, oContext, 1, 
batchMemoryManager);
     this.joinType = popConfig.getJoinType();
     this.status = new JoinStatus(leftIterator, rightIterator, this);
     this.conditions = popConfig.getConditions();
@@ -242,21 +160,10 @@ public class MergeJoinBatch extends 
AbstractRecordBatch<MergeJoinPOP> {
   public void buildSchema() {
     // initialize iterators
     status.initialize();
-
     final IterOutcome leftOutcome = status.getLeftStatus();
     final IterOutcome rightOutcome = status.getRightStatus();
-    if (leftOutcome == IterOutcome.STOP || rightOutcome == IterOutcome.STOP) {
-      state = BatchState.STOP;
-      return;
-    }
-
-    if (leftOutcome == IterOutcome.OUT_OF_MEMORY || rightOutcome == 
IterOutcome.OUT_OF_MEMORY) {
-      state = BatchState.OUT_OF_MEMORY;
-      return;
-    }
 
-    if (leftOutcome == IterOutcome.NONE && rightOutcome == IterOutcome.NONE) {
-      state = BatchState.DONE;
+    if (!verifyOutcomeToSetBatchState(leftOutcome, rightOutcome)) {
       return;
     }
 
@@ -274,12 +181,12 @@ public class MergeJoinBatch extends 
AbstractRecordBatch<MergeJoinPOP> {
         case BATCH_RETURNED:
           allocateBatch(false);
           status.resetOutputPos();
-          
status.setTargetOutputRowCount(mergeJoinMemoryManager.getOutputRowCount());
+          
status.setTargetOutputRowCount(batchMemoryManager.getOutputRowCount());
           break;
         case SCHEMA_CHANGED:
           allocateBatch(true);
           status.resetOutputPos();
-          
status.setTargetOutputRowCount(mergeJoinMemoryManager.getOutputRowCount());
+          
status.setTargetOutputRowCount(batchMemoryManager.getOutputRowCount());
           break;
         case NO_MORE_DATA:
           status.resetOutputPos();
@@ -359,12 +266,12 @@ public class MergeJoinBatch extends 
AbstractRecordBatch<MergeJoinPOP> {
       Preconditions.checkArgument(!vw.isHyper());
       vw.getValueVector().getMutator().setValueCount(getRecordCount());
     }
-    mergeJoinMemoryManager.updateOutgoingStats(getRecordCount());
+    batchMemoryManager.updateOutgoingStats(getRecordCount());
   }
 
   @Override
   public void close() {
-    updateStats();
+    updateBatchMemoryManagerStats();
     super.close();
     leftIterator.close();
     rightIterator.close();
@@ -542,9 +449,9 @@ public class MergeJoinBatch extends 
AbstractRecordBatch<MergeJoinPOP> {
 
     // Allocate memory for the vectors.
     // This will iteratively allocate memory for all nested columns underneath.
-    int outputRowCount = mergeJoinMemoryManager.getOutputRowCount();
+    int outputRowCount = batchMemoryManager.getOutputRowCount();
     for (VectorWrapper w : container) {
-      RecordBatchSizer.ColumnSize colSize = 
mergeJoinMemoryManager.getColumnSize(w.getField().getName());
+      RecordBatchSizer.ColumnSize colSize = 
batchMemoryManager.getColumnSize(w.getField().getName());
       colSize.allocateVector(w.getValueVector(), outputRowCount);
     }
 
@@ -610,33 +517,4 @@ public class MergeJoinBatch extends 
AbstractRecordBatch<MergeJoinPOP> {
     }
     return materializedExpr;
   }
-
-  private void updateStats() {
-    stats.setLongStat(MergeJoinBatch.Metric.LEFT_INPUT_BATCH_COUNT, 
mergeJoinMemoryManager.getNumIncomingBatches(LEFT_INDEX));
-    stats.setLongStat(MergeJoinBatch.Metric.LEFT_AVG_INPUT_BATCH_BYTES, 
mergeJoinMemoryManager.getAvgInputBatchSize(LEFT_INDEX));
-    stats.setLongStat(MergeJoinBatch.Metric.LEFT_AVG_INPUT_ROW_BYTES, 
mergeJoinMemoryManager.getAvgInputRowWidth(LEFT_INDEX));
-    stats.setLongStat(Metric.LEFT_INPUT_RECORD_COUNT, 
mergeJoinMemoryManager.getTotalInputRecords(LEFT_INDEX));
-
-    stats.setLongStat(MergeJoinBatch.Metric.RIGHT_INPUT_BATCH_COUNT, 
mergeJoinMemoryManager.getNumIncomingBatches(RIGHT_INDEX));
-    stats.setLongStat(MergeJoinBatch.Metric.RIGHT_AVG_INPUT_BATCH_BYTES, 
mergeJoinMemoryManager.getAvgInputBatchSize(RIGHT_INDEX));
-    stats.setLongStat(MergeJoinBatch.Metric.RIGHT_AVG_INPUT_ROW_BYTES, 
mergeJoinMemoryManager.getAvgInputRowWidth(RIGHT_INDEX));
-    stats.setLongStat(Metric.RIGHT_INPUT_RECORD_COUNT, 
mergeJoinMemoryManager.getTotalInputRecords(RIGHT_INDEX));
-
-    stats.setLongStat(MergeJoinBatch.Metric.OUTPUT_BATCH_COUNT, 
mergeJoinMemoryManager.getNumOutgoingBatches());
-    stats.setLongStat(MergeJoinBatch.Metric.AVG_OUTPUT_BATCH_BYTES, 
mergeJoinMemoryManager.getAvgOutputBatchSize());
-    stats.setLongStat(MergeJoinBatch.Metric.AVG_OUTPUT_ROW_BYTES, 
mergeJoinMemoryManager.getAvgOutputRowWidth());
-    stats.setLongStat(MergeJoinBatch.Metric.OUTPUT_RECORD_COUNT, 
mergeJoinMemoryManager.getTotalOutputRecords());
-
-    logger.debug("left input: batch count : {}, avg batch bytes : {},  avg row 
bytes : {}, record count : {}",
-      mergeJoinMemoryManager.getNumIncomingBatches(LEFT_INDEX), 
mergeJoinMemoryManager.getAvgInputBatchSize(LEFT_INDEX),
-      mergeJoinMemoryManager.getAvgInputRowWidth(LEFT_INDEX), 
mergeJoinMemoryManager.getTotalInputRecords(LEFT_INDEX));
-
-    logger.debug("right input: batch count : {}, avg batch bytes : {},  avg 
row bytes : {}, record count : {}",
-      mergeJoinMemoryManager.getNumIncomingBatches(RIGHT_INDEX), 
mergeJoinMemoryManager.getAvgInputBatchSize(RIGHT_INDEX),
-      mergeJoinMemoryManager.getAvgInputRowWidth(RIGHT_INDEX), 
mergeJoinMemoryManager.getTotalInputRecords(RIGHT_INDEX));
-
-    logger.debug("output: batch count : {}, avg batch bytes : {},  avg row 
bytes : {}, record count : {}",
-      mergeJoinMemoryManager.getNumOutgoingBatches(), 
mergeJoinMemoryManager.getAvgOutputBatchSize(),
-      mergeJoinMemoryManager.getAvgOutputRowWidth(), 
mergeJoinMemoryManager.getTotalOutputRecords());
-  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/7f19b500/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java
index 02b07bb..70be9b5 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java
@@ -22,6 +22,9 @@ import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 
 public abstract class AbstractBinaryRecordBatch<T extends PhysicalOperator> 
extends  AbstractRecordBatch<T> {
+  private static final org.slf4j.Logger logger =
+    org.slf4j.LoggerFactory.getLogger(new Object() 
{}.getClass().getEnclosingClass());
+
   protected final RecordBatch left;
   protected final RecordBatch right;
 
@@ -31,6 +34,9 @@ public abstract class AbstractBinaryRecordBatch<T extends 
PhysicalOperator> exte
   // state (IterOutcome) of the right input
   protected IterOutcome rightUpstream = IterOutcome.NONE;
 
+  // For now only used by Lateral and Merge Join
+  protected RecordBatchMemoryManager batchMemoryManager;
+
   protected AbstractBinaryRecordBatch(final T popConfig, final FragmentContext 
context, RecordBatch left,
       RecordBatch right) throws OutOfMemoryException {
     super(popConfig, context, true, context.newOperatorContext(popConfig));
@@ -45,48 +51,94 @@ public abstract class AbstractBinaryRecordBatch<T extends 
PhysicalOperator> exte
     this.right = right;
   }
 
-  /**
-   * Prefetch first batch from both inputs.
-   * @return true if caller should continue processing
-   *         false if caller should stop and exit from processing.
-   */
-  protected boolean prefetchFirstBatchFromBothSides() {
-    // Left can get batch with zero or more records with OK_NEW_SCHEMA outcome 
as first batch
-    leftUpstream = next(0, left);
-
-    rightUpstream = next(1, right);
-
-    if (leftUpstream == IterOutcome.STOP || rightUpstream == IterOutcome.STOP) 
{
+  protected boolean verifyOutcomeToSetBatchState(IterOutcome leftOutcome, 
IterOutcome rightOutcome) {
+    if (leftOutcome == IterOutcome.STOP || rightUpstream == IterOutcome.STOP) {
       state = BatchState.STOP;
       return false;
     }
 
-    if (leftUpstream == IterOutcome.OUT_OF_MEMORY || rightUpstream == 
IterOutcome.OUT_OF_MEMORY) {
+    if (leftOutcome == IterOutcome.OUT_OF_MEMORY || rightUpstream == 
IterOutcome.OUT_OF_MEMORY) {
       state = BatchState.OUT_OF_MEMORY;
       return false;
     }
 
-    if (checkForEarlyFinish()) {
+    if (checkForEarlyFinish(leftOutcome, rightOutcome)) {
       state = BatchState.DONE;
       return false;
     }
 
     // EMIT outcome is not expected as part of first batch from either side
-    if (leftUpstream == IterOutcome.EMIT || rightUpstream == IterOutcome.EMIT) 
{
+    if (leftOutcome == IterOutcome.EMIT || rightOutcome == IterOutcome.EMIT) {
       state = BatchState.STOP;
       throw new IllegalStateException("Unexpected IterOutcome.EMIT received 
either from left or right side in " +
         "buildSchema phase");
     }
-
     return true;
   }
 
+  /**
+   * Prefetch first batch from both inputs.
+   * @return true if caller should continue processing
+   *         false if caller should stop and exit from processing.
+   */
+  protected boolean prefetchFirstBatchFromBothSides() {
+    // Left can get batch with zero or more records with OK_NEW_SCHEMA outcome 
as first batch
+    leftUpstream = next(0, left);
+    rightUpstream = next(1, right);
+    return verifyOutcomeToSetBatchState(leftUpstream, rightUpstream);
+  }
+
   /*
    * Checks for the operator specific early terminal condition.
    * @return true if the further processing can stop.
    *         false if the further processing is needed.
    */
-  protected boolean checkForEarlyFinish() {
-    return (leftUpstream == IterOutcome.NONE && rightUpstream == 
IterOutcome.NONE);
+  protected boolean checkForEarlyFinish(IterOutcome leftOutcome, IterOutcome 
rightOutcome) {
+    return (leftOutcome == IterOutcome.NONE && rightOutcome == 
IterOutcome.NONE);
+  }
+
+  protected void updateBatchMemoryManagerStats() {
+    stats.setLongStat(JoinBatchMemoryManager.Metric.LEFT_INPUT_BATCH_COUNT,
+      
batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX));
+    stats.setLongStat(JoinBatchMemoryManager.Metric.LEFT_AVG_INPUT_BATCH_BYTES,
+      
batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX));
+    stats.setLongStat(JoinBatchMemoryManager.Metric.LEFT_AVG_INPUT_ROW_BYTES,
+      
batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX));
+    stats.setLongStat(JoinBatchMemoryManager.Metric.LEFT_INPUT_RECORD_COUNT,
+      
batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX));
+
+    stats.setLongStat(JoinBatchMemoryManager.Metric.RIGHT_INPUT_BATCH_COUNT,
+      
batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX));
+    
stats.setLongStat(JoinBatchMemoryManager.Metric.RIGHT_AVG_INPUT_BATCH_BYTES,
+      
batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX));
+    stats.setLongStat(JoinBatchMemoryManager.Metric.RIGHT_AVG_INPUT_ROW_BYTES,
+      
batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX));
+    stats.setLongStat(JoinBatchMemoryManager.Metric.RIGHT_INPUT_RECORD_COUNT,
+      
batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX));
+
+    stats.setLongStat(JoinBatchMemoryManager.Metric.OUTPUT_BATCH_COUNT,
+      batchMemoryManager.getNumOutgoingBatches());
+    stats.setLongStat(JoinBatchMemoryManager.Metric.AVG_OUTPUT_BATCH_BYTES,
+      batchMemoryManager.getAvgOutputBatchSize());
+    stats.setLongStat(JoinBatchMemoryManager.Metric.AVG_OUTPUT_ROW_BYTES,
+      batchMemoryManager.getAvgOutputRowWidth());
+    stats.setLongStat(JoinBatchMemoryManager.Metric.OUTPUT_RECORD_COUNT,
+      batchMemoryManager.getTotalOutputRecords());
+
+    logger.debug("left input: batch count : {}, avg batch bytes : {},  avg row 
bytes : {}, record count : {}",
+      
batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX),
+      
batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX),
+      
batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX),
+      
batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX));
+
+    logger.debug("right input: batch count : {}, avg batch bytes : {},  avg 
row bytes : {}, record count : {}",
+      
batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX),
+      
batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX),
+      
batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX),
+      
batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX));
+
+    logger.debug("output: batch count : {}, avg batch bytes : {},  avg row 
bytes : {}, record count : {}",
+      batchMemoryManager.getNumOutgoingBatches(), 
batchMemoryManager.getAvgOutputBatchSize(),
+      batchMemoryManager.getAvgOutputRowWidth(), 
batchMemoryManager.getTotalOutputRecords());
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/7f19b500/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
index 8bf1856..9d383c1 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
@@ -42,7 +42,6 @@ public abstract class AbstractRecordBatch<T extends 
PhysicalOperator> implements
   protected final OperatorContext oContext;
   protected final OperatorStats stats;
   protected final boolean unionTypeEnabled;
-
   protected BatchState state;
 
   protected AbstractRecordBatch(final T popConfig, final FragmentContext 
context) throws OutOfMemoryException {

http://git-wip-us.apache.org/repos/asf/drill/blob/7f19b500/exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java
new file mode 100644
index 0000000..8437000
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.record;
+
+import org.apache.drill.exec.ops.MetricDef;
+
+public class JoinBatchMemoryManager extends RecordBatchMemoryManager {
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(JoinBatchMemoryManager.class);
+
+  private int leftRowWidth;
+
+  private int rightRowWidth;
+
+  private RecordBatch leftIncoming;
+
+  private RecordBatch rightIncoming;
+
+  private static final int numInputs = 2;
+
+  public static final int LEFT_INDEX = 0;
+
+  public static final int RIGHT_INDEX = 1;
+
+  public JoinBatchMemoryManager(int outputBatchSize, RecordBatch leftBatch, 
RecordBatch rightBatch) {
+    super(numInputs, outputBatchSize);
+    this.leftIncoming = leftBatch;
+    this.rightIncoming = rightBatch;
+  }
+
+  @Override
+  public int update(int inputIndex, int outputPosition) {
+    switch (inputIndex) {
+      case LEFT_INDEX:
+        setRecordBatchSizer(inputIndex, new RecordBatchSizer(leftIncoming));
+        leftRowWidth = getRecordBatchSizer(inputIndex).netRowWidth();
+        logger.debug("left incoming batch size : {}", 
getRecordBatchSizer(inputIndex));
+        break;
+      case RIGHT_INDEX:
+        setRecordBatchSizer(inputIndex, new RecordBatchSizer(rightIncoming));
+        rightRowWidth = getRecordBatchSizer(inputIndex).netRowWidth();
+        logger.debug("right incoming batch size : {}", 
getRecordBatchSizer(inputIndex));
+      default:
+        break;
+    }
+
+    updateIncomingStats(inputIndex);
+    final int newOutgoingRowWidth = leftRowWidth + rightRowWidth;
+
+    // If outgoing row width is 0, just return. This is possible for empty 
batches or
+    // when first set of batches come with OK_NEW_SCHEMA and no data.
+    if (newOutgoingRowWidth == 0) {
+      return getOutputRowCount();
+    }
+
+    // Adjust for the current batch.
+    // calculate memory used so far based on previous outgoing row width and 
how many rows we already processed.
+    final int previousOutgoingWidth = getOutgoingRowWidth();
+    final long memoryUsed = outputPosition * previousOutgoingWidth;
+
+    final int configOutputBatchSize = getOutputBatchSize();
+    // This is the remaining memory.
+    final long remainingMemory = Math.max(configOutputBatchSize - memoryUsed, 
0);
+
+    // These are number of rows we can fit in remaining memory based on new 
outgoing row width.
+    final int numOutputRowsRemaining = 
RecordBatchSizer.safeDivide(remainingMemory, newOutgoingRowWidth);
+
+    // update the value to be used for next batch(es)
+    setOutputRowCount(configOutputBatchSize, newOutgoingRowWidth);
+
+    // set the new row width
+    setOutgoingRowWidth(newOutgoingRowWidth);
+
+    logger.debug("output batch size : {}, avg outgoing rowWidth : {}, output 
rowCount : {}",
+      getOutputBatchSize(), getOutgoingRowWidth(), getOutputRowCount());
+
+    return adjustOutputRowCount(outputPosition + numOutputRowsRemaining);
+  }
+
+  @Override
+  public RecordBatchSizer.ColumnSize getColumnSize(String name) {
+    RecordBatchSizer leftSizer = getRecordBatchSizer(LEFT_INDEX);
+    RecordBatchSizer rightSizer = getRecordBatchSizer(RIGHT_INDEX);
+
+    if (leftSizer != null && leftSizer.getColumn(name) != null) {
+      return leftSizer.getColumn(name);
+    }
+    return rightSizer == null ? null : rightSizer.getColumn(name);
+  }
+
+  public enum Metric implements MetricDef {
+    LEFT_INPUT_BATCH_COUNT,
+    LEFT_AVG_INPUT_BATCH_BYTES,
+    LEFT_AVG_INPUT_ROW_BYTES,
+    LEFT_INPUT_RECORD_COUNT,
+    RIGHT_INPUT_BATCH_COUNT,
+    RIGHT_AVG_INPUT_BATCH_BYTES,
+    RIGHT_AVG_INPUT_ROW_BYTES,
+    RIGHT_INPUT_RECORD_COUNT,
+    OUTPUT_BATCH_COUNT,
+    AVG_OUTPUT_BATCH_BYTES,
+    AVG_OUTPUT_ROW_BYTES,
+    OUTPUT_RECORD_COUNT;
+
+    @Override
+    public int metricId() {
+      return ordinal();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/7f19b500/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchMemoryManager.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchMemoryManager.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchMemoryManager.java
index c5f31a9..2100ae1 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchMemoryManager.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchMemoryManager.java
@@ -18,15 +18,16 @@
 package org.apache.drill.exec.record;
 
 import com.google.common.base.Preconditions;
+import org.apache.drill.exec.vector.UInt4Vector;
 import org.apache.drill.exec.vector.ValueVector;
 
 public class RecordBatchMemoryManager {
-  protected static final int OFFSET_VECTOR_WIDTH = 4;
   protected static final int MAX_NUM_ROWS = ValueVector.MAX_ROW_COUNT;
   protected static final int MIN_NUM_ROWS = 1;
   protected static final int DEFAULT_INPUT_INDEX = 0;
   private int outputRowCount = MAX_NUM_ROWS;
   private int outgoingRowWidth;
+  private int outputBatchSize;
   private RecordBatchSizer[] sizer;
   private BatchStats[] inputBatchStats;
   private BatchStats outputBatchStats;
@@ -126,20 +127,28 @@ public class RecordBatchMemoryManager {
     return inputBatchStats[index] == null ? 0 : 
inputBatchStats[index].getTotalRecords();
   }
 
-  public RecordBatchMemoryManager(int numInputs) {
+  public RecordBatchMemoryManager(int numInputs, int configuredOutputSize) {
     this.numInputs = numInputs;
+    this.outputBatchSize = configuredOutputSize;
     sizer = new RecordBatchSizer[numInputs];
     inputBatchStats = new BatchStats[numInputs];
     outputBatchStats = new BatchStats();
   }
 
-  public RecordBatchMemoryManager() {
+  public RecordBatchMemoryManager(int configuredOutputSize) {
+    this.outputBatchSize = configuredOutputSize;
     sizer = new RecordBatchSizer[numInputs];
     inputBatchStats = new BatchStats[numInputs];
     outputBatchStats = new BatchStats();
   }
 
-  public void update(int inputIndex) {};
+  public int update(int inputIndex, int outputPosition) {
+    // by default just return the outputRowCount
+    return getOutputRowCount();
+  }
+
+  public void update(int inputIndex) {
+  }
 
   public void update() {};
 
@@ -224,4 +233,12 @@ public class RecordBatchMemoryManager {
     outputBatchStats.incTotalRecords(outputRecords);
     outputBatchStats.incSumBatchSizes(outgoingRowWidth * outputRecords);
   }
+
+  public int getOutputBatchSize() {
+    return outputBatchSize;
+  }
+
+  public int getOffsetVectorWidth() {
+    return UInt4Vector.VALUE_WIDTH;
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/7f19b500/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java
index 072a5cb..c868c7d 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java
@@ -367,4 +367,4 @@ public class RecordIterator implements VectorAccessible {
     clear();
     clearInflightBatches();
   }
-}
\ No newline at end of file
+}

Reply via email to