DRILL-6123: Limit batch size for Merge Join based on memory closes #1107
Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/20185c9b Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/20185c9b Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/20185c9b Branch: refs/heads/master Commit: 20185c9bf0f4c94815fd2ab1eae1b98b3d4e4ff7 Parents: 58e4cec Author: Padma Penumarthy <[email protected]> Authored: Fri Feb 9 13:54:38 2018 -0800 Committer: Vitalii Diravka <[email protected]> Committed: Fri Feb 16 20:31:15 2018 +0000 ---------------------------------------------------------------------- .../org/apache/drill/exec/ExecConstants.java | 4 +- .../impl/flatten/FlattenRecordBatch.java | 32 +-- .../exec/physical/impl/join/JoinStatus.java | 12 +- .../exec/physical/impl/join/MergeJoinBatch.java | 65 ++++- .../AbstractRecordBatchMemoryManager.java | 61 +++++ .../drill/exec/record/RecordBatchSizer.java | 6 +- .../drill/exec/record/RecordIterator.java | 13 +- .../exec/physical/unit/TestOutputBatchSize.java | 249 ++++++++++++++++++- .../drill/exec/record/TestRecordIterator.java | 4 +- .../exec/vector/complex/AbstractMapVector.java | 4 + .../vector/complex/BaseRepeatedValueVector.java | 3 + .../drill/exec/vector/complex/ListVector.java | 4 + .../exec/vector/complex/RepeatedListVector.java | 3 + 13 files changed, 425 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/20185c9b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java index f3572d8..a1a94fa 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java @@ -79,7 +79,9 @@ public final class ExecConstants { public static final String SPILL_DIRS = "drill.exec.spill.directories"; public static final String OUTPUT_BATCH_SIZE = "drill.exec.memory.operator.output_batch_size"; - public static final LongValidator OUTPUT_BATCH_SIZE_VALIDATOR = new RangeLongValidator(OUTPUT_BATCH_SIZE, 1024, 512 * 1024 * 1024); + // Output Batch Size in Bytes. We have a small lower bound so we can test with unit tests without the + // need to produce very large batches that take up lot of memory. + public static final LongValidator OUTPUT_BATCH_SIZE_VALIDATOR = new RangeLongValidator(OUTPUT_BATCH_SIZE, 128, 512 * 1024 * 1024); // External Sort Boot configuration http://git-wip-us.apache.org/repos/asf/drill/blob/20185c9b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java index 5f693cb..4a910ef 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java @@ -44,6 +44,7 @@ import org.apache.drill.exec.physical.config.FlattenPOP; import org.apache.drill.exec.record.RecordBatchSizer; import org.apache.drill.exec.record.AbstractSingleRecordBatch; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; +import org.apache.drill.exec.record.AbstractRecordBatchMemoryManager; import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.TransferPair; @@ -70,7 +71,8 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> { private boolean hasRemainder = false; private int remainderIndex = 0; private int recordCount; - private long outputBatchSize; + private int outputBatchSize; + private final FlattenMemoryManager flattenMemoryManager = new FlattenMemoryManager(); private final Flattener.Monitor monitor = new Flattener.Monitor() { @Override @@ -97,23 +99,19 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> { } } - private class FlattenMemoryManager { - private final int outputRowCount; - private static final int OFFSET_VECTOR_WIDTH = 4; - private static final int WORST_CASE_FRAGMENTATION_FACTOR = 2; - private static final int MAX_NUM_ROWS = ValueVector.MAX_ROW_COUNT; - private static final int MIN_NUM_ROWS = 1; + private class FlattenMemoryManager extends AbstractRecordBatchMemoryManager { - private FlattenMemoryManager(RecordBatch incoming, long outputBatchSize, SchemaPath flattenColumn) { + @Override + public void update() { // Get sizing information for the batch. RecordBatchSizer sizer = new RecordBatchSizer(incoming); - final TypedFieldId typedFieldId = incoming.getValueVectorId(flattenColumn); + final TypedFieldId typedFieldId = incoming.getValueVectorId(popConfig.getColumn()); final MaterializedField field = incoming.getSchema().getColumn(typedFieldId.getFieldIds()[0]); // Get column size of flatten column. RecordBatchSizer.ColumnSize columnSize = RecordBatchSizer.getColumn(incoming.getValueAccessorById(field.getValueClass(), - typedFieldId.getFieldIds()).getValueVector(), field.getName()); + typedFieldId.getFieldIds()).getValueVector(), field.getName()); // Average rowWidth of flatten column final int avgRowWidthFlattenColumn = RecordBatchSizer.safeDivide(columnSize.netSize, incoming.getRecordCount()); @@ -124,22 +122,18 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> { // Average rowWidth of single element in the flatten list. // subtract the offset vector size from column data size. final int avgRowWidthSingleFlattenEntry = - RecordBatchSizer.safeDivide(columnSize.netSize - (OFFSET_VECTOR_WIDTH * columnSize.valueCount), columnSize.elementCount); + RecordBatchSizer.safeDivide(columnSize.netSize - (OFFSET_VECTOR_WIDTH * columnSize.valueCount), columnSize.elementCount); // Average rowWidth of outgoing batch. final int avgOutgoingRowWidth = avgRowWidthWithOutFlattenColumn + avgRowWidthSingleFlattenEntry; // Number of rows in outgoing batch - outputRowCount = Math.max(MIN_NUM_ROWS, Math.min(MAX_NUM_ROWS, - RecordBatchSizer.safeDivide((outputBatchSize/WORST_CASE_FRAGMENTATION_FACTOR), avgOutgoingRowWidth))); + setOutputRowCount(outputBatchSize, avgOutgoingRowWidth); logger.debug("flatten incoming batch sizer : {}, outputBatchSize : {}," + - "avgOutgoingRowWidth : {}, outputRowCount : {}", sizer, outputBatchSize, avgOutgoingRowWidth, outputRowCount); + "avgOutgoingRowWidth : {}, outputRowCount : {}", sizer, outputBatchSize, avgOutgoingRowWidth, getOutputRowCount()); } - public int getOutputRowCount() { - return outputRowCount; - } } @@ -147,7 +141,7 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> { super(pop, context, incoming); // get the output batch size from config. - outputBatchSize = context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR); + outputBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR); } @Override @@ -200,7 +194,7 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> { @Override protected IterOutcome doWork() { - FlattenMemoryManager flattenMemoryManager = new FlattenMemoryManager(incoming, outputBatchSize, popConfig.getColumn()); + flattenMemoryManager.update(); flattener.setOutputCount(flattenMemoryManager.getOutputRowCount()); int incomingRecordCount = incoming.getRecordCount(); http://git-wip-us.apache.org/repos/asf/drill/blob/20185c9b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java index 527c984..beae021 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java @@ -29,8 +29,6 @@ import org.apache.calcite.rel.core.JoinRelType; public final class JoinStatus { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JoinStatus.class); - private static final int OUTPUT_BATCH_SIZE = 32*1024; - public final RecordIterator left; public final RecordIterator right; private boolean iteratorInitialized; @@ -44,6 +42,8 @@ public final class JoinStatus { public boolean ok = true; public boolean hasMoreData = false; + private int targetOutputRowCount; + public JoinStatus(RecordIterator left, RecordIterator right, MergeJoinBatch output) { this.left = left; this.right = right; @@ -101,8 +101,12 @@ public final class JoinStatus { } public final boolean isOutgoingBatchFull() { - Preconditions.checkArgument(outputPosition <= OUTPUT_BATCH_SIZE); - return outputPosition == OUTPUT_BATCH_SIZE; + Preconditions.checkArgument(outputPosition <= targetOutputRowCount); + return outputPosition >= targetOutputRowCount; + } + + public final void setTargetOutputRowCount(int outputRowCount) { + this.targetOutputRowCount = outputRowCount; } public final void incOutputPos() { http://git-wip-us.apache.org/repos/asf/drill/blob/20185c9b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java index 1ed4722..f612ae2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java @@ -33,6 +33,7 @@ import org.apache.drill.common.types.TypeProtos.DataMode; import org.apache.drill.common.types.TypeProtos.MajorType; import org.apache.drill.common.types.TypeProtos.MinorType; import org.apache.drill.common.types.Types; +import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.compile.sig.MappingSet; import org.apache.drill.exec.exception.ClassTransformationException; import org.apache.drill.exec.exception.OutOfMemoryException; @@ -45,6 +46,7 @@ import org.apache.drill.exec.expr.fn.FunctionGenerationHelper; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.config.MergeJoinPOP; import org.apache.drill.exec.physical.impl.common.Comparator; +import org.apache.drill.exec.record.RecordBatchSizer; import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.record.RecordBatch; @@ -54,6 +56,7 @@ import org.apache.drill.exec.record.VectorAccessible; import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.record.AbstractRecordBatch; +import org.apache.drill.exec.record.AbstractRecordBatchMemoryManager; import org.apache.drill.exec.vector.AllocationHelper; import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.complex.AbstractContainerVector; @@ -102,20 +105,75 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> { private final List<Comparator> comparators; private final JoinRelType joinType; private JoinWorker worker; + private final int outputBatchSize; private static final String LEFT_INPUT = "LEFT INPUT"; private static final String RIGHT_INPUT = "RIGHT INPUT"; + private class MergeJoinMemoryManager extends AbstractRecordBatchMemoryManager { + private int leftRowWidth; + private int rightRowWidth; + + /** + * mergejoin operates on one record at a time from the left and right batches + * using RecordIterator abstraction. We have a callback mechanism to get notified + * when new batch is loaded in record iterator. + * This can get called in the middle of current output batch we are building. + * when this gets called, adjust number of output rows for the current batch and + * update the value to be used for subsequent batches. + */ + @Override + public void update(int inputIndex) { + switch(inputIndex) { + case 0: + final RecordBatchSizer leftSizer = new RecordBatchSizer(left); + leftRowWidth = leftSizer.netRowWidth(); + break; + case 1: + final RecordBatchSizer rightSizer = new RecordBatchSizer(right); + rightRowWidth = rightSizer.netRowWidth(); + default: + break; + } + + final int newOutgoingRowWidth = leftRowWidth + rightRowWidth; + + // If outgoing row width is 0, just return. This is possible for empty batches or + // when first set of batches come with OK_NEW_SCHEMA and no data. + if (newOutgoingRowWidth == 0) { + return; + } + + // update the value to be used for next batch(es) + setOutputRowCount(outputBatchSize, newOutgoingRowWidth); + + // Adjust for the current batch. + // calculate memory used so far based on previous outgoing row width and how many rows we already processed. + final long memoryUsed = status.getOutPosition() * getOutgoingRowWidth(); + // This is the remaining memory. + final long remainingMemory = Math.max(outputBatchSize/WORST_CASE_FRAGMENTATION_FACTOR - memoryUsed, 0); + // These are number of rows we can fit in remaining memory based on new outgoing row width. + final int numOutputRowsRemaining = RecordBatchSizer.safeDivide(remainingMemory, newOutgoingRowWidth); + + status.setTargetOutputRowCount(status.getOutPosition() + numOutputRowsRemaining); + setOutgoingRowWidth(newOutgoingRowWidth); + } + } + + private final MergeJoinMemoryManager mergeJoinMemoryManager = new MergeJoinMemoryManager(); + protected MergeJoinBatch(MergeJoinPOP popConfig, FragmentContext context, RecordBatch left, RecordBatch right) throws OutOfMemoryException { super(popConfig, context, true); + outputBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR); + if (popConfig.getConditions().size() == 0) { throw new UnsupportedOperationException("Merge Join currently does not support cartesian join. This join operator was configured with 0 conditions"); } this.left = left; - this.leftIterator = new RecordIterator(left, this, oContext, 0, false); + this.leftIterator = new RecordIterator(left, this, oContext, 0, false, mergeJoinMemoryManager); this.right = right; - this.rightIterator = new RecordIterator(right, this, oContext, 1); + this.rightIterator = new RecordIterator(right, this, oContext, 1, mergeJoinMemoryManager); this.joinType = popConfig.getJoinType(); this.status = new JoinStatus(leftIterator, rightIterator, this); this.conditions = popConfig.getConditions(); @@ -171,10 +229,12 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> { case BATCH_RETURNED: allocateBatch(false); status.resetOutputPos(); + status.setTargetOutputRowCount(mergeJoinMemoryManager.getOutputRowCount()); break; case SCHEMA_CHANGED: allocateBatch(true); status.resetOutputPos(); + status.setTargetOutputRowCount(mergeJoinMemoryManager.getOutputRowCount()); break; case NO_MORE_DATA: status.resetOutputPos(); @@ -272,6 +332,7 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> { private JoinWorker generateNewWorker() throws ClassTransformationException, IOException, SchemaChangeException { final ClassGenerator<JoinWorker> cg = CodeGenerator.getRoot(JoinWorker.TEMPLATE_DEFINITION, context.getOptions()); cg.getCodeGenerator().plainJavaCapable(true); + // cg.getCodeGenerator().saveCodeForDebugging(true); final ErrorCollector collector = new ErrorCollectorImpl(); // Generate members and initialization code http://git-wip-us.apache.org/repos/asf/drill/blob/20185c9b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatchMemoryManager.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatchMemoryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatchMemoryManager.java new file mode 100644 index 0000000..b91ede0 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatchMemoryManager.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.drill.exec.record; + +import org.apache.drill.exec.vector.ValueVector; + +public abstract class AbstractRecordBatchMemoryManager { + protected static final int OFFSET_VECTOR_WIDTH = 4; + protected static final int WORST_CASE_FRAGMENTATION_FACTOR = 2; + protected static final int MAX_NUM_ROWS = ValueVector.MAX_ROW_COUNT; + protected static final int MIN_NUM_ROWS = 1; + private int outputRowCount = MAX_NUM_ROWS; + private int outgoingRowWidth; + + public void update(int inputIndex) {}; + + public void update() {}; + + public int getOutputRowCount() { + return outputRowCount; + } + + /** + * Given batchSize and rowWidth, this will set output rowCount taking into account + * the min and max that is allowed. + */ + public void setOutputRowCount(int targetBatchSize, int rowWidth) { + this.outputRowCount = adjustOutputRowCount(RecordBatchSizer.safeDivide(targetBatchSize/WORST_CASE_FRAGMENTATION_FACTOR, rowWidth)); + } + + /** + * This will adjust rowCount taking into account the min and max that is allowed. + */ + public static int adjustOutputRowCount(int rowCount) { + return (Math.min(MAX_NUM_ROWS, Math.max(rowCount, MIN_NUM_ROWS))); + } + + public void setOutgoingRowWidth(int outgoingRowWidth) { + this.outgoingRowWidth = outgoingRowWidth; + } + + public int getOutgoingRowWidth() { + return outgoingRowWidth; + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/20185c9b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchSizer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchSizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchSizer.java index f5c77ce..536c8bc 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchSizer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchSizer.java @@ -153,7 +153,7 @@ public class RecordBatchSizer { // that changes the value count of the contained vectors. UInt4Vector offsetVector = ((RepeatedValueVector) v).getOffsetVector(); - int childCount = offsetVector.getAccessor().get(valueCount); + int childCount = valueCount == 0 ? 0 : offsetVector.getAccessor().get(valueCount); if (metadata.getType().getMinorType() == MinorType.MAP) { // For map, the only data associated with the map vector @@ -305,8 +305,8 @@ public class RecordBatchSizer { public RecordBatchSizer(RecordBatch batch) { this(batch, - (batch.getSchema().getSelectionVectorMode() == BatchSchema.SelectionVectorMode.TWO_BYTE) ? - batch.getSelectionVector2() : null); + (batch.getSchema() == null ? null : (batch.getSchema().getSelectionVectorMode() == BatchSchema.SelectionVectorMode.TWO_BYTE ? + batch.getSelectionVector2() : null))); } /** * Create empirical metadata for a record batch given a vector accessible http://git-wip-us.apache.org/repos/asf/drill/blob/20185c9b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java index 01acd7f..32c69ce 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java @@ -57,18 +57,21 @@ public class RecordIterator implements VectorAccessible { private final VectorContainer container; // Holds VectorContainer of current record batch private final TreeRangeMap<Long, RecordBatchData> batches = TreeRangeMap.create(); + private final AbstractRecordBatchMemoryManager newBatchCallBack; + public RecordIterator(RecordBatch incoming, AbstractRecordBatch<?> outgoing, OperatorContext oContext, - int inputIndex) { - this(incoming, outgoing, oContext, inputIndex, true); + int inputIndex, AbstractRecordBatchMemoryManager callBack) { + this(incoming, outgoing, oContext, inputIndex, true, callBack); } public RecordIterator(RecordBatch incoming, AbstractRecordBatch<?> outgoing, OperatorContext oContext, int inputIndex, - boolean enableMarkAndReset) { + boolean enableMarkAndReset, + AbstractRecordBatchMemoryManager callBack) { this.incoming = incoming; this.outgoing = outgoing; this.inputIndex = inputIndex; @@ -78,6 +81,7 @@ public class RecordIterator implements VectorAccessible { resetIndices(); this.initialized = false; this.enableMarkAndReset = enableMarkAndReset; + this.newBatchCallBack = callBack; } private void resetIndices() { @@ -97,6 +101,9 @@ public class RecordIterator implements VectorAccessible { return; } lastOutcome = outgoing != null ? outgoing.next(inputIndex, incoming) : incoming.next(); + if ((lastOutcome == IterOutcome.OK || lastOutcome == IterOutcome.OK_NEW_SCHEMA) && newBatchCallBack != null) { + newBatchCallBack.update(inputIndex); + } } public void mark() { http://git-wip-us.apache.org/repos/asf/drill/blob/20185c9b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestOutputBatchSize.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestOutputBatchSize.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestOutputBatchSize.java index 9a4633d..99af4c2 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestOutputBatchSize.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestOutputBatchSize.java @@ -19,12 +19,14 @@ package org.apache.drill.exec.physical.unit; import com.google.common.collect.Lists; +import org.apache.calcite.rel.core.JoinRelType; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.exec.physical.base.AbstractBase; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.config.FlattenPOP; +import org.apache.drill.exec.physical.config.MergeJoinPOP; import org.apache.drill.exec.physical.impl.ScanBatch; import org.apache.drill.exec.record.RecordBatchSizer; import org.apache.drill.exec.record.RecordBatch; @@ -879,4 +881,249 @@ public class TestOutputBatchSize extends PhysicalOpUnitTestBase { opTestBuilder.go(); } -} + + @Test + public void testMergeJoinMultipleOutputBatches() throws Exception { + MergeJoinPOP mergeJoin = new MergeJoinPOP(null, null, + Lists.newArrayList(joinCond("c1", "EQUALS", "c2")), JoinRelType.INNER); + mockOpContext(mergeJoin, initReservation, maxAllocation); + + // create left input rows like this. + // "a1" : 5, "b1" : wideString, "c1" : <id> + List<String> leftJsonBatches = Lists.newArrayList(); + StringBuilder leftBatchString = new StringBuilder(); + leftBatchString.append("["); + for (int i = 0; i < numRows; i++) { + leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + i + "},"); + } + leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + numRows + "}"); + leftBatchString.append("]"); + + leftJsonBatches.add(leftBatchString.toString()); + + // create right input rows like this. + // "a2" : 6, "b2" : wideString, "c2" : <id> + List<String> rightJsonBatches = Lists.newArrayList(); + StringBuilder rightBatchString = new StringBuilder(); + rightBatchString.append("["); + for (int i = 0; i < numRows; i++) { + rightBatchString.append("{\"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + i + "},"); + } + rightBatchString.append("{\"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + numRows + "}"); + rightBatchString.append("]"); + rightJsonBatches.add(rightBatchString.toString()); + + // output rows will be like this. + // "a1" : 5, "b1" : wideString, "c1" : 1, "a2":6, "b2" : wideString, "c2": 1 + // "a1" : 5, "b1" : wideString, "c1" : 2, "a2":6, "b2" : wideString, "c2": 2 + // "a1" : 5, "b1" : wideString, "c1" : 3, "a2":6, "b2" : wideString, "c2": 3 + List<String> expectedJsonBatches = Lists.newArrayList(); + StringBuilder expectedBatchString = new StringBuilder(); + expectedBatchString.append("["); + for (int i = 0; i < numRows; i++) { + expectedBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + i); + expectedBatchString.append(", \"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + i + "},"); + } + expectedBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + numRows); + expectedBatchString.append(", \"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + numRows + "}"); + expectedBatchString.append("]"); + expectedJsonBatches.add(expectedBatchString.toString()); + + long totalSize = getExpectedSize(expectedJsonBatches); + + // set the output batch size to 1/2 of total size expected. + // We will get approximately 4 batches because of fragmentation factor of 2 accounted for in merge join. + fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize/2); + + OperatorTestBuilder opTestBuilder = opTestBuilder() + .physicalOperator(mergeJoin) + .baselineColumns("a1", "b1", "c1", "a2", "b2", "c2") + .expectedNumBatches(4) // verify number of batches + .expectedBatchSize(totalSize / 2) // verify batch size + .inputDataStreamsJson(Lists.newArrayList(leftJsonBatches, rightJsonBatches)); + + for (long i = 0; i < numRows + 1; i++) { + opTestBuilder.baselineValues(5l, wideString, i, 6l, wideString, i); + } + + opTestBuilder.go(); + } + + @Test + public void testMergeJoinSingleOutputBatch() throws Exception { + MergeJoinPOP mergeJoin = new MergeJoinPOP(null, null, + Lists.newArrayList(joinCond("c1", "EQUALS", "c2")), JoinRelType.INNER); + mockOpContext(mergeJoin, initReservation, maxAllocation); + + // create multiple batches from both sides. + numRows = 4096 * 2; + + // create left input rows like this. + // "a1" : 5, "b1" : wideString, "c1" : <id> + List<String> leftJsonBatches = Lists.newArrayList(); + StringBuilder leftBatchString = new StringBuilder(); + leftBatchString.append("["); + for (int i = 0; i < numRows; i++) { + leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + i + "},"); + } + leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + numRows + "}"); + leftBatchString.append("]"); + + leftJsonBatches.add(leftBatchString.toString()); + + // create right input rows like this. + // "a2" : 6, "b2" : wideString, "c2" : <id> + List<String> rightJsonBatches = Lists.newArrayList(); + StringBuilder rightBatchString = new StringBuilder(); + rightBatchString.append("["); + for (int i = 0; i < numRows; i++) { + rightBatchString.append("{\"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + i + "},"); + } + rightBatchString.append("{\"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + numRows + "}"); + rightBatchString.append("]"); + rightJsonBatches.add(rightBatchString.toString()); + + // output rows will be like this. + // "a1" : 5, "b1" : wideString, "c1" : 1, "a2":6, "b2" : wideString, "c2": 1 + // "a1" : 5, "b1" : wideString, "c1" : 2, "a2":6, "b2" : wideString, "c2": 2 + // "a1" : 5, "b1" : wideString, "c1" : 3, "a2":6, "b2" : wideString, "c2": 3 + List<String> expectedJsonBatches = Lists.newArrayList(); + StringBuilder expectedBatchString = new StringBuilder(); + expectedBatchString.append("["); + for (int i = 0; i < numRows; i++) { + expectedBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + i); + expectedBatchString.append(", \"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + i + "},"); + } + expectedBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + numRows); + expectedBatchString.append(", \"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + numRows + "}"); + expectedBatchString.append("]"); + expectedJsonBatches.add(expectedBatchString.toString()); + + long totalSize = getExpectedSize(expectedJsonBatches); + + // set the output batch size to twice of total size expected. + // We should get 1 batch. + fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize*2); + + OperatorTestBuilder opTestBuilder = opTestBuilder() + .physicalOperator(mergeJoin) + .baselineColumns("a1", "b1", "c1", "a2", "b2", "c2") + .expectedNumBatches(1) // verify number of batches + .expectedBatchSize(totalSize) // verify batch size + .inputDataStreamsJson(Lists.newArrayList(leftJsonBatches, rightJsonBatches)); + + for (long i = 0; i < numRows + 1; i++) { + opTestBuilder.baselineValues(5l, wideString, i, 6l, wideString, i); + } + + opTestBuilder.go(); + } + + @Test + public void testMergeJoinUpperLimit() throws Exception { + // test the upper limit of 65535 records per batch. + MergeJoinPOP mergeJoin = new MergeJoinPOP(null, null, + Lists.newArrayList(joinCond("c1", "EQUALS", "c2")), JoinRelType.LEFT); + mockOpContext(mergeJoin, initReservation, maxAllocation); + + numRows = 100000; + + // create left input rows like this. + // "a1" : 5, "c1" : <id> + List<String> leftJsonBatches = Lists.newArrayList(); + StringBuilder leftBatchString = new StringBuilder(); + leftBatchString.append("["); + for (int i = 0; i < numRows; i++) { + leftBatchString.append("{\"a1\": 5, " + "\"c1\" : " + i + "},"); + } + leftBatchString.append("{\"a1\": 5, " + "\"c1\" : " + numRows + "}"); + leftBatchString.append("]"); + + leftJsonBatches.add(leftBatchString.toString()); + + // create right input rows like this. + // "a2" : 6, "c2" : <id> + List<String> rightJsonBatches = Lists.newArrayList(); + StringBuilder rightBatchString = new StringBuilder(); + rightBatchString.append("["); + for (int i = 0; i < numRows; i++) { + rightBatchString.append("{\"a2\": 6, " + "\"c2\" : " + i + "},"); + } + rightBatchString.append("{\"a2\": 6, " + "\"c2\" : " + numRows + "}"); + rightBatchString.append("]"); + rightJsonBatches.add(rightBatchString.toString()); + + // output rows will be like this. + // "a1" : 5, "c1" : 1, "a2":6, "c2": 1 + // "a1" : 5, "c1" : 2, "a2":6, "c2": 2 + // "a1" : 5, "c1" : 3, "a2":6, "c2": 3 + + // expect two batches, batch limited by 65535 records + OperatorTestBuilder opTestBuilder = opTestBuilder() + .physicalOperator(mergeJoin) + .baselineColumns("a1", "c1", "a2", "c2") + .expectedNumBatches(2) // verify number of batches + .inputDataStreamsJson(Lists.newArrayList(leftJsonBatches, rightJsonBatches)); + + for (long i = 0; i < numRows + 1; i++) { + opTestBuilder.baselineValues(5l, i, 6l, i); + } + + opTestBuilder.go(); + } + + @Test + public void testMergeJoinLowerLimit() throws Exception { + // test the lower limit of at least one batch + MergeJoinPOP mergeJoin = new MergeJoinPOP(null, null, + Lists.newArrayList(joinCond("c1", "EQUALS", "c2")), JoinRelType.RIGHT); + mockOpContext(mergeJoin, initReservation, maxAllocation); + + numRows = 10; + + // create left input rows like this. + // "a1" : 5, "b1" : wideString, "c1" : <id> + List<String> leftJsonBatches = Lists.newArrayList(); + StringBuilder leftBatchString = new StringBuilder(); + leftBatchString.append("["); + for (int i = 0; i < numRows; i++) { + leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + i + "},"); + } + leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + numRows + "}"); + leftBatchString.append("]"); + + leftJsonBatches.add(leftBatchString.toString()); + + // create right input rows like this. + // "a2" : 6, "b2" : wideString, "c2" : <id> + List<String> rightJsonBatches = Lists.newArrayList(); + StringBuilder rightBatchString = new StringBuilder(); + rightBatchString.append("["); + for (int i = 0; i < numRows; i++) { + rightBatchString.append("{\"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + i + "},"); + } + rightBatchString.append("{\"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + numRows + "}"); + rightBatchString.append("]"); + rightJsonBatches.add(rightBatchString.toString()); + + // output rows will be like this. + // "a1" : 5, "b1" : wideString, "c1" : 1, "a2":6, "b2" : wideString, "c2": 1 + // "a1" : 5, "b1" : wideString, "c1" : 2, "a2":6, "b2" : wideString, "c2": 2 + // "a1" : 5, "b1" : wideString, "c1" : 3, "a2":6, "b2" : wideString, "c2": 3 + + // set very low value of output batch size so we can do only one row per batch. + fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", 128); + + OperatorTestBuilder opTestBuilder = opTestBuilder() + .physicalOperator(mergeJoin) + .baselineColumns("a1", "b1", "c1", "a2", "b2", "c2") + .expectedNumBatches(10) // verify number of batches + .inputDataStreamsJson(Lists.newArrayList(leftJsonBatches, rightJsonBatches)); + + for (long i = 0; i < numRows + 1; i++) { + opTestBuilder.baselineValues(5l, wideString, i, 6l, wideString, i); + } + + opTestBuilder.go(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/drill/blob/20185c9b/exec/java-exec/src/test/java/org/apache/drill/exec/record/TestRecordIterator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/record/TestRecordIterator.java b/exec/java-exec/src/test/java/org/apache/drill/exec/record/TestRecordIterator.java index e88bb41..7e13dad 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/record/TestRecordIterator.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/record/TestRecordIterator.java @@ -77,7 +77,7 @@ public class TestRecordIterator extends PopUnitTestBase { OpProfileDef def = new OpProfileDef(dummyPop.getOperatorId(), UserBitShared.CoreOperatorType.MOCK_SUB_SCAN_VALUE, OperatorUtilities.getChildCount(dummyPop)); OperatorStats stats = exec.getContext().getStats().newOperatorStats(def, exec.getContext().getAllocator()); - RecordIterator iter = new RecordIterator(singleBatch, null, exec.getContext().newOperatorContext(dummyPop, stats), 0, false); + RecordIterator iter = new RecordIterator(singleBatch, null, exec.getContext().newOperatorContext(dummyPop, stats), 0, false, null); int totalRecords = 0; List<ValueVector> vectors = null; @@ -133,7 +133,7 @@ public class TestRecordIterator extends PopUnitTestBase { OpProfileDef def = new OpProfileDef(dummyPop.getOperatorId(), UserBitShared.CoreOperatorType.MOCK_SUB_SCAN_VALUE, OperatorUtilities.getChildCount(dummyPop)); OperatorStats stats = exec.getContext().getStats().newOperatorStats(def, exec.getContext().getAllocator()); - RecordIterator iter = new RecordIterator(singleBatch, null, exec.getContext().newOperatorContext(dummyPop, stats), 0); + RecordIterator iter = new RecordIterator(singleBatch, null, exec.getContext().newOperatorContext(dummyPop, stats), 0, null); List<ValueVector> vectors = null; // batche sizes // 1, 100, 10, 10000, 1, 1000 http://git-wip-us.apache.org/repos/asf/drill/blob/20185c9b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/AbstractMapVector.java ---------------------------------------------------------------------- diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/AbstractMapVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/AbstractMapVector.java index 5515b7a..3682397 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/AbstractMapVector.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/AbstractMapVector.java @@ -297,6 +297,10 @@ public abstract class AbstractMapVector extends AbstractContainerVector { @Override public int getPayloadByteCount(int valueCount) { + if (valueCount == 0) { + return 0; + } + int count = 0; for (final ValueVector v : vectors.values()) { http://git-wip-us.apache.org/repos/asf/drill/blob/20185c9b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/BaseRepeatedValueVector.java ---------------------------------------------------------------------- diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/BaseRepeatedValueVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/BaseRepeatedValueVector.java index 4b0c1b5..02243c8 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/BaseRepeatedValueVector.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/BaseRepeatedValueVector.java @@ -220,6 +220,9 @@ public abstract class BaseRepeatedValueVector extends BaseValueVector implements @Override public int getPayloadByteCount(int valueCount) { + if (valueCount == 0) { + return 0; + } int entryCount = offsets.getAccessor().get(valueCount); return offsets.getPayloadByteCount(valueCount) + vector.getPayloadByteCount(entryCount); } http://git-wip-us.apache.org/repos/asf/drill/blob/20185c9b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/ListVector.java ---------------------------------------------------------------------- diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/ListVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/ListVector.java index 7de5ce6..45d9160 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/ListVector.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/ListVector.java @@ -333,6 +333,10 @@ public class ListVector extends BaseRepeatedValueVector { @Override public int getPayloadByteCount(int valueCount) { + if (valueCount == 0) { + return 0; + } + return offsets.getPayloadByteCount(valueCount) + bits.getPayloadByteCount(valueCount) + super.getPayloadByteCount(valueCount); } http://git-wip-us.apache.org/repos/asf/drill/blob/20185c9b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java ---------------------------------------------------------------------- diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java index 6442417..4a7eda1 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java @@ -442,6 +442,9 @@ public class RepeatedListVector extends AbstractContainerVector @Override public int getPayloadByteCount(int valueCount) { + if (valueCount == 0) { + return 0; + } return delegate.getPayloadByteCount(valueCount); }
