Ben-Zvi closed pull request #1546: DRILL-6861: Hash-Join should not exit after
an empty probe-side spilled partition
URL: https://github.com/apache/drill/pull/1546
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index f1c61816584..88eadf29115 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -614,33 +614,41 @@ public IterOutcome innerNext() {
//
// (recursively) Handle the spilled partitions, if any
//
- if (!buildSideIsEmpty.booleanValue() && !spilledState.isEmpty()) {
- // Get the next (previously) spilled partition to handle as incoming
- HashJoinSpilledPartition currSp =
spilledState.getNextSpilledPartition();
-
- // Create a BUILD-side "incoming" out of the inner spill file of
that partition
- buildBatch = new SpilledRecordbatch(currSp.innerSpillFile,
currSp.innerSpilledBatches, context, buildSchema, oContext, spillSet);
- // The above ctor call also got the first batch; need to update the
outcome
- rightUpstream = ((SpilledRecordbatch)
buildBatch).getInitialOutcome();
-
- if (currSp.outerSpilledBatches > 0) {
- // Create a PROBE-side "incoming" out of the outer spill file of
that partition
- probeBatch = new SpilledRecordbatch(currSp.outerSpillFile,
currSp.outerSpilledBatches, context, probeSchema, oContext, spillSet);
+ if (!buildSideIsEmpty.booleanValue()) {
+ while (!spilledState.isEmpty()) { // "while" is only used for
skipping; see "continue" below
+
+ // Get the next (previously) spilled partition to handle as
incoming
+ HashJoinSpilledPartition currSp =
spilledState.getNextSpilledPartition();
+
+ // If the outer is empty (and it's not a right/full join) - try
the next spilled partition
+ if (currSp.outerSpilledBatches == 0 && !joinIsRightOrFull) {
+ continue;
+ }
+
+ // Create a BUILD-side "incoming" out of the inner spill file of
that partition
+ buildBatch = new SpilledRecordbatch(currSp.innerSpillFile,
currSp.innerSpilledBatches, context, buildSchema, oContext, spillSet);
// The above ctor call also got the first batch; need to update
the outcome
- leftUpstream = ((SpilledRecordbatch)
probeBatch).getInitialOutcome();
- } else {
- probeBatch = left; // if no outer batch then reuse left - needed
for updateIncoming()
- leftUpstream = IterOutcome.NONE;
- hashJoinProbe.changeToFinalProbeState();
- }
+ rightUpstream = ((SpilledRecordbatch)
buildBatch).getInitialOutcome();
+
+ if (currSp.outerSpilledBatches > 0) {
+ // Create a PROBE-side "incoming" out of the outer spill file of
that partition
+ probeBatch = new SpilledRecordbatch(currSp.outerSpillFile,
currSp.outerSpilledBatches, context, probeSchema, oContext, spillSet);
+ // The above ctor call also got the first batch; need to update
the outcome
+ leftUpstream = ((SpilledRecordbatch)
probeBatch).getInitialOutcome();
+ } else {
+ probeBatch = left; // if no outer batch then reuse left - needed
for updateIncoming()
+ leftUpstream = IterOutcome.NONE;
+ hashJoinProbe.changeToFinalProbeState();
+ }
- spilledState.updateCycle(stats, currSp, spilledStateUpdater);
- state = BatchState.FIRST; // TODO need to determine if this is
still necessary since prefetchFirstBatchFromBothSides sets this
+ spilledState.updateCycle(stats, currSp, spilledStateUpdater);
+ state = BatchState.FIRST; // TODO need to determine if this is
still necessary since prefetchFirstBatchFromBothSides sets this
- prefetchedBuild.setValue(false);
- prefetchedProbe.setValue(false);
+ prefetchedBuild.setValue(false);
+ prefetchedProbe.setValue(false);
- return innerNext(); // start processing the next spilled partition
"recursively"
+ return innerNext(); // start processing the next spilled partition
"recursively"
+ }
}
} else {
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 87e5286a485..7897c3b0096 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -124,7 +124,7 @@
new OptionDefinition(ExecConstants.HASHJOIN_NUM_PARTITIONS_VALIDATOR),
new OptionDefinition(ExecConstants.HASHJOIN_MAX_MEMORY_VALIDATOR, new
OptionMetaData(OptionValue.AccessibleScopes.SYSTEM, true, true)),
new OptionDefinition(ExecConstants.HASHJOIN_NUM_ROWS_IN_BATCH_VALIDATOR),
- new
OptionDefinition(ExecConstants.HASHJOIN_MAX_BATCHES_IN_MEMORY_VALIDATOR, new
OptionMetaData(OptionValue.AccessibleScopes.SYSTEM, true, true)),
+ new
OptionDefinition(ExecConstants.HASHJOIN_MAX_BATCHES_IN_MEMORY_VALIDATOR, new
OptionMetaData(OptionValue.AccessibleScopes.SYSTEM, false, true)),
new OptionDefinition(ExecConstants.HASHJOIN_FALLBACK_ENABLED_VALIDATOR),
// for enable/disable unbounded HashJoin
new OptionDefinition(ExecConstants.HASHJOIN_ENABLE_RUNTIME_FILTER),
new OptionDefinition(ExecConstants.HASHJOIN_BLOOM_FILTER_MAX_SIZE),
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services