DRILL-6323: Lateral Join - Lateral Join Batch Memory manager support using the record batch sizer
Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/769999ef Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/769999ef Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/769999ef Branch: refs/heads/master Commit: 769999ef16c9d819d400c1407c5d919245b4a957 Parents: 7f19b50 Author: Sorabh Hamirwasia <shamirwa...@maprtech.com> Authored: Mon Mar 19 12:00:22 2018 -0700 Committer: Parth Chandra <par...@apache.org> Committed: Tue Apr 17 18:16:16 2018 -0700 ---------------------------------------------------------------------- .../drill/exec/ops/OperatorMetricRegistry.java | 1 + .../physical/impl/join/LateralJoinBatch.java | 470 +++++++++++-------- .../impl/join/TestLateralJoinCorrectness.java | 28 +- 3 files changed, 284 insertions(+), 215 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/769999ef/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorMetricRegistry.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorMetricRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorMetricRegistry.java index d9a5fdc..c1c0537 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorMetricRegistry.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorMetricRegistry.java @@ -53,6 +53,7 @@ public class OperatorMetricRegistry { register(CoreOperatorType.PARQUET_ROW_GROUP_SCAN_VALUE, ParquetRecordReader.Metric.class); register(CoreOperatorType.FLATTEN_VALUE, FlattenRecordBatch.Metric.class); register(CoreOperatorType.MERGE_JOIN_VALUE, JoinBatchMemoryManager.Metric.class); + register(CoreOperatorType.LATERAL_JOIN_VALUE, JoinBatchMemoryManager.Metric.class); } private static void register(final int operatorType, final Class<? extends MetricDef> metricDef) { http://git-wip-us.apache.org/repos/asf/drill/blob/769999ef/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java index 70ac11b..295ee78 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java @@ -17,10 +17,12 @@ */ package org.apache.drill.exec.physical.impl.join; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import org.apache.calcite.rel.core.JoinRelType; import org.apache.drill.common.types.TypeProtos; import org.apache.drill.common.types.Types; +import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.exception.OutOfMemoryException; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.ops.FragmentContext; @@ -28,11 +30,12 @@ import org.apache.drill.exec.physical.base.LateralContract; import org.apache.drill.exec.physical.config.LateralJoinPOP; import org.apache.drill.exec.record.AbstractBinaryRecordBatch; import org.apache.drill.exec.record.BatchSchema; +import org.apache.drill.exec.record.JoinBatchMemoryManager; import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.record.RecordBatch; +import org.apache.drill.exec.record.RecordBatchSizer; import org.apache.drill.exec.record.VectorAccessibleUtilities; import org.apache.drill.exec.record.VectorWrapper; -import org.apache.drill.exec.vector.AllocationHelper; import org.apache.drill.exec.vector.ValueVector; import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT; @@ -49,23 +52,22 @@ import static org.apache.drill.exec.record.RecordBatch.IterOutcome.STOP; public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> implements LateralContract { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LateralJoinBatch.class); - // Maximum number records in the outgoing batch - // Made public for testing - static int MAX_BATCH_SIZE = 4096; - // Input indexes to correctly update the stats private static final int LEFT_INPUT = 0; private static final int RIGHT_INPUT = 1; + // Maximum number records in the outgoing batch + private int maxOutputRowCount; + // Schema on the left side - private BatchSchema leftSchema = null; + private BatchSchema leftSchema; // Schema on the right side - private BatchSchema rightSchema = null; + private BatchSchema rightSchema; // Index in output batch to populate next row - private int outputIndex = 0; + private int outputIndex; // Current index of record in left incoming which is being processed private int leftJoinIndex = -1; @@ -74,18 +76,214 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> private int rightJoinIndex = -1; // flag to keep track if current left batch needs to be processed in future next call - private boolean processLeftBatchInFuture = false; + private boolean processLeftBatchInFuture; // Keep track if any matching right record was found for current left index record - private boolean matchedRecordFound = false; + private boolean matchedRecordFound; + + private boolean useMemoryManager = true; - protected LateralJoinBatch(LateralJoinPOP popConfig, FragmentContext context, - RecordBatch left, RecordBatch right) throws OutOfMemoryException { + /* **************************************************************************************************************** + * Public Methods + * ****************************************************************************************************************/ + public LateralJoinBatch(LateralJoinPOP popConfig, FragmentContext context, + RecordBatch left, RecordBatch right) throws OutOfMemoryException { super(popConfig, context, left, right); Preconditions.checkNotNull(left); Preconditions.checkNotNull(right); + final int configOutputBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR); + batchMemoryManager = new JoinBatchMemoryManager(configOutputBatchSize, left, right); + + // Initially it's set to default value of 64K and later for each new output row it will be set to the computed + // row count + maxOutputRowCount = batchMemoryManager.getOutputRowCount(); + } + + /** + * Method that get's left and right incoming batch and produce the output batch. If the left incoming batch is + * empty then next on right branch is not called and empty batch with correct outcome is returned. If non empty + * left incoming batch is received then it call's next on right branch to get an incoming and finally produces + * output. + * @return IterOutcome state of the lateral join batch + */ + @Override + public IterOutcome innerNext() { + + // We don't do anything special on FIRST state. Process left batch first and then right batch if need be + IterOutcome childOutcome = processLeftBatch(); + + // reset this state after calling processLeftBatch above. + processLeftBatchInFuture = false; + + // If the left batch doesn't have any record in the incoming batch (with OK_NEW_SCHEMA/EMIT) or the state returned + // from left side is terminal state then just return the IterOutcome and don't call next() on right branch + if (isTerminalOutcome(childOutcome) || left.getRecordCount() == 0) { + container.setRecordCount(0); + return childOutcome; + } + + // Left side has some records in the batch so let's process right batch + childOutcome = processRightBatch(); + + // reset the left & right outcomes to OK here and send the empty batch downstream + // Assumption being right side will always send OK_NEW_SCHEMA with empty batch which is what UNNEST will do + if (childOutcome == OK_NEW_SCHEMA) { + leftUpstream = (leftUpstream != EMIT) ? OK : leftUpstream; + rightUpstream = OK; + return childOutcome; + } + + if (isTerminalOutcome(childOutcome)) { + return childOutcome; + } + + // If OK_NEW_SCHEMA is seen only on non empty left batch but not on right batch, then we should setup schema in + // output container based on new left schema and old right schema. If schema change failed then return STOP + // downstream + if (leftUpstream == OK_NEW_SCHEMA && !handleSchemaChange()) { + return STOP; + } + + // Setup the references of left, right and outgoing container in generated operator + state = BatchState.NOT_FIRST; + + // Update the memory manager + updateMemoryManager(LEFT_INPUT); + updateMemoryManager(RIGHT_INPUT); + + // allocate space for the outgoing batch + allocateVectors(); + + return produceOutputBatch(); + } + + @Override + public void close() { + updateBatchMemoryManagerStats(); + super.close(); + } + + @Override + public int getRecordCount() { + return container.getRecordCount(); } + /** + * Returns the left side incoming for the Lateral Join. Used by right branch leaf operator of Lateral + * to process the records at leftJoinIndex. + * + * @return - RecordBatch received as left side incoming + */ + @Override + public RecordBatch getIncoming() { + Preconditions.checkState (left != null, "Retuning null left batch. It's unexpected since right side will only be " + + "called iff there is any valid left batch"); + return left; + } + + /** + * Returns the current row index which the calling operator should process in current incoming left record batch. + * LATERAL should never return it as -1 since that indicated current left batch is empty and LATERAL will never + * call next on right side with empty left batch + * + * @return - int - index of row to process. + */ + @Override + public int getRecordIndex() { + Preconditions.checkState (leftJoinIndex < left.getRecordCount(), + String.format("Left join index: %d is out of bounds: %d", leftJoinIndex, left.getRecordCount())); + return leftJoinIndex; + } + + /** + * Returns the current {@link org.apache.drill.exec.record.RecordBatch.IterOutcome} for the left incoming batch + */ + @Override + public IterOutcome getLeftOutcome() { + return leftUpstream; + } + + /* **************************************************************************************************************** + * Protected Methods + * ****************************************************************************************************************/ + + /** + * Method to get left and right batch during build schema phase for {@link LateralJoinBatch}. If left batch sees a + * failure outcome then we don't even call next on right branch, since there is no left incoming. + * @return true if both the left/right batch was received without failure outcome. + * false if either of batch is received with failure outcome. + */ + @Override + protected boolean prefetchFirstBatchFromBothSides() { + // Left can get batch with zero or more records with OK_NEW_SCHEMA outcome as first batch + leftUpstream = next(0, left); + + boolean validBatch = setBatchState(leftUpstream); + + if (validBatch) { + rightUpstream = next(1, right); + validBatch = setBatchState(rightUpstream); + } + + // EMIT outcome is not expected as part of first batch from either side + if (leftUpstream == EMIT || rightUpstream == EMIT) { + state = BatchState.STOP; + throw new IllegalStateException("Unexpected IterOutcome.EMIT received either from left or right side in " + + "buildSchema phase"); + } + return validBatch; + } + + /** + * Prefetch a batch from left and right branch to know about the schema of each side. Then adds value vector in + * output container based on those schemas. For this phase LATERAL always expect's an empty batch from right side + * which UNNEST should abide by. + * + * @throws SchemaChangeException if batch schema was changed during execution + */ + @Override + protected void buildSchema() throws SchemaChangeException { + // Prefetch a RecordBatch from both left and right branch + if (!prefetchFirstBatchFromBothSides()) { + return; + } + Preconditions.checkState(right.getRecordCount() == 0, "Unexpected non-empty first right batch received"); + + // Update the record memory manager + updateMemoryManager(LEFT_INPUT); + updateMemoryManager(RIGHT_INPUT); + + // Setup output container schema based on known left and right schema + setupNewSchema(); + + // Release the vectors received from right side + VectorAccessibleUtilities.clear(right); + + // Set join index as invalid (-1) if the left side is empty, else set it to 0 + leftJoinIndex = (left.getRecordCount() <= 0) ? -1 : 0; + rightJoinIndex = -1; + + // Reset the left side of the IterOutcome since for this call, OK_NEW_SCHEMA will be returned correctly + // by buildSchema caller and we should treat the batch as received with OK outcome. + leftUpstream = OK; + rightUpstream = OK; + } + + @Override + protected void killIncoming(boolean sendUpstream) { + this.left.kill(sendUpstream); + // Reset the left side outcome as STOP since as part of right kill when UNNEST will ask IterOutcome of left incoming + // from LATERAL and based on that it can make decision if the kill is coming from downstream to LATERAL or upstream + // to LATERAL. Like LIMIT operator being present downstream to LATERAL or upstream to LATERAL and downstream to + // UNNEST. + leftUpstream = STOP; + this.right.kill(sendUpstream); + } + + /* **************************************************************************************************************** + * Private Methods + * ****************************************************************************************************************/ + private boolean handleSchemaChange() { try { stats.startSetup(); @@ -268,60 +466,6 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> } /** - * Method that get's left and right incoming batch and produce the output batch. If the left incoming batch is - * empty then next on right branch is not called and empty batch with correct outcome is returned. If non empty - * left incoming batch is received then it call's next on right branch to get an incoming and finally produces - * output. - * @return IterOutcome state of the lateral join batch - */ - @Override - public IterOutcome innerNext() { - - // We don't do anything special on FIRST state. Process left batch first and then right batch if need be - IterOutcome childOutcome = processLeftBatch(); - - // reset this state after calling processLeftBatch above. - processLeftBatchInFuture = false; - - // If the left batch doesn't have any record in the incoming batch (with OK_NEW_SCHEMA/EMIT) or the state returned - // from left side is terminal state then just return the IterOutcome and don't call next() on right branch - if (isTerminalOutcome(childOutcome) || left.getRecordCount() == 0) { - container.setRecordCount(0); - return childOutcome; - } - - // Left side has some records in the batch so let's process right batch - childOutcome = processRightBatch(); - - // reset the left & right outcomes to OK here and send the empty batch downstream - // Assumption being right side will always send OK_NEW_SCHEMA with empty batch which is what UNNEST will do - if (childOutcome == OK_NEW_SCHEMA) { - leftUpstream = (leftUpstream != EMIT) ? OK : leftUpstream; - rightUpstream = OK; - return childOutcome; - } - - if (isTerminalOutcome(childOutcome)) { - return childOutcome; - } - - // If OK_NEW_SCHEMA is seen only on non empty left batch but not on right batch, then we should setup schema in - // output container based on new left schema and old right schema. If schema change failed then return STOP - // downstream - if (leftUpstream == OK_NEW_SCHEMA && !handleSchemaChange()) { - return STOP; - } - - // Setup the references of left, right and outgoing container in generated operator - state = BatchState.NOT_FIRST; - - // allocate space for the outgoing batch - allocateVectors(); - - return produceOutputBatch(); - } - - /** * Get's the current left and right incoming batch and does the cross join to fill the output batch. If all the * records in the either or both the batches are consumed then it get's next batch from that branch depending upon * if output batch still has some space left. If output batch is full then the output if finalized to be sent @@ -335,10 +479,10 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> boolean isLeftProcessed = false; // Try to fully pack the outgoing container - while (outputIndex < LateralJoinBatch.MAX_BATCH_SIZE) { + while (!isOutgoingBatchFull()) { final int previousOutputCount = outputIndex; // invoke the runtime generated method to emit records in the output batch for each leftJoinIndex - outputIndex = crossJoinAndOutputRecords(leftJoinIndex, rightJoinIndex, outputIndex); + crossJoinAndOutputRecords(); // We have produced some records in outgoing container, hence there must be a match found for left record if (outputIndex > previousOutputCount) { @@ -385,7 +529,7 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> } // Check if output batch still has some space - if (outputIndex < MAX_BATCH_SIZE) { + if (!isOutgoingBatchFull()) { // Check if left side still has records or not if (isLeftProcessed) { // The current left batch was with EMIT/OK_NEW_SCHEMA outcome, then return output to downstream layer before @@ -417,6 +561,9 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> isLeftProcessed = true; break; } + + // Update the batch memory manager to use new left incoming batch + updateMemoryManager(LEFT_INPUT); } } @@ -434,6 +581,9 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> finalizeOutputContainer(); return rightUpstream; } + + // Update the batch memory manager to use new right incoming batch + updateMemoryManager(RIGHT_INPUT); } } // output batch is full to its max capacity @@ -468,6 +618,8 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> // Set the record count in the container container.setRecordCount(outputIndex); container.buildSchema(BatchSchema.SelectionVectorMode.NONE); + + batchMemoryManager.updateOutgoingStats(outputIndex); logger.debug("Number of records emitted: " + outputIndex); // Update the output index for next output batch to zero @@ -566,9 +718,10 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> /** * Simple method to allocate space for all the vectors in the container. */ - private void allocateVectors() { - for (final VectorWrapper<?> vw : container) { - AllocationHelper.allocateNew(vw.getValueVector(), MAX_BATCH_SIZE); + private void allocateVectors() {; + for (VectorWrapper w : container) { + RecordBatchSizer.ColumnSize colSize = batchMemoryManager.getColumnSize(w.getField().getName()); + colSize.allocateVector(w.getValueVector(), maxOutputRowCount); } } @@ -590,154 +743,32 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> } /** - * Method to get left and right batch during build schema phase for {@link LateralJoinBatch}. If left batch sees a - * failure outcome then we don't even call next on right branch, since there is no left incoming. - * @return true if both the left/right batch was received without failure outcome. - * false if either of batch is received with failure outcome. - */ - @Override - protected boolean prefetchFirstBatchFromBothSides() { - // Left can get batch with zero or more records with OK_NEW_SCHEMA outcome as first batch - leftUpstream = next(0, left); - - boolean validBatch = setBatchState(leftUpstream); - - if (validBatch) { - rightUpstream = next(1, right); - validBatch = setBatchState(rightUpstream); - } - - // EMIT outcome is not expected as part of first batch from either side - if (leftUpstream == EMIT || rightUpstream == EMIT) { - state = BatchState.STOP; - throw new IllegalStateException("Unexpected IterOutcome.EMIT received either from left or right side in " + - "buildSchema phase"); - } - return validBatch; - } - - /** - * Prefetch a batch from left and right branch to know about the schema of each side. Then adds value vector in - * output container based on those schemas. For this phase LATERAL always expect's an empty batch from right side - * which UNNEST should abide by. - * - * @throws SchemaChangeException if batch schema was changed during execution - */ - @Override - protected void buildSchema() throws SchemaChangeException { - // Prefetch a RecordBatch from both left and right branch - if (!prefetchFirstBatchFromBothSides()) { - return; - } - Preconditions.checkState(right.getRecordCount() == 0, "Unexpected non-empty first right batch received"); - - // Setup output container schema based on known left and right schema - setupNewSchema(); - - // Release the vectors received from right side - VectorAccessibleUtilities.clear(right); - - // Set join index as invalid (-1) if the left side is empty, else set it to 0 - leftJoinIndex = (left.getRecordCount() <= 0) ? -1 : 0; - rightJoinIndex = -1; - - // Reset the left side of the IterOutcome since for this call, OK_NEW_SCHEMA will be returned correctly - // by buildSchema caller and we should treat the batch as received with OK outcome. - leftUpstream = OK; - rightUpstream = OK; - } - - @Override - public void close() { - super.close(); - } - - @Override - protected void killIncoming(boolean sendUpstream) { - this.left.kill(sendUpstream); - // Reset the left side outcome as STOP since as part of right kill when UNNEST will ask IterOutcome of left incoming - // from LATERAL and based on that it can make decision if the kill is coming from downstream to LATERAL or upstream - // to LATERAL. Like LIMIT operator being present downstream to LATERAL or upstream to LATERAL and downstream to - // UNNEST. - leftUpstream = STOP; - this.right.kill(sendUpstream); - } - - @Override - public int getRecordCount() { - return container.getRecordCount(); - } - - /** - * Returns the left side incoming for the Lateral Join. Used by right branch leaf operator of Lateral - * to process the records at leftJoinIndex. - * - * @return - RecordBatch received as left side incoming - */ - @Override - public RecordBatch getIncoming() { - Preconditions.checkState (left != null, "Retuning null left batch. It's unexpected since right side will only be " + - "called iff there is any valid left batch"); - return left; - } - - /** - * Returns the current row index which the calling operator should process in current incoming left record batch. - * LATERAL should never return it as -1 since that indicated current left batch is empty and LATERAL will never - * call next on right side with empty left batch - * - * @return - int - index of row to process. - */ - @Override - public int getRecordIndex() { - Preconditions.checkState (leftJoinIndex < left.getRecordCount(), - String.format("Left join index: %d is out of bounds: %d", leftJoinIndex, left.getRecordCount())); - return leftJoinIndex; - } - - /** - * Returns the current {@link org.apache.drill.exec.record.RecordBatch.IterOutcome} for the left incoming batch - */ - @Override - public IterOutcome getLeftOutcome() { - return leftUpstream; - } - - /** * Main entry point for producing the output records. This method populates the output batch after cross join of * the record in a given left batch at left index and all the corresponding right batches produced for * this left index. The right container is copied starting from rightIndex until number of records in the container. - * - * @param leftIndex - row index in left incoming batch - * @param rightIndex - row index in right incoming batch - * @param outIndex - row index in output batch - * - * @return - final row index of output batch */ - private int crossJoinAndOutputRecords(final int leftIndex, final int rightIndex, final int outIndex) { + private void crossJoinAndOutputRecords() { logger.trace("Producing output for leftIndex: {}, rightIndex: {}, rightRecordCount: {} and outputIndex: {}", - leftIndex, rightIndex, right.getRecordCount(), outIndex); + leftJoinIndex, rightJoinIndex, right.getRecordCount(), outputIndex); final int rightRecordCount = right.getRecordCount(); - int outBatchIndex = outIndex; // If there is no record in right batch just return current index in output batch if (rightRecordCount <= 0) { - return outBatchIndex; + return; } // Check if right batch is empty since we have to handle left join case - Preconditions.checkState(rightIndex != -1, "Right batch record count is >0 but index is -1"); + Preconditions.checkState(rightJoinIndex != -1, "Right batch record count is >0 but index is -1"); // For every record in right side just emit left and right records in output container - for (int i = rightIndex; i < rightRecordCount; ++i) { - emitLeft(leftIndex, outBatchIndex); - emitRight(i, outBatchIndex); - ++outBatchIndex; + for (int i = rightJoinIndex; i < rightRecordCount; ++i) { + emitLeft(leftJoinIndex, outputIndex); + emitRight(i, outputIndex); + ++outputIndex; - if (outBatchIndex >= LateralJoinBatch.MAX_BATCH_SIZE) { + if (isOutgoingBatchFull()) { break; } } - return outBatchIndex; } /** @@ -794,4 +825,37 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> private void emitRight(int rightIndex, int outIndex) { copyDataToOutputVectors(rightIndex, outIndex, right, 0, rightSchema.getFieldCount(), leftSchema.getFieldCount()); } + + /** + * Used only for testing for cases when multiple output batches are produced for same input set + * @param outputRowCount - Max rows that output batch can hold + */ + @VisibleForTesting + public void setMaxOutputRowCount(int outputRowCount) { + maxOutputRowCount = outputRowCount; + } + + /** + * Used only for testing to disable output batch calculation using memory manager and instead use the static max + * value set by {@link LateralJoinBatch#setMaxOutputRowCount(int)} + * @param useMemoryManager - false - disable memory manager update to take effect, true enable memory manager update + */ + @VisibleForTesting + public void setUseMemoryManager(boolean useMemoryManager) { + this.useMemoryManager = useMemoryManager; + } + + private boolean isOutgoingBatchFull() { + return outputIndex >= maxOutputRowCount; + } + + private void updateMemoryManager(int inputIndex) { + // For cases where all the previous input were consumed and send with previous output batch. But now we are building + // a new output batch with new incoming then it will not cause any problem since outputIndex will be 0 + final int newOutputRowCount = batchMemoryManager.update(inputIndex, outputIndex); + + if (useMemoryManager) { + maxOutputRowCount = newOutputRowCount; + } + } } http://git-wip-us.apache.org/repos/asf/drill/blob/769999ef/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java index b237ef7..9d93cb3 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java @@ -1311,8 +1311,9 @@ public class TestLateralJoinCorrectness extends SubOperatorTest { final LateralJoinBatch ljBatch = new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(), leftMockBatch, rightMockBatch); - int originalMaxBatchSize = LateralJoinBatch.MAX_BATCH_SIZE; - LateralJoinBatch.MAX_BATCH_SIZE = 2; + final int maxBatchSize = 2; + ljBatch.setUseMemoryManager(false); + ljBatch.setMaxOutputRowCount(maxBatchSize); try { int totalRecordCount = 0; @@ -1321,11 +1322,11 @@ public class TestLateralJoinCorrectness extends SubOperatorTest { // 1st output batch assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next()); totalRecordCount += ljBatch.getRecordCount(); - assertTrue(ljBatch.getRecordCount() == LateralJoinBatch.MAX_BATCH_SIZE); + assertTrue(ljBatch.getRecordCount() == maxBatchSize); // 2nd output batch assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next()); - assertTrue(ljBatch.getRecordCount() == LateralJoinBatch.MAX_BATCH_SIZE); + assertTrue(ljBatch.getRecordCount() == maxBatchSize); totalRecordCount += ljBatch.getRecordCount(); // 3rd output batch @@ -1346,7 +1347,6 @@ public class TestLateralJoinCorrectness extends SubOperatorTest { rightMockBatch.close(); leftRowSet2.clear(); nonEmptyRightRowSet2.clear(); - LateralJoinBatch.MAX_BATCH_SIZE = originalMaxBatchSize; } } @@ -1693,8 +1693,9 @@ public class TestLateralJoinCorrectness extends SubOperatorTest { final LateralJoinBatch ljBatch = new LateralJoinBatch(popConfig, fixture.getFragmentContext(), leftMockBatch, rightMockBatch); - int originalMaxBatchSize = LateralJoinBatch.MAX_BATCH_SIZE; - LateralJoinBatch.MAX_BATCH_SIZE = 2; + int originalMaxBatchSize = 2; + ljBatch.setUseMemoryManager(false); + ljBatch.setMaxOutputRowCount(originalMaxBatchSize); try { final int expectedOutputRecordCount = 7; // 3 for first left row and 1 for second left row @@ -1717,7 +1718,6 @@ public class TestLateralJoinCorrectness extends SubOperatorTest { ljBatch.close(); leftMockBatch.close(); rightMockBatch.close(); - LateralJoinBatch.MAX_BATCH_SIZE = originalMaxBatchSize; } } @@ -1959,14 +1959,15 @@ public class TestLateralJoinCorrectness extends SubOperatorTest { final CloseableRecordBatch rightMockBatch_1 = new MockRecordBatch(fixture.getFragmentContext(), operatorContext, rightContainer, rightOutcomes, rightContainer.get(0).getSchema()); - int originalMaxBatchSize = LateralJoinBatch.MAX_BATCH_SIZE; - LateralJoinBatch.MAX_BATCH_SIZE = 2; - final LateralJoinPOP popConfig_1 = new LateralJoinPOP(null, null, JoinRelType.FULL); final LateralJoinBatch lowerLateral = new LateralJoinBatch(popConfig_1, fixture.getFragmentContext(), leftMockBatch_1, rightMockBatch_1); + // Use below api to enforce static output batch limit + lowerLateral.setUseMemoryManager(false); + lowerLateral.setMaxOutputRowCount(2); + // ** Prepare second pair of left and right batch for upper LATERAL Lateral_2 ** // Create left input schema @@ -1997,6 +1998,10 @@ public class TestLateralJoinCorrectness extends SubOperatorTest { final LateralJoinBatch upperLateral = new LateralJoinBatch(popConfig_1, fixture.getFragmentContext(), leftMockBatch_2, lowerLateral); + // Use below api to enforce static output batch limit + upperLateral.setUseMemoryManager(false); + upperLateral.setMaxOutputRowCount(2); + try { final int expectedOutputRecordCount = 6; @@ -2021,7 +2026,6 @@ public class TestLateralJoinCorrectness extends SubOperatorTest { rightMockBatch_1.close(); leftContainer2.clear(); leftOutcomes2.clear(); - LateralJoinBatch.MAX_BATCH_SIZE = originalMaxBatchSize; } }