This is an automated email from the ASF dual-hosted git repository. arina pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/drill.git
commit e9ffb5bde37efad9f0b646773311f23ea2dbda5d Author: Timothy Farkas <[email protected]> AuthorDate: Mon Jul 30 11:21:02 2018 -0700 DRILL-6644: Don't reserve space for incoming probe batches unnecessarily during the build phase. git closes #1409 --- .../exec/physical/impl/join/HashJoinBatch.java | 23 ++-- .../join/HashJoinMechanicalMemoryCalculator.java | 1 - .../impl/join/HashJoinMemoryCalculator.java | 3 +- .../impl/join/HashJoinMemoryCalculatorImpl.java | 106 ++++++++--------- .../impl/join/TestBuildSidePartitioningImpl.java | 104 +++++++++++++---- .../impl/join/TestPostBuildCalculationsImpl.java | 127 +++++++++++++++++---- .../exec/physical/unit/TestNullInputMiniPlan.java | 4 +- 7 files changed, 243 insertions(+), 125 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java index a8339f8..dd9b0d6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java @@ -287,7 +287,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { prefetchedBuild, buildSideIsEmpty, RIGHT_INDEX, - right, + buildBatch, () -> { batchMemoryManager.update(RIGHT_INDEX, 0, true); logger.debug("BATCH_STATS, incoming right: {}", batchMemoryManager.getRecordBatchSizer(RIGHT_INDEX)); @@ -302,7 +302,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { prefetchedProbe, probeSideIsEmpty, LEFT_INDEX, - left, + probeBatch, () -> { batchMemoryManager.update(LEFT_INDEX, 0); logger.debug("BATCH_STATS, incoming left: {}", batchMemoryManager.getRecordBatchSizer(LEFT_INDEX)); @@ -348,7 +348,10 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { state = BatchState.STOP; } else { // Got our first batch(es) - memoryManagerUpdate.run(); + if (cycleNum == 0) { + // Only collect stats for the first cylce + memoryManagerUpdate.run(); + } state = BatchState.FIRST; } @@ -558,6 +561,9 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { state = BatchState.FIRST; // TODO need to determine if this is still necessary since prefetchFirstBatchFromBothSides sets this + prefetchedBuild.setValue(false); + prefetchedProbe.setValue(false); + return innerNext(); // start processing the next spilled partition "recursively" } @@ -762,7 +768,6 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { buildJoinColumns, leftUpstream == IterOutcome.NONE, // probeEmpty allocator.getLimit(), - maxIncomingBatchSize, numPartitions, RECORDS_PER_BATCH, RECORDS_PER_BATCH, @@ -821,22 +826,19 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { { // Initializing build calculator // Limit scope of these variables to this block - int maxBatchSize = firstCycle? RecordBatch.MAX_BATCH_ROW_COUNT : RECORDS_PER_BATCH; - boolean hasProbeData = leftUpstream != IterOutcome.NONE; - boolean doMemoryCalculation = canSpill && hasProbeData; + int maxBatchSize = firstCycle? RecordBatch.MAX_BATCH_ROW_COUNT: RECORDS_PER_BATCH; + boolean doMemoryCalculation = canSpill && !probeSideIsEmpty.booleanValue(); HashJoinMemoryCalculator calc = getCalculatorImpl(); calc.initialize(doMemoryCalculation); buildCalc = calc.next(); - // We've sniffed first non empty build and probe batches so we have enough information to create a calculator buildCalc.initialize(firstCycle, true, // TODO Fix after growing hash values bug fixed buildBatch, probeBatch, buildJoinColumns, - leftUpstream == IterOutcome.NONE, // probeEmpty + probeSideIsEmpty.booleanValue(), allocator.getLimit(), - maxIncomingBatchSize, numPartitions, RECORDS_PER_BATCH, RECORDS_PER_BATCH, @@ -1093,7 +1095,6 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { this.allocator = oContext.getAllocator(); - maxIncomingBatchSize = context.getOptions().getLong(ExecConstants.OUTPUT_BATCH_SIZE); numPartitions = (int)context.getOptions().getOption(ExecConstants.HASHJOIN_NUM_PARTITIONS_VALIDATOR); if ( numPartitions == 1 ) { // disableSpilling("Spilling is disabled due to configuration setting of num_partitions to 1"); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMechanicalMemoryCalculator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMechanicalMemoryCalculator.java index af6be8b..59496d9 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMechanicalMemoryCalculator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMechanicalMemoryCalculator.java @@ -73,7 +73,6 @@ public class HashJoinMechanicalMemoryCalculator implements HashJoinMemoryCalcula Set<String> joinColumns, boolean probeEmpty, long memoryAvailable, - long maxIncomingBatchSize, int initialPartitions, int recordsPerPartitionBatchBuild, int recordsPerPartitionBatchProbe, diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculator.java index 0ccd912..38c2a35 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculator.java @@ -96,14 +96,13 @@ public interface HashJoinMemoryCalculator extends HashJoinStateCalculator<HashJo * </ul> */ interface BuildSidePartitioning extends HashJoinStateCalculator<PostBuildCalculations> { - void initialize(boolean autoTune, + void initialize(boolean firstCycle, boolean reserveHash, RecordBatch buildSideBatch, RecordBatch probeSideBatch, Set<String> joinColumns, boolean probeEmpty, long memoryAvailable, - long maxIncomingBatchSize, int initialPartitions, int recordsPerPartitionBatchBuild, int recordsPerPartitionBatchProbe, diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculatorImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculatorImpl.java index 2ab42e5..35ff7ec 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculatorImpl.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculatorImpl.java @@ -93,14 +93,13 @@ public class HashJoinMemoryCalculatorImpl implements HashJoinMemoryCalculator { private int recordsPerPartitionBatchProbe; @Override - public void initialize(boolean autoTune, + public void initialize(boolean firstCycle, boolean reserveHash, RecordBatch buildSideBatch, RecordBatch probeSideBatch, Set<String> joinColumns, boolean probeEmpty, long memoryAvailable, - long maxIncomingBatchSize, int initialPartitions, int recordsPerPartitionBatchBuild, int recordsPerPartitionBatchProbe, @@ -169,7 +168,7 @@ public class HashJoinMemoryCalculatorImpl implements HashJoinMemoryCalculator { * <h1>Life Cycle</h1> * <p> * <ul> - * <li><b>Step 0:</b> Call {@link #initialize(boolean, boolean, RecordBatch, RecordBatch, Set, boolean, long, long, int, int, int, int, int, int, double)}. + * <li><b>Step 0:</b> Call {@link #initialize(boolean, boolean, RecordBatch, RecordBatch, Set, boolean, long, int, int, int, int, int, int, double)}. * This will initialize the StateCalculate with the additional information it needs.</li> * <li><b>Step 1:</b> Call {@link #getNumPartitions()} to see the number of partitions that fit in memory.</li> * <li><b>Step 2:</b> Call {@link #shouldSpill()} To determine if spilling needs to occurr.</li> @@ -190,7 +189,6 @@ public class HashJoinMemoryCalculatorImpl implements HashJoinMemoryCalculator { private int maxBatchNumRecordsProbe; private long memoryAvailable; private boolean probeEmpty; - private long maxIncomingBatchSize; private long maxBuildBatchSize; private long maxProbeBatchSize; private long maxOutputBatchSize; @@ -200,7 +198,7 @@ public class HashJoinMemoryCalculatorImpl implements HashJoinMemoryCalculator { private int recordsPerPartitionBatchProbe; private int outputBatchSize; private Map<String, Long> keySizes; - private boolean autoTune; + private boolean firstCycle; private boolean reserveHash; private double loadFactor; @@ -228,14 +226,13 @@ public class HashJoinMemoryCalculatorImpl implements HashJoinMemoryCalculator { } @Override - public void initialize(boolean autoTune, + public void initialize(boolean firstCycle, boolean reserveHash, RecordBatch buildBatch, RecordBatch probeBatch, Set<String> joinColumns, boolean probeEmpty, long memoryAvailable, - long maxIncomingBatchSize, int initialPartitions, int recordsPerPartitionBatchBuild, int recordsPerPartitionBatchProbe, @@ -264,11 +261,10 @@ public class HashJoinMemoryCalculatorImpl implements HashJoinMemoryCalculator { keySizes.put(joinColumn, (long)columnSize.getStdNetOrNetSizePerEntry()); } - initialize(autoTune, + initialize(firstCycle, reserveHash, keySizes, memoryAvailable, - maxIncomingBatchSize, initialPartitions, probeEmpty, buildSizePredictor, @@ -282,11 +278,10 @@ public class HashJoinMemoryCalculatorImpl implements HashJoinMemoryCalculator { } @VisibleForTesting - protected void initialize(boolean autoTune, + protected void initialize(boolean firstCycle, boolean reserveHash, CaseInsensitiveMap<Long> keySizes, long memoryAvailable, - long maxIncomingBatchSize, int initialPartitions, boolean probeEmpty, BatchSizePredictor buildSizePredictor, @@ -305,12 +300,11 @@ public class HashJoinMemoryCalculatorImpl implements HashJoinMemoryCalculator { firstInitialized = true; this.loadFactor = loadFactor; - this.autoTune = autoTune; + this.firstCycle = firstCycle; this.reserveHash = reserveHash; this.keySizes = Preconditions.checkNotNull(keySizes); this.memoryAvailable = memoryAvailable; this.probeEmpty = probeEmpty; - this.maxIncomingBatchSize = maxIncomingBatchSize; this.buildSizePredictor = buildSizePredictor; this.probeSizePredictor = probeSizePredictor; this.initialPartitions = initialPartitions; @@ -358,22 +352,6 @@ public class HashJoinMemoryCalculatorImpl implements HashJoinMemoryCalculator { { // Adjust based on number of records maxBuildBatchSize = buildSizePredictor.predictBatchSize(maxBatchNumRecordsBuild, false); - - if (probeSizePredictor.hadDataLastTime()) { - // We have probe data and we can compute the max incoming size. - maxProbeBatchSize = probeSizePredictor.predictBatchSize(maxBatchNumRecordsProbe, false); - } else { - // We don't have probe data - if (probeEmpty) { - // We know the probe has no data, so we don't need to reserve any space for the incoming probe - maxProbeBatchSize = 0; - } else { - // The probe side may have data, so assume it is the max incoming batch size. This assumption - // can fail in some cases since the batch sizing project is incomplete. - maxProbeBatchSize = maxIncomingBatchSize; - } - } - partitionBuildBatchSize = buildSizePredictor.predictBatchSize(recordsPerPartitionBatchBuild, reserveHash); if (probeSizePredictor.hadDataLastTime()) { @@ -389,15 +367,18 @@ public class HashJoinMemoryCalculatorImpl implements HashJoinMemoryCalculator { long incompletePartitionsBatchSizes = ((long) partitions) * partitionBuildBatchSize; // We need to reserve all the space for incomplete batches, and the incoming batch as well as the // probe batch we sniffed. - // TODO when batch sizing project is complete we won't have to sniff probe batches since - // they will have a well defined size. - reservedMemory = incompletePartitionsBatchSizes + maxBuildBatchSize + maxProbeBatchSize; + reservedMemory = incompletePartitionsBatchSizes + maxBuildBatchSize; + + if (!firstCycle) { + // If this is NOT the first cycle the HashJoin operator owns the probe batch and we need to reserve space for it. + reservedMemory += probeSizePredictor.getBatchSize(); + } if (probeSizePredictor.hadDataLastTime()) { // If we have probe data, use it in our memory reservation calculations. probeReservedMemory = PostBuildCalculationsImpl.calculateReservedMemory( partitions, - maxProbeBatchSize, + probeSizePredictor.getBatchSize(), maxOutputBatchSize, partitionProbeBatchSize); @@ -407,7 +388,7 @@ public class HashJoinMemoryCalculatorImpl implements HashJoinMemoryCalculator { maxReservedMemory = reservedMemory; } - if (!autoTune || maxReservedMemory <= memoryAvailable) { + if (!firstCycle || maxReservedMemory <= memoryAvailable) { // Stop the tuning loop if we are not doing auto tuning, or if we are living within our memory limit break; } @@ -476,6 +457,7 @@ public class HashJoinMemoryCalculatorImpl implements HashJoinMemoryCalculator { Preconditions.checkState(initialized); return new PostBuildCalculationsImpl( + firstCycle, probeSizePredictor, memoryAvailable, maxOutputBatchSize, @@ -580,10 +562,10 @@ public class HashJoinMemoryCalculatorImpl implements HashJoinMemoryCalculator { public static final int MIN_RECORDS_PER_PARTITION_BATCH_PROBE = 10; + private final boolean firstCycle; private final BatchSizePredictor probeSizePredictor; private final long memoryAvailable; private final long maxOutputBatchSize; - private final int maxBatchNumRecordsProbe; private final int recordsPerPartitionBatchProbe; private final PartitionStatSet buildPartitionStatSet; private final Map<String, Long> keySizes; @@ -597,24 +579,25 @@ public class HashJoinMemoryCalculatorImpl implements HashJoinMemoryCalculator { private boolean initialized; private long consumedMemory; private boolean probeEmpty; - private long maxProbeBatchSize; private long partitionProbeBatchSize; private int computedProbeRecordsPerBatch; @VisibleForTesting - public PostBuildCalculationsImpl(final BatchSizePredictor probeSizePredictor, - final long memoryAvailable, - final long maxOutputBatchSize, - final int maxBatchNumRecordsProbe, - final int recordsPerPartitionBatchProbe, - final PartitionStatSet buildPartitionStatSet, - final Map<String, Long> keySizes, - final HashTableSizeCalculator hashTableSizeCalculator, - final HashJoinHelperSizeCalculator hashJoinHelperSizeCalculator, - final double fragmentationFactor, - final double safetyFactor, - final double loadFactor, - final boolean reserveHash) { + public PostBuildCalculationsImpl(final boolean firstCycle, + final BatchSizePredictor probeSizePredictor, + final long memoryAvailable, + final long maxOutputBatchSize, + final int maxBatchNumRecordsProbe, + final int recordsPerPartitionBatchProbe, + final PartitionStatSet buildPartitionStatSet, + final Map<String, Long> keySizes, + final HashTableSizeCalculator hashTableSizeCalculator, + final HashJoinHelperSizeCalculator hashJoinHelperSizeCalculator, + final double fragmentationFactor, + final double safetyFactor, + final double loadFactor, + final boolean reserveHash) { + this.firstCycle = firstCycle; this.probeSizePredictor = Preconditions.checkNotNull(probeSizePredictor); this.memoryAvailable = memoryAvailable; this.maxOutputBatchSize = maxOutputBatchSize; @@ -626,7 +609,6 @@ public class HashJoinMemoryCalculatorImpl implements HashJoinMemoryCalculator { this.safetyFactor = safetyFactor; this.loadFactor = loadFactor; this.reserveHash = reserveHash; - this.maxBatchNumRecordsProbe = maxBatchNumRecordsProbe; this.recordsPerPartitionBatchProbe = recordsPerPartitionBatchProbe; this.computedProbeRecordsPerBatch = recordsPerPartitionBatchProbe; } @@ -636,7 +618,7 @@ public class HashJoinMemoryCalculatorImpl implements HashJoinMemoryCalculator { Preconditions.checkState(!initialized); // If we had probe data before there should still be probe data now. // If we didn't have probe data before we could get some new data now. - Preconditions.checkState(probeSizePredictor.hadDataLastTime() && !probeEmpty || !probeSizePredictor.hadDataLastTime()); + Preconditions.checkState(!(probeEmpty && probeSizePredictor.hadDataLastTime())); initialized = true; this.probeEmpty = probeEmpty; @@ -650,12 +632,11 @@ public class HashJoinMemoryCalculatorImpl implements HashJoinMemoryCalculator { probeSizePredictor.updateStats(); } - maxProbeBatchSize = probeSizePredictor.predictBatchSize(maxBatchNumRecordsProbe, false); partitionProbeBatchSize = probeSizePredictor.predictBatchSize(recordsPerPartitionBatchProbe, reserveHash); long worstCaseProbeMemory = calculateReservedMemory( buildPartitionStatSet.getSize(), - maxProbeBatchSize, + getIncomingProbeBatchReservedSpace(), maxOutputBatchSize, partitionProbeBatchSize); @@ -667,7 +648,7 @@ public class HashJoinMemoryCalculatorImpl implements HashJoinMemoryCalculator { buildPartitionStatSet.getSize(), recordsPerPartitionBatchProbe, MIN_RECORDS_PER_PARTITION_BATCH_PROBE, - maxProbeBatchSize, + getIncomingProbeBatchReservedSpace(), maxOutputBatchSize, partitionProbeBatchSize); @@ -681,9 +662,14 @@ public class HashJoinMemoryCalculatorImpl implements HashJoinMemoryCalculator { return computedProbeRecordsPerBatch; } - @VisibleForTesting - public long getMaxProbeBatchSize() { - return maxProbeBatchSize; + public long getIncomingProbeBatchReservedSpace() { + Preconditions.checkState(initialized); + + if (firstCycle) { + return 0; + } else { + return probeSizePredictor.getBatchSize(); + } } @VisibleForTesting @@ -744,7 +730,7 @@ public class HashJoinMemoryCalculatorImpl implements HashJoinMemoryCalculator { long reservedMemory = calculateReservedMemory( buildPartitionStatSet.getNumSpilledPartitions(), - maxProbeBatchSize, + getIncomingProbeBatchReservedSpace(), maxOutputBatchSize, partitionProbeBatchSize); @@ -804,11 +790,11 @@ public class HashJoinMemoryCalculatorImpl implements HashJoinMemoryCalculator { "Mem calc stats:\n" + "memoryLimit = %s\n" + "consumedMemory = %s\n" + - "maxProbeBatchSize = %s\n" + + "maxIncomingProbeBatchReservedSpace = %s\n" + "maxOutputBatchSize = %s\n", PartitionStatSet.prettyPrintBytes(memoryAvailable), PartitionStatSet.prettyPrintBytes(consumedMemory), - PartitionStatSet.prettyPrintBytes(maxProbeBatchSize), + PartitionStatSet.prettyPrintBytes(getIncomingProbeBatchReservedSpace()), PartitionStatSet.prettyPrintBytes(maxOutputBatchSize)); StringBuilder hashJoinHelperSb = new StringBuilder("Partition Hash Join Helpers\n"); diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestBuildSidePartitioningImpl.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestBuildSidePartitioningImpl.java index af943ec..665bc48 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestBuildSidePartitioningImpl.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestBuildSidePartitioningImpl.java @@ -25,7 +25,16 @@ import org.junit.Test; public class TestBuildSidePartitioningImpl { @Test - public void testSimpleReserveMemoryCalculationNoHash() { + public void testSimpleReserveMemoryCalculationNoHashFirstCycle() { + testSimpleReserveMemoryCalculationNoHashHelper(true); + } + + @Test + public void testSimpleReserveMemoryCalculationNoHashNotFirstCycle() { + testSimpleReserveMemoryCalculationNoHashHelper(false); + } + + private void testSimpleReserveMemoryCalculationNoHashHelper(final boolean firstCycle) { final int maxBatchNumRecords = 20; final double fragmentationFactor = 2.0; final double safetyFactor = 1.5; @@ -39,12 +48,12 @@ public class TestBuildSidePartitioningImpl { safetyFactor); final CaseInsensitiveMap<Long> keySizes = CaseInsensitiveMap.newHashMap(); + final long accountedProbeBatchSize = firstCycle? 0: 10; - calc.initialize(true, + calc.initialize(firstCycle, false, keySizes, - 200, - 100, + 190 + accountedProbeBatchSize, 2, false, new MockBatchSizePredictor(20, 20, fragmentationFactor, safetyFactor), @@ -62,7 +71,7 @@ public class TestBuildSidePartitioningImpl { long expectedReservedMemory = 60 // Max incoming batch size + 2 * 30 // build side batch for each spilled partition - + 60; // Max incoming probe batch size + + accountedProbeBatchSize; // Max incoming probe batch size long actualReservedMemory = calc.getBuildReservedMemory(); Assert.assertEquals(expectedReservedMemory, actualReservedMemory); @@ -89,7 +98,6 @@ public class TestBuildSidePartitioningImpl { true, keySizes, 350, - 100, // Ignored for test 2, false, new MockBatchSizePredictor(20, 20, fragmentationFactor, safetyFactor), @@ -107,7 +115,7 @@ public class TestBuildSidePartitioningImpl { long expectedReservedMemory = 60 // Max incoming batch size + 2 * (/* data size for batch */ 30 + /* Space reserved for hash value vector */ 10 * 4 * 2) // build side batch for each spilled partition - + 60; // Max incoming probe batch size + + 10; // Max incoming probe batch size long actualReservedMemory = calc.getBuildReservedMemory(); Assert.assertEquals(expectedReservedMemory, actualReservedMemory); @@ -135,7 +143,6 @@ public class TestBuildSidePartitioningImpl { false, keySizes, 200, - 100, // Ignored for test 4, false, new MockBatchSizePredictor(20, 20, fragmentationFactor, safetyFactor), @@ -153,17 +160,61 @@ public class TestBuildSidePartitioningImpl { calc.setPartitionStatSet(partitionStatSet); long expectedReservedMemory = 60 // Max incoming batch size - + 2 * 30 // build side batch for each spilled partition - + 60; // Max incoming probe batch size + + 2 * 30; // build side batch for each spilled partition long actualReservedMemory = calc.getBuildReservedMemory(); Assert.assertEquals(expectedReservedMemory, actualReservedMemory); Assert.assertEquals(2, calc.getNumPartitions()); } + @Test + public void testDontAdjustInitialPartitions() { + final int maxBatchNumRecords = 20; + final double fragmentationFactor = 2.0; + final double safetyFactor = 1.5; + + final HashJoinMemoryCalculatorImpl.BuildSidePartitioningImpl calc = + new HashJoinMemoryCalculatorImpl.BuildSidePartitioningImpl( + BatchSizePredictorImpl.Factory.INSTANCE, + new HashTableSizeCalculatorConservativeImpl(RecordBatch.MAX_BATCH_ROW_COUNT, HashTableSizeCalculatorConservativeImpl.HASHTABLE_DOUBLING_FACTOR), + HashJoinHelperSizeCalculatorImpl.INSTANCE, + fragmentationFactor, + safetyFactor); + + final CaseInsensitiveMap<Long> keySizes = CaseInsensitiveMap.newHashMap(); + + calc.initialize( + false, + false, + keySizes, + 200, + 4, + false, + new MockBatchSizePredictor(20, 20, fragmentationFactor, safetyFactor), + new MockBatchSizePredictor(10, 10, fragmentationFactor, safetyFactor), + 10, + 5, + maxBatchNumRecords, + maxBatchNumRecords, + 16000, + .75); + + final HashJoinMemoryCalculator.PartitionStatSet partitionStatSet = + new HashJoinMemoryCalculator.PartitionStatSet(new PartitionStatImpl(), new PartitionStatImpl(), + new PartitionStatImpl(), new PartitionStatImpl()); + calc.setPartitionStatSet(partitionStatSet); + + long expectedReservedMemory = 60 // Max incoming batch size + + 4 * 30 // build side batch for each spilled partition + + 10; // Max incoming probe batch size + long actualReservedMemory = calc.getBuildReservedMemory(); + + Assert.assertEquals(expectedReservedMemory, actualReservedMemory); + Assert.assertEquals(4, calc.getNumPartitions()); + } + @Test(expected = IllegalStateException.class) public void testHasDataProbeEmpty() { - final int maxIncomingBatchSize = 100; final int maxBatchNumRecords = 20; final double fragmentationFactor = 2.0; final double safetyFactor = 1.5; @@ -183,7 +234,6 @@ public class TestBuildSidePartitioningImpl { false, keySizes, 240, - maxIncomingBatchSize, 4, true, new MockBatchSizePredictor(20, 20, fragmentationFactor, safetyFactor), @@ -198,7 +248,6 @@ public class TestBuildSidePartitioningImpl { @Test public void testNoProbeDataForStats() { - final int maxIncomingBatchSize = 100; final int maxBatchNumRecords = 20; final double fragmentationFactor = 2.0; final double safetyFactor = 1.5; @@ -218,7 +267,6 @@ public class TestBuildSidePartitioningImpl { false, keySizes, 240, - maxIncomingBatchSize, 4, false, new MockBatchSizePredictor(20, 20, fragmentationFactor, safetyFactor), @@ -235,12 +283,11 @@ public class TestBuildSidePartitioningImpl { calc.setPartitionStatSet(partitionStatSet); long expectedReservedMemory = 60 // Max incoming batch size - + 2 * 30 // build side batch for each spilled partition - + maxIncomingBatchSize; + + 4 * 30; // build side batch for each spilled partition long actualReservedMemory = calc.getBuildReservedMemory(); Assert.assertEquals(expectedReservedMemory, actualReservedMemory); - Assert.assertEquals(2, calc.getNumPartitions()); + Assert.assertEquals(4, calc.getNumPartitions()); } @Test @@ -264,7 +311,6 @@ public class TestBuildSidePartitioningImpl { false, keySizes, 200, - 100, // Ignored for test 4, true, new MockBatchSizePredictor(20, 20, fragmentationFactor, safetyFactor), @@ -290,10 +336,20 @@ public class TestBuildSidePartitioningImpl { } @Test - public void testNoRoomInMemoryForBatch1() { + public void testNoRoomInMemoryForBatch1FirstCycle() { + testNoRoomInMemoryForBatch1Helper(true); + } + + @Test + public void testNoRoomInMemoryForBatch1NotFirstCycle() { + testNoRoomInMemoryForBatch1Helper(false); + } + + private void testNoRoomInMemoryForBatch1Helper(final boolean firstCycle) { final int maxBatchNumRecords = 20; final double fragmentationFactor = 2.0; final double safetyFactor = 1.5; + final long accountedProbeBatchSize = firstCycle? 0: 10; final HashJoinMemoryCalculatorImpl.BuildSidePartitioningImpl calc = new HashJoinMemoryCalculatorImpl.BuildSidePartitioningImpl( @@ -306,11 +362,10 @@ public class TestBuildSidePartitioningImpl { final CaseInsensitiveMap<Long> keySizes = CaseInsensitiveMap.newHashMap(); calc.initialize( - true, + firstCycle, false, keySizes, - 180, - 100, // Ignored for test + 120 + accountedProbeBatchSize, 2, false, new MockBatchSizePredictor(20, 20, fragmentationFactor, safetyFactor), @@ -330,7 +385,7 @@ public class TestBuildSidePartitioningImpl { long expectedReservedMemory = 60 // Max incoming batch size + 2 * 30 // build side batch for each spilled partition - + 60; // Max incoming probe batch size + + accountedProbeBatchSize; // Max incoming probe batch size long actualReservedMemory = calc.getBuildReservedMemory(); Assert.assertEquals(expectedReservedMemory, actualReservedMemory); @@ -360,8 +415,7 @@ public class TestBuildSidePartitioningImpl { true, false, keySizes, - 210, - 100, // Ignored for test + 160, 2, false, new MockBatchSizePredictor(20, 20, fragmentationFactor, safetyFactor), diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestPostBuildCalculationsImpl.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestPostBuildCalculationsImpl.java index aa7a435..fa3fdfb 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestPostBuildCalculationsImpl.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestPostBuildCalculationsImpl.java @@ -121,6 +121,7 @@ public class TestPostBuildCalculationsImpl { final HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl calc = new HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl( + true, new ConditionalMockBatchSizePredictor( Lists.newArrayList(maxBatchNumRecordsProbe, recordsPerPartitionBatchProbe), Lists.newArrayList(maxProbeBatchSize, partitionProbeBatchSize), @@ -167,6 +168,7 @@ public class TestPostBuildCalculationsImpl { final HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl calc = new HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl( + true, new ConditionalMockBatchSizePredictor(), 50, 1000, @@ -188,7 +190,16 @@ public class TestPostBuildCalculationsImpl { } @Test - public void testHasNoProbeDataButProbeNonEmpty() { + public void testHasNoProbeDataButProbeNonEmptyFirstCycle() { + testHasNoProbeDataButProbeNonEmptyHelper(true); + } + + @Test + public void testHasNoProbeDataButProbeNonEmptyNotFirstCycle() { + testHasNoProbeDataButProbeNonEmptyHelper(false); + } + + private void testHasNoProbeDataButProbeNonEmptyHelper(final boolean firstCycle) { final Map<String, Long> keySizes = org.apache.drill.common.map.CaseInsensitiveMap.newHashMap(); final PartitionStatImpl partition1 = new PartitionStatImpl(); @@ -210,14 +221,16 @@ public class TestPostBuildCalculationsImpl { final int recordsPerPartitionBatchProbe = 5; final long partitionProbeBatchSize = 15; final long maxProbeBatchSize = 60; + final long accountedProbeBatchSize = (firstCycle? 0: maxProbeBatchSize); final HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl calc = new HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl( + firstCycle, new ConditionalMockBatchSizePredictor( Lists.newArrayList(maxBatchNumRecordsProbe, recordsPerPartitionBatchProbe), Lists.newArrayList(maxProbeBatchSize, partitionProbeBatchSize), false), - 290, + 230 + accountedProbeBatchSize, 20, maxBatchNumRecordsProbe, recordsPerPartitionBatchProbe, @@ -232,7 +245,7 @@ public class TestPostBuildCalculationsImpl { calc.initialize(false); - long expected = 60 // maxProbeBatchSize + long expected = accountedProbeBatchSize + 160 // in memory partitions + 20 // max output batch size + 2 * 10 // Hash Table @@ -243,7 +256,16 @@ public class TestPostBuildCalculationsImpl { } @Test - public void testProbingAndPartitioningBuildAllInMemoryNoSpill() { + public void testProbingAndPartitioningBuildAllInMemoryNoSpillFirstCycle() { + testProbingAndPartitioningBuildAllInMemoryNoSpillHelper(true); + } + + @Test + public void testProbingAndPartitioningBuildAllInMemoryNoSpillNotFirstCycle() { + testProbingAndPartitioningBuildAllInMemoryNoSpillHelper(false); + } + + private void testProbingAndPartitioningBuildAllInMemoryNoSpillHelper(final boolean firstCycle) { final Map<String, Long> keySizes = org.apache.drill.common.map.CaseInsensitiveMap.newHashMap(); final PartitionStatImpl partition1 = new PartitionStatImpl(); @@ -265,14 +287,16 @@ public class TestPostBuildCalculationsImpl { final int recordsPerPartitionBatchProbe = 5; final long partitionProbeBatchSize = 15; final long maxProbeBatchSize = 60; + final long accountedProbeBatchSize = (firstCycle? 0: maxProbeBatchSize); final HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl calc = new HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl( + firstCycle, new ConditionalMockBatchSizePredictor( Lists.newArrayList(maxBatchNumRecordsProbe, recordsPerPartitionBatchProbe), Lists.newArrayList(maxProbeBatchSize, partitionProbeBatchSize), true), - 290, + 230 + accountedProbeBatchSize, 20, maxBatchNumRecordsProbe, recordsPerPartitionBatchProbe, @@ -287,7 +311,7 @@ public class TestPostBuildCalculationsImpl { calc.initialize(false); - long expected = 60 // maxProbeBatchSize + long expected = accountedProbeBatchSize + 160 // in memory partitions + 20 // max output batch size + 2 * 10 // Hash Table @@ -298,7 +322,16 @@ public class TestPostBuildCalculationsImpl { } @Test - public void testProbingAndPartitioningBuildAllInMemorySpill() { + public void testProbingAndPartitioningBuildAllInMemorySpillFirstCycle() { + testProbingAndPartitioningBuildAllInMemorySpillHelper(true); + } + + @Test + public void testProbingAndPartitioningBuildAllInMemorySpillNotFirstCycle() { + testProbingAndPartitioningBuildAllInMemorySpillHelper(false); + } + + private void testProbingAndPartitioningBuildAllInMemorySpillHelper(final boolean firstCycle) { final Map<String, Long> keySizes = org.apache.drill.common.map.CaseInsensitiveMap.newHashMap(); final PartitionStatImpl partition1 = new PartitionStatImpl(); @@ -320,16 +353,18 @@ public class TestPostBuildCalculationsImpl { final int recordsPerPartitionBatchProbe = 5; final long partitionProbeBatchSize = 15; final long maxProbeBatchSize = 60; + final long accountedProbeBatchSize = (firstCycle? 0: maxProbeBatchSize); HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl calc = new HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl( + firstCycle, new ConditionalMockBatchSizePredictor( Lists.newArrayList(maxBatchNumRecordsProbe, recordsPerPartitionBatchProbe), Lists.newArrayList(maxProbeBatchSize, partitionProbeBatchSize), true), - 270, + 210 + accountedProbeBatchSize, 20, - maxBatchNumRecordsProbe, + maxBatchNumRecordsProbe, recordsPerPartitionBatchProbe, buildPartitionStatSet, keySizes, @@ -342,7 +377,7 @@ public class TestPostBuildCalculationsImpl { calc.initialize(false); - long expected = 60 // maxProbeBatchSize + long expected = accountedProbeBatchSize + 160 // in memory partitions + 20 // max output batch size + 2 * 10 // Hash Table @@ -351,7 +386,7 @@ public class TestPostBuildCalculationsImpl { Assert.assertEquals(expected, calc.getConsumedMemory()); partition1.spill(); - expected = 60 // maxProbeBatchSize + expected = accountedProbeBatchSize + 80 // in memory partitions + 20 // max output batch size + 10 // Hash Table @@ -363,7 +398,16 @@ public class TestPostBuildCalculationsImpl { } @Test - public void testProbingAndPartitioningBuildAllInMemoryNoSpillWithHash() { + public void testProbingAndPartitioningBuildAllInMemoryNoSpillWithHashFirstCycle() { + testProbingAndPartitioningBuildAllInMemoryNoSpillWithHashHelper(true); + } + + @Test + public void testProbingAndPartitioningBuildAllInMemoryNoSpillWithHashNotFirstCycle() { + testProbingAndPartitioningBuildAllInMemoryNoSpillWithHashHelper(false); + } + + private void testProbingAndPartitioningBuildAllInMemoryNoSpillWithHashHelper(final boolean firstCycle) { final Map<String, Long> keySizes = org.apache.drill.common.map.CaseInsensitiveMap.newHashMap(); final PartitionStatImpl partition1 = new PartitionStatImpl(); @@ -381,14 +425,16 @@ public class TestPostBuildCalculationsImpl { final int recordsPerPartitionBatchProbe = 5; final long partitionProbeBatchSize = 15; final long maxProbeBatchSize = 60; + final long accountedProbeBatchSize = (firstCycle? 0: maxProbeBatchSize); HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl calc = new HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl( + firstCycle, new ConditionalMockBatchSizePredictor( Lists.newArrayList(maxBatchNumRecordsProbe, recordsPerPartitionBatchProbe), Lists.newArrayList(maxProbeBatchSize, partitionProbeBatchSize), true), - 180, + 120 + accountedProbeBatchSize, 20, maxBatchNumRecordsProbe, recordsPerPartitionBatchProbe, @@ -403,7 +449,7 @@ public class TestPostBuildCalculationsImpl { calc.initialize(false); - long expected = 60 // maxProbeBatchSize + long expected = accountedProbeBatchSize // probe batch + 2 * 5 * 3 // partition batches + 20; // max output batch size Assert.assertFalse(calc.shouldSpill()); @@ -412,7 +458,16 @@ public class TestPostBuildCalculationsImpl { } @Test - public void testProbingAndPartitioningBuildAllInMemoryWithSpill() { + public void testProbingAndPartitioningBuildAllInMemoryWithSpillFirstCycle() { + testProbingAndPartitioningBuildAllInMemoryWithSpillHelper(true); + } + + @Test + public void testProbingAndPartitioningBuildAllInMemoryWithSpillNotFirstCycle() { + testProbingAndPartitioningBuildAllInMemoryWithSpillHelper(false); + } + + private void testProbingAndPartitioningBuildAllInMemoryWithSpillHelper(final boolean firstCycle) { final Map<String, Long> keySizes = org.apache.drill.common.map.CaseInsensitiveMap.newHashMap(); final PartitionStatImpl partition1 = new PartitionStatImpl(); @@ -434,14 +489,16 @@ public class TestPostBuildCalculationsImpl { final int recordsPerPartitionBatchProbe = 5; final long partitionProbeBatchSize = 15; final long maxProbeBatchSize = 60; + final long accountedProbeBatchSize = (firstCycle? 0: maxProbeBatchSize); HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl calc = new HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl( + firstCycle, new ConditionalMockBatchSizePredictor( Lists.newArrayList(maxBatchNumRecordsProbe, recordsPerPartitionBatchProbe), Lists.newArrayList(maxProbeBatchSize, partitionProbeBatchSize), true), - 200, + 140 + accountedProbeBatchSize, 20, maxBatchNumRecordsProbe, recordsPerPartitionBatchProbe, @@ -456,7 +513,7 @@ public class TestPostBuildCalculationsImpl { calc.initialize(false); - long expected = 60 // maxProbeBatchSize + long expected = accountedProbeBatchSize + 80 // in memory partition + 10 // hash table size + 10 // hash join helper size @@ -471,7 +528,16 @@ public class TestPostBuildCalculationsImpl { } @Test - public void testProbingAndPartitioningBuildSomeInMemory() { + public void testProbingAndPartitioningBuildSomeInMemoryFirstCycle() { + testProbingAndPartitioningBuildSomeInMemoryHelper(true); + } + + @Test + public void testProbingAndPartitioningBuildSomeInMemoryNotFirstCycle() { + testProbingAndPartitioningBuildSomeInMemoryHelper(false); + } + + private void testProbingAndPartitioningBuildSomeInMemoryHelper(final boolean firstCycle) { final Map<String, Long> keySizes = org.apache.drill.common.map.CaseInsensitiveMap.newHashMap(); final PartitionStatImpl partition1 = new PartitionStatImpl(); @@ -497,14 +563,16 @@ public class TestPostBuildCalculationsImpl { final int recordsPerPartitionBatchProbe = 5; final long partitionProbeBatchSize = 15; final long maxProbeBatchSize = 60; + final long accountedProbeBatchSize = (firstCycle? 0: maxProbeBatchSize); HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl calc = new HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl( + firstCycle, new ConditionalMockBatchSizePredictor( Lists.newArrayList(maxBatchNumRecordsProbe, recordsPerPartitionBatchProbe), Lists.newArrayList(maxProbeBatchSize, partitionProbeBatchSize), true), - 230, + 170 + accountedProbeBatchSize, 20, maxBatchNumRecordsProbe, recordsPerPartitionBatchProbe, @@ -519,7 +587,7 @@ public class TestPostBuildCalculationsImpl { calc.initialize(false); - long expected = 60 // maxProbeBatchSize + long expected = accountedProbeBatchSize + 80 // in memory partition + 10 // hash table size + 10 // hash join helper size @@ -533,8 +601,16 @@ public class TestPostBuildCalculationsImpl { } @Test - public void testProbingAndPartitioningBuildNoneInMemory() { + public void testProbingAndPartitioningBuildNoneInMemoryFirstCycle() { + testProbingAndPartitioningBuildNoneInMemoryHelper(true); + } + @Test + public void testProbingAndPartitioningBuildNoneInMemoryNotFirstCycle() { + testProbingAndPartitioningBuildNoneInMemoryHelper(false); + } + + private void testProbingAndPartitioningBuildNoneInMemoryHelper(final boolean firstCycle) { final Map<String, Long> keySizes = org.apache.drill.common.map.CaseInsensitiveMap.newHashMap(); final PartitionStatImpl partition1 = new PartitionStatImpl(); @@ -554,14 +630,16 @@ public class TestPostBuildCalculationsImpl { final int recordsPerPartitionBatchProbe = 5; final long partitionProbeBatchSize = 15; final long maxProbeBatchSize = 60; + final long accountedProbeBatchSize = (firstCycle? 0: maxProbeBatchSize); HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl calc = new HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl( + firstCycle, new ConditionalMockBatchSizePredictor( Lists.newArrayList(maxBatchNumRecordsProbe, recordsPerPartitionBatchProbe), Lists.newArrayList(maxProbeBatchSize, partitionProbeBatchSize), true), - 110, + 110 + accountedProbeBatchSize, 20, maxBatchNumRecordsProbe, recordsPerPartitionBatchProbe, @@ -576,7 +654,7 @@ public class TestPostBuildCalculationsImpl { calc.initialize(false); Assert.assertFalse(calc.shouldSpill()); - Assert.assertEquals(110, calc.getConsumedMemory()); + Assert.assertEquals(50 + accountedProbeBatchSize, calc.getConsumedMemory()); Assert.assertNotNull(calc.next()); } @@ -611,6 +689,7 @@ public class TestPostBuildCalculationsImpl { HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl calc = new HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl( + true, new ConditionalMockBatchSizePredictor( Lists.newArrayList(maxBatchNumRecordsProbe, recordsPerPartitionBatchProbe), Lists.newArrayList(maxProbeBatchSize, partitionProbeBatchSize), @@ -708,7 +787,7 @@ public class TestPostBuildCalculationsImpl { @Override public long getBatchSize() { - return 0; + return batchSize.get(0); } @Override diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestNullInputMiniPlan.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestNullInputMiniPlan.java index 9ad7a9b..e1609a5 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestNullInputMiniPlan.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestNullInputMiniPlan.java @@ -316,7 +316,7 @@ public class TestNullInputMiniPlan extends MiniPlanUnitTestBase{ .build(); RecordBatch joinBatch = new PopBuilder() - .physicalOperator(new HashJoinPOP(null, null, Lists.newArrayList(joinCond("a", "EQUALS", "a2")), JoinRelType.INNER, null)) + .physicalOperator(new HashJoinPOP(null, null, Lists.newArrayList(joinCond("a2", "EQUALS", "a")), JoinRelType.INNER, null)) .addInput(left) .addInput(rightScan) .build(); @@ -379,7 +379,7 @@ public class TestNullInputMiniPlan extends MiniPlanUnitTestBase{ .build(); RecordBatch joinBatch = new PopBuilder() - .physicalOperator(new HashJoinPOP(null, null, Lists.newArrayList(joinCond("a", "EQUALS", "a2")), JoinRelType.LEFT, null)) + .physicalOperator(new HashJoinPOP(null, null, Lists.newArrayList(joinCond("a2", "EQUALS", "a")), JoinRelType.LEFT, null)) .addInput(left) .addInput(rightScan) .build();
