Ben-Zvi closed pull request #1546: DRILL-6861: Hash-Join should not exit after 
an empty probe-side spilled partition
URL: https://github.com/apache/drill/pull/1546
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index f1c61816584..88eadf29115 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -614,33 +614,41 @@ public IterOutcome innerNext() {
         //
         //  (recursively) Handle the spilled partitions, if any
         //
-        if (!buildSideIsEmpty.booleanValue() && !spilledState.isEmpty()) {
-          // Get the next (previously) spilled partition to handle as incoming
-          HashJoinSpilledPartition currSp = 
spilledState.getNextSpilledPartition();
-
-          // Create a BUILD-side "incoming" out of the inner spill file of 
that partition
-          buildBatch = new SpilledRecordbatch(currSp.innerSpillFile, 
currSp.innerSpilledBatches, context, buildSchema, oContext, spillSet);
-          // The above ctor call also got the first batch; need to update the 
outcome
-          rightUpstream = ((SpilledRecordbatch) 
buildBatch).getInitialOutcome();
-
-          if (currSp.outerSpilledBatches > 0) {
-            // Create a PROBE-side "incoming" out of the outer spill file of 
that partition
-            probeBatch = new SpilledRecordbatch(currSp.outerSpillFile, 
currSp.outerSpilledBatches, context, probeSchema, oContext, spillSet);
+        if (!buildSideIsEmpty.booleanValue()) {
+          while (!spilledState.isEmpty()) {  // "while" is only used for 
skipping; see "continue" below
+
+            // Get the next (previously) spilled partition to handle as 
incoming
+            HashJoinSpilledPartition currSp = 
spilledState.getNextSpilledPartition();
+
+            // If the outer is empty (and it's not a right/full join) - try 
the next spilled partition
+            if (currSp.outerSpilledBatches == 0 && !joinIsRightOrFull) {
+              continue;
+            }
+
+            // Create a BUILD-side "incoming" out of the inner spill file of 
that partition
+            buildBatch = new SpilledRecordbatch(currSp.innerSpillFile, 
currSp.innerSpilledBatches, context, buildSchema, oContext, spillSet);
             // The above ctor call also got the first batch; need to update 
the outcome
-            leftUpstream = ((SpilledRecordbatch) 
probeBatch).getInitialOutcome();
-          } else {
-            probeBatch = left; // if no outer batch then reuse left - needed 
for updateIncoming()
-            leftUpstream = IterOutcome.NONE;
-            hashJoinProbe.changeToFinalProbeState();
-          }
+            rightUpstream = ((SpilledRecordbatch) 
buildBatch).getInitialOutcome();
+
+            if (currSp.outerSpilledBatches > 0) {
+              // Create a PROBE-side "incoming" out of the outer spill file of 
that partition
+              probeBatch = new SpilledRecordbatch(currSp.outerSpillFile, 
currSp.outerSpilledBatches, context, probeSchema, oContext, spillSet);
+              // The above ctor call also got the first batch; need to update 
the outcome
+              leftUpstream = ((SpilledRecordbatch) 
probeBatch).getInitialOutcome();
+            } else {
+              probeBatch = left; // if no outer batch then reuse left - needed 
for updateIncoming()
+              leftUpstream = IterOutcome.NONE;
+              hashJoinProbe.changeToFinalProbeState();
+            }
 
-          spilledState.updateCycle(stats, currSp, spilledStateUpdater);
-          state = BatchState.FIRST;  // TODO need to determine if this is 
still necessary since prefetchFirstBatchFromBothSides sets this
+            spilledState.updateCycle(stats, currSp, spilledStateUpdater);
+            state = BatchState.FIRST;  // TODO need to determine if this is 
still necessary since prefetchFirstBatchFromBothSides sets this
 
-          prefetchedBuild.setValue(false);
-          prefetchedProbe.setValue(false);
+            prefetchedBuild.setValue(false);
+            prefetchedProbe.setValue(false);
 
-          return innerNext(); // start processing the next spilled partition 
"recursively"
+            return innerNext(); // start processing the next spilled partition 
"recursively"
+          }
         }
 
       } else {
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 87e5286a485..7897c3b0096 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -124,7 +124,7 @@
       new OptionDefinition(ExecConstants.HASHJOIN_NUM_PARTITIONS_VALIDATOR),
       new OptionDefinition(ExecConstants.HASHJOIN_MAX_MEMORY_VALIDATOR, new 
OptionMetaData(OptionValue.AccessibleScopes.SYSTEM, true, true)),
       new OptionDefinition(ExecConstants.HASHJOIN_NUM_ROWS_IN_BATCH_VALIDATOR),
-      new 
OptionDefinition(ExecConstants.HASHJOIN_MAX_BATCHES_IN_MEMORY_VALIDATOR, new 
OptionMetaData(OptionValue.AccessibleScopes.SYSTEM, true, true)),
+      new 
OptionDefinition(ExecConstants.HASHJOIN_MAX_BATCHES_IN_MEMORY_VALIDATOR, new 
OptionMetaData(OptionValue.AccessibleScopes.SYSTEM, false, true)),
       new OptionDefinition(ExecConstants.HASHJOIN_FALLBACK_ENABLED_VALIDATOR), 
// for enable/disable unbounded HashJoin
       new OptionDefinition(ExecConstants.HASHJOIN_ENABLE_RUNTIME_FILTER),
       new OptionDefinition(ExecConstants.HASHJOIN_BLOOM_FILTER_MAX_SIZE),


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to