This is an automated email from the ASF dual-hosted git repository. parthc pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/drill.git
commit 480ade960767cf138de184c7873792e96b0e9a9f Author: Padma Penumarthy <[email protected]> AuthorDate: Wed May 30 14:00:16 2018 -0700 DRILL-6236:Batch sizing for hash join This closes #1227 --- .../exec/physical/impl/join/HashJoinBatch.java | 125 ++++--- .../join/HashJoinMechanicalMemoryCalculator.java | 1 + .../impl/join/HashJoinMemoryCalculator.java | 1 + .../impl/join/HashJoinMemoryCalculatorImpl.java | 35 +- .../exec/physical/impl/join/HashJoinProbe.java | 2 + .../physical/impl/join/HashJoinProbeTemplate.java | 22 +- .../exec/record/AbstractBinaryRecordBatch.java | 4 + .../drill/exec/record/JoinBatchMemoryManager.java | 61 ++-- .../exec/record/RecordBatchMemoryManager.java | 26 +- .../apache/drill/exec/record/RecordBatchSizer.java | 49 ++- .../impl/join/TestBuildSidePartitioningImpl.java | 20 +- .../exec/physical/unit/TestOutputBatchSize.java | 386 +++++++++++++++++++++ 12 files changed, 603 insertions(+), 129 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java index ee7a8a3..4267077 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java @@ -57,16 +57,19 @@ import org.apache.drill.exec.physical.impl.common.HashPartition; import org.apache.drill.exec.physical.impl.spill.SpillSet; import org.apache.drill.exec.record.AbstractBinaryRecordBatch; import org.apache.drill.exec.record.BatchSchema; +import org.apache.drill.exec.record.JoinBatchMemoryManager; import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.record.RecordBatch; +import org.apache.drill.exec.record.RecordBatchSizer; import org.apache.drill.exec.record.VectorWrapper; -import org.apache.drill.exec.vector.FixedWidthVector; import org.apache.drill.exec.vector.IntVector; import org.apache.drill.exec.vector.ValueVector; -import org.apache.drill.exec.vector.VariableWidthVector; import org.apache.drill.exec.vector.complex.AbstractContainerVector; import org.apache.calcite.rel.core.JoinRelType; +import static org.apache.drill.exec.record.JoinBatchMemoryManager.LEFT_INDEX; +import static org.apache.drill.exec.record.JoinBatchMemoryManager.RIGHT_INDEX; + /** * This class implements the runtime execution for the Hash-Join operator * supporting INNER, LEFT OUTER, RIGHT OUTER, and FULL OUTER joins @@ -95,11 +98,6 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { */ private int RECORDS_PER_BATCH; // internal batches - /** - * The maximum number of records in each outgoing batch. - */ - private static final int TARGET_RECORDS_PER_BATCH = 4000; - // Join type, INNER, LEFT, RIGHT or OUTER private final JoinRelType joinType; @@ -172,7 +170,8 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { public String outerSpillFile; int cycleNum; int origPartn; - int prevOrigPartn; } + int prevOrigPartn; + } /** * Queue of spilled partitions to process. @@ -181,7 +180,6 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { private HJSpilledPartition spilledInners[]; // for the outer to find the partition public enum Metric implements MetricDef { - NUM_BUCKETS, NUM_ENTRIES, NUM_RESIZING, @@ -190,8 +188,19 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { SPILLED_PARTITIONS, // number of original partitions spilled to disk SPILL_MB, // Number of MB of data spilled to disk. This amount is first written, // then later re-read. So, disk I/O is twice this amount. - SPILL_CYCLE // 0 - no spill, 1 - spill, 2 - SECONDARY, 3 - TERTIARY - ; + SPILL_CYCLE, // 0 - no spill, 1 - spill, 2 - SECONDARY, 3 - TERTIARY + LEFT_INPUT_BATCH_COUNT, + LEFT_AVG_INPUT_BATCH_BYTES, + LEFT_AVG_INPUT_ROW_BYTES, + LEFT_INPUT_RECORD_COUNT, + RIGHT_INPUT_BATCH_COUNT, + RIGHT_AVG_INPUT_BATCH_BYTES, + RIGHT_AVG_INPUT_ROW_BYTES, + RIGHT_INPUT_RECORD_COUNT, + OUTPUT_BATCH_COUNT, + AVG_OUTPUT_BATCH_BYTES, + AVG_OUTPUT_ROW_BYTES, + OUTPUT_RECORD_COUNT; // duplicate for hash ag @@ -221,12 +230,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { throw new SchemaChangeException(e); } - // Build the container schema and set the counts - for (final VectorWrapper<?> w : container) { - w.getValueVector().allocateNew(); - } container.buildSchema(BatchSchema.SelectionVectorMode.NONE); - container.setRecordCount(outputRecords); } @Override @@ -234,6 +238,15 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { leftUpstream = sniffNonEmptyBatch(0, left); rightUpstream = sniffNonEmptyBatch(1, right); + // For build side, use aggregate i.e. average row width across batches + batchMemoryManager.update(LEFT_INDEX, 0); + batchMemoryManager.update(RIGHT_INDEX, 0, true); + + if (logger.isDebugEnabled()) { + logger.debug("BATCH_STATS, incoming left:\n {}", batchMemoryManager.getRecordBatchSizer(LEFT_INDEX)); + logger.debug("BATCH_STATS, incoming right:\n {}", batchMemoryManager.getRecordBatchSizer(RIGHT_INDEX)); + } + if (leftUpstream == IterOutcome.STOP || rightUpstream == IterOutcome.STOP) { state = BatchState.STOP; return false; @@ -333,10 +346,21 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { joinType != JoinRelType.INNER) { // or if this is a left/full outer join // Allocate the memory for the vectors in the output container - allocateVectors(); + batchMemoryManager.allocateVectors(container); + hashJoinProbe.setTargetOutputCount(batchMemoryManager.getOutputRowCount()); outputRecords = hashJoinProbe.probeAndProject(); + for (final VectorWrapper<?> v : container) { + v.getValueVector().getMutator().setValueCount(outputRecords); + } + container.setRecordCount(outputRecords); + + batchMemoryManager.updateOutgoingStats(outputRecords); + if (logger.isDebugEnabled()) { + logger.debug("BATCH_STATS, outgoing:\n {}", new RecordBatchSizer(this)); + } + /* We are here because of one the following * 1. Completed processing of all the records and we are done * 2. We've filled up the outgoing batch to the maximum and we need to return upstream @@ -347,10 +371,6 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { state = BatchState.NOT_FIRST; } - for (final VectorWrapper<?> v : container) { - v.getValueVector().getMutator().setValueCount(outputRecords); - } - return IterOutcome.OK; } @@ -557,7 +577,8 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { RECORDS_PER_BATCH, maxBatchSize, maxBatchSize, - TARGET_RECORDS_PER_BATCH, + batchMemoryManager.getOutputRowCount(), + batchMemoryManager.getOutputBatchSize(), HashTable.DEFAULT_LOAD_FACTOR); disableSpilling(null); @@ -628,7 +649,8 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { RECORDS_PER_BATCH, maxBatchSize, maxBatchSize, - TARGET_RECORDS_PER_BATCH, + batchMemoryManager.getOutputRowCount(), + batchMemoryManager.getOutputBatchSize(), HashTable.DEFAULT_LOAD_FACTOR); if (firstCycle && doMemoryCalculation) { @@ -665,6 +687,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { for (HashPartition partn : partitions) { partn.updateBatches(); } // Fall through case OK: + batchMemoryManager.update(buildBatch, RIGHT_INDEX, 0, true); // Special treatment (when no spill, and single partition) -- use the incoming vectors as they are (no row copy) if ( numPartitions == 1 ) { partitions[0].appendBatch(buildBatch); @@ -803,22 +826,6 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { } - private void allocateVectors() { - for (final VectorWrapper<?> vectorWrapper : container) { - ValueVector valueVector = vectorWrapper.getValueVector(); - - if (valueVector instanceof FixedWidthVector) { - ((FixedWidthVector) valueVector).allocateNew(TARGET_RECORDS_PER_BATCH); - } else if (valueVector instanceof VariableWidthVector) { - ((VariableWidthVector) valueVector).allocateNew(8 * TARGET_RECORDS_PER_BATCH, TARGET_RECORDS_PER_BATCH); - } else { - valueVector.allocateNew(); - } - } - - container.setRecordCount(0); // reset container's counter back to zero records - } - // (After the inner side was read whole) - Has that inner partition spilled public boolean isSpilledInner(int part) { if ( spilledInners == null ) { return false; } // empty inner @@ -879,6 +886,10 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { // Create empty partitions (in the ctor - covers the case where right side is empty) partitions = new HashPartition[0]; + + // get the output batch size from config. + int configuredBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR); + batchMemoryManager = new JoinBatchMemoryManager(configuredBatchSize, left, right); } /** @@ -966,6 +977,23 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { buildBatch.kill(sendUpstream); } + public void updateMetrics() { + stats.setLongStat(HashJoinBatch.Metric.LEFT_INPUT_BATCH_COUNT, batchMemoryManager.getNumIncomingBatches(LEFT_INDEX)); + stats.setLongStat(HashJoinBatch.Metric.LEFT_AVG_INPUT_BATCH_BYTES, batchMemoryManager.getAvgInputBatchSize(LEFT_INDEX)); + stats.setLongStat(HashJoinBatch.Metric.LEFT_AVG_INPUT_ROW_BYTES, batchMemoryManager.getAvgInputRowWidth(LEFT_INDEX)); + stats.setLongStat(HashJoinBatch.Metric.LEFT_INPUT_RECORD_COUNT, batchMemoryManager.getTotalInputRecords(LEFT_INDEX)); + + stats.setLongStat(HashJoinBatch.Metric.RIGHT_INPUT_BATCH_COUNT, batchMemoryManager.getNumIncomingBatches(RIGHT_INDEX)); + stats.setLongStat(HashJoinBatch.Metric.RIGHT_AVG_INPUT_BATCH_BYTES, batchMemoryManager.getAvgInputBatchSize(RIGHT_INDEX)); + stats.setLongStat(HashJoinBatch.Metric.RIGHT_AVG_INPUT_ROW_BYTES, batchMemoryManager.getAvgInputRowWidth(RIGHT_INDEX)); + stats.setLongStat(HashJoinBatch.Metric.RIGHT_INPUT_RECORD_COUNT, batchMemoryManager.getTotalInputRecords(RIGHT_INDEX)); + + stats.setLongStat(HashJoinBatch.Metric.OUTPUT_BATCH_COUNT, batchMemoryManager.getNumOutgoingBatches()); + stats.setLongStat(HashJoinBatch.Metric.AVG_OUTPUT_BATCH_BYTES, batchMemoryManager.getAvgOutputBatchSize()); + stats.setLongStat(HashJoinBatch.Metric.AVG_OUTPUT_ROW_BYTES, batchMemoryManager.getAvgOutputRowWidth()); + stats.setLongStat(HashJoinBatch.Metric.OUTPUT_RECORD_COUNT, batchMemoryManager.getTotalOutputRecords()); + } + @Override public void close() { if ( cycleNum > 0 ) { // spilling happened @@ -973,6 +1001,25 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { // SpilledRecordBatch "scanners" as it only knows about the original left/right ops. killIncoming(false); } + + updateMetrics(); + + logger.debug("BATCH_STATS, incoming aggregate left: batch count : {}, avg bytes : {}, avg row bytes : {}, record count : {}", + batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX), + batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX), + batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX), + batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX)); + + logger.debug("BATCH_STATS, incoming aggregate right: batch count : {}, avg bytes : {}, avg row bytes : {}, record count : {}", + batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX), + batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX), + batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX), + batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX)); + + logger.debug("BATCH_STATS, outgoing aggregate: batch count : {}, avg bytes : {}, avg row bytes : {}, record count : {}", + batchMemoryManager.getNumOutgoingBatches(), batchMemoryManager.getAvgOutputBatchSize(), + batchMemoryManager.getAvgOutputRowWidth(), batchMemoryManager.getTotalOutputRecords()); + this.cleanup(); super.close(); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMechanicalMemoryCalculator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMechanicalMemoryCalculator.java index 618e80e..fb087a0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMechanicalMemoryCalculator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMechanicalMemoryCalculator.java @@ -77,6 +77,7 @@ public class HashJoinMechanicalMemoryCalculator implements HashJoinMemoryCalcula int maxBatchNumRecordsBuild, int maxBatchNumRecordsProbe, int outputBatchNumRecords, + int outputBatchSize, double loadFactor) { this.initialPartitions = initialPartitions; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculator.java index 71292a5..868fbfd 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculator.java @@ -100,6 +100,7 @@ public interface HashJoinMemoryCalculator extends HashJoinStateCalculator<HashJo int maxBatchNumRecordsBuild, int maxBatchNumRecordsProbe, int outputBatchNumRecords, + int outputBatchSize, double loadFactor); void setPartitionStatSet(PartitionStatSet partitionStatSet); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculatorImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculatorImpl.java index ed0adc5..37f3329 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculatorImpl.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculatorImpl.java @@ -142,6 +142,7 @@ public class HashJoinMemoryCalculatorImpl implements HashJoinMemoryCalculator { int maxBatchNumRecordsBuild, int maxBatchNumRecordsProbe, int outputBatchNumRecords, + int outputBatchSize, double loadFactor) { this.initialPartitions = initialPartitions; } @@ -203,7 +204,7 @@ public class HashJoinMemoryCalculatorImpl implements HashJoinMemoryCalculator { * <h1>Life Cycle</h1> * <p> * <ul> - * <li><b>Step 0:</b> Call {@link #initialize(boolean, boolean, RecordBatch, RecordBatch, Set, long, int, int, int, int, int, int, double)}. + * <li><b>Step 0:</b> Call {@link #initialize(boolean, boolean, RecordBatch, RecordBatch, Set, long, int, int, int, int, int, int, int, double)}. * This will initialize the StateCalculate with the additional information it needs.</li> * <li><b>Step 1:</b> Call {@link #getNumPartitions()} to see the number of partitions that fit in memory.</li> * <li><b>Step 2:</b> Call {@link #shouldSpill()} To determine if spilling needs to occurr.</li> @@ -233,9 +234,7 @@ public class HashJoinMemoryCalculatorImpl implements HashJoinMemoryCalculator { private int partitions; private int recordsPerPartitionBatchBuild; private int recordsPerPartitionBatchProbe; - private int outputBatchNumRecords; - private Map<String, Long> buildValueSizes; - private Map<String, Long> probeValueSizes; + private int outputBatchSize; private Map<String, Long> keySizes; private boolean autoTune; private boolean reserveHash; @@ -273,6 +272,7 @@ public class HashJoinMemoryCalculatorImpl implements HashJoinMemoryCalculator { int maxBatchNumRecordsBuild, int maxBatchNumRecordsProbe, int outputBatchNumRecords, + int outputBatchSize, double loadFactor) { Preconditions.checkNotNull(buildSideBatch); Preconditions.checkNotNull(probeSideBatch); @@ -300,8 +300,6 @@ public class HashJoinMemoryCalculatorImpl implements HashJoinMemoryCalculator { initialize(autoTune, reserveHash, - buildValueSizes, - probeValueSizes, keySizes, memoryAvailable, initialPartitions, @@ -313,7 +311,7 @@ public class HashJoinMemoryCalculatorImpl implements HashJoinMemoryCalculator { recordsPerPartitionBatchProbe, maxBatchNumRecordsBuild, maxBatchNumRecordsProbe, - outputBatchNumRecords, + outputBatchSize, loadFactor); } @@ -352,8 +350,6 @@ public class HashJoinMemoryCalculatorImpl implements HashJoinMemoryCalculator { @VisibleForTesting protected void initialize(boolean autoTune, boolean reserveHash, - CaseInsensitiveMap<Long> buildValueSizes, - CaseInsensitiveMap<Long> probeValueSizes, CaseInsensitiveMap<Long> keySizes, long memoryAvailable, int initialPartitions, @@ -365,7 +361,7 @@ public class HashJoinMemoryCalculatorImpl implements HashJoinMemoryCalculator { int recordsPerPartitionBatchProbe, int maxBatchNumRecordsBuild, int maxBatchNumRecordsProbe, - int outputBatchNumRecords, + int outputBatchSize, double loadFactor) { Preconditions.checkState(!firstInitialized); Preconditions.checkArgument(initialPartitions >= 1); @@ -374,8 +370,6 @@ public class HashJoinMemoryCalculatorImpl implements HashJoinMemoryCalculator { this.loadFactor = loadFactor; this.autoTune = autoTune; this.reserveHash = reserveHash; - this.buildValueSizes = Preconditions.checkNotNull(buildValueSizes); - this.probeValueSizes = Preconditions.checkNotNull(probeValueSizes); this.keySizes = Preconditions.checkNotNull(keySizes); this.memoryAvailable = memoryAvailable; this.buildBatchSize = buildBatchSize; @@ -387,7 +381,7 @@ public class HashJoinMemoryCalculatorImpl implements HashJoinMemoryCalculator { this.recordsPerPartitionBatchProbe = recordsPerPartitionBatchProbe; this.maxBatchNumRecordsBuild = maxBatchNumRecordsBuild; this.maxBatchNumRecordsProbe = maxBatchNumRecordsProbe; - this.outputBatchNumRecords = outputBatchNumRecords; + this.outputBatchSize = outputBatchSize; calculateMemoryUsage(); @@ -448,8 +442,7 @@ public class HashJoinMemoryCalculatorImpl implements HashJoinMemoryCalculator { safetyFactor, reserveHash); - maxOutputBatchSize = computeMaxOutputBatchSize(buildValueSizes, probeValueSizes, keySizes, - outputBatchNumRecords, safetyFactor, fragmentationFactor); + maxOutputBatchSize = (long) ((double)outputBatchSize * fragmentationFactor * safetyFactor); long probeReservedMemory; @@ -519,18 +512,6 @@ public class HashJoinMemoryCalculatorImpl implements HashJoinMemoryCalculator { } } - public static long computeMaxOutputBatchSize(Map<String, Long> buildValueSizes, - Map<String, Long> probeValueSizes, - Map<String, Long> keySizes, - int outputNumRecords, - double safetyFactor, - double fragmentationFactor) { - long outputSize = HashTableSizeCalculatorConservativeImpl.computeVectorSizes(keySizes, outputNumRecords, safetyFactor) - + HashTableSizeCalculatorConservativeImpl.computeVectorSizes(buildValueSizes, outputNumRecords, safetyFactor) - + HashTableSizeCalculatorConservativeImpl.computeVectorSizes(probeValueSizes, outputNumRecords, safetyFactor); - return RecordBatchSizer.multiplyByFactor(outputSize, fragmentationFactor); - } - @Override public boolean shouldSpill() { Preconditions.checkState(initialized); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java index f212605..5059b18 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java @@ -42,4 +42,6 @@ public interface HashJoinProbe { void setupHashJoinProbe(RecordBatch probeBatch, HashJoinBatch outgoing, JoinRelType joinRelType, RecordBatch.IterOutcome leftStartState, HashPartition[] partitions, int cycleNum, VectorContainer container, HashJoinBatch.HJSpilledPartition[] spilledInners, boolean buildSideIsEmpty, int numPartitions, int rightHVColPosition); int probeAndProject() throws SchemaChangeException; void changeToFinalProbeState(); + void setTargetOutputCount(int targetOutputCount); + int getOutputCount(); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java index 75c3073..46f2fa3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java @@ -31,6 +31,8 @@ import org.apache.calcite.rel.core.JoinRelType; import org.apache.drill.exec.vector.IntVector; import org.apache.drill.exec.vector.ValueVector; +import static org.apache.drill.exec.record.JoinBatchMemoryManager.LEFT_INDEX; + public abstract class HashJoinProbeTemplate implements HashJoinProbe { VectorContainer container; // the outgoing container @@ -45,8 +47,6 @@ public abstract class HashJoinProbeTemplate implements HashJoinProbe { private HashJoinBatch outgoingJoinBatch = null; - private static final int TARGET_RECORDS_PER_BATCH = 4000; - // Number of records to process on the probe side private int recordsToProcess = 0; @@ -83,6 +83,16 @@ public abstract class HashJoinProbeTemplate implements HashJoinProbe { private int partitionMask = 0; // numPartitions - 1 private int bitsInMask = 0; // number of bits in the MASK private int rightHVColPosition; + private int targetOutputRecords; + + @Override + public void setTargetOutputCount(int targetOutputRecords) { + this.targetOutputRecords = targetOutputRecords; + } + + public int getOutputCount() { + return outputRecords; + } /** * Setup the Hash Join Probe object @@ -209,7 +219,7 @@ public abstract class HashJoinProbeTemplate implements HashJoinProbe { private void executeProjectRightPhase(int currBuildPart) { - while (outputRecords < TARGET_RECORDS_PER_BATCH && recordsProcessed < recordsToProcess) { + while (outputRecords < targetOutputRecords && recordsProcessed < recordsToProcess) { outputRecords = outputRow(partitions[currBuildPart].getContainers(), unmatchedBuildIndexes.get(recordsProcessed), null /* no probeBatch */, 0 /* no probe index */ ); @@ -219,7 +229,7 @@ public abstract class HashJoinProbeTemplate implements HashJoinProbe { private void executeProbePhase() throws SchemaChangeException { - while (outputRecords < TARGET_RECORDS_PER_BATCH && probeState != ProbeState.DONE && probeState != ProbeState.PROJECT_RIGHT) { + while (outputRecords < targetOutputRecords && probeState != ProbeState.DONE && probeState != ProbeState.PROJECT_RIGHT) { // Check if we have processed all records in this batch we need to invoke next if (recordsProcessed == recordsToProcess) { @@ -262,6 +272,7 @@ public abstract class HashJoinProbeTemplate implements HashJoinProbe { probeBatch.getSchema()); } case OK: + setTargetOutputCount(outgoingJoinBatch.getBatchMemoryManager().update(probeBatch, LEFT_INDEX,outputRecords)); recordsToProcess = probeBatch.getRecordCount(); recordsProcessed = 0; // If we received an empty batch do nothing @@ -274,10 +285,9 @@ public abstract class HashJoinProbeTemplate implements HashJoinProbe { } } - int probeIndex = -1; + int probeIndex = -1; // Check if we need to drain the next row in the probe side if (getNextRecord) { - if ( !buildSideIsEmpty ) { int hashCode = ( cycleNum == 0 ) ? partitions[0].getProbeHashCode(recordsProcessed) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java index 9052836..d75463b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java @@ -121,6 +121,10 @@ public abstract class AbstractBinaryRecordBatch<T extends PhysicalOperator> exte return (leftOutcome == IterOutcome.NONE && rightOutcome == IterOutcome.NONE); } + public RecordBatchMemoryManager getBatchMemoryManager() { + return batchMemoryManager; + } + protected void updateBatchMemoryManagerStats() { stats.setLongStat(Metric.LEFT_INPUT_BATCH_COUNT, batchMemoryManager.getNumIncomingBatches(LEFT_INDEX)); stats.setLongStat(Metric.LEFT_AVG_INPUT_BATCH_BYTES, batchMemoryManager.getAvgInputBatchSize(LEFT_INDEX)); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java index c147cf7..16b06fe 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java @@ -20,42 +20,26 @@ package org.apache.drill.exec.record; public class JoinBatchMemoryManager extends RecordBatchMemoryManager { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JoinBatchMemoryManager.class); - private int leftRowWidth; - - private int rightRowWidth; - - private RecordBatch leftIncoming; - - private RecordBatch rightIncoming; + private int rowWidth[]; + private RecordBatch recordBatch[]; private static final int numInputs = 2; - public static final int LEFT_INDEX = 0; - public static final int RIGHT_INDEX = 1; public JoinBatchMemoryManager(int outputBatchSize, RecordBatch leftBatch, RecordBatch rightBatch) { super(numInputs, outputBatchSize); - this.leftIncoming = leftBatch; - this.rightIncoming = rightBatch; + recordBatch = new RecordBatch[numInputs]; + recordBatch[LEFT_INDEX] = leftBatch; + recordBatch[RIGHT_INDEX] = rightBatch; + rowWidth = new int[numInputs]; } - @Override - public int update(int inputIndex, int outputPosition) { - switch (inputIndex) { - case LEFT_INDEX: - setRecordBatchSizer(inputIndex, new RecordBatchSizer(leftIncoming)); - leftRowWidth = getRecordBatchSizer(inputIndex).getRowAllocSize(); - break; - case RIGHT_INDEX: - setRecordBatchSizer(inputIndex, new RecordBatchSizer(rightIncoming)); - rightRowWidth = getRecordBatchSizer(inputIndex).getRowAllocSize(); - default: - break; - } - + private int updateInternal(int inputIndex, int outputPosition, boolean useAggregate) { updateIncomingStats(inputIndex); - final int newOutgoingRowWidth = leftRowWidth + rightRowWidth; + rowWidth[inputIndex] = useAggregate ? (int) getAvgInputRowWidth(inputIndex) : getRecordBatchSizer(inputIndex).getRowAllocSize(); + + final int newOutgoingRowWidth = rowWidth[LEFT_INDEX] + rowWidth[RIGHT_INDEX]; // If outgoing row width is 0, just return. This is possible for empty batches or // when first set of batches come with OK_NEW_SCHEMA and no data. @@ -85,13 +69,24 @@ public class JoinBatchMemoryManager extends RecordBatchMemoryManager { } @Override - public RecordBatchSizer.ColumnSize getColumnSize(String name) { - RecordBatchSizer leftSizer = getRecordBatchSizer(LEFT_INDEX); - RecordBatchSizer rightSizer = getRecordBatchSizer(RIGHT_INDEX); + public int update(int inputIndex, int outputPosition, boolean useAggregate) { + setRecordBatchSizer(inputIndex, new RecordBatchSizer(recordBatch[inputIndex])); + return updateInternal(inputIndex, outputPosition, useAggregate); + } - if (leftSizer != null && leftSizer.getColumn(name) != null) { - return leftSizer.getColumn(name); - } - return rightSizer == null ? null : rightSizer.getColumn(name); + @Override + public int update(int inputIndex, int outputPosition) { + return update(inputIndex, outputPosition, false); + } + + @Override + public int update(RecordBatch batch, int inputIndex, int outputPosition, boolean useAggregate) { + setRecordBatchSizer(inputIndex, new RecordBatchSizer(batch)); + return updateInternal(inputIndex, outputPosition, useAggregate); + } + + @Override + public int update(RecordBatch batch, int inputIndex, int outputPosition) { + return update(batch, inputIndex, outputPosition, false); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchMemoryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchMemoryManager.java index 759e597..993f3cb 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchMemoryManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchMemoryManager.java @@ -163,6 +163,19 @@ public class RecordBatchMemoryManager { updateIncomingStats(index); } + public int update(int inputIndex, int outputPosition, boolean useAggregate) { + // by default just return the outputRowCount + return getOutputRowCount(); + } + + public int update(RecordBatch batch, int inputIndex, int outputPosition) { + return getOutputRowCount(); + } + + public int update(RecordBatch batch, int inputIndex, int outputPosition, boolean useAggregate) { + return getOutputRowCount(); + } + public int getOutputRowCount() { return outputRowCount; } @@ -205,8 +218,7 @@ public class RecordBatchMemoryManager { } public void setRecordBatchSizer(RecordBatchSizer sizer) { - this.sizer[DEFAULT_INPUT_INDEX] = sizer; - inputBatchStats[DEFAULT_INPUT_INDEX] = new BatchStats(); + setRecordBatchSizer(DEFAULT_INPUT_INDEX, sizer); } public RecordBatchSizer getRecordBatchSizer(int index) { @@ -261,7 +273,6 @@ public class RecordBatchMemoryManager { return UInt4Vector.VALUE_WIDTH; } - public void allocateVectors(VectorContainer container, int recordCount) { // Allocate memory for the vectors. // This will iteratively allocate memory for all nested columns underneath. @@ -269,10 +280,7 @@ public class RecordBatchMemoryManager { RecordBatchSizer.ColumnSize colSize = getColumnSize(w.getField().getName()); colSize.allocateVector(w.getValueVector(), recordCount); } - } - - public void allocateVectors(VectorContainer container) { - allocateVectors(container, outputRowCount); + container.setRecordCount(0); } public void allocateVectors(List<ValueVector> valueVectors, int recordCount) { @@ -284,6 +292,10 @@ public class RecordBatchMemoryManager { } } + public void allocateVectors(VectorContainer container) { + allocateVectors(container, outputRowCount); + } + public void allocateVectors(List<ValueVector> valueVectors) { allocateVectors(valueVectors, outputRowCount); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchSizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchSizer.java index 7e531f8..a5cb05b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchSizer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchSizer.java @@ -251,12 +251,50 @@ public class RecordBatchSizer { } /** - * This returns actual entry size if rowCount > 0 or standard size otherwise. + * This returns actual entry size if rowCount > 0 or allocation size otherwise. * Use this for the cases when you might get empty batches with schema * and you still need to do memory calculations based on just schema. */ public int getAllocSizePerEntry() { - return rowCount() == 0 ? getStdNetSizePerEntry() : getNetSizePerEntry(); + if (rowCount() != 0) { + return getNetSizePerEntry(); + } + + int stdNetSize; + try { + stdNetSize = TypeHelper.getSize(metadata.getType()); + + // TypeHelper estimates 50 bytes for variable length. That is pretty high number + // to use as approximation for empty batches. Use 8 instead. + switch (metadata.getType().getMinorType()) { + case VARBINARY: + case VARCHAR: + case VAR16CHAR: + case VARDECIMAL: + stdNetSize = 4 + 8; + break; + } + } catch (Exception e) { + stdNetSize = 0; + } + + if (isOptional) { + stdNetSize += BIT_VECTOR_WIDTH; + } + + if (isRepeated) { + stdNetSize = (stdNetSize * STD_REPETITION_FACTOR) + OFFSET_VECTOR_WIDTH; + } + + for (ColumnSize columnSize : children.values()) { + stdNetSize += columnSize.getAllocSizePerEntry(); + } + + if (isRepeatedList()) { + stdNetSize = (stdNetSize * STD_REPETITION_FACTOR) + OFFSET_VECTOR_WIDTH; + } + + return stdNetSize; } /** @@ -777,6 +815,13 @@ public class RecordBatchSizer { return (int) Math.ceil((double) num / denom); } + public static int safeDivide(int num, double denom) { + if (denom == 0) { + return 0; + } + return (int) Math.ceil((double) num / denom); + } + public int rowCount() { return rowCount; } public int stdRowWidth() { return stdRowWidth; } public int grossRowWidth() { return grossRowWidth; } diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestBuildSidePartitioningImpl.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestBuildSidePartitioningImpl.java index 30c0c73..2a44edb 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestBuildSidePartitioningImpl.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestBuildSidePartitioningImpl.java @@ -39,8 +39,6 @@ public class TestBuildSidePartitioningImpl { calc.initialize(true, false, - buildValueSizes, - probeValueSizes, keySizes, 200, 2, @@ -52,7 +50,7 @@ public class TestBuildSidePartitioningImpl { 5, maxBatchNumRecords, maxBatchNumRecords, - 10, + 16000, .75); final HashJoinMemoryCalculator.PartitionStatSet partitionStatSet = @@ -84,8 +82,6 @@ public class TestBuildSidePartitioningImpl { calc.initialize(false, true, - buildValueSizes, - probeValueSizes, keySizes, 350, 2, @@ -97,7 +93,7 @@ public class TestBuildSidePartitioningImpl { 5, maxBatchNumRecords, maxBatchNumRecords, - 10, + 16000, .75); final HashJoinMemoryCalculator.PartitionStatSet partitionStatSet = @@ -130,8 +126,6 @@ public class TestBuildSidePartitioningImpl { calc.initialize( true, false, - buildValueSizes, - probeValueSizes, keySizes, 200, 4, @@ -143,7 +137,7 @@ public class TestBuildSidePartitioningImpl { 5, maxBatchNumRecords, maxBatchNumRecords, - 10, + 16000, .75); final HashJoinMemoryCalculator.PartitionStatSet partitionStatSet = @@ -178,8 +172,6 @@ public class TestBuildSidePartitioningImpl { calc.initialize( true, false, - buildValueSizes, - probeValueSizes, keySizes, 180, 2, @@ -191,7 +183,7 @@ public class TestBuildSidePartitioningImpl { 5, maxBatchNumRecords, maxBatchNumRecords, - 10, + 16000, .75); final PartitionStatImpl partition1 = new PartitionStatImpl(); @@ -229,8 +221,6 @@ public class TestBuildSidePartitioningImpl { calc.initialize( true, false, - buildValueSizes, - probeValueSizes, keySizes, 210, 2, @@ -242,7 +232,7 @@ public class TestBuildSidePartitioningImpl { 5, maxBatchNumRecords, maxBatchNumRecords, - 10, + 16000, .75); final PartitionStatImpl partition1 = new PartitionStatImpl(); diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestOutputBatchSize.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestOutputBatchSize.java index 9838670..da83b00 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestOutputBatchSize.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestOutputBatchSize.java @@ -25,6 +25,7 @@ import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.exec.physical.base.AbstractBase; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.config.FlattenPOP; +import org.apache.drill.exec.physical.config.HashJoinPOP; import org.apache.drill.exec.physical.config.MergeJoinPOP; import org.apache.drill.exec.physical.config.UnionAll; import org.apache.drill.exec.physical.impl.ScanBatch; @@ -1353,6 +1354,391 @@ public class TestOutputBatchSize extends PhysicalOpUnitTestBase { } @Test + public void testHashJoinMultipleOutputBatches() throws Exception { + HashJoinPOP hashJoin = new HashJoinPOP(null, null, + Lists.newArrayList(joinCond("c1", "EQUALS", "c2")), JoinRelType.INNER); + mockOpContext(hashJoin, initReservation, maxAllocation); + + numRows = 4000 * 2; + // create left input rows like this. + // "a1" : 5, "b1" : wideString, "c1" : <id> + List<String> leftJsonBatches = Lists.newArrayList(); + StringBuilder leftBatchString = new StringBuilder(); + leftBatchString.append("["); + for (int i = 0; i < numRows; i++) { + leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + i + "},"); + } + leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + numRows + "}"); + leftBatchString.append("]"); + + leftJsonBatches.add(leftBatchString.toString()); + + // create right input rows like this. + // "a2" : 6, "b2" : wideString, "c2" : <id> + List<String> rightJsonBatches = Lists.newArrayList(); + StringBuilder rightBatchString = new StringBuilder(); + rightBatchString.append("["); + for (int i = 0; i < numRows; i++) { + rightBatchString.append("{\"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + i + "},"); + } + rightBatchString.append("{\"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + numRows + "}"); + rightBatchString.append("]"); + rightJsonBatches.add(rightBatchString.toString()); + + // output rows will be like this. + // "a1" : 5, "b1" : wideString, "c1" : 1, "a2":6, "b2" : wideString, "c2": 1 + // "a1" : 5, "b1" : wideString, "c1" : 2, "a2":6, "b2" : wideString, "c2": 2 + // "a1" : 5, "b1" : wideString, "c1" : 3, "a2":6, "b2" : wideString, "c2": 3 + List<String> expectedJsonBatches = Lists.newArrayList(); + StringBuilder expectedBatchString = new StringBuilder(); + expectedBatchString.append("["); + for (int i = 0; i < numRows; i++) { + expectedBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + i); + expectedBatchString.append(", \"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + i + "},"); + } + expectedBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + numRows); + expectedBatchString.append(", \"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + numRows + "}"); + expectedBatchString.append("]"); + expectedJsonBatches.add(expectedBatchString.toString()); + + long totalSize = getExpectedSize(expectedJsonBatches); + + // set the output batch size to 1/2 of total size expected. + // We will get approximately 4 batches because of fragmentation factor of 2 accounted for in merge join. + fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize/2); + + OperatorTestBuilder opTestBuilder = opTestBuilder() + .physicalOperator(hashJoin) + .baselineColumns("a1", "b1", "c1", "a2", "b2", "c2") + .expectedNumBatches(4) // verify number of batches + .expectedBatchSize(totalSize / 2) // verify batch size + .inputDataStreamsJson(Lists.newArrayList(leftJsonBatches, rightJsonBatches)); + + for (long i = 0; i < numRows+1; i++) { + opTestBuilder.baselineValues(5l, wideString, i, 6l, wideString, i); + } + + opTestBuilder.go(); + } + + @Test + public void testHashJoinSingleOutputBatch() throws Exception { + HashJoinPOP hashJoin = new HashJoinPOP(null, null, + Lists.newArrayList(joinCond("c1", "EQUALS", "c2")), JoinRelType.INNER); + mockOpContext(hashJoin, initReservation, maxAllocation); + + // create multiple batches from both sides. + numRows = 4096 * 2; + + // create left input rows like this. + // "a1" : 5, "b1" : wideString, "c1" : <id> + List<String> leftJsonBatches = Lists.newArrayList(); + StringBuilder leftBatchString = new StringBuilder(); + leftBatchString.append("["); + for (int i = 0; i < numRows; i++) { + leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + i + "},"); + } + leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + numRows + "}"); + leftBatchString.append("]"); + + leftJsonBatches.add(leftBatchString.toString()); + + // create right input rows like this. + // "a2" : 6, "b2" : wideString, "c2" : <id> + List<String> rightJsonBatches = Lists.newArrayList(); + StringBuilder rightBatchString = new StringBuilder(); + rightBatchString.append("["); + for (int i = 0; i < numRows; i++) { + rightBatchString.append("{\"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + i + "},"); + } + rightBatchString.append("{\"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + numRows + "}"); + rightBatchString.append("]"); + rightJsonBatches.add(rightBatchString.toString()); + + // output rows will be like this. + // "a1" : 5, "b1" : wideString, "c1" : 1, "a2":6, "b2" : wideString, "c2": 1 + // "a1" : 5, "b1" : wideString, "c1" : 2, "a2":6, "b2" : wideString, "c2": 2 + // "a1" : 5, "b1" : wideString, "c1" : 3, "a2":6, "b2" : wideString, "c2": 3 + List<String> expectedJsonBatches = Lists.newArrayList(); + StringBuilder expectedBatchString = new StringBuilder(); + expectedBatchString.append("["); + for (int i = 0; i < numRows; i++) { + expectedBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + i); + expectedBatchString.append(", \"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + i + "},"); + } + expectedBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + numRows); + expectedBatchString.append(", \"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + numRows + "}"); + expectedBatchString.append("]"); + expectedJsonBatches.add(expectedBatchString.toString()); + + long totalSize = getExpectedSize(expectedJsonBatches); + + // set the output batch size to twice of total size expected. + // We should get 1 batch. + fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize*2); + + OperatorTestBuilder opTestBuilder = opTestBuilder() + .physicalOperator(hashJoin) + .baselineColumns("a1", "b1", "c1", "a2", "b2", "c2") + .expectedNumBatches(1) // verify number of batches + .expectedBatchSize(totalSize) // verify batch size + .inputDataStreamsJson(Lists.newArrayList(leftJsonBatches, rightJsonBatches)); + + for (long i = 0; i < numRows + 1; i++) { + opTestBuilder.baselineValues(5l, wideString, i, 6l, wideString, i); + } + + opTestBuilder.go(); + } + + @Test + public void testHashJoinUpperLimit() throws Exception { + // test the upper limit of 65535 records per batch. + HashJoinPOP hashJoin = new HashJoinPOP(null, null, + Lists.newArrayList(joinCond("c1", "EQUALS", "c2")), JoinRelType.INNER); + mockOpContext(hashJoin, initReservation, maxAllocation); + + numRows = 100000; + + // create left input rows like this. + // "a1" : 5, "c1" : <id> + List<String> leftJsonBatches = Lists.newArrayList(); + StringBuilder leftBatchString = new StringBuilder(); + leftBatchString.append("["); + for (int i = 0; i < numRows; i++) { + leftBatchString.append("{\"a1\": 5, " + "\"c1\" : " + i + "},"); + } + leftBatchString.append("{\"a1\": 5, " + "\"c1\" : " + numRows + "}"); + leftBatchString.append("]"); + + leftJsonBatches.add(leftBatchString.toString()); + + // create right input rows like this. + // "a2" : 6, "c2" : <id> + List<String> rightJsonBatches = Lists.newArrayList(); + StringBuilder rightBatchString = new StringBuilder(); + rightBatchString.append("["); + for (int i = 0; i < numRows; i++) { + rightBatchString.append("{\"a2\": 6, " + "\"c2\" : " + i + "},"); + } + rightBatchString.append("{\"a2\": 6, " + "\"c2\" : " + numRows + "}"); + rightBatchString.append("]"); + rightJsonBatches.add(rightBatchString.toString()); + + // output rows will be like this. + // "a1" : 5, "c1" : 1, "a2":6, "c2": 1 + // "a1" : 5, "c1" : 2, "a2":6, "c2": 2 + // "a1" : 5, "c1" : 3, "a2":6, "c2": 3 + + // expect two batches, batch limited by 65535 records + OperatorTestBuilder opTestBuilder = opTestBuilder() + .physicalOperator(hashJoin) + .baselineColumns("a1", "c1", "a2", "c2") + .expectedNumBatches(2) // verify number of batches + .inputDataStreamsJson(Lists.newArrayList(leftJsonBatches, rightJsonBatches)); + + for (long i = 0; i < numRows + 1; i++) { + opTestBuilder.baselineValues(5l, i, 6l, i); + } + + opTestBuilder.go(); + } + + @Test + public void testHashJoinLowerLimit() throws Exception { + // test the lower limit of at least one batch + HashJoinPOP hashJoin = new HashJoinPOP(null, null, + Lists.newArrayList(joinCond("c1", "EQUALS", "c2")), JoinRelType.INNER); + mockOpContext(hashJoin, initReservation, maxAllocation); + + numRows = 10; + + // create left input rows like this. + // "a1" : 5, "b1" : wideString, "c1" : <id> + List<String> leftJsonBatches = Lists.newArrayList(); + StringBuilder leftBatchString = new StringBuilder(); + leftBatchString.append("["); + for (int i = 0; i < numRows; i++) { + leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + i + "},"); + } + leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + numRows + "}"); + leftBatchString.append("]"); + + leftJsonBatches.add(leftBatchString.toString()); + + // create right input rows like this. + // "a2" : 6, "b2" : wideString, "c2" : <id> + List<String> rightJsonBatches = Lists.newArrayList(); + StringBuilder rightBatchString = new StringBuilder(); + rightBatchString.append("["); + for (int i = 0; i < numRows; i++) { + rightBatchString.append("{\"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + i + "},"); + } + rightBatchString.append("{\"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + numRows + "}"); + rightBatchString.append("]"); + rightJsonBatches.add(rightBatchString.toString()); + + // output rows will be like this. + // "a1" : 5, "b1" : wideString, "c1" : 1, "a2":6, "b2" : wideString, "c2": 1 + // "a1" : 5, "b1" : wideString, "c1" : 2, "a2":6, "b2" : wideString, "c2": 2 + // "a1" : 5, "b1" : wideString, "c1" : 3, "a2":6, "b2" : wideString, "c2": 3 + + // set very low value of output batch size so we can do only one row per batch. + fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", 128); + + OperatorTestBuilder opTestBuilder = opTestBuilder() + .physicalOperator(hashJoin) + .baselineColumns("a1", "b1", "c1", "a2", "b2", "c2") + .expectedNumBatches(10) // verify number of batches + .inputDataStreamsJson(Lists.newArrayList(leftJsonBatches, rightJsonBatches)); + + for (long i = 0; i < numRows + 1; i++) { + opTestBuilder.baselineValues(5l, wideString, i, 6l, wideString, i); + } + + opTestBuilder.go(); + } + + @Test + public void testRightOuterHashJoin() throws Exception { + + HashJoinPOP hashJoin = new HashJoinPOP(null, null, + Lists.newArrayList(joinCond("c1", "EQUALS", "c2")), JoinRelType.RIGHT); + mockOpContext(hashJoin, initReservation, maxAllocation); + + numRows = 4000 * 2; + // create left input rows like this. + // "a1" : 5, "b1" : wideString, "c1" : <id> + List<String> leftJsonBatches = Lists.newArrayList(); + StringBuilder leftBatchString = new StringBuilder(); + leftBatchString.append("["); + for (int i = 0; i < numRows; i++) { + leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + i + "},"); + } + leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + numRows + "}"); + leftBatchString.append("]"); + + leftJsonBatches.add(leftBatchString.toString()); + + // create right input rows like this. + // "a2" : 6, "b2" : wideString, "c2" : <id> + List<String> rightJsonBatches = Lists.newArrayList(); + StringBuilder rightBatchString = new StringBuilder(); + rightBatchString.append("["); + for (int i = 0; i < numRows; i++) { + rightBatchString.append("{\"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + i + "},"); + } + rightBatchString.append("{\"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + numRows + "}"); + rightBatchString.append("]"); + rightJsonBatches.add(rightBatchString.toString()); + + // output rows will be like this. + // "a1" : 5, "b1" : wideString, "c1" : 1, "a2":6, "b2" : wideString, "c2": 1 + // "a1" : 5, "b1" : wideString, "c1" : 2, "a2":6, "b2" : wideString, "c2": 2 + // "a1" : 5, "b1" : wideString, "c1" : 3, "a2":6, "b2" : wideString, "c2": 3 + List<String> expectedJsonBatches = Lists.newArrayList(); + StringBuilder expectedBatchString = new StringBuilder(); + expectedBatchString.append("["); + for (int i = 0; i < numRows; i++) { + expectedBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + i); + expectedBatchString.append(", \"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + i + "},"); + } + expectedBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + numRows); + expectedBatchString.append(", \"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + numRows + "}"); + expectedBatchString.append("]"); + expectedJsonBatches.add(expectedBatchString.toString()); + + long totalSize = getExpectedSize(expectedJsonBatches); + + // set the output batch size to 1/2 of total size expected. + // We will get approximately 4 batches because of fragmentation factor of 2 accounted for in merge join. + fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize/2); + + OperatorTestBuilder opTestBuilder = opTestBuilder() + .physicalOperator(hashJoin) + .baselineColumns("a1", "b1", "c1", "a2", "b2", "c2") + .expectedNumBatches(4) // verify number of batches + .expectedBatchSize(totalSize / 2) // verify batch size + .inputDataStreamsJson(Lists.newArrayList(leftJsonBatches, rightJsonBatches)); + + for (long i = 0; i < numRows + 1; i++) { + opTestBuilder.baselineValues(5l, wideString, i, 6l, wideString, i); + } + + opTestBuilder.go(); + } + + @Test + public void testLeftOuterHashJoin() throws Exception { + + HashJoinPOP hashJoin = new HashJoinPOP(null, null, + Lists.newArrayList(joinCond("c1", "EQUALS", "c2")), JoinRelType.LEFT); + mockOpContext(hashJoin, initReservation, maxAllocation); + + numRows = 4000 * 2; + // create left input rows like this. + // "a1" : 5, "b1" : wideString, "c1" : <id> + List<String> leftJsonBatches = Lists.newArrayList(); + StringBuilder leftBatchString = new StringBuilder(); + leftBatchString.append("["); + for (int i = 0; i < numRows; i++) { + leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + i + "},"); + } + leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + numRows + "}"); + leftBatchString.append("]"); + + leftJsonBatches.add(leftBatchString.toString()); + + // create right input rows like this. + // "a2" : 6, "b2" : wideString, "c2" : <id> + List<String> rightJsonBatches = Lists.newArrayList(); + StringBuilder rightBatchString = new StringBuilder(); + rightBatchString.append("["); + for (int i = 0; i < numRows; i++) { + rightBatchString.append("{\"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + i + "},"); + } + rightBatchString.append("{\"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + numRows + "}"); + rightBatchString.append("]"); + rightJsonBatches.add(rightBatchString.toString()); + + // output rows will be like this. + // "a1" : 5, "b1" : wideString, "c1" : 1, "a2":6, "b2" : wideString, "c2": 1 + // "a1" : 5, "b1" : wideString, "c1" : 2, "a2":6, "b2" : wideString, "c2": 2 + // "a1" : 5, "b1" : wideString, "c1" : 3, "a2":6, "b2" : wideString, "c2": 3 + List<String> expectedJsonBatches = Lists.newArrayList(); + StringBuilder expectedBatchString = new StringBuilder(); + expectedBatchString.append("["); + for (int i = 0; i < numRows; i++) { + expectedBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + i); + expectedBatchString.append(", \"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + i + "},"); + } + expectedBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + numRows); + expectedBatchString.append(", \"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + numRows + "}"); + expectedBatchString.append("]"); + expectedJsonBatches.add(expectedBatchString.toString()); + + long totalSize = getExpectedSize(expectedJsonBatches); + + // set the output batch size to 1/2 of total size expected. + // We will get approximately 4 batches because of fragmentation factor of 2 accounted for in merge join. + fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize/2); + + OperatorTestBuilder opTestBuilder = opTestBuilder() + .physicalOperator(hashJoin) + .baselineColumns("a1", "b1", "c1", "a2", "b2", "c2") + .expectedNumBatches(4) // verify number of batches + .expectedBatchSize(totalSize / 2) // verify batch size + .inputDataStreamsJson(Lists.newArrayList(leftJsonBatches, rightJsonBatches)); + + for (long i = 0; i < numRows+1; i++) { + opTestBuilder.baselineValues(5l, wideString, i, 6l, wideString, i); + } + + opTestBuilder.go(); + + } + + @Test public void testSizerRepeatedList() throws Exception { List<String> inputJsonBatches = Lists.newArrayList(); StringBuilder batchString = new StringBuilder(); -- To stop receiving notification emails like this one, please contact [email protected].
