DRILL-6323: Lateral Join - Lateral Join Batch Memory manager support using the 
record batch sizer


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/769999ef
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/769999ef
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/769999ef

Branch: refs/heads/master
Commit: 769999ef16c9d819d400c1407c5d919245b4a957
Parents: 7f19b50
Author: Sorabh Hamirwasia <shamirwa...@maprtech.com>
Authored: Mon Mar 19 12:00:22 2018 -0700
Committer: Parth Chandra <par...@apache.org>
Committed: Tue Apr 17 18:16:16 2018 -0700

----------------------------------------------------------------------
 .../drill/exec/ops/OperatorMetricRegistry.java  |   1 +
 .../physical/impl/join/LateralJoinBatch.java    | 470 +++++++++++--------
 .../impl/join/TestLateralJoinCorrectness.java   |  28 +-
 3 files changed, 284 insertions(+), 215 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/769999ef/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorMetricRegistry.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorMetricRegistry.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorMetricRegistry.java
index d9a5fdc..c1c0537 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorMetricRegistry.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorMetricRegistry.java
@@ -53,6 +53,7 @@ public class OperatorMetricRegistry {
     register(CoreOperatorType.PARQUET_ROW_GROUP_SCAN_VALUE, 
ParquetRecordReader.Metric.class);
     register(CoreOperatorType.FLATTEN_VALUE, FlattenRecordBatch.Metric.class);
     register(CoreOperatorType.MERGE_JOIN_VALUE, 
JoinBatchMemoryManager.Metric.class);
+    register(CoreOperatorType.LATERAL_JOIN_VALUE, 
JoinBatchMemoryManager.Metric.class);
   }
 
   private static void register(final int operatorType, final Class<? extends 
MetricDef> metricDef) {

http://git-wip-us.apache.org/repos/asf/drill/blob/769999ef/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
index 70ac11b..295ee78 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
@@ -17,10 +17,12 @@
  */
 package org.apache.drill.exec.physical.impl.join;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.exception.OutOfMemoryException;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.ops.FragmentContext;
@@ -28,11 +30,12 @@ import org.apache.drill.exec.physical.base.LateralContract;
 import org.apache.drill.exec.physical.config.LateralJoinPOP;
 import org.apache.drill.exec.record.AbstractBinaryRecordBatch;
 import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.JoinBatchMemoryManager;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.RecordBatchSizer;
 import org.apache.drill.exec.record.VectorAccessibleUtilities;
 import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.vector.AllocationHelper;
 import org.apache.drill.exec.vector.ValueVector;
 
 import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
@@ -49,23 +52,22 @@ import static 
org.apache.drill.exec.record.RecordBatch.IterOutcome.STOP;
 public class LateralJoinBatch extends 
AbstractBinaryRecordBatch<LateralJoinPOP> implements LateralContract {
   private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(LateralJoinBatch.class);
 
-  // Maximum number records in the outgoing batch
-  // Made public for testing
-  static int MAX_BATCH_SIZE = 4096;
-
   // Input indexes to correctly update the stats
   private static final int LEFT_INPUT = 0;
 
   private static final int RIGHT_INPUT = 1;
 
+  // Maximum number records in the outgoing batch
+  private int maxOutputRowCount;
+
   // Schema on the left side
-  private BatchSchema leftSchema = null;
+  private BatchSchema leftSchema;
 
   // Schema on the right side
-  private BatchSchema rightSchema = null;
+  private BatchSchema rightSchema;
 
   // Index in output batch to populate next row
-  private int outputIndex = 0;
+  private int outputIndex;
 
   // Current index of record in left incoming which is being processed
   private int leftJoinIndex = -1;
@@ -74,18 +76,214 @@ public class LateralJoinBatch extends 
AbstractBinaryRecordBatch<LateralJoinPOP>
   private int rightJoinIndex = -1;
 
   // flag to keep track if current left batch needs to be processed in future 
next call
-  private boolean processLeftBatchInFuture = false;
+  private boolean processLeftBatchInFuture;
 
   // Keep track if any matching right record was found for current left index 
record
-  private boolean matchedRecordFound = false;
+  private boolean matchedRecordFound;
+
+  private boolean useMemoryManager = true;
 
-  protected LateralJoinBatch(LateralJoinPOP popConfig, FragmentContext context,
-                             RecordBatch left, RecordBatch right) throws 
OutOfMemoryException {
+  /* 
****************************************************************************************************************
+   * Public Methods
+   * 
****************************************************************************************************************/
+  public LateralJoinBatch(LateralJoinPOP popConfig, FragmentContext context,
+                          RecordBatch left, RecordBatch right) throws 
OutOfMemoryException {
     super(popConfig, context, left, right);
     Preconditions.checkNotNull(left);
     Preconditions.checkNotNull(right);
+    final int configOutputBatchSize = (int) 
context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
+    batchMemoryManager = new JoinBatchMemoryManager(configOutputBatchSize, 
left, right);
+
+    // Initially it's set to default value of 64K and later for each new 
output row it will be set to the computed
+    // row count
+    maxOutputRowCount = batchMemoryManager.getOutputRowCount();
+  }
+
+  /**
+   * Method that get's left and right incoming batch and produce the output 
batch. If the left incoming batch is
+   * empty then next on right branch is not called and empty batch with 
correct outcome is returned. If non empty
+   * left incoming batch is received then it call's next on right branch to 
get an incoming and finally produces
+   * output.
+   * @return IterOutcome state of the lateral join batch
+   */
+  @Override
+  public IterOutcome innerNext() {
+
+    // We don't do anything special on FIRST state. Process left batch first 
and then right batch if need be
+    IterOutcome childOutcome = processLeftBatch();
+
+    // reset this state after calling processLeftBatch above.
+    processLeftBatchInFuture = false;
+
+    // If the left batch doesn't have any record in the incoming batch (with 
OK_NEW_SCHEMA/EMIT) or the state returned
+    // from left side is terminal state then just return the IterOutcome and 
don't call next() on right branch
+    if (isTerminalOutcome(childOutcome) || left.getRecordCount() == 0) {
+      container.setRecordCount(0);
+      return childOutcome;
+    }
+
+    // Left side has some records in the batch so let's process right batch
+    childOutcome = processRightBatch();
+
+    // reset the left & right outcomes to OK here and send the empty batch 
downstream
+    // Assumption being right side will always send OK_NEW_SCHEMA with empty 
batch which is what UNNEST will do
+    if (childOutcome == OK_NEW_SCHEMA) {
+      leftUpstream = (leftUpstream != EMIT) ? OK : leftUpstream;
+      rightUpstream = OK;
+      return childOutcome;
+    }
+
+    if (isTerminalOutcome(childOutcome)) {
+      return childOutcome;
+    }
+
+    // If OK_NEW_SCHEMA is seen only on non empty left batch but not on right 
batch, then we should setup schema in
+    // output container based on new left schema and old right schema. If 
schema change failed then return STOP
+    // downstream
+    if (leftUpstream == OK_NEW_SCHEMA && !handleSchemaChange()) {
+      return STOP;
+    }
+
+    // Setup the references of left, right and outgoing container in generated 
operator
+    state = BatchState.NOT_FIRST;
+
+    // Update the memory manager
+    updateMemoryManager(LEFT_INPUT);
+    updateMemoryManager(RIGHT_INPUT);
+
+    // allocate space for the outgoing batch
+    allocateVectors();
+
+    return produceOutputBatch();
+  }
+
+  @Override
+  public void close() {
+    updateBatchMemoryManagerStats();
+    super.close();
+  }
+
+  @Override
+  public int getRecordCount() {
+    return container.getRecordCount();
   }
 
+  /**
+   * Returns the left side incoming for the Lateral Join. Used by right branch 
leaf operator of Lateral
+   * to process the records at leftJoinIndex.
+   *
+   * @return - RecordBatch received as left side incoming
+   */
+  @Override
+  public RecordBatch getIncoming() {
+    Preconditions.checkState (left != null, "Retuning null left batch. It's 
unexpected since right side will only be " +
+      "called iff there is any valid left batch");
+    return left;
+  }
+
+  /**
+   * Returns the current row index which the calling operator should process 
in current incoming left record batch.
+   * LATERAL should never return it as -1 since that indicated current left 
batch is empty and LATERAL will never
+   * call next on right side with empty left batch
+   *
+   * @return - int - index of row to process.
+   */
+  @Override
+  public int getRecordIndex() {
+    Preconditions.checkState (leftJoinIndex < left.getRecordCount(),
+      String.format("Left join index: %d is out of bounds: %d", leftJoinIndex, 
left.getRecordCount()));
+    return leftJoinIndex;
+  }
+
+  /**
+   * Returns the current {@link 
org.apache.drill.exec.record.RecordBatch.IterOutcome} for the left incoming 
batch
+   */
+  @Override
+  public IterOutcome getLeftOutcome() {
+    return leftUpstream;
+  }
+
+  /* 
****************************************************************************************************************
+   * Protected Methods
+   * 
****************************************************************************************************************/
+
+  /**
+   * Method to get left and right batch during build schema phase for {@link 
LateralJoinBatch}. If left batch sees a
+   * failure outcome then we don't even call next on right branch, since there 
is no left incoming.
+   * @return true if both the left/right batch was received without failure 
outcome.
+   *         false if either of batch is received with failure outcome.
+   */
+  @Override
+  protected boolean prefetchFirstBatchFromBothSides() {
+    // Left can get batch with zero or more records with OK_NEW_SCHEMA outcome 
as first batch
+    leftUpstream = next(0, left);
+
+    boolean validBatch = setBatchState(leftUpstream);
+
+    if (validBatch) {
+      rightUpstream = next(1, right);
+      validBatch = setBatchState(rightUpstream);
+    }
+
+    // EMIT outcome is not expected as part of first batch from either side
+    if (leftUpstream == EMIT || rightUpstream == EMIT) {
+      state = BatchState.STOP;
+      throw new IllegalStateException("Unexpected IterOutcome.EMIT received 
either from left or right side in " +
+        "buildSchema phase");
+    }
+    return validBatch;
+  }
+
+  /**
+   * Prefetch a batch from left and right branch to know about the schema of 
each side. Then adds value vector in
+   * output container based on those schemas. For this phase LATERAL always 
expect's an empty batch from right side
+   * which UNNEST should abide by.
+   *
+   * @throws SchemaChangeException if batch schema was changed during execution
+   */
+  @Override
+  protected void buildSchema() throws SchemaChangeException {
+    // Prefetch a RecordBatch from both left and right branch
+    if (!prefetchFirstBatchFromBothSides()) {
+      return;
+    }
+    Preconditions.checkState(right.getRecordCount() == 0, "Unexpected 
non-empty first right batch received");
+
+    // Update the record memory manager
+    updateMemoryManager(LEFT_INPUT);
+    updateMemoryManager(RIGHT_INPUT);
+
+    // Setup output container schema based on known left and right schema
+    setupNewSchema();
+
+    // Release the vectors received from right side
+    VectorAccessibleUtilities.clear(right);
+
+    // Set join index as invalid (-1) if the left side is empty, else set it 
to 0
+    leftJoinIndex = (left.getRecordCount() <= 0) ? -1 : 0;
+    rightJoinIndex = -1;
+
+    // Reset the left side of the IterOutcome since for this call, 
OK_NEW_SCHEMA will be returned correctly
+    // by buildSchema caller and we should treat the batch as received with OK 
outcome.
+    leftUpstream = OK;
+    rightUpstream = OK;
+  }
+
+  @Override
+  protected void killIncoming(boolean sendUpstream) {
+    this.left.kill(sendUpstream);
+    // Reset the left side outcome as STOP since as part of right kill when 
UNNEST will ask IterOutcome of left incoming
+    // from LATERAL and based on that it can make decision if the kill is 
coming from downstream to LATERAL or upstream
+    // to LATERAL. Like LIMIT operator being present downstream to LATERAL or 
upstream to LATERAL and downstream to
+    // UNNEST.
+    leftUpstream = STOP;
+    this.right.kill(sendUpstream);
+  }
+
+  /* 
****************************************************************************************************************
+   * Private Methods
+   * 
****************************************************************************************************************/
+
   private boolean handleSchemaChange() {
     try {
       stats.startSetup();
@@ -268,60 +466,6 @@ public class LateralJoinBatch extends 
AbstractBinaryRecordBatch<LateralJoinPOP>
   }
 
   /**
-   * Method that get's left and right incoming batch and produce the output 
batch. If the left incoming batch is
-   * empty then next on right branch is not called and empty batch with 
correct outcome is returned. If non empty
-   * left incoming batch is received then it call's next on right branch to 
get an incoming and finally produces
-   * output.
-   * @return IterOutcome state of the lateral join batch
-   */
-  @Override
-  public IterOutcome innerNext() {
-
-    // We don't do anything special on FIRST state. Process left batch first 
and then right batch if need be
-    IterOutcome childOutcome = processLeftBatch();
-
-    // reset this state after calling processLeftBatch above.
-    processLeftBatchInFuture = false;
-
-    // If the left batch doesn't have any record in the incoming batch (with 
OK_NEW_SCHEMA/EMIT) or the state returned
-    // from left side is terminal state then just return the IterOutcome and 
don't call next() on right branch
-    if (isTerminalOutcome(childOutcome) || left.getRecordCount() == 0) {
-      container.setRecordCount(0);
-      return childOutcome;
-    }
-
-    // Left side has some records in the batch so let's process right batch
-    childOutcome = processRightBatch();
-
-    // reset the left & right outcomes to OK here and send the empty batch 
downstream
-    // Assumption being right side will always send OK_NEW_SCHEMA with empty 
batch which is what UNNEST will do
-    if (childOutcome == OK_NEW_SCHEMA) {
-      leftUpstream = (leftUpstream != EMIT) ? OK : leftUpstream;
-      rightUpstream = OK;
-      return childOutcome;
-    }
-
-    if (isTerminalOutcome(childOutcome)) {
-      return childOutcome;
-    }
-
-    // If OK_NEW_SCHEMA is seen only on non empty left batch but not on right 
batch, then we should setup schema in
-    // output container based on new left schema and old right schema. If 
schema change failed then return STOP
-    // downstream
-    if (leftUpstream == OK_NEW_SCHEMA && !handleSchemaChange()) {
-      return STOP;
-    }
-
-    // Setup the references of left, right and outgoing container in generated 
operator
-    state = BatchState.NOT_FIRST;
-
-    // allocate space for the outgoing batch
-    allocateVectors();
-
-    return produceOutputBatch();
-  }
-
-  /**
    * Get's the current left and right incoming batch and does the cross join 
to fill the output batch. If all the
    * records in the either or both the batches are consumed then it get's next 
batch from that branch depending upon
    * if output batch still has some space left. If output batch is full then 
the output if finalized to be sent
@@ -335,10 +479,10 @@ public class LateralJoinBatch extends 
AbstractBinaryRecordBatch<LateralJoinPOP>
     boolean isLeftProcessed = false;
 
     // Try to fully pack the outgoing container
-    while (outputIndex < LateralJoinBatch.MAX_BATCH_SIZE) {
+    while (!isOutgoingBatchFull()) {
       final int previousOutputCount = outputIndex;
       // invoke the runtime generated method to emit records in the output 
batch for each leftJoinIndex
-      outputIndex = crossJoinAndOutputRecords(leftJoinIndex, rightJoinIndex, 
outputIndex);
+      crossJoinAndOutputRecords();
 
       // We have produced some records in outgoing container, hence there must 
be a match found for left record
       if (outputIndex > previousOutputCount) {
@@ -385,7 +529,7 @@ public class LateralJoinBatch extends 
AbstractBinaryRecordBatch<LateralJoinPOP>
       }
 
       // Check if output batch still has some space
-      if (outputIndex < MAX_BATCH_SIZE) {
+      if (!isOutgoingBatchFull()) {
         // Check if left side still has records or not
         if (isLeftProcessed) {
           // The current left batch was with EMIT/OK_NEW_SCHEMA outcome, then 
return output to downstream layer before
@@ -417,6 +561,9 @@ public class LateralJoinBatch extends 
AbstractBinaryRecordBatch<LateralJoinPOP>
               isLeftProcessed = true;
               break;
             }
+
+            // Update the batch memory manager to use new left incoming batch
+            updateMemoryManager(LEFT_INPUT);
           }
         }
 
@@ -434,6 +581,9 @@ public class LateralJoinBatch extends 
AbstractBinaryRecordBatch<LateralJoinPOP>
           finalizeOutputContainer();
           return rightUpstream;
         }
+
+        // Update the batch memory manager to use new right incoming batch
+        updateMemoryManager(RIGHT_INPUT);
       }
     } // output batch is full to its max capacity
 
@@ -468,6 +618,8 @@ public class LateralJoinBatch extends 
AbstractBinaryRecordBatch<LateralJoinPOP>
     // Set the record count in the container
     container.setRecordCount(outputIndex);
     container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
+
+    batchMemoryManager.updateOutgoingStats(outputIndex);
     logger.debug("Number of records emitted: " + outputIndex);
 
     // Update the output index for next output batch to zero
@@ -566,9 +718,10 @@ public class LateralJoinBatch extends 
AbstractBinaryRecordBatch<LateralJoinPOP>
   /**
    * Simple method to allocate space for all the vectors in the container.
    */
-  private void allocateVectors() {
-    for (final VectorWrapper<?> vw : container) {
-      AllocationHelper.allocateNew(vw.getValueVector(), MAX_BATCH_SIZE);
+  private void allocateVectors() {;
+    for (VectorWrapper w : container) {
+      RecordBatchSizer.ColumnSize colSize = 
batchMemoryManager.getColumnSize(w.getField().getName());
+      colSize.allocateVector(w.getValueVector(), maxOutputRowCount);
     }
   }
 
@@ -590,154 +743,32 @@ public class LateralJoinBatch extends 
AbstractBinaryRecordBatch<LateralJoinPOP>
   }
 
   /**
-   * Method to get left and right batch during build schema phase for {@link 
LateralJoinBatch}. If left batch sees a
-   * failure outcome then we don't even call next on right branch, since there 
is no left incoming.
-   * @return true if both the left/right batch was received without failure 
outcome.
-   *         false if either of batch is received with failure outcome.
-   */
-  @Override
-  protected boolean prefetchFirstBatchFromBothSides() {
-    // Left can get batch with zero or more records with OK_NEW_SCHEMA outcome 
as first batch
-    leftUpstream = next(0, left);
-
-    boolean validBatch = setBatchState(leftUpstream);
-
-    if (validBatch) {
-      rightUpstream = next(1, right);
-      validBatch = setBatchState(rightUpstream);
-    }
-
-    // EMIT outcome is not expected as part of first batch from either side
-    if (leftUpstream == EMIT || rightUpstream == EMIT) {
-      state = BatchState.STOP;
-      throw new IllegalStateException("Unexpected IterOutcome.EMIT received 
either from left or right side in " +
-        "buildSchema phase");
-    }
-    return validBatch;
-  }
-
-  /**
-   * Prefetch a batch from left and right branch to know about the schema of 
each side. Then adds value vector in
-   * output container based on those schemas. For this phase LATERAL always 
expect's an empty batch from right side
-   * which UNNEST should abide by.
-   *
-   * @throws SchemaChangeException if batch schema was changed during execution
-   */
-  @Override
-  protected void buildSchema() throws SchemaChangeException {
-    // Prefetch a RecordBatch from both left and right branch
-    if (!prefetchFirstBatchFromBothSides()) {
-      return;
-    }
-    Preconditions.checkState(right.getRecordCount() == 0, "Unexpected 
non-empty first right batch received");
-
-    // Setup output container schema based on known left and right schema
-    setupNewSchema();
-
-    // Release the vectors received from right side
-    VectorAccessibleUtilities.clear(right);
-
-    // Set join index as invalid (-1) if the left side is empty, else set it 
to 0
-    leftJoinIndex = (left.getRecordCount() <= 0) ? -1 : 0;
-    rightJoinIndex = -1;
-
-    // Reset the left side of the IterOutcome since for this call, 
OK_NEW_SCHEMA will be returned correctly
-    // by buildSchema caller and we should treat the batch as received with OK 
outcome.
-    leftUpstream = OK;
-    rightUpstream = OK;
-  }
-
-  @Override
-  public void close() {
-    super.close();
-  }
-
-  @Override
-  protected void killIncoming(boolean sendUpstream) {
-    this.left.kill(sendUpstream);
-    // Reset the left side outcome as STOP since as part of right kill when 
UNNEST will ask IterOutcome of left incoming
-    // from LATERAL and based on that it can make decision if the kill is 
coming from downstream to LATERAL or upstream
-    // to LATERAL. Like LIMIT operator being present downstream to LATERAL or 
upstream to LATERAL and downstream to
-    // UNNEST.
-    leftUpstream = STOP;
-    this.right.kill(sendUpstream);
-  }
-
-  @Override
-  public int getRecordCount() {
-    return container.getRecordCount();
-  }
-
-  /**
-   * Returns the left side incoming for the Lateral Join. Used by right branch 
leaf operator of Lateral
-   * to process the records at leftJoinIndex.
-   *
-   * @return - RecordBatch received as left side incoming
-   */
-  @Override
-  public RecordBatch getIncoming() {
-    Preconditions.checkState (left != null, "Retuning null left batch. It's 
unexpected since right side will only be " +
-      "called iff there is any valid left batch");
-    return left;
-  }
-
-  /**
-   * Returns the current row index which the calling operator should process 
in current incoming left record batch.
-   * LATERAL should never return it as -1 since that indicated current left 
batch is empty and LATERAL will never
-   * call next on right side with empty left batch
-   *
-   * @return - int - index of row to process.
-   */
-  @Override
-  public int getRecordIndex() {
-    Preconditions.checkState (leftJoinIndex < left.getRecordCount(),
-      String.format("Left join index: %d is out of bounds: %d", leftJoinIndex, 
left.getRecordCount()));
-    return leftJoinIndex;
-  }
-
-  /**
-   * Returns the current {@link 
org.apache.drill.exec.record.RecordBatch.IterOutcome} for the left incoming 
batch
-   */
-  @Override
-  public IterOutcome getLeftOutcome() {
-    return leftUpstream;
-  }
-
-  /**
    * Main entry point for producing the output records. This method populates 
the output batch after cross join of
    * the record in a given left batch at left index and all the corresponding 
right batches produced for
    * this left index. The right container is copied starting from rightIndex 
until number of records in the container.
-   *
-   * @param leftIndex - row index in left incoming batch
-   * @param rightIndex - row index in right incoming batch
-   * @param outIndex - row index in output batch
-   *
-   * @return - final row index of output batch
    */
-  private int crossJoinAndOutputRecords(final int leftIndex, final int 
rightIndex, final int outIndex) {
+  private void crossJoinAndOutputRecords() {
     logger.trace("Producing output for leftIndex: {}, rightIndex: {}, 
rightRecordCount: {} and outputIndex: {}",
-      leftIndex, rightIndex, right.getRecordCount(), outIndex);
+      leftJoinIndex, rightJoinIndex, right.getRecordCount(), outputIndex);
     final int rightRecordCount = right.getRecordCount();
-    int outBatchIndex = outIndex;
 
     // If there is no record in right batch just return current index in 
output batch
     if (rightRecordCount <= 0) {
-      return outBatchIndex;
+      return;
     }
 
     // Check if right batch is empty since we have to handle left join case
-    Preconditions.checkState(rightIndex != -1, "Right batch record count is >0 
but index is -1");
+    Preconditions.checkState(rightJoinIndex != -1, "Right batch record count 
is >0 but index is -1");
     // For every record in right side just emit left and right records in 
output container
-    for (int i = rightIndex; i < rightRecordCount; ++i) {
-      emitLeft(leftIndex, outBatchIndex);
-      emitRight(i, outBatchIndex);
-      ++outBatchIndex;
+    for (int i = rightJoinIndex; i < rightRecordCount; ++i) {
+      emitLeft(leftJoinIndex, outputIndex);
+      emitRight(i, outputIndex);
+      ++outputIndex;
 
-      if (outBatchIndex >= LateralJoinBatch.MAX_BATCH_SIZE) {
+      if (isOutgoingBatchFull()) {
         break;
       }
     }
-    return outBatchIndex;
   }
 
   /**
@@ -794,4 +825,37 @@ public class LateralJoinBatch extends 
AbstractBinaryRecordBatch<LateralJoinPOP>
   private void emitRight(int rightIndex, int outIndex) {
     copyDataToOutputVectors(rightIndex, outIndex, right, 0, 
rightSchema.getFieldCount(), leftSchema.getFieldCount());
   }
+
+  /**
+   * Used only for testing for cases when multiple output batches are produced 
for same input set
+   * @param outputRowCount - Max rows that output batch can hold
+   */
+  @VisibleForTesting
+  public void setMaxOutputRowCount(int outputRowCount) {
+    maxOutputRowCount = outputRowCount;
+  }
+
+  /**
+   * Used only for testing to disable output batch calculation using memory 
manager and instead use the static max
+   * value set by {@link LateralJoinBatch#setMaxOutputRowCount(int)}
+   * @param useMemoryManager - false - disable memory manager update to take 
effect, true enable memory manager update
+   */
+  @VisibleForTesting
+  public void setUseMemoryManager(boolean useMemoryManager) {
+    this.useMemoryManager = useMemoryManager;
+  }
+
+  private boolean isOutgoingBatchFull() {
+    return outputIndex >= maxOutputRowCount;
+  }
+
+  private void updateMemoryManager(int inputIndex) {
+    // For cases where all the previous input were consumed and send with 
previous output batch. But now we are building
+    // a new output batch with new incoming then it will not cause any problem 
since outputIndex will be 0
+    final int newOutputRowCount = batchMemoryManager.update(inputIndex, 
outputIndex);
+
+    if (useMemoryManager) {
+      maxOutputRowCount = newOutputRowCount;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/769999ef/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java
index b237ef7..9d93cb3 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java
@@ -1311,8 +1311,9 @@ public class TestLateralJoinCorrectness extends 
SubOperatorTest {
     final LateralJoinBatch ljBatch = new LateralJoinBatch(ljPopConfig, 
fixture.getFragmentContext(),
       leftMockBatch, rightMockBatch);
 
-    int originalMaxBatchSize = LateralJoinBatch.MAX_BATCH_SIZE;
-    LateralJoinBatch.MAX_BATCH_SIZE = 2;
+    final int maxBatchSize = 2;
+    ljBatch.setUseMemoryManager(false);
+    ljBatch.setMaxOutputRowCount(maxBatchSize);
 
     try {
       int totalRecordCount = 0;
@@ -1321,11 +1322,11 @@ public class TestLateralJoinCorrectness extends 
SubOperatorTest {
       // 1st output batch
       assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next());
       totalRecordCount += ljBatch.getRecordCount();
-      assertTrue(ljBatch.getRecordCount() == LateralJoinBatch.MAX_BATCH_SIZE);
+      assertTrue(ljBatch.getRecordCount() == maxBatchSize);
 
       // 2nd output batch
       assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next());
-      assertTrue(ljBatch.getRecordCount() == LateralJoinBatch.MAX_BATCH_SIZE);
+      assertTrue(ljBatch.getRecordCount() == maxBatchSize);
       totalRecordCount += ljBatch.getRecordCount();
 
       // 3rd output batch
@@ -1346,7 +1347,6 @@ public class TestLateralJoinCorrectness extends 
SubOperatorTest {
       rightMockBatch.close();
       leftRowSet2.clear();
       nonEmptyRightRowSet2.clear();
-      LateralJoinBatch.MAX_BATCH_SIZE = originalMaxBatchSize;
     }
   }
 
@@ -1693,8 +1693,9 @@ public class TestLateralJoinCorrectness extends 
SubOperatorTest {
     final LateralJoinBatch ljBatch = new LateralJoinBatch(popConfig, 
fixture.getFragmentContext(),
       leftMockBatch, rightMockBatch);
 
-    int originalMaxBatchSize = LateralJoinBatch.MAX_BATCH_SIZE;
-    LateralJoinBatch.MAX_BATCH_SIZE = 2;
+    int originalMaxBatchSize = 2;
+    ljBatch.setUseMemoryManager(false);
+    ljBatch.setMaxOutputRowCount(originalMaxBatchSize);
 
     try {
       final int expectedOutputRecordCount = 7; // 3 for first left row and 1 
for second left row
@@ -1717,7 +1718,6 @@ public class TestLateralJoinCorrectness extends 
SubOperatorTest {
       ljBatch.close();
       leftMockBatch.close();
       rightMockBatch.close();
-      LateralJoinBatch.MAX_BATCH_SIZE = originalMaxBatchSize;
     }
   }
 
@@ -1959,14 +1959,15 @@ public class TestLateralJoinCorrectness extends 
SubOperatorTest {
     final CloseableRecordBatch rightMockBatch_1 = new 
MockRecordBatch(fixture.getFragmentContext(), operatorContext,
       rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
 
-    int originalMaxBatchSize = LateralJoinBatch.MAX_BATCH_SIZE;
-    LateralJoinBatch.MAX_BATCH_SIZE = 2;
-
     final LateralJoinPOP popConfig_1 = new LateralJoinPOP(null, null, 
JoinRelType.FULL);
 
     final LateralJoinBatch lowerLateral = new LateralJoinBatch(popConfig_1, 
fixture.getFragmentContext(),
       leftMockBatch_1, rightMockBatch_1);
 
+    // Use below api to enforce static output batch limit
+    lowerLateral.setUseMemoryManager(false);
+    lowerLateral.setMaxOutputRowCount(2);
+
     // ** Prepare second pair of left and right batch for upper LATERAL 
Lateral_2 **
 
     // Create left input schema
@@ -1997,6 +1998,10 @@ public class TestLateralJoinCorrectness extends 
SubOperatorTest {
     final LateralJoinBatch upperLateral = new LateralJoinBatch(popConfig_1, 
fixture.getFragmentContext(),
       leftMockBatch_2, lowerLateral);
 
+    // Use below api to enforce static output batch limit
+    upperLateral.setUseMemoryManager(false);
+    upperLateral.setMaxOutputRowCount(2);
+
     try {
       final int expectedOutputRecordCount = 6;
 
@@ -2021,7 +2026,6 @@ public class TestLateralJoinCorrectness extends 
SubOperatorTest {
       rightMockBatch_1.close();
       leftContainer2.clear();
       leftOutcomes2.clear();
-      LateralJoinBatch.MAX_BATCH_SIZE = originalMaxBatchSize;
     }
   }
 

Reply via email to