diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 811c4791e82..729b5e4021f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -157,8 +157,17 @@ private ExecConstants() {
public static final BooleanValidator HASHJOIN_ENABLE_RUNTIME_FILTER_WAITING
= new BooleanValidator(HASHJOIN_RUNTIME_FILTER_WAITING_ENABLE_KEY, null);
public static final String HASHJOIN_RUNTIME_FILTER_MAX_WAITING_TIME_KEY =
"exec.hashjoin.runtime_filter.max.waiting.time";
public static final PositiveLongValidator
HASHJOIN_RUNTIME_FILTER_MAX_WAITING_TIME = new
PositiveLongValidator(HASHJOIN_RUNTIME_FILTER_MAX_WAITING_TIME_KEY,
Character.MAX_VALUE, null);
-
-
+ public static final String HASHJOIN_SEMI_SKIP_DUPLICATES_KEY =
"exec.hashjoin.semi_skip_duplicates";
+ public static final BooleanValidator HASHJOIN_SEMI_SKIP_DUPLICATES_VALIDATOR
= new BooleanValidator(HASHJOIN_SEMI_SKIP_DUPLICATES_KEY,
+ new OptionDescription("When TRUE, make Semi Hash Join check for incoming
duplicated and skip those (use more cpu, less memory)"));
+ public static final String HASHJOIN_SEMI_PERCENT_DUPLICATES_TO_SKIP_KEY =
"exec.hashjoin.semi_percent_duplicates_to_skip";
+ public static final IntegerValidator
HASHJOIN_SEMI_PERCENT_DUPLICATES_TO_SKIP_VALIDATOR = new
IntegerValidator(HASHJOIN_SEMI_PERCENT_DUPLICATES_TO_SKIP_KEY,
+ 0, 100,
+ new OptionDescription("Semi join to skip duplicates only if initial check
finds duplicates in incoming as no less than this percentage"));
+ public static final String HASHJOIN_MIN_BATCHES_IN_AVAILABLE_MEMORY_KEY =
"exec.hashjoin.min_batches_in_available_memory";
+ public static final IntegerValidator
HASHJOIN_MIN_BATCHES_IN_AVAILABLE_MEMORY_VALIDATOR = new
IntegerValidator(HASHJOIN_MIN_BATCHES_IN_AVAILABLE_MEMORY_KEY,
+ 1, Integer.MAX_VALUE,
+ new OptionDescription("Threshold: Start spilling if available memory is
less than this number of batches (only for semi join skipping duplicates"));
// Hash Aggregate Options
public static final String HASHAGG_NUM_PARTITIONS_KEY =
"exec.hashagg.num_partitions";
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java
index 275cf16572d..69ea3f34a66 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java
@@ -66,7 +66,7 @@
* After all the build/inner data is read for this partition - if all its
data is in memory, then
* a hash table and a helper are created, and later this data would be probed.
* If all this partition's build/inner data was spilled, then it begins to
work as an outer
- * partition (see the flag "processingOuter") -- reusing some of the fields
(e.g., currentBatch,
+ * partition (see the flag "processingOuter") -- reusing some of the fields
(e.g., currentVectorContainer,
* currHVVector, writer, spillFile, partitionBatchesCount) for the outer.
* </p>
*/
@@ -94,8 +94,8 @@
// incoming batches, per each partition (these may be spilled at some point)
private List<VectorContainer> tmpBatchesList;
// A batch and HV vector to hold incoming rows - per each partition
- private VectorContainer currentBatch; // The current (newest) batch
- private IntVector currHVVector; // The HV vectors for the currentBatches
+ private VectorContainer currentVectorContainer; // The current (newest)
container
+ private IntVector currHVVector; // The HV vectors for the
currentVectorContainers
/* Helper class
* Maintains linked list of build side records with the same key
@@ -126,9 +126,10 @@
private long numInMemoryRecords;
private boolean updatedRecordsPerBatch = false;
private boolean semiJoin;
+ private boolean skipDuplicates; // only for semi
public HashPartition(FragmentContext context, BufferAllocator allocator,
ChainedHashTable baseHashTable,
- RecordBatch buildBatch, RecordBatch probeBatch, boolean
semiJoin,
+ RecordBatch buildBatch, RecordBatch probeBatch, boolean
semiJoin, boolean skipDuplicates,
int recordsPerBatch, SpillSet spillSet, int partNum,
int cycleNum, int numPartitions) {
this.allocator = allocator;
this.buildBatch = buildBatch;
@@ -139,6 +140,7 @@ public HashPartition(FragmentContext context,
BufferAllocator allocator, Chained
this.cycleNum = cycleNum;
this.numPartitions = numPartitions;
this.semiJoin = semiJoin;
+ this.skipDuplicates = semiJoin && skipDuplicates;
try {
this.hashTable = baseHashTable.createAndSetupHashTable(null);
@@ -156,7 +158,7 @@ public HashPartition(FragmentContext context,
BufferAllocator allocator, Chained
this.hjHelper = semiJoin ? null : new HashJoinHelper(context, allocator);
tmpBatchesList = new ArrayList<>();
if ( numPartitions > 1 ) {
- allocateNewCurrentBatchAndHV();
+ allocateNewCurrentVectorContainerAndHV();
}
}
@@ -218,19 +220,47 @@ private VectorContainer
allocateNewVectorContainer(RecordBatch rb) {
/**
* Allocate a new current Vector Container and current HV vector
*/
- public void allocateNewCurrentBatchAndHV() {
+ public void allocateNewCurrentVectorContainerAndHV() {
if (outerBatchAllocNotNeeded) { return; } // skip when the inner is whole
in memory
- currentBatch = allocateNewVectorContainer(processingOuter ? probeBatch :
buildBatch);
+ currentVectorContainer = allocateNewVectorContainer(processingOuter ?
probeBatch : buildBatch);
currHVVector = new
IntVector(MaterializedField.create(HASH_VALUE_COLUMN_NAME, HVtype), allocator);
currHVVector.allocateNew(recordsPerBatch);
}
+ /**
+ * This method is only used for semi-join, when trying to skip incoming key
duplicate rows
+ * It adds the given row's key to the hash table, is needed, and returns
true only if that
+ * key already existed in the hash table
+ *
+ * @param buildContainer The container with the current row
+ * @param ind The index of the current row in the container
+ * @param hashCode The hash code for the key of this row
+ * @return True iff that key already exists in the hash table
+ */
+ public boolean insertKeyIntoHashTable(VectorContainer buildContainer, int
ind, int hashCode) throws SchemaChangeException {
+ hashTable.updateIncoming(buildContainer, probeBatch );
+ final IndexPointer htIndex = new IndexPointer();
+ HashTable.PutStatus status;
+
+ try {
+ status = hashTable.put(ind, htIndex, hashCode, BATCH_SIZE);
+ } catch (RetryAfterSpillException RE) {
+ if ( numPartitions == 1 ) { // if cannot spill
+ throw new OutOfMemoryException(RE);
+ }
+ spillThisPartition(); // free some memory
+ return false;
+ }
+
+ return status == HashTable.PutStatus.KEY_PRESENT;
+ }
+
/**
* Spills if needed
*/
- public void appendInnerRow(VectorContainer buildContainer, int ind, int
hashCode, HashJoinMemoryCalculator.BuildSidePartitioning calc) {
+ public void appendInnerRow(VectorContainer buildContainer, int ind, int
hashCode, HashJoinMemoryCalculator.HashJoinSpillControl calc) {
- int pos = currentBatch.appendRow(buildContainer,ind);
+ int pos = currentVectorContainer.appendRow(buildContainer,ind);
currHVVector.getMutator().set(pos - 1, hashCode); // store the hash
value in the new column
if ( pos == recordsPerBatch ) {
boolean needsSpill = isSpilled || calc.shouldSpill();
@@ -243,7 +273,7 @@ public void appendInnerRow(VectorContainer buildContainer,
int ind, int hashCode
*
*/
public void appendOuterRow(int hashCode, int recordsProcessed) {
- int pos =
currentBatch.appendRow(probeBatch.getContainer(),recordsProcessed);
+ int pos =
currentVectorContainer.appendRow(probeBatch.getContainer(),recordsProcessed);
currHVVector.getMutator().set(pos - 1, hashCode); // store the hash
value in the new column
if ( pos == recordsPerBatch ) {
completeAnOuterBatch(true);
@@ -262,27 +292,27 @@ public void completeAnInnerBatch(boolean toInitialize,
boolean needsSpill) {
* (that is, more rows are coming) - initialize with a new current batch for
that partition
* */
private void completeABatch(boolean toInitialize, boolean needsSpill) {
- if ( currentBatch.hasRecordCount() && currentBatch.getRecordCount() > 0) {
- currentBatch.add(currHVVector);
- currentBatch.buildSchema(BatchSchema.SelectionVectorMode.NONE);
- tmpBatchesList.add(currentBatch);
+ if ( currentVectorContainer.hasRecordCount() &&
currentVectorContainer.getRecordCount() > 0) {
+ currentVectorContainer.add(currHVVector);
+ currentVectorContainer.buildSchema(BatchSchema.SelectionVectorMode.NONE);
+ tmpBatchesList.add(currentVectorContainer);
partitionBatchesCount++;
- long batchSize = new RecordBatchSizer(currentBatch).getActualSize();
- inMemoryBatchStats.add(new
HashJoinMemoryCalculator.BatchStat(currentBatch.getRecordCount(), batchSize));
+ long batchSize = new
RecordBatchSizer(currentVectorContainer).getActualSize();
+ inMemoryBatchStats.add(new
HashJoinMemoryCalculator.BatchStat(currentVectorContainer.getRecordCount(),
batchSize));
partitionInMemorySize += batchSize;
- numInMemoryRecords += currentBatch.getRecordCount();
+ numInMemoryRecords += currentVectorContainer.getRecordCount();
} else {
- freeCurrentBatchAndHVVector();
+ freeCurrentVectorContainerAndHVVector();
}
if ( needsSpill ) { // spill this batch/partition and free its memory
spillThisPartition();
}
if ( toInitialize ) { // allocate a new batch and HV vector
- allocateNewCurrentBatchAndHV();
+ allocateNewCurrentVectorContainerAndHV();
} else {
- currentBatch = null;
+ currentVectorContainer = null;
currHVVector = null;
}
}
@@ -327,6 +357,10 @@ public void spillThisPartition() {
if ( tmpBatchesList.size() == 0 ) { return; } // in case empty - nothing
to spill
logger.debug("HashJoin: Spilling partition {}, current cycle {}, part size
{} batches", partitionNum, cycleNum, tmpBatchesList.size());
+ if ( skipDuplicates ) {
+ hashTable.reset();
+ } // deallocate and reinit the hash table in case of a semi skipping dupl
+
// If this is the first spill for this partition, create an output stream
if ( writer == null ) {
final String side = processingOuter ? "outer" : "inner";
@@ -492,6 +526,31 @@ private void closeWriterInternal(boolean doDeleteFile) {
partitionBatchesCount = 0;
}
+ /**
+ * Stop skipping duplicates (when there are too few of them)
+ * thus not maintaining the hash table as new rows arrive
+ */
+ public void stopSkippingDuplicates() {
+ assert skipDuplicates;
+ hashTable.reset();
+ skipDuplicates = false;
+ }
+
+ /**
+ * Builds the containers only, not the hash table nor the helper
+ * To be used in case of skipping duplicates (when the hash table already
exists)
+ */
+ public void buildContainers() {
+ assert skipDuplicates;
+ if ( isSpilled ) { return; } // no building for spilled partitions
+ containers = new ArrayList<>();
+ for (int curr = 0; curr < partitionBatchesCount; curr++) {
+ VectorContainer nextBatch = tmpBatchesList.get(curr);
+ containers.add(nextBatch);
+ }
+ outerBatchAllocNotNeeded = true; // the inner is whole in memory, no need
for an outer batch
+ }
+
/**
* Creates the hash table and join helper for this partition.
* This method should only be called after all the build side records
@@ -514,7 +573,7 @@ public void buildContainersHashTableAndHelper() throws
SchemaChangeException {
assert nextBatch != null;
assert probeBatch != null;
- hashTable.updateIncoming(nextBatch, probeBatch );
+ hashTable.updateIncoming(nextBatch, probeBatch);
IntVector HV_vector = (IntVector) nextBatch.getLast();
@@ -531,7 +590,6 @@ public void buildContainersHashTableAndHelper() throws
SchemaChangeException {
*/
if ( ! semiJoin ) { hjHelper.setCurrentIndex(htIndex.value, curr /*
buildBatchIndex */, recInd); }
}
-
containers.add(nextBatch);
}
outerBatchAllocNotNeeded = true; // the inner is whole in memory, no need
for an outer batch
@@ -555,10 +613,10 @@ private void clearHashTableAndHelper() {
}
}
- private void freeCurrentBatchAndHVVector() {
- if ( currentBatch != null ) {
- currentBatch.clear();
- currentBatch = null;
+ private void freeCurrentVectorContainerAndHVVector() {
+ if ( currentVectorContainer != null ) {
+ currentVectorContainer.clear();
+ currentVectorContainer = null;
}
if ( currHVVector != null ) {
currHVVector.clear();
@@ -571,7 +629,7 @@ private void freeCurrentBatchAndHVVector() {
* @param deleteFile - whether to delete the spill file or not
*/
public void cleanup(boolean deleteFile) {
- freeCurrentBatchAndHVVector();
+ freeCurrentVectorContainerAndHVVector();
if (containers != null && !containers.isEmpty()) {
for (VectorContainer vc : containers) {
vc.clear();
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableConfig.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableConfig.java
index fe329cc5e51..5ecf0447cc5 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableConfig.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableConfig.java
@@ -70,7 +70,7 @@ public HashTableConfig(
@JsonCreator
public HashTableConfig(@JsonProperty("initialCapacity") int initialCapacity,
- @JsonProperty("initialCapacity") boolean
initialSizeIsFinal,
+ @JsonProperty("initialSizeIsFinal") boolean
initialSizeIsFinal,
@JsonProperty("loadFactor") float loadFactor,
@JsonProperty("keyExprsBuild") List<NamedExpression>
keyExprsBuild,
@JsonProperty("keyExprsProbe") List<NamedExpression>
keyExprsProbe,
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 0ac0809d8f7..2761fff4f1f 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -214,6 +214,12 @@
private Map<BloomFilterDef, Integer> bloomFilterDef2buildId = new
HashMap<>();
private List<BloomFilter> bloomFilters = new ArrayList<>();
+ private long semiCountTotal;
+ private long semiCountDuplicates;
+ private long semiDupDecisionPoint; // The number of incoming at which to
stop and make the "continue skip" decision
+ private boolean semiSkipDuplicates; // optional, for semi join
+ private int semiSkipDuplicatesMinPercentage;
+
/**
* This holds information about the spilled partitions for the build and
probe side.
*/
@@ -320,7 +326,7 @@ public boolean hasPartitionLimit() {
AVG_OUTPUT_ROW_BYTES,
OUTPUT_RECORD_COUNT;
- // duplicate for hash ag
+ // duplicate for hash agg
@Override
public int metricId() { return ordinal(); }
@@ -719,7 +725,7 @@ private void setupHashTable() throws SchemaChangeException {
}
final HashTableConfig htConfig = new HashTableConfig((int)
context.getOptions().getOption(ExecConstants.MIN_HASH_TABLE_SIZE),
- true, HashTable.DEFAULT_LOAD_FACTOR, rightExpr, leftExpr, comparators,
joinControl.asInt());
+ !semiSkipDuplicates, HashTable.DEFAULT_LOAD_FACTOR, rightExpr, leftExpr,
comparators, joinControl.asInt());
// Create the chained hash table
baseHashTable =
@@ -789,18 +795,16 @@ private void setupHash64(HashTableConfig htConfig) throws
SchemaChangeException
}
/**
- * Call only after num partitions is known
+ * Call only after the final 'num partitions' is known (See
partitionNumTuning())
*/
private void delayedSetup() {
- //
- // Find out the estimated max batch size, etc
- // and compute the max numPartitions possible
- // See partitionNumTuning()
- //
-
spilledState.initialize(numPartitions);
// Create array for the partitions
partitions = new HashPartition[numPartitions];
+ // Runtime stats for semi-join: Help decide early if seen too many
duplicates (based on initial data, about 32K per partition)
+ semiCountTotal = semiCountDuplicates = 0;
+ semiDupDecisionPoint = // average each partition's hash table half full (
+ 1 to avoid zero in case numPartitions == 1 )
+ ((numPartitions + 1) / 2) *
context.getOptions().getLong(ExecConstants.MIN_HASH_TABLE_SIZE_KEY);
}
/**
@@ -810,8 +814,8 @@ private void initializeBuild() {
baseHashTable.updateIncoming(buildBatch, probeBatch); // in case we
process the spilled files
// Recreate the partitions every time build is initialized
for (int part = 0; part < numPartitions; part++ ) {
- partitions[part] = new HashPartition(context, allocator, baseHashTable,
buildBatch, probeBatch, semiJoin,
- RECORDS_PER_BATCH, spillSet, part, spilledState.getCycle(),
numPartitions);
+ partitions[part] = new HashPartition(context, allocator, baseHashTable,
buildBatch, probeBatch, semiJoin, semiSkipDuplicates, RECORDS_PER_BATCH,
spillSet, part, spilledState.getCycle(),
+ numPartitions);
}
spilledInners = new HashJoinSpilledPartition[numPartitions];
@@ -939,16 +943,17 @@ public IterOutcome executeBuildPhase() throws
SchemaChangeException {
}
HashJoinMemoryCalculator.BuildSidePartitioning buildCalc;
+ HashJoinMemoryCalculator.BuildSidePartitioning currentCalc; // may be
either a spill control calc, or buildCalc
{
// Initializing build calculator
// Limit scope of these variables to this block
- int maxBatchSize = spilledState.isFirstCycle()?
RecordBatch.MAX_BATCH_ROW_COUNT: RECORDS_PER_BATCH;
+ int maxBatchRowCount = spilledState.isFirstCycle()?
RecordBatch.MAX_BATCH_ROW_COUNT: RECORDS_PER_BATCH;
boolean doMemoryCalculation = canSpill &&
!probeSideIsEmpty.booleanValue();
HashJoinMemoryCalculator calc = getCalculatorImpl();
calc.initialize(doMemoryCalculation);
- buildCalc = calc.next();
+ currentCalc = buildCalc = calc.next();
buildCalc.initialize(spilledState.isFirstCycle(), true, // TODO Fix
after growing hash values bug fixed
buildBatch,
@@ -959,14 +964,40 @@ public IterOutcome executeBuildPhase() throws
SchemaChangeException {
numPartitions,
RECORDS_PER_BATCH,
RECORDS_PER_BATCH,
- maxBatchSize,
- maxBatchSize,
+ maxBatchRowCount,
+ maxBatchRowCount,
batchMemoryManager.getOutputBatchSize(),
HashTable.DEFAULT_LOAD_FACTOR);
if (spilledState.isFirstCycle() && doMemoryCalculation) {
// Do auto tuning
- buildCalc = partitionNumTuning(maxBatchSize, buildCalc);
+ buildCalc = partitionNumTuning(maxBatchRowCount, buildCalc);
+ }
+ if ( semiSkipDuplicates ) {
+ // in case of a Semi Join skippinging duplicates, use a "spill
control" calc
+ // (may revert back to the buildCalc if the code decides to stop
skipping)
+ currentCalc = new HashJoinSpillControlImpl(allocator,
RECORDS_PER_BATCH,
+ (int)
context.getOptions().getOption(ExecConstants.HASHJOIN_MIN_BATCHES_IN_AVAILABLE_MEMORY_VALIDATOR),
+ batchMemoryManager, context);
+
+ // calculates the max number of partitions possible
+ if ( spilledState.isFirstCycle() && doMemoryCalculation ) {
+ currentCalc.initialize(spilledState.isFirstCycle(), true, // TODO
Fix after growing hash values bug fixed
+ buildBatch,
+ probeBatch,
+ buildJoinColumns,
+ probeSideIsEmpty.booleanValue(),
+ allocator.getLimit(),
+ numPartitions,
+ RECORDS_PER_BATCH,
+ RECORDS_PER_BATCH,
+ maxBatchRowCount,
+ maxBatchRowCount,
+ batchMemoryManager.getOutputBatchSize(),
+ HashTable.DEFAULT_LOAD_FACTOR);
+
+ numPartitions = currentCalc.getNumPartitions();
+ }
}
}
@@ -981,7 +1012,7 @@ public IterOutcome executeBuildPhase() throws
SchemaChangeException {
// Make the calculator aware of our partitions
final HashJoinMemoryCalculator.PartitionStatSet partitionStatSet = new
HashJoinMemoryCalculator.PartitionStatSet(partitions);
- buildCalc.setPartitionStatSet(partitionStatSet);
+ currentCalc.setPartitionStatSet(partitionStatSet);
boolean moreData = true;
while (moreData) {
@@ -1029,12 +1060,31 @@ public IterOutcome executeBuildPhase() throws
SchemaChangeException {
: read_right_HV_vector.getAccessor().get(ind); // get the hash
value from the HV column
int currPart = hashCode & spilledState.getPartitionMask();
hashCode >>>= spilledState.getBitsInMask();
- // semi-join skips join-key-duplicate rows
- if ( semiJoin ) {
-
+ // semi-join builds the hash-table, and skips join-key-duplicate rows
+ if (semiSkipDuplicates) {
+ semiCountTotal++;
+ boolean aDuplicate =
partitions[currPart].insertKeyIntoHashTable(buildBatch.getContainer(), ind,
hashCode);
+ // A heuristic: Make a decision once the threshold was met -
either continue skipping duplicates, or stop
+ // (skipping duplicates carries a cost, so better avoid if
duplicates are too few)
+ if ( semiCountTotal == semiDupDecisionPoint) { // got enough
incoming rows to decide ?
+ long threshold = semiCountTotal *
semiSkipDuplicatesMinPercentage / 100;
+ if ( semiCountDuplicates < threshold ) { // when duplicates
found were less than the percentage threshold, stop skipping
+ for (HashPartition partn : partitions) {
+ partn.stopSkippingDuplicates();
+ }
+ semiSkipDuplicates = false;
+ currentCalc = buildCalc; // back to using the regular calc
+ }
+ logger.debug("Semi {} skipping duplicates after receiving {}
rows with {} percent duplicates",
+ semiSkipDuplicates ? "to continue" : "stopped",
semiCountTotal, (100 * semiCountDuplicates) / semiCountTotal);
+ }
+ if ( aDuplicate ) {
+ semiCountDuplicates++;
+ continue;
+ }
}
// Append the new inner row to the appropriate partition; spill
(that partition) if needed
- partitions[currPart].appendInnerRow(buildBatch.getContainer(), ind,
hashCode, buildCalc); // may spill if needed
+ partitions[currPart].appendInnerRow(buildBatch.getContainer(), ind,
hashCode, currentCalc); // may spill if needed
}
if ( read_right_HV_vector != null ) {
@@ -1054,12 +1104,15 @@ public IterOutcome executeBuildPhase() throws
SchemaChangeException {
}
}
+ int numPartitionsSpilled = 0;
+
// Move the remaining current batches into their temp lists, or spill
// them if the partition is spilled. Add the spilled partitions into
// the spilled partitions list
if ( numPartitions > 1 ) { // a single partition needs no completion
for (HashPartition partn : partitions) {
partn.completeAnInnerBatch(false, partn.isSpilled());
+ if ( partn.isSpilled() ) { numPartitionsSpilled++; }
}
}
@@ -1071,8 +1124,8 @@ public IterOutcome executeBuildPhase() throws
SchemaChangeException {
return leftUpstream;
}
- HashJoinMemoryCalculator.PostBuildCalculations postBuildCalc =
buildCalc.next();
- postBuildCalc.initialize(probeSideIsEmpty.booleanValue()); // probeEmpty
+ HashJoinMemoryCalculator.PostBuildCalculations postBuildCalc =
currentCalc.next();
+ postBuildCalc.initialize(probeSideIsEmpty.booleanValue());
//
// Traverse all the in-memory partitions' incoming batches, and build
their hash tables
@@ -1090,6 +1143,10 @@ public IterOutcome executeBuildPhase() throws
SchemaChangeException {
if (postBuildCalc.shouldSpill()) {
// Spill this partition if we need to make room
partn.spillThisPartition();
+ } else if (semiSkipDuplicates) {
+ // All in memory, and already got the Hash Table - just build the
containers
+ // (No additional memory is needed, hence no need for any new spill)
+ partn.buildContainers();
} else {
// Only build hash tables for partitions that are not spilled
partn.buildContainersHashTableAndHelper();
@@ -1196,6 +1253,10 @@ public HashJoinBatch(HashJoinPOP popConfig,
FragmentContext context,
this.probeBatch = left;
joinType = popConfig.getJoinType();
semiJoin = popConfig.isSemiJoin();
+ semiSkipDuplicatesMinPercentage = (int)
context.getOptions().getOption(ExecConstants.HASHJOIN_SEMI_PERCENT_DUPLICATES_TO_SKIP_VALIDATOR);
+ semiSkipDuplicates = semiJoin &&
+ semiSkipDuplicatesMinPercentage < 100 && // can't have 100 percent, at
least the first key is not a duplicate
+
context.getOptions().getBoolean(ExecConstants.HASHJOIN_SEMI_SKIP_DUPLICATES_KEY);
joinIsLeftOrFull = joinType == JoinRelType.LEFT || joinType ==
JoinRelType.FULL;
joinIsRightOrFull = joinType == JoinRelType.RIGHT || joinType ==
JoinRelType.FULL;
conditions = popConfig.getConditions();
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculator.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculator.java
index c262e3c8c27..d6108c6ce3d 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculator.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculator.java
@@ -95,7 +95,7 @@
* </li>
* </ul>
*/
- interface BuildSidePartitioning extends
HashJoinStateCalculator<PostBuildCalculations> {
+ interface BuildSidePartitioning extends
HashJoinStateCalculator<PostBuildCalculations>, HashJoinSpillControl {
void initialize(boolean firstCycle,
boolean reserveHash,
RecordBatch buildSideBatch,
@@ -119,8 +119,6 @@ void initialize(boolean firstCycle,
long getMaxReservedMemory();
- boolean shouldSpill();
-
String makeDebugString();
}
@@ -128,17 +126,16 @@ void initialize(boolean firstCycle,
* The interface representing the {@link HashJoinStateCalculator}
corresponding to the
* {@link HashJoinState#POST_BUILD_CALCULATIONS} state.
*/
- interface PostBuildCalculations extends
HashJoinStateCalculator<HashJoinMemoryCalculator> {
+ interface PostBuildCalculations extends
HashJoinStateCalculator<HashJoinMemoryCalculator>, HashJoinSpillControl {
/**
* Initializes the calculator with additional information needed.
* @param probeEmty True if the probe is empty. False otherwise.
+ *
*/
void initialize(boolean probeEmty);
int getProbeRecordsPerBatch();
- boolean shouldSpill();
-
String makeDebugString();
}
@@ -154,6 +151,9 @@ void initialize(boolean firstCycle,
long getInMemorySize();
}
+ interface HashJoinSpillControl {
+ boolean shouldSpill();
+ }
/**
* This class represents the memory size statistics for an entire set of
partitions.
*/
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculatorImpl.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculatorImpl.java
index 88f3ddc21de..dd1ede2e15a 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculatorImpl.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculatorImpl.java
@@ -174,7 +174,7 @@ public HashJoinState getState() {
* <li><b>Step 0:</b> Call {@link #initialize(boolean, boolean,
RecordBatch, RecordBatch, Set, boolean, long, int, int, int, int, int, int,
double)}.
* This will initialize the StateCalculate with the additional
information it needs.</li>
* <li><b>Step 1:</b> Call {@link #getNumPartitions()} to see the number
of partitions that fit in memory.</li>
- * <li><b>Step 2:</b> Call {@link #shouldSpill()} To determine if
spilling needs to occurr.</li>
+ * <li><b>Step 2:</b> Call {@link HashJoinSpillControl#shouldSpill()} To
determine if spilling needs to occurr.</li>
* <li><b>Step 3:</b> Call {@link #next()} and get the next memory
calculator associated with your next state.</li>
* </ul>
* </p>
@@ -555,9 +555,9 @@ public String makeDebugString() {
* <h1>Lifecycle</h1>
* <p>
* <ul>
- * <li><b>Step 1:</b> Call {@link #initialize(boolean)}. This
+ * <li><b>Step 1:</b> Call {@link
PostBuildCalculations#initialize(boolean)}. This
* gives the {@link HashJoinStateCalculator} additional information it
needs to compute memory requirements.</li>
- * <li><b>Step 2:</b> Call {@link #shouldSpill()}. This tells
+ * <li><b>Step 2:</b> Call {@link HashJoinSpillControl#shouldSpill()}.
This tells
* you which build side partitions need to be spilled in order to make
room for probing.</li>
* <li><b>Step 3:</b> Call {@link #next()}. After you are done probing
* and partitioning the probe side, get the next calculator.</li>
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
index c549143d5f3..44e191d96d8 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
@@ -153,7 +153,7 @@ public void setupHashJoinProbe(RecordBatch probeBatch,
HashJoinBatch outgoing, J
// for those outer partitions that need spilling (cause their matching
inners spilled)
// initialize those partitions' current batches and hash-value vectors
for ( HashPartition partn : this.partitions) {
- partn.allocateNewCurrentBatchAndHV();
+ partn.allocateNewCurrentVectorContainerAndHV();
}
currRightPartition = 0; // In case it's a Right/Full outer join
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinSpillControlImpl.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinSpillControlImpl.java
new file mode 100644
index 00000000000..d4f22ac8a8e
--- /dev/null
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinSpillControlImpl.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.physical.impl.join;
+
+ import org.apache.drill.exec.ExecConstants;
+ import org.apache.drill.exec.memory.BufferAllocator;
+ import org.apache.drill.exec.ops.FragmentContext;
+ import org.apache.drill.exec.record.RecordBatch;
+ import org.apache.drill.exec.record.RecordBatchMemoryManager;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+
+ import javax.annotation.Nullable;
+ import java.util.Set;
+
+ import static org.apache.drill.exec.record.JoinBatchMemoryManager.LEFT_INDEX;
+ import static
org.apache.drill.exec.record.JoinBatchMemoryManager.RIGHT_INDEX;
+
+/**
+ * This class is currently used only for Semi-Hash-Join that avoids duplicates
by the use of a hash table
+ * The method {@link
HashJoinMemoryCalculator.HashJoinSpillControl#shouldSpill()} returns true if
the memory available now to the allocator if not enough
+ * to hold (a multiple of, for safety) a new allocated batch
+ */
+public class HashJoinSpillControlImpl implements
HashJoinMemoryCalculator.BuildSidePartitioning {
+ private static final Logger logger =
LoggerFactory.getLogger(HashJoinSpillControlImpl.class);
+
+ private BufferAllocator allocator;
+ private int recordsPerBatch;
+ private int minBatchesInAvailableMemory;
+ private RecordBatchMemoryManager batchMemoryManager;
+ private int initialPartitions;
+ private int numPartitions;
+ private int recordsPerPartitionBatchProbe;
+ private FragmentContext context;
+ HashJoinMemoryCalculator.PartitionStatSet partitionStatSet;
+
+ HashJoinSpillControlImpl(BufferAllocator allocator, int recordsPerBatch, int
minBatchesInAvailableMemory, RecordBatchMemoryManager batchMemoryManager,
FragmentContext context) {
+ this.allocator = allocator;
+ this.recordsPerBatch = recordsPerBatch;
+ this.minBatchesInAvailableMemory = minBatchesInAvailableMemory;
+ this.batchMemoryManager = batchMemoryManager;
+ this.context = context;
+ }
+
+ @Override
+ public boolean shouldSpill() {
+ // Expected new batch size like the current, plus the Hash Values vector
(4 bytes per HV)
+ long batchSize = (
batchMemoryManager.getRecordBatchSizer(RIGHT_INDEX).getRowAllocWidth() + 4 ) *
recordsPerBatch;
+ long reserveForOutgoing = batchMemoryManager.getOutputBatchSize();
+ long memoryAvailableNow = allocator.getLimit() -
allocator.getAllocatedMemory() - reserveForOutgoing;
+ boolean needsSpill = minBatchesInAvailableMemory * batchSize >
memoryAvailableNow;
+ if ( needsSpill ) {
+ logger.debug("should spill now - batch size {}, mem avail {}, reserved
for outgoing {}", batchSize, memoryAvailableNow, reserveForOutgoing);
+ }
+ return needsSpill; // go spill if too little memory is available
+ }
+
+ @Override
+ public void initialize(boolean firstCycle,
+ boolean reserveHash,
+ RecordBatch buildSideBatch,
+ RecordBatch probeSideBatch,
+ Set<String> joinColumns,
+ boolean probeEmpty,
+ long memoryAvailable,
+ int initialPartitions,
+ int recordsPerPartitionBatchBuild,
+ int recordsPerPartitionBatchProbe,
+ int maxBatchNumRecordsBuild,
+ int maxBatchNumRecordsProbe,
+ int outputBatchSize,
+ double loadFactor) {
+ this.initialPartitions = initialPartitions;
+ this.recordsPerPartitionBatchProbe = recordsPerPartitionBatchProbe;
+
+ calculateMemoryUsage();
+ }
+
+ @Override
+ public void setPartitionStatSet(HashJoinMemoryCalculator.PartitionStatSet
partitionStatSet) {
+ this.partitionStatSet = partitionStatSet;
+ }
+
+ @Override
+ public int getNumPartitions() {
+ return numPartitions;
+ }
+
+ /**
+ * Calculate the number of partitions possible for the given available memory
+ * start at initialPartitions and adjust down (in powers of 2) as needed
+ */
+ private void calculateMemoryUsage() {
+ long reserveForOutgoing = batchMemoryManager.getOutputBatchSize();
+ long memoryAvailableNow = allocator.getLimit() -
allocator.getAllocatedMemory() - reserveForOutgoing;
+
+ // Expected new batch size like the current, plus the Hash Values vector
(4 bytes per HV)
+ int buildBatchSize = (
batchMemoryManager.getRecordBatchSizer(RIGHT_INDEX).getRowAllocWidth() + 4 ) *
recordsPerBatch;
+ int hashTableSize = buildBatchSize /* the keys in the HT */ +
+ 4 *
(int)context.getOptions().getLong(ExecConstants.MIN_HASH_TABLE_SIZE_KEY) /* the
initial hash table buckets */ +
+ (2 + 2) * recordsPerBatch; /* the hash-values and the links */
+ int probeBatchSize = (
batchMemoryManager.getRecordBatchSizer(LEFT_INDEX).getRowAllocWidth() + 4 ) *
recordsPerBatch;
+
+ long memoryNeededPerPartition = Integer.max(buildBatchSize +
hashTableSize, probeBatchSize);
+
+ for ( numPartitions = initialPartitions; numPartitions > 2; numPartitions
/= 2 ) { // need at least 2
+ // each partition needs at least one internal build batch, and a minimum
hash-table
+ // ( or an internal probe batch, for a spilled partition during probe
phase )
+ // adding "min batches" to create some safety slack
+ if ( memoryAvailableNow >
+ ( numPartitions + minBatchesInAvailableMemory ) *
memoryNeededPerPartition ) {
+ break; // got enough memory
+ }
+ }
+ logger.debug("Spill control chosen to use {} partitions", numPartitions);
+ }
+
+ @Override
+ public long getBuildReservedMemory() {
+ return 0;
+ }
+
+ @Override
+ public long getMaxReservedMemory() {
+ return 0;
+ }
+
+ @Override
+ public String makeDebugString() {
+ return "Spill Control " +
HashJoinMemoryCalculatorImpl.HashJoinSpillControl.class.getCanonicalName();
+ }
+
+ @Nullable
+ @Override
+ public HashJoinMemoryCalculator.PostBuildCalculations next() {
+ return new
SpillControlPostBuildCalculationsImpl(recordsPerPartitionBatchProbe,
+ allocator, recordsPerBatch, minBatchesInAvailableMemory,
batchMemoryManager, partitionStatSet);
+ }
+
+ @Override
+ public HashJoinState getState() {
+ return HashJoinState.BUILD_SIDE_PARTITIONING;
+ }
+
+
+ /**
+ * The purpose of this class is to provide the method {@link #shouldSpill}
that ensures that enough memory is available to
+ * hold all the probe incoming batches for those partitions that spilled
(else need to spill more of them, for more memory).
+ */
+ public static class SpillControlPostBuildCalculationsImpl implements
HashJoinMemoryCalculator.PostBuildCalculations {
+ private static final Logger logger =
LoggerFactory.getLogger(SpillControlPostBuildCalculationsImpl.class);
+
+ private final int recordsPerPartitionBatchProbe;
+ private BufferAllocator allocator;
+ private int recordsPerBatch;
+ private int minBatchesInAvailableMemory;
+ private RecordBatchMemoryManager batchMemoryManager;
+ private boolean probeEmpty;
+ private final HashJoinMemoryCalculator.PartitionStatSet
buildPartitionStatSet;
+
+
+ public SpillControlPostBuildCalculationsImpl(final int
recordsPerPartitionBatchProbe,
+ BufferAllocator allocator,
int recordsPerBatch, int minBatchesInAvailableMemory,
+ RecordBatchMemoryManager
batchMemoryManager,
+ final
HashJoinMemoryCalculator.PartitionStatSet buildPartitionStatSet) {
+ this.allocator = allocator;
+ this.recordsPerBatch = recordsPerBatch;
+ this.minBatchesInAvailableMemory = minBatchesInAvailableMemory;
+ this.batchMemoryManager = batchMemoryManager;
+ this.recordsPerPartitionBatchProbe = recordsPerPartitionBatchProbe;
+ this.buildPartitionStatSet = buildPartitionStatSet;
+ }
+
+ @Override
+ public void initialize(boolean hasProbeData) {
+ this.probeEmpty = hasProbeData;
+ }
+
+
+ @Override
+ public int getProbeRecordsPerBatch() {
+ return recordsPerPartitionBatchProbe;
+ }
+
+ @Override
+ public boolean shouldSpill() {
+ int numPartitionsSpilled =
buildPartitionStatSet.getNumSpilledPartitions();
+ if ( numPartitionsSpilled == 0 ) { return false; } // no extra memory is
needed if all the build side is in memory
+ if ( probeEmpty ) { return false; } // no probe side data
+ // Expected new batch size like the current, plus the Hash Values vector
(4 bytes per HV)
+ long batchSize = (
batchMemoryManager.getRecordBatchSizer(LEFT_INDEX).getRowAllocWidth() + 4 ) *
recordsPerBatch;
+ long reserveForOutgoing = batchMemoryManager.getOutputBatchSize();
+ long memoryAvailableNow = allocator.getLimit() -
allocator.getAllocatedMemory() - reserveForOutgoing;
+ boolean needsSpill = (numPartitionsSpilled + minBatchesInAvailableMemory
) * batchSize > memoryAvailableNow;
+ if ( needsSpill ) {
+ logger.debug("Post build should spill now - batch size {}, mem avail
{}, reserved for outgoing {}, num partn spilled {}", batchSize,
+ memoryAvailableNow, reserveForOutgoing, numPartitionsSpilled);
+ }
+ return needsSpill; // go spill if too little memory is available
+ }
+
+ @Nullable
+ @Override
+ public HashJoinMemoryCalculator next() {
+ return null;
+ }
+
+ @Override
+ public HashJoinState getState() {
+ return HashJoinState.POST_BUILD_CALCULATIONS;
+ }
+
+ @Override
+ public String makeDebugString() {
+ return "Spill Control " +
HashJoinMemoryCalculatorImpl.HashJoinSpillControl.class.getCanonicalName() + "
calculator.";
+ }
+ }
+
+
+
+}
+
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index c97220cef98..aeba391c992 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -124,13 +124,16 @@
new OptionDefinition(ExecConstants.HASHJOIN_NUM_PARTITIONS_VALIDATOR),
new OptionDefinition(ExecConstants.HASHJOIN_MAX_MEMORY_VALIDATOR, new
OptionMetaData(OptionValue.AccessibleScopes.SYSTEM, true, true)),
new OptionDefinition(ExecConstants.HASHJOIN_NUM_ROWS_IN_BATCH_VALIDATOR),
- new
OptionDefinition(ExecConstants.HASHJOIN_MAX_BATCHES_IN_MEMORY_VALIDATOR, new
OptionMetaData(OptionValue.AccessibleScopes.SYSTEM, false, true)),
+ new
OptionDefinition(ExecConstants.HASHJOIN_MAX_BATCHES_IN_MEMORY_VALIDATOR, new
OptionMetaData(OptionValue.AccessibleScopes.ALL, false, true)),
new OptionDefinition(ExecConstants.HASHJOIN_FALLBACK_ENABLED_VALIDATOR),
// for enable/disable unbounded HashJoin
new OptionDefinition(ExecConstants.HASHJOIN_ENABLE_RUNTIME_FILTER),
new OptionDefinition(ExecConstants.HASHJOIN_BLOOM_FILTER_MAX_SIZE),
new OptionDefinition(ExecConstants.HASHJOIN_BLOOM_FILTER_FPP_VALIDATOR),
new
OptionDefinition(ExecConstants.HASHJOIN_RUNTIME_FILTER_MAX_WAITING_TIME),
new
OptionDefinition(ExecConstants.HASHJOIN_ENABLE_RUNTIME_FILTER_WAITING),
+ new
OptionDefinition(ExecConstants.HASHJOIN_SEMI_SKIP_DUPLICATES_VALIDATOR, new
OptionMetaData(OptionValue.AccessibleScopes.ALL, false, true)),
+ new
OptionDefinition(ExecConstants.HASHJOIN_SEMI_PERCENT_DUPLICATES_TO_SKIP_VALIDATOR,
new OptionMetaData(OptionValue.AccessibleScopes.ALL, false, true)),
+ new
OptionDefinition(ExecConstants.HASHJOIN_MIN_BATCHES_IN_AVAILABLE_MEMORY_VALIDATOR,
new OptionMetaData(OptionValue.AccessibleScopes.ALL, false, true)),
// ------------------------------------------- Index planning related
options BEGIN --------------------------------------------------------------
new OptionDefinition(PlannerSettings.USE_SIMPLE_OPTIMIZER),
new OptionDefinition(PlannerSettings.INDEX_PLANNING),
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/TypeValidators.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/TypeValidators.java
index 3f8f8a4b4c9..fa8a0702ec0 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/TypeValidators.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/TypeValidators.java
@@ -158,14 +158,21 @@ public DoubleValidator(String name, OptionDescription
description) {
}
public static class IntegerValidator extends LongValidator {
+ private int iMin = Integer.MIN_VALUE;
+ private int iMax = Integer.MAX_VALUE;
public IntegerValidator(String name, OptionDescription description) {
super(name, description);
}
+ public IntegerValidator(String name, int min, int max, OptionDescription
description) {
+ super(name, description);
+ iMin = min;
+ iMax = max;
+ }
@Override
public void validate(final OptionValue v, final OptionMetaData metaData,
final OptionSet manager) {
super.validate(v, metaData, manager);
- if (v.num_val > Integer.MAX_VALUE || v.num_val < Integer.MIN_VALUE) {
+ if (v.num_val > iMax || v.num_val < iMin) {
throw UserException.validationError()
.message(String.format("Option %s does not have a valid integer
value", getOptionName()))
.build(logger);
diff --git a/exec/java-exec/src/main/resources/drill-module.conf
b/exec/java-exec/src/main/resources/drill-module.conf
index 4a5f07549ea..3d25b80d326 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -490,6 +490,9 @@ drill.exec.options: {
exec.hashjoin.bloom_filter.max.size: 33554432, #32 MB
exec.hashjoin.runtime_filter.waiting.enable: true,
exec.hashjoin.runtime_filter.max.waiting.time: 300, #400 ms
+ exec.hashjoin.semi_skip_duplicates: true,
+ exec.hashjoin.semi_percent_duplicates_to_skip: 20,
+ exec.hashjoin.min_batches_in_available_memory: 3,
exec.hashagg.mem_limit: 0,
exec.hashagg.min_batches_per_partition: 2,
exec.hashagg.num_partitions: 32,
diff --git
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/common/HashPartitionTest.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/common/HashPartitionTest.java
index 93475acf6d4..a4377cb43ca 100644
---
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/common/HashPartitionTest.java
+++
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/common/HashPartitionTest.java
@@ -110,7 +110,7 @@ public void run(SpillSet spillSet,
context.getAllocator(),
baseHashTable,
buildBatch,
- probeBatch, false, 10,
+ probeBatch, false, false, 10,
spillSet,
0,
0,
@@ -209,7 +209,7 @@ public void run(SpillSet spillSet,
context.getAllocator(),
baseHashTable,
buildBatch,
- probeBatch, false, 10,
+ probeBatch, false, false, 10,
spillSet,
0,
0,
With regards,
Apache Git Services