This is an automated email from the ASF dual-hosted git repository. arina pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/drill.git
commit 25e65c0fe2aed741b143d69c0ffbe23a3433bedc Author: Padma Penumarthy <[email protected]> AuthorDate: Thu Jun 28 03:50:36 2018 -0700 DRILL-6549: batch sizing for nested loop join closes #1363 --- .../exec/physical/impl/join/NestedLoopJoin.java | 3 + .../physical/impl/join/NestedLoopJoinBatch.java | 62 +++- .../physical/impl/join/NestedLoopJoinTemplate.java | 24 +- .../exec/physical/unit/TestOutputBatchSize.java | 342 ++++++++++++++++++++- 4 files changed, 407 insertions(+), 24 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoin.java index f7d96ad..725c46d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoin.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoin.java @@ -37,6 +37,9 @@ public interface NestedLoopJoin { ExpandableHyperContainer rightContainer, LinkedList<Integer> rightCounts, NestedLoopJoinBatch outgoing); + + void setTargetOutputCount(int targetOutputCount); + // Produce output records taking into account join type public int outputRecords(JoinRelType joinType); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java index ae14fb3..e2532e8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java @@ -18,6 +18,7 @@ package org.apache.drill.exec.physical.impl.join; import java.io.IOException; +import java.util.HashSet; import java.util.LinkedList; import java.util.Map; @@ -29,6 +30,7 @@ import org.apache.drill.common.expression.ErrorCollectorImpl; import org.apache.drill.common.expression.LogicalExpression; import org.apache.drill.common.types.TypeProtos; import org.apache.drill.common.types.Types; +import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.compile.sig.GeneratorMapping; import org.apache.drill.exec.compile.sig.MappingSet; import org.apache.drill.exec.exception.ClassTransformationException; @@ -50,8 +52,8 @@ import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.TypedFieldId; import org.apache.drill.exec.record.VectorAccessible; import org.apache.drill.exec.record.VectorWrapper; -import org.apache.drill.exec.vector.AllocationHelper; - +import org.apache.drill.exec.record.JoinBatchMemoryManager; +import org.apache.drill.exec.record.RecordBatchSizer; import com.google.common.base.Preconditions; import com.sun.codemodel.JExpr; import com.sun.codemodel.JExpression; @@ -65,9 +67,6 @@ import org.apache.drill.exec.vector.complex.AbstractContainerVector; public class NestedLoopJoinBatch extends AbstractBinaryRecordBatch<NestedLoopJoinPOP> { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(NestedLoopJoinBatch.class); - // Maximum number records in the outgoing batch - protected static final int MAX_BATCH_SIZE = 4096; - // Input indexes to correctly update the stats protected static final int LEFT_INPUT = 0; protected static final int RIGHT_INPUT = 1; @@ -130,6 +129,11 @@ public class NestedLoopJoinBatch extends AbstractBinaryRecordBatch<NestedLoopJoi super(popConfig, context, left, right); Preconditions.checkNotNull(left); Preconditions.checkNotNull(right); + + // get the output batch size from config. + int configuredBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR); + batchMemoryManager = new JoinBatchMemoryManager(configuredBatchSize, left, right, new HashSet<>()); + logger.debug("BATCH_STATS, configured output batch size: {}", configuredBatchSize); } /** @@ -162,6 +166,9 @@ public class NestedLoopJoinBatch extends AbstractBinaryRecordBatch<NestedLoopJoi } // fall through case OK: + // For right side, use aggregate i.e. average row width across batches + batchMemoryManager.update(RIGHT_INDEX, 0, true); + logger.debug("BATCH_STATS, incoming right: {}", batchMemoryManager.getRecordBatchSizer(RIGHT_INDEX)); addBatchToHyperContainer(right); break; case OUT_OF_MEMORY: @@ -179,7 +186,9 @@ public class NestedLoopJoinBatch extends AbstractBinaryRecordBatch<NestedLoopJoi } // allocate space for the outgoing batch - allocateVectors(); + batchMemoryManager.allocateVectors(container); + + nljWorker.setTargetOutputCount(batchMemoryManager.getOutputRowCount()); // invoke the runtime generated method to emit records in the output batch outputRecords = nljWorker.outputRecords(popConfig.getJoinType()); @@ -193,6 +202,10 @@ public class NestedLoopJoinBatch extends AbstractBinaryRecordBatch<NestedLoopJoi container.setRecordCount(outputRecords); container.buildSchema(BatchSchema.SelectionVectorMode.NONE); + if (logger.isDebugEnabled()) { + logger.debug("BATCH_STATS, outgoing: {}", new RecordBatchSizer(this)); + } + logger.debug("Number of records emitted: " + outputRecords); return (outputRecords > 0) ? IterOutcome.OK : IterOutcome.NONE; @@ -332,15 +345,6 @@ public class NestedLoopJoinBatch extends AbstractBinaryRecordBatch<NestedLoopJoi } /** - * Simple method to allocate space for all the vectors in the container. - */ - private void allocateVectors() { - for (final VectorWrapper<?> vw : container) { - AllocationHelper.allocateNew(vw.getValueVector(), MAX_BATCH_SIZE); - } - } - - /** * Builds the output container's schema. Goes over the left and the right * batch and adds the corresponding vectors to the output container. * @throws SchemaChangeException if batch schema was changed during execution @@ -352,6 +356,9 @@ public class NestedLoopJoinBatch extends AbstractBinaryRecordBatch<NestedLoopJoi return; } + batchMemoryManager.update(RIGHT_INDEX, 0, true); + logger.debug("BATCH_STATS, incoming right: {}", batchMemoryManager.getRecordBatchSizer(RIGHT_INDEX)); + if (leftUpstream != IterOutcome.NONE) { leftSchema = left.getSchema(); for (final VectorWrapper<?> vw : left) { @@ -380,7 +387,6 @@ public class NestedLoopJoinBatch extends AbstractBinaryRecordBatch<NestedLoopJoi addBatchToHyperContainer(right); } - allocateVectors(); nljWorker = setupWorker(); // if left batch is empty, fetch next @@ -388,7 +394,9 @@ public class NestedLoopJoinBatch extends AbstractBinaryRecordBatch<NestedLoopJoi leftUpstream = next(LEFT_INPUT, left); } - container.setRecordCount(0); + batchMemoryManager.update(LEFT_INDEX, 0); + logger.debug("BATCH_STATS, incoming left: {}", batchMemoryManager.getRecordBatchSizer(LEFT_INDEX)); + container.buildSchema(BatchSchema.SelectionVectorMode.NONE); } catch (ClassTransformationException | IOException e) { @@ -412,6 +420,26 @@ public class NestedLoopJoinBatch extends AbstractBinaryRecordBatch<NestedLoopJoi @Override public void close() { + updateBatchMemoryManagerStats(); + + if (logger.isDebugEnabled()) { + logger.debug("BATCH_STATS, incoming aggregate left: batch count : {}, avg bytes : {}, avg row bytes : {}, record count : {}", + batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX), + batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX), + batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX), + batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX)); + + logger.debug("BATCH_STATS, incoming aggregate right: batch count : {}, avg bytes : {}, avg row bytes : {}, record count : {}", + batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX), + batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX), + batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX), + batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX)); + + logger.debug("BATCH_STATS, outgoing aggregate: batch count : {}, avg bytes : {}, avg row bytes : {}, record count : {}", + batchMemoryManager.getNumOutgoingBatches(), batchMemoryManager.getAvgOutputBatchSize(), + batchMemoryManager.getAvgOutputRowWidth(), batchMemoryManager.getTotalOutputRecords()); + } + rightContainer.clear(); rightCounts.clear(); super.close(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinTemplate.java index cdd02f4..adf681b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinTemplate.java @@ -29,12 +29,16 @@ import javax.inject.Named; import java.util.LinkedList; import java.util.List; +import static org.apache.drill.exec.record.JoinBatchMemoryManager.LEFT_INDEX; + /* * Template class that combined with the runtime generated source implements the NestedLoopJoin interface. This * class contains the main nested loop join logic. */ public abstract class NestedLoopJoinTemplate implements NestedLoopJoin { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(NestedLoopJoinBatch.class); + // Current left input batch being processed private RecordBatch left = null; @@ -50,6 +54,8 @@ public abstract class NestedLoopJoinTemplate implements NestedLoopJoin { // Iteration status tracker private IterationStatusTracker tracker = new IterationStatusTracker(); + private int targetOutputRecords; + /** * Method initializes necessary state and invokes the doSetup() to set the * input and output value vector references. @@ -69,10 +75,14 @@ public abstract class NestedLoopJoinTemplate implements NestedLoopJoin { this.leftRecordCount = left.getRecordCount(); this.rightCounts = rightCounts; this.outgoing = outgoing; - doSetup(context, rightContainer, left, outgoing); } + @Override + public void setTargetOutputCount(int targetOutputRecords) { + this.targetOutputRecords = targetOutputRecords; + } + /** * Main entry point for producing the output records. Thin wrapper around populateOutgoingBatch(), this method * controls which left batch we are processing and fetches the next left input batch once we exhaust the current one. @@ -84,11 +94,11 @@ public abstract class NestedLoopJoinTemplate implements NestedLoopJoin { int outputIndex = 0; while (leftRecordCount != 0) { outputIndex = populateOutgoingBatch(joinType, outputIndex); - if (outputIndex >= NestedLoopJoinBatch.MAX_BATCH_SIZE) { + if (outputIndex >= targetOutputRecords) { break; } // reset state and get next left batch - resetAndGetNextLeft(); + resetAndGetNextLeft(outputIndex); } return outputIndex; } @@ -128,7 +138,7 @@ public abstract class NestedLoopJoinTemplate implements NestedLoopJoin { outputIndex++; rightRecordMatched = true; - if (outputIndex >= NestedLoopJoinBatch.MAX_BATCH_SIZE) { + if (outputIndex >= targetOutputRecords) { nextRightRecordToProcess++; // no more space left in the batch, stop processing @@ -143,7 +153,7 @@ public abstract class NestedLoopJoinTemplate implements NestedLoopJoin { // project records from the left side only, records from right will be null emitLeft(nextLeftRecordToProcess, outputIndex); outputIndex++; - if (outputIndex >= NestedLoopJoinBatch.MAX_BATCH_SIZE) { + if (outputIndex >= targetOutputRecords) { nextLeftRecordToProcess++; // no more space left in the batch, stop processing @@ -165,7 +175,7 @@ public abstract class NestedLoopJoinTemplate implements NestedLoopJoin { * Resets some internal state which indicates the next records to process in the left and right batches, * also fetches the next left input batch. */ - private void resetAndGetNextLeft() { + private void resetAndGetNextLeft(int outputIndex) { for (VectorWrapper<?> vw : left) { vw.getValueVector().clear(); } @@ -181,6 +191,8 @@ public abstract class NestedLoopJoinTemplate implements NestedLoopJoin { leftRecordCount = 0; break; case OK: + setTargetOutputCount(outgoing.getBatchMemoryManager().update(left, LEFT_INDEX,outputIndex)); + logger.debug("BATCH_STATS, incoming left: {}", outgoing.getBatchMemoryManager().getRecordBatchSizer(LEFT_INDEX)); leftRecordCount = left.getRecordCount(); break; } diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestOutputBatchSize.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestOutputBatchSize.java index 471f1b8..84a4fbc 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestOutputBatchSize.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestOutputBatchSize.java @@ -17,18 +17,23 @@ */ package org.apache.drill.exec.physical.unit; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import org.apache.calcite.rel.core.JoinRelType; import org.apache.directory.api.util.Strings; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.expression.SchemaPath; - +import org.apache.drill.common.expression.LogicalExpression; +import org.apache.drill.common.expression.FunctionCall; +import org.apache.drill.common.expression.FieldReference; +import org.apache.drill.common.expression.ExpressionPosition; import org.apache.drill.exec.physical.base.AbstractBase; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.config.FlattenPOP; import org.apache.drill.exec.physical.config.HashAggregate; import org.apache.drill.exec.physical.config.HashJoinPOP; import org.apache.drill.exec.physical.config.MergeJoinPOP; +import org.apache.drill.exec.physical.config.NestedLoopJoinPOP; import org.apache.drill.exec.physical.config.Project; import org.apache.drill.exec.physical.config.UnionAll; import org.apache.drill.exec.physical.impl.ScanBatch; @@ -2288,6 +2293,341 @@ public class TestOutputBatchSize extends PhysicalOpUnitTestBase { } @Test + public void testNestedLoopJoinMultipleOutputBatches() throws Exception { + LogicalExpression functionCallExpr = new FunctionCall("equal", + ImmutableList.of((LogicalExpression) new FieldReference("c1", ExpressionPosition.UNKNOWN), + (LogicalExpression) new FieldReference("c2", ExpressionPosition.UNKNOWN)), + ExpressionPosition.UNKNOWN); + + NestedLoopJoinPOP nestedLoopJoin = new NestedLoopJoinPOP(null, null, JoinRelType.INNER, functionCallExpr); + mockOpContext(nestedLoopJoin, initReservation, maxAllocation); + + numRows = 4000 * 2; + // create left input rows like this. + // "a1" : 5, "b1" : wideString, "c1" : <id> + List<String> leftJsonBatches = Lists.newArrayList(); + StringBuilder leftBatchString = new StringBuilder(); + leftBatchString.append("["); + for (int i = 0; i < numRows; i++) { + leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + i + "},"); + } + leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + numRows + "}"); + leftBatchString.append("]"); + + leftJsonBatches.add(leftBatchString.toString()); + + // create right input rows like this. + // "a2" : 6, "b2" : wideString, "c2" : <id> + List<String> rightJsonBatches = Lists.newArrayList(); + StringBuilder rightBatchString = new StringBuilder(); + rightBatchString.append("["); + for (int i = 0; i < numRows; i++) { + rightBatchString.append("{\"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + i + "},"); + } + rightBatchString.append("{\"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + numRows + "}"); + rightBatchString.append("]"); + rightJsonBatches.add(rightBatchString.toString()); + + // output rows will be like this. + // "a1" : 5, "b1" : wideString, "c1" : 1, "a2":6, "b2" : wideString, "c2": 1 + // "a1" : 5, "b1" : wideString, "c1" : 2, "a2":6, "b2" : wideString, "c2": 2 + // "a1" : 5, "b1" : wideString, "c1" : 3, "a2":6, "b2" : wideString, "c2": 3 + List<String> expectedJsonBatches = Lists.newArrayList(); + StringBuilder expectedBatchString = new StringBuilder(); + expectedBatchString.append("["); + for (int i = 0; i < numRows; i++) { + expectedBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + i); + expectedBatchString.append(", \"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + i + "},"); + } + expectedBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + numRows); + expectedBatchString.append(", \"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + numRows + "}"); + expectedBatchString.append("]"); + expectedJsonBatches.add(expectedBatchString.toString()); + + long totalSize = getExpectedSize(expectedJsonBatches); + + // set the output batch size to 1/2 of total size expected. + // We will get approximately 4 batches. + fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize/2); + + OperatorTestBuilder opTestBuilder = opTestBuilder() + .physicalOperator(nestedLoopJoin) + .baselineColumns("a1", "b1", "c1", "a2", "b2", "c2") + .expectedNumBatches(4) // verify number of batches + .expectedBatchSize(totalSize / 2) // verify batch size + .inputDataStreamsJson(Lists.newArrayList(leftJsonBatches, rightJsonBatches)); + + for (long i = 0; i < numRows+1; i++) { + opTestBuilder.baselineValues(5l, wideString, i, 6l, wideString, i); + } + + opTestBuilder.go(); + + } + + @Test + public void testNestedLoopJoinSingleOutputBatch() throws Exception { + LogicalExpression functionCallExpr = new FunctionCall("equal", + ImmutableList.of((LogicalExpression) new FieldReference("c1", ExpressionPosition.UNKNOWN), + (LogicalExpression) new FieldReference("c2", ExpressionPosition.UNKNOWN)), + ExpressionPosition.UNKNOWN); + + NestedLoopJoinPOP nestedLoopJoin = new NestedLoopJoinPOP(null, null, JoinRelType.INNER, functionCallExpr); + + // create multiple batches from both sides. + numRows = 4096 * 2; + + // create left input rows like this. + // "a1" : 5, "b1" : wideString, "c1" : <id> + List<String> leftJsonBatches = Lists.newArrayList(); + StringBuilder leftBatchString = new StringBuilder(); + leftBatchString.append("["); + for (int i = 0; i < numRows; i++) { + leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + i + "},"); + } + leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + numRows + "}"); + leftBatchString.append("]"); + + leftJsonBatches.add(leftBatchString.toString()); + + // create right input rows like this. + // "a2" : 6, "b2" : wideString, "c2" : <id> + List<String> rightJsonBatches = Lists.newArrayList(); + StringBuilder rightBatchString = new StringBuilder(); + rightBatchString.append("["); + for (int i = 0; i < numRows; i++) { + rightBatchString.append("{\"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + i + "},"); + } + rightBatchString.append("{\"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + numRows + "}"); + rightBatchString.append("]"); + rightJsonBatches.add(rightBatchString.toString()); + + // output rows will be like this. + // "a1" : 5, "b1" : wideString, "c1" : 1, "a2":6, "b2" : wideString, "c2": 1 + // "a1" : 5, "b1" : wideString, "c1" : 2, "a2":6, "b2" : wideString, "c2": 2 + // "a1" : 5, "b1" : wideString, "c1" : 3, "a2":6, "b2" : wideString, "c2": 3 + List<String> expectedJsonBatches = Lists.newArrayList(); + StringBuilder expectedBatchString = new StringBuilder(); + expectedBatchString.append("["); + for (int i = 0; i < numRows; i++) { + expectedBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + i); + expectedBatchString.append(", \"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + i + "},"); + } + expectedBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + numRows); + expectedBatchString.append(", \"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + numRows + "}"); + expectedBatchString.append("]"); + expectedJsonBatches.add(expectedBatchString.toString()); + + long totalSize = getExpectedSize(expectedJsonBatches); + + // set the output batch size to twice of total size expected. + // We should get 1 batch. + fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize*2); + + OperatorTestBuilder opTestBuilder = opTestBuilder() + .physicalOperator(nestedLoopJoin) + .baselineColumns("a1", "b1", "c1", "a2", "b2", "c2") + .expectedNumBatches(1) // verify number of batches + .expectedBatchSize(totalSize) // verify batch size + .inputDataStreamsJson(Lists.newArrayList(leftJsonBatches, rightJsonBatches)); + + for (long i = 0; i < numRows + 1; i++) { + opTestBuilder.baselineValues(5l, wideString, i, 6l, wideString, i); + } + + opTestBuilder.go(); + } + + @Test + public void testNestedLoopJoinUpperLimit() throws Exception { + // test the upper limit of 65535 records per batch. + LogicalExpression functionCallExpr = new FunctionCall("<", + ImmutableList.of((LogicalExpression) new FieldReference("c1", ExpressionPosition.UNKNOWN), + (LogicalExpression) new FieldReference("c2", ExpressionPosition.UNKNOWN)), + ExpressionPosition.UNKNOWN); + + NestedLoopJoinPOP nestedLoopJoin = new NestedLoopJoinPOP(null, null, JoinRelType.INNER, functionCallExpr); + + numRows = 500; + + // create left input rows like this. + // "a1" : 5, "c1" : <id> + List<String> leftJsonBatches = Lists.newArrayList(); + StringBuilder leftBatchString = new StringBuilder(); + leftBatchString.append("["); + for (int i = 0; i < numRows; i++) { + leftBatchString.append("{\"a1\": 5, " + "\"c1\" : " + i + "},"); + } + leftBatchString.append("{\"a1\": 5, " + "\"c1\" : " + numRows + "}"); + leftBatchString.append("]"); + + leftJsonBatches.add(leftBatchString.toString()); + + // create right input rows like this. + // "a2" : 6, "c2" : <id> + List<String> rightJsonBatches = Lists.newArrayList(); + StringBuilder rightBatchString = new StringBuilder(); + rightBatchString.append("["); + for (int i = 0; i < numRows; i++) { + rightBatchString.append("{\"a2\": 6, " + "\"c2\" : " + i + "},"); + } + rightBatchString.append("{\"a2\": 6, " + "\"c2\" : " + numRows + "}"); + rightBatchString.append("]"); + rightJsonBatches.add(rightBatchString.toString()); + + // output rows will be like this. + // "a1" : 5, "c1" : 1, "a2":6, "c2": 1 + // "a1" : 5, "c1" : 2, "a2":6, "c2": 2 + // "a1" : 5, "c1" : 3, "a2":6, "c2": 3 + + // we expect n(n+1)/2 number of records i.e. (500 * 501)/2 = 125250 + // expect two batches, batch limited by 65535 records + OperatorTestBuilder opTestBuilder = opTestBuilder() + .physicalOperator(nestedLoopJoin) + .baselineColumns("a1", "c1", "a2", "c2") + .expectedNumBatches(2) // verify number of batches + .inputDataStreamsJson(Lists.newArrayList(leftJsonBatches, rightJsonBatches)); + + for (long i = 0; i < numRows+1; i++) { + for (long j = i+1; j < numRows+1; j++) { + opTestBuilder.baselineValues(5l, i, 6l, j); + } + } + + opTestBuilder.go(); + } + + @Test + public void testNestedLoopJoinLowerLimit() throws Exception { + // test the lower limit of at least one batch + LogicalExpression functionCallExpr = new FunctionCall("equal", + ImmutableList.of((LogicalExpression) new FieldReference("c1", ExpressionPosition.UNKNOWN), + (LogicalExpression) new FieldReference("c2", ExpressionPosition.UNKNOWN)), + ExpressionPosition.UNKNOWN); + + NestedLoopJoinPOP nestedLoopJoin = new NestedLoopJoinPOP(null, null, JoinRelType.INNER, functionCallExpr); + + numRows = 10; + + // create left input rows like this. + // "a1" : 5, "b1" : wideString, "c1" : <id> + List<String> leftJsonBatches = Lists.newArrayList(); + StringBuilder leftBatchString = new StringBuilder(); + leftBatchString.append("["); + for (int i = 0; i < numRows; i++) { + leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + i + "},"); + } + leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + numRows + "}"); + leftBatchString.append("]"); + + leftJsonBatches.add(leftBatchString.toString()); + + // create right input rows like this. + // "a2" : 6, "b2" : wideString, "c2" : <id> + List<String> rightJsonBatches = Lists.newArrayList(); + StringBuilder rightBatchString = new StringBuilder(); + rightBatchString.append("["); + for (int i = 0; i < numRows; i++) { + rightBatchString.append("{\"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + i + "},"); + } + rightBatchString.append("{\"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + numRows + "}"); + rightBatchString.append("]"); + rightJsonBatches.add(rightBatchString.toString()); + + // output rows will be like this. + // "a1" : 5, "b1" : wideString, "c1" : 1, "a2":6, "b2" : wideString, "c2": 1 + // "a1" : 5, "b1" : wideString, "c1" : 2, "a2":6, "b2" : wideString, "c2": 2 + // "a1" : 5, "b1" : wideString, "c1" : 3, "a2":6, "b2" : wideString, "c2": 3 + + // set very low value of output batch size so we can do only one row per batch. + fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", 128); + + OperatorTestBuilder opTestBuilder = opTestBuilder() + .physicalOperator(nestedLoopJoin) + .baselineColumns("a1", "b1", "c1", "a2", "b2", "c2") + .expectedNumBatches(10) // verify number of batches + .inputDataStreamsJson(Lists.newArrayList(leftJsonBatches, rightJsonBatches)); + + for (long i = 0; i < numRows + 1; i++) { + opTestBuilder.baselineValues(5l, wideString, i, 6l, wideString, i); + } + + opTestBuilder.go(); + } + + @Test + public void testLeftNestedLoopJoin() throws Exception { + LogicalExpression functionCallExpr = new FunctionCall("equal", + ImmutableList.of((LogicalExpression) new FieldReference("c1", ExpressionPosition.UNKNOWN), + (LogicalExpression) new FieldReference("c2", ExpressionPosition.UNKNOWN)), + ExpressionPosition.UNKNOWN); + + NestedLoopJoinPOP nestedLoopJoin = new NestedLoopJoinPOP(null, null, JoinRelType.LEFT, functionCallExpr); + + numRows = 4000 * 2; + // create left input rows like this. + // "a1" : 5, "b1" : wideString, "c1" : <id> + List<String> leftJsonBatches = Lists.newArrayList(); + StringBuilder leftBatchString = new StringBuilder(); + leftBatchString.append("["); + for (int i = 0; i < numRows; i++) { + leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + i + "},"); + } + leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + numRows + "}"); + leftBatchString.append("]"); + + leftJsonBatches.add(leftBatchString.toString()); + + // create right input rows like this. + // "a2" : 6, "b2" : wideString, "c2" : <id> + List<String> rightJsonBatches = Lists.newArrayList(); + StringBuilder rightBatchString = new StringBuilder(); + rightBatchString.append("["); + for (int i = 0; i < numRows; i++) { + rightBatchString.append("{\"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + i + "},"); + } + rightBatchString.append("{\"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + numRows + "}"); + rightBatchString.append("]"); + rightJsonBatches.add(rightBatchString.toString()); + + // output rows will be like this. + // "a1" : 5, "b1" : wideString, "c1" : 1, "a2":6, "b2" : wideString, "c2": 1 + // "a1" : 5, "b1" : wideString, "c1" : 2, "a2":6, "b2" : wideString, "c2": 2 + // "a1" : 5, "b1" : wideString, "c1" : 3, "a2":6, "b2" : wideString, "c2": 3 + List<String> expectedJsonBatches = Lists.newArrayList(); + StringBuilder expectedBatchString = new StringBuilder(); + expectedBatchString.append("["); + for (int i = 0; i < numRows; i++) { + expectedBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + i); + expectedBatchString.append(", \"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + i + "},"); + } + expectedBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + numRows); + expectedBatchString.append(", \"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + numRows + "}"); + expectedBatchString.append("]"); + expectedJsonBatches.add(expectedBatchString.toString()); + + long totalSize = getExpectedSize(expectedJsonBatches); + + // set the output batch size to 1/2 of total size expected. + // We will get approximately 4 batches. + fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize/2); + + OperatorTestBuilder opTestBuilder = opTestBuilder() + .physicalOperator(nestedLoopJoin) + .baselineColumns("a1", "b1", "c1", "a2", "b2", "c2") + .expectedNumBatches(4) // verify number of batches + .expectedBatchSize(totalSize / 2) // verify batch size + .inputDataStreamsJson(Lists.newArrayList(leftJsonBatches, rightJsonBatches)); + + for (long i = 0; i < numRows+1; i++) { + opTestBuilder.baselineValues(5l, wideString, i, 6l, wideString, i); + } + + opTestBuilder.go(); + + } + + @Test public void testSizerRepeatedList() throws Exception { List<String> inputJsonBatches = Lists.newArrayList(); StringBuilder batchString = new StringBuilder();
