asfgit closed pull request #1409: DRILL-6644: Don't reserve space for incoming
probe batches unnecessarily during the build phase.
URL: https://github.com/apache/drill/pull/1409
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index a8339f86883..dd9b0d6be91 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -287,7 +287,7 @@ private void prefetchFirstBuildBatch() {
prefetchedBuild,
buildSideIsEmpty,
RIGHT_INDEX,
- right,
+ buildBatch,
() -> {
batchMemoryManager.update(RIGHT_INDEX, 0, true);
logger.debug("BATCH_STATS, incoming right: {}",
batchMemoryManager.getRecordBatchSizer(RIGHT_INDEX));
@@ -302,7 +302,7 @@ private void prefetchFirstProbeBatch() {
prefetchedProbe,
probeSideIsEmpty,
LEFT_INDEX,
- left,
+ probeBatch,
() -> {
batchMemoryManager.update(LEFT_INDEX, 0);
logger.debug("BATCH_STATS, incoming left: {}",
batchMemoryManager.getRecordBatchSizer(LEFT_INDEX));
@@ -348,7 +348,10 @@ private IterOutcome prefetchFirstBatch(IterOutcome outcome,
state = BatchState.STOP;
} else {
// Got our first batch(es)
- memoryManagerUpdate.run();
+ if (cycleNum == 0) {
+ // Only collect stats for the first cylce
+ memoryManagerUpdate.run();
+ }
state = BatchState.FIRST;
}
@@ -558,6 +561,9 @@ public IterOutcome innerNext() {
state = BatchState.FIRST; // TODO need to determine if this is
still necessary since prefetchFirstBatchFromBothSides sets this
+ prefetchedBuild.setValue(false);
+ prefetchedProbe.setValue(false);
+
return innerNext(); // start processing the next spilled partition
"recursively"
}
@@ -762,7 +768,6 @@ private void initializeRuntimeFilter() {
buildJoinColumns,
leftUpstream == IterOutcome.NONE, // probeEmpty
allocator.getLimit(),
- maxIncomingBatchSize,
numPartitions,
RECORDS_PER_BATCH,
RECORDS_PER_BATCH,
@@ -821,22 +826,19 @@ public IterOutcome executeBuildPhase() throws
SchemaChangeException {
{
// Initializing build calculator
// Limit scope of these variables to this block
- int maxBatchSize = firstCycle? RecordBatch.MAX_BATCH_ROW_COUNT :
RECORDS_PER_BATCH;
- boolean hasProbeData = leftUpstream != IterOutcome.NONE;
- boolean doMemoryCalculation = canSpill && hasProbeData;
+ int maxBatchSize = firstCycle? RecordBatch.MAX_BATCH_ROW_COUNT:
RECORDS_PER_BATCH;
+ boolean doMemoryCalculation = canSpill &&
!probeSideIsEmpty.booleanValue();
HashJoinMemoryCalculator calc = getCalculatorImpl();
calc.initialize(doMemoryCalculation);
buildCalc = calc.next();
- // We've sniffed first non empty build and probe batches so we have
enough information to create a calculator
buildCalc.initialize(firstCycle, true, // TODO Fix after growing hash
values bug fixed
buildBatch,
probeBatch,
buildJoinColumns,
- leftUpstream == IterOutcome.NONE, // probeEmpty
+ probeSideIsEmpty.booleanValue(),
allocator.getLimit(),
- maxIncomingBatchSize,
numPartitions,
RECORDS_PER_BATCH,
RECORDS_PER_BATCH,
@@ -1093,7 +1095,6 @@ public HashJoinBatch(HashJoinPOP popConfig,
FragmentContext context,
this.allocator = oContext.getAllocator();
- maxIncomingBatchSize =
context.getOptions().getLong(ExecConstants.OUTPUT_BATCH_SIZE);
numPartitions =
(int)context.getOptions().getOption(ExecConstants.HASHJOIN_NUM_PARTITIONS_VALIDATOR);
if ( numPartitions == 1 ) { //
disableSpilling("Spilling is disabled due to configuration setting of
num_partitions to 1");
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMechanicalMemoryCalculator.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMechanicalMemoryCalculator.java
index af6be8bfe3c..59496d95268 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMechanicalMemoryCalculator.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMechanicalMemoryCalculator.java
@@ -73,7 +73,6 @@ public void initialize(boolean autoTune,
Set<String> joinColumns,
boolean probeEmpty,
long memoryAvailable,
- long maxIncomingBatchSize,
int initialPartitions,
int recordsPerPartitionBatchBuild,
int recordsPerPartitionBatchProbe,
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculator.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculator.java
index 0ccd912d4ba..38c2a3546aa 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculator.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculator.java
@@ -96,14 +96,13 @@
* </ul>
*/
interface BuildSidePartitioning extends
HashJoinStateCalculator<PostBuildCalculations> {
- void initialize(boolean autoTune,
+ void initialize(boolean firstCycle,
boolean reserveHash,
RecordBatch buildSideBatch,
RecordBatch probeSideBatch,
Set<String> joinColumns,
boolean probeEmpty,
long memoryAvailable,
- long maxIncomingBatchSize,
int initialPartitions,
int recordsPerPartitionBatchBuild,
int recordsPerPartitionBatchProbe,
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculatorImpl.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculatorImpl.java
index 2ab42e5a046..35ff7eca121 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculatorImpl.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculatorImpl.java
@@ -93,14 +93,13 @@ public HashJoinState getState() {
private int recordsPerPartitionBatchProbe;
@Override
- public void initialize(boolean autoTune,
+ public void initialize(boolean firstCycle,
boolean reserveHash,
RecordBatch buildSideBatch,
RecordBatch probeSideBatch,
Set<String> joinColumns,
boolean probeEmpty,
long memoryAvailable,
- long maxIncomingBatchSize,
int initialPartitions,
int recordsPerPartitionBatchBuild,
int recordsPerPartitionBatchProbe,
@@ -169,7 +168,7 @@ public HashJoinState getState() {
* <h1>Life Cycle</h1>
* <p>
* <ul>
- * <li><b>Step 0:</b> Call {@link #initialize(boolean, boolean,
RecordBatch, RecordBatch, Set, boolean, long, long, int, int, int, int, int,
int, double)}.
+ * <li><b>Step 0:</b> Call {@link #initialize(boolean, boolean,
RecordBatch, RecordBatch, Set, boolean, long, int, int, int, int, int, int,
double)}.
* This will initialize the StateCalculate with the additional
information it needs.</li>
* <li><b>Step 1:</b> Call {@link #getNumPartitions()} to see the number
of partitions that fit in memory.</li>
* <li><b>Step 2:</b> Call {@link #shouldSpill()} To determine if
spilling needs to occurr.</li>
@@ -190,7 +189,6 @@ public HashJoinState getState() {
private int maxBatchNumRecordsProbe;
private long memoryAvailable;
private boolean probeEmpty;
- private long maxIncomingBatchSize;
private long maxBuildBatchSize;
private long maxProbeBatchSize;
private long maxOutputBatchSize;
@@ -200,7 +198,7 @@ public HashJoinState getState() {
private int recordsPerPartitionBatchProbe;
private int outputBatchSize;
private Map<String, Long> keySizes;
- private boolean autoTune;
+ private boolean firstCycle;
private boolean reserveHash;
private double loadFactor;
@@ -228,14 +226,13 @@ public BuildSidePartitioningImpl(final
BatchSizePredictor.Factory batchSizePredi
}
@Override
- public void initialize(boolean autoTune,
+ public void initialize(boolean firstCycle,
boolean reserveHash,
RecordBatch buildBatch,
RecordBatch probeBatch,
Set<String> joinColumns,
boolean probeEmpty,
long memoryAvailable,
- long maxIncomingBatchSize,
int initialPartitions,
int recordsPerPartitionBatchBuild,
int recordsPerPartitionBatchProbe,
@@ -264,11 +261,10 @@ public void initialize(boolean autoTune,
keySizes.put(joinColumn,
(long)columnSize.getStdNetOrNetSizePerEntry());
}
- initialize(autoTune,
+ initialize(firstCycle,
reserveHash,
keySizes,
memoryAvailable,
- maxIncomingBatchSize,
initialPartitions,
probeEmpty,
buildSizePredictor,
@@ -282,11 +278,10 @@ public void initialize(boolean autoTune,
}
@VisibleForTesting
- protected void initialize(boolean autoTune,
+ protected void initialize(boolean firstCycle,
boolean reserveHash,
CaseInsensitiveMap<Long> keySizes,
long memoryAvailable,
- long maxIncomingBatchSize,
int initialPartitions,
boolean probeEmpty,
BatchSizePredictor buildSizePredictor,
@@ -305,12 +300,11 @@ protected void initialize(boolean autoTune,
firstInitialized = true;
this.loadFactor = loadFactor;
- this.autoTune = autoTune;
+ this.firstCycle = firstCycle;
this.reserveHash = reserveHash;
this.keySizes = Preconditions.checkNotNull(keySizes);
this.memoryAvailable = memoryAvailable;
this.probeEmpty = probeEmpty;
- this.maxIncomingBatchSize = maxIncomingBatchSize;
this.buildSizePredictor = buildSizePredictor;
this.probeSizePredictor = probeSizePredictor;
this.initialPartitions = initialPartitions;
@@ -358,22 +352,6 @@ private void calculateMemoryUsage()
{
// Adjust based on number of records
maxBuildBatchSize =
buildSizePredictor.predictBatchSize(maxBatchNumRecordsBuild, false);
-
- if (probeSizePredictor.hadDataLastTime()) {
- // We have probe data and we can compute the max incoming size.
- maxProbeBatchSize =
probeSizePredictor.predictBatchSize(maxBatchNumRecordsProbe, false);
- } else {
- // We don't have probe data
- if (probeEmpty) {
- // We know the probe has no data, so we don't need to reserve any
space for the incoming probe
- maxProbeBatchSize = 0;
- } else {
- // The probe side may have data, so assume it is the max incoming
batch size. This assumption
- // can fail in some cases since the batch sizing project is
incomplete.
- maxProbeBatchSize = maxIncomingBatchSize;
- }
- }
-
partitionBuildBatchSize =
buildSizePredictor.predictBatchSize(recordsPerPartitionBatchBuild, reserveHash);
if (probeSizePredictor.hadDataLastTime()) {
@@ -389,15 +367,18 @@ private void calculateMemoryUsage()
long incompletePartitionsBatchSizes = ((long) partitions) *
partitionBuildBatchSize;
// We need to reserve all the space for incomplete batches, and the
incoming batch as well as the
// probe batch we sniffed.
- // TODO when batch sizing project is complete we won't have to sniff
probe batches since
- // they will have a well defined size.
- reservedMemory = incompletePartitionsBatchSizes + maxBuildBatchSize +
maxProbeBatchSize;
+ reservedMemory = incompletePartitionsBatchSizes + maxBuildBatchSize;
+
+ if (!firstCycle) {
+ // If this is NOT the first cycle the HashJoin operator owns the
probe batch and we need to reserve space for it.
+ reservedMemory += probeSizePredictor.getBatchSize();
+ }
if (probeSizePredictor.hadDataLastTime()) {
// If we have probe data, use it in our memory reservation
calculations.
probeReservedMemory =
PostBuildCalculationsImpl.calculateReservedMemory(
partitions,
- maxProbeBatchSize,
+ probeSizePredictor.getBatchSize(),
maxOutputBatchSize,
partitionProbeBatchSize);
@@ -407,7 +388,7 @@ private void calculateMemoryUsage()
maxReservedMemory = reservedMemory;
}
- if (!autoTune || maxReservedMemory <= memoryAvailable) {
+ if (!firstCycle || maxReservedMemory <= memoryAvailable) {
// Stop the tuning loop if we are not doing auto tuning, or if we
are living within our memory limit
break;
}
@@ -476,6 +457,7 @@ public PostBuildCalculations next() {
Preconditions.checkState(initialized);
return new PostBuildCalculationsImpl(
+ firstCycle,
probeSizePredictor,
memoryAvailable,
maxOutputBatchSize,
@@ -580,10 +562,10 @@ public String makeDebugString() {
public static final int MIN_RECORDS_PER_PARTITION_BATCH_PROBE = 10;
+ private final boolean firstCycle;
private final BatchSizePredictor probeSizePredictor;
private final long memoryAvailable;
private final long maxOutputBatchSize;
- private final int maxBatchNumRecordsProbe;
private final int recordsPerPartitionBatchProbe;
private final PartitionStatSet buildPartitionStatSet;
private final Map<String, Long> keySizes;
@@ -597,24 +579,25 @@ public String makeDebugString() {
private boolean initialized;
private long consumedMemory;
private boolean probeEmpty;
- private long maxProbeBatchSize;
private long partitionProbeBatchSize;
private int computedProbeRecordsPerBatch;
@VisibleForTesting
- public PostBuildCalculationsImpl(final BatchSizePredictor
probeSizePredictor,
- final long memoryAvailable,
- final long maxOutputBatchSize,
- final int maxBatchNumRecordsProbe,
- final int recordsPerPartitionBatchProbe,
- final PartitionStatSet
buildPartitionStatSet,
- final Map<String, Long> keySizes,
- final HashTableSizeCalculator
hashTableSizeCalculator,
- final HashJoinHelperSizeCalculator
hashJoinHelperSizeCalculator,
- final double fragmentationFactor,
- final double safetyFactor,
- final double loadFactor,
- final boolean reserveHash) {
+ public PostBuildCalculationsImpl(final boolean firstCycle,
+ final BatchSizePredictor
probeSizePredictor,
+ final long memoryAvailable,
+ final long maxOutputBatchSize,
+ final int maxBatchNumRecordsProbe,
+ final int recordsPerPartitionBatchProbe,
+ final PartitionStatSet
buildPartitionStatSet,
+ final Map<String, Long> keySizes,
+ final HashTableSizeCalculator
hashTableSizeCalculator,
+ final HashJoinHelperSizeCalculator
hashJoinHelperSizeCalculator,
+ final double fragmentationFactor,
+ final double safetyFactor,
+ final double loadFactor,
+ final boolean reserveHash) {
+ this.firstCycle = firstCycle;
this.probeSizePredictor = Preconditions.checkNotNull(probeSizePredictor);
this.memoryAvailable = memoryAvailable;
this.maxOutputBatchSize = maxOutputBatchSize;
@@ -626,7 +609,6 @@ public PostBuildCalculationsImpl(final BatchSizePredictor
probeSizePredictor,
this.safetyFactor = safetyFactor;
this.loadFactor = loadFactor;
this.reserveHash = reserveHash;
- this.maxBatchNumRecordsProbe = maxBatchNumRecordsProbe;
this.recordsPerPartitionBatchProbe = recordsPerPartitionBatchProbe;
this.computedProbeRecordsPerBatch = recordsPerPartitionBatchProbe;
}
@@ -636,7 +618,7 @@ public void initialize(boolean probeEmpty) {
Preconditions.checkState(!initialized);
// If we had probe data before there should still be probe data now.
// If we didn't have probe data before we could get some new data now.
- Preconditions.checkState(probeSizePredictor.hadDataLastTime() &&
!probeEmpty || !probeSizePredictor.hadDataLastTime());
+ Preconditions.checkState(!(probeEmpty &&
probeSizePredictor.hadDataLastTime()));
initialized = true;
this.probeEmpty = probeEmpty;
@@ -650,12 +632,11 @@ public void initialize(boolean probeEmpty) {
probeSizePredictor.updateStats();
}
- maxProbeBatchSize =
probeSizePredictor.predictBatchSize(maxBatchNumRecordsProbe, false);
partitionProbeBatchSize =
probeSizePredictor.predictBatchSize(recordsPerPartitionBatchProbe, reserveHash);
long worstCaseProbeMemory = calculateReservedMemory(
buildPartitionStatSet.getSize(),
- maxProbeBatchSize,
+ getIncomingProbeBatchReservedSpace(),
maxOutputBatchSize,
partitionProbeBatchSize);
@@ -667,7 +648,7 @@ public void initialize(boolean probeEmpty) {
buildPartitionStatSet.getSize(),
recordsPerPartitionBatchProbe,
MIN_RECORDS_PER_PARTITION_BATCH_PROBE,
- maxProbeBatchSize,
+ getIncomingProbeBatchReservedSpace(),
maxOutputBatchSize,
partitionProbeBatchSize);
@@ -681,9 +662,14 @@ public int getProbeRecordsPerBatch() {
return computedProbeRecordsPerBatch;
}
- @VisibleForTesting
- public long getMaxProbeBatchSize() {
- return maxProbeBatchSize;
+ public long getIncomingProbeBatchReservedSpace() {
+ Preconditions.checkState(initialized);
+
+ if (firstCycle) {
+ return 0;
+ } else {
+ return probeSizePredictor.getBatchSize();
+ }
}
@VisibleForTesting
@@ -744,7 +730,7 @@ public boolean shouldSpill() {
long reservedMemory = calculateReservedMemory(
buildPartitionStatSet.getNumSpilledPartitions(),
- maxProbeBatchSize,
+ getIncomingProbeBatchReservedSpace(),
maxOutputBatchSize,
partitionProbeBatchSize);
@@ -804,11 +790,11 @@ public String makeDebugString() {
"Mem calc stats:\n" +
"memoryLimit = %s\n" +
"consumedMemory = %s\n" +
- "maxProbeBatchSize = %s\n" +
+ "maxIncomingProbeBatchReservedSpace = %s\n" +
"maxOutputBatchSize = %s\n",
PartitionStatSet.prettyPrintBytes(memoryAvailable),
PartitionStatSet.prettyPrintBytes(consumedMemory),
- PartitionStatSet.prettyPrintBytes(maxProbeBatchSize),
+
PartitionStatSet.prettyPrintBytes(getIncomingProbeBatchReservedSpace()),
PartitionStatSet.prettyPrintBytes(maxOutputBatchSize));
StringBuilder hashJoinHelperSb = new StringBuilder("Partition Hash Join
Helpers\n");
diff --git
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestBuildSidePartitioningImpl.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestBuildSidePartitioningImpl.java
index af943ecb7cc..665bc485bb0 100644
---
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestBuildSidePartitioningImpl.java
+++
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestBuildSidePartitioningImpl.java
@@ -25,7 +25,16 @@
public class TestBuildSidePartitioningImpl {
@Test
- public void testSimpleReserveMemoryCalculationNoHash() {
+ public void testSimpleReserveMemoryCalculationNoHashFirstCycle() {
+ testSimpleReserveMemoryCalculationNoHashHelper(true);
+ }
+
+ @Test
+ public void testSimpleReserveMemoryCalculationNoHashNotFirstCycle() {
+ testSimpleReserveMemoryCalculationNoHashHelper(false);
+ }
+
+ private void testSimpleReserveMemoryCalculationNoHashHelper(final boolean
firstCycle) {
final int maxBatchNumRecords = 20;
final double fragmentationFactor = 2.0;
final double safetyFactor = 1.5;
@@ -39,12 +48,12 @@ public void testSimpleReserveMemoryCalculationNoHash() {
safetyFactor);
final CaseInsensitiveMap<Long> keySizes = CaseInsensitiveMap.newHashMap();
+ final long accountedProbeBatchSize = firstCycle? 0: 10;
- calc.initialize(true,
+ calc.initialize(firstCycle,
false,
keySizes,
- 200,
- 100,
+ 190 + accountedProbeBatchSize,
2,
false,
new MockBatchSizePredictor(20, 20, fragmentationFactor, safetyFactor),
@@ -62,7 +71,7 @@ public void testSimpleReserveMemoryCalculationNoHash() {
long expectedReservedMemory = 60 // Max incoming batch size
+ 2 * 30 // build side batch for each spilled partition
- + 60; // Max incoming probe batch size
+ + accountedProbeBatchSize; // Max incoming probe batch size
long actualReservedMemory = calc.getBuildReservedMemory();
Assert.assertEquals(expectedReservedMemory, actualReservedMemory);
@@ -89,7 +98,6 @@ public void testSimpleReserveMemoryCalculationHash() {
true,
keySizes,
350,
- 100, // Ignored for test
2,
false,
new MockBatchSizePredictor(20, 20, fragmentationFactor, safetyFactor),
@@ -107,7 +115,7 @@ public void testSimpleReserveMemoryCalculationHash() {
long expectedReservedMemory = 60 // Max incoming batch size
+ 2 * (/* data size for batch */ 30 + /* Space reserved for hash value
vector */ 10 * 4 * 2) // build side batch for each spilled partition
- + 60; // Max incoming probe batch size
+ + 10; // Max incoming probe batch size
long actualReservedMemory = calc.getBuildReservedMemory();
Assert.assertEquals(expectedReservedMemory, actualReservedMemory);
@@ -135,7 +143,6 @@ public void testAdjustInitialPartitions() {
false,
keySizes,
200,
- 100, // Ignored for test
4,
false,
new MockBatchSizePredictor(20, 20, fragmentationFactor, safetyFactor),
@@ -153,17 +160,61 @@ public void testAdjustInitialPartitions() {
calc.setPartitionStatSet(partitionStatSet);
long expectedReservedMemory = 60 // Max incoming batch size
- + 2 * 30 // build side batch for each spilled partition
- + 60; // Max incoming probe batch size
+ + 2 * 30; // build side batch for each spilled partition
long actualReservedMemory = calc.getBuildReservedMemory();
Assert.assertEquals(expectedReservedMemory, actualReservedMemory);
Assert.assertEquals(2, calc.getNumPartitions());
}
+ @Test
+ public void testDontAdjustInitialPartitions() {
+ final int maxBatchNumRecords = 20;
+ final double fragmentationFactor = 2.0;
+ final double safetyFactor = 1.5;
+
+ final HashJoinMemoryCalculatorImpl.BuildSidePartitioningImpl calc =
+ new HashJoinMemoryCalculatorImpl.BuildSidePartitioningImpl(
+ BatchSizePredictorImpl.Factory.INSTANCE,
+ new
HashTableSizeCalculatorConservativeImpl(RecordBatch.MAX_BATCH_ROW_COUNT,
HashTableSizeCalculatorConservativeImpl.HASHTABLE_DOUBLING_FACTOR),
+ HashJoinHelperSizeCalculatorImpl.INSTANCE,
+ fragmentationFactor,
+ safetyFactor);
+
+ final CaseInsensitiveMap<Long> keySizes = CaseInsensitiveMap.newHashMap();
+
+ calc.initialize(
+ false,
+ false,
+ keySizes,
+ 200,
+ 4,
+ false,
+ new MockBatchSizePredictor(20, 20, fragmentationFactor, safetyFactor),
+ new MockBatchSizePredictor(10, 10, fragmentationFactor, safetyFactor),
+ 10,
+ 5,
+ maxBatchNumRecords,
+ maxBatchNumRecords,
+ 16000,
+ .75);
+
+ final HashJoinMemoryCalculator.PartitionStatSet partitionStatSet =
+ new HashJoinMemoryCalculator.PartitionStatSet(new PartitionStatImpl(),
new PartitionStatImpl(),
+ new PartitionStatImpl(), new PartitionStatImpl());
+ calc.setPartitionStatSet(partitionStatSet);
+
+ long expectedReservedMemory = 60 // Max incoming batch size
+ + 4 * 30 // build side batch for each spilled partition
+ + 10; // Max incoming probe batch size
+ long actualReservedMemory = calc.getBuildReservedMemory();
+
+ Assert.assertEquals(expectedReservedMemory, actualReservedMemory);
+ Assert.assertEquals(4, calc.getNumPartitions());
+ }
+
@Test(expected = IllegalStateException.class)
public void testHasDataProbeEmpty() {
- final int maxIncomingBatchSize = 100;
final int maxBatchNumRecords = 20;
final double fragmentationFactor = 2.0;
final double safetyFactor = 1.5;
@@ -183,7 +234,6 @@ public void testHasDataProbeEmpty() {
false,
keySizes,
240,
- maxIncomingBatchSize,
4,
true,
new MockBatchSizePredictor(20, 20, fragmentationFactor, safetyFactor),
@@ -198,7 +248,6 @@ public void testHasDataProbeEmpty() {
@Test
public void testNoProbeDataForStats() {
- final int maxIncomingBatchSize = 100;
final int maxBatchNumRecords = 20;
final double fragmentationFactor = 2.0;
final double safetyFactor = 1.5;
@@ -218,7 +267,6 @@ public void testNoProbeDataForStats() {
false,
keySizes,
240,
- maxIncomingBatchSize,
4,
false,
new MockBatchSizePredictor(20, 20, fragmentationFactor, safetyFactor),
@@ -235,12 +283,11 @@ public void testNoProbeDataForStats() {
calc.setPartitionStatSet(partitionStatSet);
long expectedReservedMemory = 60 // Max incoming batch size
- + 2 * 30 // build side batch for each spilled partition
- + maxIncomingBatchSize;
+ + 4 * 30; // build side batch for each spilled partition
long actualReservedMemory = calc.getBuildReservedMemory();
Assert.assertEquals(expectedReservedMemory, actualReservedMemory);
- Assert.assertEquals(2, calc.getNumPartitions());
+ Assert.assertEquals(4, calc.getNumPartitions());
}
@Test
@@ -264,7 +311,6 @@ public void testProbeEmpty() {
false,
keySizes,
200,
- 100, // Ignored for test
4,
true,
new MockBatchSizePredictor(20, 20, fragmentationFactor, safetyFactor),
@@ -290,10 +336,20 @@ public void testProbeEmpty() {
}
@Test
- public void testNoRoomInMemoryForBatch1() {
+ public void testNoRoomInMemoryForBatch1FirstCycle() {
+ testNoRoomInMemoryForBatch1Helper(true);
+ }
+
+ @Test
+ public void testNoRoomInMemoryForBatch1NotFirstCycle() {
+ testNoRoomInMemoryForBatch1Helper(false);
+ }
+
+ private void testNoRoomInMemoryForBatch1Helper(final boolean firstCycle) {
final int maxBatchNumRecords = 20;
final double fragmentationFactor = 2.0;
final double safetyFactor = 1.5;
+ final long accountedProbeBatchSize = firstCycle? 0: 10;
final HashJoinMemoryCalculatorImpl.BuildSidePartitioningImpl calc =
new HashJoinMemoryCalculatorImpl.BuildSidePartitioningImpl(
@@ -306,11 +362,10 @@ public void testNoRoomInMemoryForBatch1() {
final CaseInsensitiveMap<Long> keySizes = CaseInsensitiveMap.newHashMap();
calc.initialize(
- true,
+ firstCycle,
false,
keySizes,
- 180,
- 100, // Ignored for test
+ 120 + accountedProbeBatchSize,
2,
false,
new MockBatchSizePredictor(20, 20, fragmentationFactor, safetyFactor),
@@ -330,7 +385,7 @@ public void testNoRoomInMemoryForBatch1() {
long expectedReservedMemory = 60 // Max incoming batch size
+ 2 * 30 // build side batch for each spilled partition
- + 60; // Max incoming probe batch size
+ + accountedProbeBatchSize; // Max incoming probe batch size
long actualReservedMemory = calc.getBuildReservedMemory();
Assert.assertEquals(expectedReservedMemory, actualReservedMemory);
@@ -360,8 +415,7 @@ public void testCompleteLifeCycle() {
true,
false,
keySizes,
- 210,
- 100, // Ignored for test
+ 160,
2,
false,
new MockBatchSizePredictor(20, 20, fragmentationFactor, safetyFactor),
diff --git
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestPostBuildCalculationsImpl.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestPostBuildCalculationsImpl.java
index aa7a4354ba6..fa3fdfb3a74 100644
---
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestPostBuildCalculationsImpl.java
+++
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestPostBuildCalculationsImpl.java
@@ -121,6 +121,7 @@ public void testHasProbeDataButProbeEmpty() {
final HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl calc =
new HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl(
+ true,
new ConditionalMockBatchSizePredictor(
Lists.newArrayList(maxBatchNumRecordsProbe,
recordsPerPartitionBatchProbe),
Lists.newArrayList(maxProbeBatchSize, partitionProbeBatchSize),
@@ -167,6 +168,7 @@ public void testProbeEmpty() {
final HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl calc =
new HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl(
+ true,
new ConditionalMockBatchSizePredictor(),
50,
1000,
@@ -188,7 +190,16 @@ public void testProbeEmpty() {
}
@Test
- public void testHasNoProbeDataButProbeNonEmpty() {
+ public void testHasNoProbeDataButProbeNonEmptyFirstCycle() {
+ testHasNoProbeDataButProbeNonEmptyHelper(true);
+ }
+
+ @Test
+ public void testHasNoProbeDataButProbeNonEmptyNotFirstCycle() {
+ testHasNoProbeDataButProbeNonEmptyHelper(false);
+ }
+
+ private void testHasNoProbeDataButProbeNonEmptyHelper(final boolean
firstCycle) {
final Map<String, Long> keySizes =
org.apache.drill.common.map.CaseInsensitiveMap.newHashMap();
final PartitionStatImpl partition1 = new PartitionStatImpl();
@@ -210,14 +221,16 @@ public void testHasNoProbeDataButProbeNonEmpty() {
final int recordsPerPartitionBatchProbe = 5;
final long partitionProbeBatchSize = 15;
final long maxProbeBatchSize = 60;
+ final long accountedProbeBatchSize = (firstCycle? 0: maxProbeBatchSize);
final HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl calc =
new HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl(
+ firstCycle,
new ConditionalMockBatchSizePredictor(
Lists.newArrayList(maxBatchNumRecordsProbe,
recordsPerPartitionBatchProbe),
Lists.newArrayList(maxProbeBatchSize, partitionProbeBatchSize),
false),
- 290,
+ 230 + accountedProbeBatchSize,
20,
maxBatchNumRecordsProbe,
recordsPerPartitionBatchProbe,
@@ -232,7 +245,7 @@ public void testHasNoProbeDataButProbeNonEmpty() {
calc.initialize(false);
- long expected = 60 // maxProbeBatchSize
+ long expected = accountedProbeBatchSize
+ 160 // in memory partitions
+ 20 // max output batch size
+ 2 * 10 // Hash Table
@@ -243,7 +256,16 @@ public void testHasNoProbeDataButProbeNonEmpty() {
}
@Test
- public void testProbingAndPartitioningBuildAllInMemoryNoSpill() {
+ public void testProbingAndPartitioningBuildAllInMemoryNoSpillFirstCycle() {
+ testProbingAndPartitioningBuildAllInMemoryNoSpillHelper(true);
+ }
+
+ @Test
+ public void testProbingAndPartitioningBuildAllInMemoryNoSpillNotFirstCycle()
{
+ testProbingAndPartitioningBuildAllInMemoryNoSpillHelper(false);
+ }
+
+ private void testProbingAndPartitioningBuildAllInMemoryNoSpillHelper(final
boolean firstCycle) {
final Map<String, Long> keySizes =
org.apache.drill.common.map.CaseInsensitiveMap.newHashMap();
final PartitionStatImpl partition1 = new PartitionStatImpl();
@@ -265,14 +287,16 @@ public void
testProbingAndPartitioningBuildAllInMemoryNoSpill() {
final int recordsPerPartitionBatchProbe = 5;
final long partitionProbeBatchSize = 15;
final long maxProbeBatchSize = 60;
+ final long accountedProbeBatchSize = (firstCycle? 0: maxProbeBatchSize);
final HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl calc =
new HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl(
+ firstCycle,
new ConditionalMockBatchSizePredictor(
Lists.newArrayList(maxBatchNumRecordsProbe,
recordsPerPartitionBatchProbe),
Lists.newArrayList(maxProbeBatchSize, partitionProbeBatchSize),
true),
- 290,
+ 230 + accountedProbeBatchSize,
20,
maxBatchNumRecordsProbe,
recordsPerPartitionBatchProbe,
@@ -287,7 +311,7 @@ public void
testProbingAndPartitioningBuildAllInMemoryNoSpill() {
calc.initialize(false);
- long expected = 60 // maxProbeBatchSize
+ long expected = accountedProbeBatchSize
+ 160 // in memory partitions
+ 20 // max output batch size
+ 2 * 10 // Hash Table
@@ -298,7 +322,16 @@ public void
testProbingAndPartitioningBuildAllInMemoryNoSpill() {
}
@Test
- public void testProbingAndPartitioningBuildAllInMemorySpill() {
+ public void testProbingAndPartitioningBuildAllInMemorySpillFirstCycle() {
+ testProbingAndPartitioningBuildAllInMemorySpillHelper(true);
+ }
+
+ @Test
+ public void testProbingAndPartitioningBuildAllInMemorySpillNotFirstCycle() {
+ testProbingAndPartitioningBuildAllInMemorySpillHelper(false);
+ }
+
+ private void testProbingAndPartitioningBuildAllInMemorySpillHelper(final
boolean firstCycle) {
final Map<String, Long> keySizes =
org.apache.drill.common.map.CaseInsensitiveMap.newHashMap();
final PartitionStatImpl partition1 = new PartitionStatImpl();
@@ -320,16 +353,18 @@ public void
testProbingAndPartitioningBuildAllInMemorySpill() {
final int recordsPerPartitionBatchProbe = 5;
final long partitionProbeBatchSize = 15;
final long maxProbeBatchSize = 60;
+ final long accountedProbeBatchSize = (firstCycle? 0: maxProbeBatchSize);
HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl calc =
new HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl(
+ firstCycle,
new ConditionalMockBatchSizePredictor(
Lists.newArrayList(maxBatchNumRecordsProbe,
recordsPerPartitionBatchProbe),
Lists.newArrayList(maxProbeBatchSize, partitionProbeBatchSize),
true),
- 270,
+ 210 + accountedProbeBatchSize,
20,
- maxBatchNumRecordsProbe,
+ maxBatchNumRecordsProbe,
recordsPerPartitionBatchProbe,
buildPartitionStatSet,
keySizes,
@@ -342,7 +377,7 @@ public void
testProbingAndPartitioningBuildAllInMemorySpill() {
calc.initialize(false);
- long expected = 60 // maxProbeBatchSize
+ long expected = accountedProbeBatchSize
+ 160 // in memory partitions
+ 20 // max output batch size
+ 2 * 10 // Hash Table
@@ -351,7 +386,7 @@ public void
testProbingAndPartitioningBuildAllInMemorySpill() {
Assert.assertEquals(expected, calc.getConsumedMemory());
partition1.spill();
- expected = 60 // maxProbeBatchSize
+ expected = accountedProbeBatchSize
+ 80 // in memory partitions
+ 20 // max output batch size
+ 10 // Hash Table
@@ -363,7 +398,16 @@ public void
testProbingAndPartitioningBuildAllInMemorySpill() {
}
@Test
- public void testProbingAndPartitioningBuildAllInMemoryNoSpillWithHash() {
+ public void
testProbingAndPartitioningBuildAllInMemoryNoSpillWithHashFirstCycle() {
+ testProbingAndPartitioningBuildAllInMemoryNoSpillWithHashHelper(true);
+ }
+
+ @Test
+ public void
testProbingAndPartitioningBuildAllInMemoryNoSpillWithHashNotFirstCycle() {
+ testProbingAndPartitioningBuildAllInMemoryNoSpillWithHashHelper(false);
+ }
+
+ private void
testProbingAndPartitioningBuildAllInMemoryNoSpillWithHashHelper(final boolean
firstCycle) {
final Map<String, Long> keySizes =
org.apache.drill.common.map.CaseInsensitiveMap.newHashMap();
final PartitionStatImpl partition1 = new PartitionStatImpl();
@@ -381,14 +425,16 @@ public void
testProbingAndPartitioningBuildAllInMemoryNoSpillWithHash() {
final int recordsPerPartitionBatchProbe = 5;
final long partitionProbeBatchSize = 15;
final long maxProbeBatchSize = 60;
+ final long accountedProbeBatchSize = (firstCycle? 0: maxProbeBatchSize);
HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl calc =
new HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl(
+ firstCycle,
new ConditionalMockBatchSizePredictor(
Lists.newArrayList(maxBatchNumRecordsProbe,
recordsPerPartitionBatchProbe),
Lists.newArrayList(maxProbeBatchSize, partitionProbeBatchSize),
true),
- 180,
+ 120 + accountedProbeBatchSize,
20,
maxBatchNumRecordsProbe,
recordsPerPartitionBatchProbe,
@@ -403,7 +449,7 @@ public void
testProbingAndPartitioningBuildAllInMemoryNoSpillWithHash() {
calc.initialize(false);
- long expected = 60 // maxProbeBatchSize
+ long expected = accountedProbeBatchSize // probe batch
+ 2 * 5 * 3 // partition batches
+ 20; // max output batch size
Assert.assertFalse(calc.shouldSpill());
@@ -412,7 +458,16 @@ public void
testProbingAndPartitioningBuildAllInMemoryNoSpillWithHash() {
}
@Test
- public void testProbingAndPartitioningBuildAllInMemoryWithSpill() {
+ public void testProbingAndPartitioningBuildAllInMemoryWithSpillFirstCycle() {
+ testProbingAndPartitioningBuildAllInMemoryWithSpillHelper(true);
+ }
+
+ @Test
+ public void
testProbingAndPartitioningBuildAllInMemoryWithSpillNotFirstCycle() {
+ testProbingAndPartitioningBuildAllInMemoryWithSpillHelper(false);
+ }
+
+ private void testProbingAndPartitioningBuildAllInMemoryWithSpillHelper(final
boolean firstCycle) {
final Map<String, Long> keySizes =
org.apache.drill.common.map.CaseInsensitiveMap.newHashMap();
final PartitionStatImpl partition1 = new PartitionStatImpl();
@@ -434,14 +489,16 @@ public void
testProbingAndPartitioningBuildAllInMemoryWithSpill() {
final int recordsPerPartitionBatchProbe = 5;
final long partitionProbeBatchSize = 15;
final long maxProbeBatchSize = 60;
+ final long accountedProbeBatchSize = (firstCycle? 0: maxProbeBatchSize);
HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl calc =
new HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl(
+ firstCycle,
new ConditionalMockBatchSizePredictor(
Lists.newArrayList(maxBatchNumRecordsProbe,
recordsPerPartitionBatchProbe),
Lists.newArrayList(maxProbeBatchSize, partitionProbeBatchSize),
true),
- 200,
+ 140 + accountedProbeBatchSize,
20,
maxBatchNumRecordsProbe,
recordsPerPartitionBatchProbe,
@@ -456,7 +513,7 @@ public void
testProbingAndPartitioningBuildAllInMemoryWithSpill() {
calc.initialize(false);
- long expected = 60 // maxProbeBatchSize
+ long expected = accountedProbeBatchSize
+ 80 // in memory partition
+ 10 // hash table size
+ 10 // hash join helper size
@@ -471,7 +528,16 @@ public void
testProbingAndPartitioningBuildAllInMemoryWithSpill() {
}
@Test
- public void testProbingAndPartitioningBuildSomeInMemory() {
+ public void testProbingAndPartitioningBuildSomeInMemoryFirstCycle() {
+ testProbingAndPartitioningBuildSomeInMemoryHelper(true);
+ }
+
+ @Test
+ public void testProbingAndPartitioningBuildSomeInMemoryNotFirstCycle() {
+ testProbingAndPartitioningBuildSomeInMemoryHelper(false);
+ }
+
+ private void testProbingAndPartitioningBuildSomeInMemoryHelper(final boolean
firstCycle) {
final Map<String, Long> keySizes =
org.apache.drill.common.map.CaseInsensitiveMap.newHashMap();
final PartitionStatImpl partition1 = new PartitionStatImpl();
@@ -497,14 +563,16 @@ public void testProbingAndPartitioningBuildSomeInMemory()
{
final int recordsPerPartitionBatchProbe = 5;
final long partitionProbeBatchSize = 15;
final long maxProbeBatchSize = 60;
+ final long accountedProbeBatchSize = (firstCycle? 0: maxProbeBatchSize);
HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl calc =
new HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl(
+ firstCycle,
new ConditionalMockBatchSizePredictor(
Lists.newArrayList(maxBatchNumRecordsProbe,
recordsPerPartitionBatchProbe),
Lists.newArrayList(maxProbeBatchSize, partitionProbeBatchSize),
true),
- 230,
+ 170 + accountedProbeBatchSize,
20,
maxBatchNumRecordsProbe,
recordsPerPartitionBatchProbe,
@@ -519,7 +587,7 @@ public void testProbingAndPartitioningBuildSomeInMemory() {
calc.initialize(false);
- long expected = 60 // maxProbeBatchSize
+ long expected = accountedProbeBatchSize
+ 80 // in memory partition
+ 10 // hash table size
+ 10 // hash join helper size
@@ -533,8 +601,16 @@ public void testProbingAndPartitioningBuildSomeInMemory() {
}
@Test
- public void testProbingAndPartitioningBuildNoneInMemory() {
+ public void testProbingAndPartitioningBuildNoneInMemoryFirstCycle() {
+ testProbingAndPartitioningBuildNoneInMemoryHelper(true);
+ }
+ @Test
+ public void testProbingAndPartitioningBuildNoneInMemoryNotFirstCycle() {
+ testProbingAndPartitioningBuildNoneInMemoryHelper(false);
+ }
+
+ private void testProbingAndPartitioningBuildNoneInMemoryHelper(final boolean
firstCycle) {
final Map<String, Long> keySizes =
org.apache.drill.common.map.CaseInsensitiveMap.newHashMap();
final PartitionStatImpl partition1 = new PartitionStatImpl();
@@ -554,14 +630,16 @@ public void testProbingAndPartitioningBuildNoneInMemory()
{
final int recordsPerPartitionBatchProbe = 5;
final long partitionProbeBatchSize = 15;
final long maxProbeBatchSize = 60;
+ final long accountedProbeBatchSize = (firstCycle? 0: maxProbeBatchSize);
HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl calc =
new HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl(
+ firstCycle,
new ConditionalMockBatchSizePredictor(
Lists.newArrayList(maxBatchNumRecordsProbe,
recordsPerPartitionBatchProbe),
Lists.newArrayList(maxProbeBatchSize, partitionProbeBatchSize),
true),
- 110,
+ 110 + accountedProbeBatchSize,
20,
maxBatchNumRecordsProbe,
recordsPerPartitionBatchProbe,
@@ -576,7 +654,7 @@ public void testProbingAndPartitioningBuildNoneInMemory() {
calc.initialize(false);
Assert.assertFalse(calc.shouldSpill());
- Assert.assertEquals(110, calc.getConsumedMemory());
+ Assert.assertEquals(50 + accountedProbeBatchSize,
calc.getConsumedMemory());
Assert.assertNotNull(calc.next());
}
@@ -611,6 +689,7 @@ public void testMakeDebugString()
HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl calc =
new HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl(
+ true,
new ConditionalMockBatchSizePredictor(
Lists.newArrayList(maxBatchNumRecordsProbe,
recordsPerPartitionBatchProbe),
Lists.newArrayList(maxProbeBatchSize, partitionProbeBatchSize),
@@ -708,7 +787,7 @@ public ConditionalMockBatchSizePredictor(final
List<Integer> recordsPerBatch,
@Override
public long getBatchSize() {
- return 0;
+ return batchSize.get(0);
}
@Override
diff --git
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestNullInputMiniPlan.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestNullInputMiniPlan.java
index 9ad7a9b6ad1..e1609a501fe 100644
---
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestNullInputMiniPlan.java
+++
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestNullInputMiniPlan.java
@@ -316,7 +316,7 @@ public void testHashJoinLeftEmpty() throws Exception {
.build();
RecordBatch joinBatch = new PopBuilder()
- .physicalOperator(new HashJoinPOP(null, null,
Lists.newArrayList(joinCond("a", "EQUALS", "a2")), JoinRelType.INNER, null))
+ .physicalOperator(new HashJoinPOP(null, null,
Lists.newArrayList(joinCond("a2", "EQUALS", "a")), JoinRelType.INNER, null))
.addInput(left)
.addInput(rightScan)
.build();
@@ -379,7 +379,7 @@ public void testLeftHashJoinLeftEmpty() throws Exception {
.build();
RecordBatch joinBatch = new PopBuilder()
- .physicalOperator(new HashJoinPOP(null, null,
Lists.newArrayList(joinCond("a", "EQUALS", "a2")), JoinRelType.LEFT, null))
+ .physicalOperator(new HashJoinPOP(null, null,
Lists.newArrayList(joinCond("a2", "EQUALS", "a")), JoinRelType.LEFT, null))
.addInput(left)
.addInput(rightScan)
.build();
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services