[ 
https://issues.apache.org/jira/browse/DRILL-6861?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16693760#comment-16693760
 ] 

ASF GitHub Bot commented on DRILL-6861:
---------------------------------------

Ben-Zvi closed pull request #1546: DRILL-6861: Hash-Join should not exit after 
an empty probe-side spilled partition
URL: https://github.com/apache/drill/pull/1546
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index f1c61816584..88eadf29115 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -614,33 +614,41 @@ public IterOutcome innerNext() {
         //
         //  (recursively) Handle the spilled partitions, if any
         //
-        if (!buildSideIsEmpty.booleanValue() && !spilledState.isEmpty()) {
-          // Get the next (previously) spilled partition to handle as incoming
-          HashJoinSpilledPartition currSp = 
spilledState.getNextSpilledPartition();
-
-          // Create a BUILD-side "incoming" out of the inner spill file of 
that partition
-          buildBatch = new SpilledRecordbatch(currSp.innerSpillFile, 
currSp.innerSpilledBatches, context, buildSchema, oContext, spillSet);
-          // The above ctor call also got the first batch; need to update the 
outcome
-          rightUpstream = ((SpilledRecordbatch) 
buildBatch).getInitialOutcome();
-
-          if (currSp.outerSpilledBatches > 0) {
-            // Create a PROBE-side "incoming" out of the outer spill file of 
that partition
-            probeBatch = new SpilledRecordbatch(currSp.outerSpillFile, 
currSp.outerSpilledBatches, context, probeSchema, oContext, spillSet);
+        if (!buildSideIsEmpty.booleanValue()) {
+          while (!spilledState.isEmpty()) {  // "while" is only used for 
skipping; see "continue" below
+
+            // Get the next (previously) spilled partition to handle as 
incoming
+            HashJoinSpilledPartition currSp = 
spilledState.getNextSpilledPartition();
+
+            // If the outer is empty (and it's not a right/full join) - try 
the next spilled partition
+            if (currSp.outerSpilledBatches == 0 && !joinIsRightOrFull) {
+              continue;
+            }
+
+            // Create a BUILD-side "incoming" out of the inner spill file of 
that partition
+            buildBatch = new SpilledRecordbatch(currSp.innerSpillFile, 
currSp.innerSpilledBatches, context, buildSchema, oContext, spillSet);
             // The above ctor call also got the first batch; need to update 
the outcome
-            leftUpstream = ((SpilledRecordbatch) 
probeBatch).getInitialOutcome();
-          } else {
-            probeBatch = left; // if no outer batch then reuse left - needed 
for updateIncoming()
-            leftUpstream = IterOutcome.NONE;
-            hashJoinProbe.changeToFinalProbeState();
-          }
+            rightUpstream = ((SpilledRecordbatch) 
buildBatch).getInitialOutcome();
+
+            if (currSp.outerSpilledBatches > 0) {
+              // Create a PROBE-side "incoming" out of the outer spill file of 
that partition
+              probeBatch = new SpilledRecordbatch(currSp.outerSpillFile, 
currSp.outerSpilledBatches, context, probeSchema, oContext, spillSet);
+              // The above ctor call also got the first batch; need to update 
the outcome
+              leftUpstream = ((SpilledRecordbatch) 
probeBatch).getInitialOutcome();
+            } else {
+              probeBatch = left; // if no outer batch then reuse left - needed 
for updateIncoming()
+              leftUpstream = IterOutcome.NONE;
+              hashJoinProbe.changeToFinalProbeState();
+            }
 
-          spilledState.updateCycle(stats, currSp, spilledStateUpdater);
-          state = BatchState.FIRST;  // TODO need to determine if this is 
still necessary since prefetchFirstBatchFromBothSides sets this
+            spilledState.updateCycle(stats, currSp, spilledStateUpdater);
+            state = BatchState.FIRST;  // TODO need to determine if this is 
still necessary since prefetchFirstBatchFromBothSides sets this
 
-          prefetchedBuild.setValue(false);
-          prefetchedProbe.setValue(false);
+            prefetchedBuild.setValue(false);
+            prefetchedProbe.setValue(false);
 
-          return innerNext(); // start processing the next spilled partition 
"recursively"
+            return innerNext(); // start processing the next spilled partition 
"recursively"
+          }
         }
 
       } else {
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 87e5286a485..7897c3b0096 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -124,7 +124,7 @@
       new OptionDefinition(ExecConstants.HASHJOIN_NUM_PARTITIONS_VALIDATOR),
       new OptionDefinition(ExecConstants.HASHJOIN_MAX_MEMORY_VALIDATOR, new 
OptionMetaData(OptionValue.AccessibleScopes.SYSTEM, true, true)),
       new OptionDefinition(ExecConstants.HASHJOIN_NUM_ROWS_IN_BATCH_VALIDATOR),
-      new 
OptionDefinition(ExecConstants.HASHJOIN_MAX_BATCHES_IN_MEMORY_VALIDATOR, new 
OptionMetaData(OptionValue.AccessibleScopes.SYSTEM, true, true)),
+      new 
OptionDefinition(ExecConstants.HASHJOIN_MAX_BATCHES_IN_MEMORY_VALIDATOR, new 
OptionMetaData(OptionValue.AccessibleScopes.SYSTEM, false, true)),
       new OptionDefinition(ExecConstants.HASHJOIN_FALLBACK_ENABLED_VALIDATOR), 
// for enable/disable unbounded HashJoin
       new OptionDefinition(ExecConstants.HASHJOIN_ENABLE_RUNTIME_FILTER),
       new OptionDefinition(ExecConstants.HASHJOIN_BLOOM_FILTER_MAX_SIZE),


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> Hash-Join: Spilled partitions are skipped following an empty probe side
> -----------------------------------------------------------------------
>
>                 Key: DRILL-6861
>                 URL: https://issues.apache.org/jira/browse/DRILL-6861
>             Project: Apache Drill
>          Issue Type: Bug
>          Components: Execution - Relational Operators
>    Affects Versions: 1.14.0
>            Reporter: Boaz Ben-Zvi
>            Assignee: Boaz Ben-Zvi
>            Priority: Blocker
>              Labels: ready-to-commit
>             Fix For: 1.15.0
>
>   Original Estimate: 96h
>  Remaining Estimate: 96h
>
>      Following DRILL-6755 (_Avoid building a hash table when the probe side 
> is empty_) - The special case of an empty spilled probe-partition was not 
> handled.  When such a case happens, the Hash-Join terminates early (returns 
> NONE) and the remaining partitions are not processed/returned (which may lead 
> to incorrect results).
>   A test case - force tpcds/query95 to spill (sf1) :
> {code:java}
> 0: jdbc:drill:zk=local> alter system set 
> `exec.hashjoin.max_batches_in_memory` = 40;
> +-------+-----------------------------------------------+
> |  ok   |                    summary                    |
> +-------+-----------------------------------------------+
> | true  | exec.hashjoin.max_batches_in_memory updated.  |
> +-------+-----------------------------------------------+
> 1 row selected (1.325 seconds)
> 0: jdbc:drill:zk=local> WITH ws_wh AS
> . . . . . . . . . . . > (
> . . . . . . . . . . . >        SELECT ws1.ws_order_number,
> . . . . . . . . . . . >               ws1.ws_warehouse_sk wh1,
> . . . . . . . . . . . >               ws2.ws_warehouse_sk wh2
> . . . . . . . . . . . >        FROM   dfs.`/data/tpcds/sf1/parquet/web_sales` 
> ws1,
> . . . . . . . . . . . >               dfs.`/data/tpcds/sf1/parquet/web_sales` 
> ws2
> . . . . . . . . . . . >        WHERE  ws1.ws_order_number = 
> ws2.ws_order_number
> . . . . . . . . . . . >        AND    ws1.ws_warehouse_sk <> 
> ws2.ws_warehouse_sk)
> . . . . . . . . . . . > SELECT
> . . . . . . . . . . . >          Count(DISTINCT ws1.ws_order_number) AS 
> `order count` ,
> . . . . . . . . . . . >          Sum(ws1.ws_ext_ship_cost)           AS 
> `total shipping cost` ,
> . . . . . . . . . . . >          Sum(ws1.ws_net_profit)              AS 
> `total net profit`
> . . . . . . . . . . . > FROM     dfs.`/data/tpcds/sf1/parquet/web_sales` ws1 ,
> . . . . . . . . . . . >          dfs.`/data/tpcds/sf1/parquet/date_dim` dd,
> . . . . . . . . . . . >          
> dfs.`/data/tpcds/sf1/parquet/customer_address` ca,
> . . . . . . . . . . . >          dfs.`/data/tpcds/sf1/parquet/web_site` wbst
> . . . . . . . . . . . > WHERE    dd.d_date BETWEEN '2000-04-01' AND      (
> . . . . . . . . . . . >                   Cast('2000-04-01' AS DATE) + 
> INTERVAL '60' day)
> . . . . . . . . . . . > AND      ws1.ws_ship_date_sk = dd.d_date_sk
> . . . . . . . . . . . > AND      ws1.ws_ship_addr_sk = ca.ca_address_sk
> . . . . . . . . . . . > AND      ca.ca_state = 'IN'
> . . . . . . . . . . . > AND      ws1.ws_web_site_sk = wbst.web_site_sk
> . . . . . . . . . . . > AND      wbst.web_company_name = 'pri'
> . . . . . . . . . . . > AND      ws1.ws_order_number IN
> . . . . . . . . . . . >          (
> . . . . . . . . . . . >                 SELECT ws_wh.ws_order_number
> . . . . . . . . . . . >                 FROM   ws_wh)
> . . . . . . . . . . . > AND      ws1.ws_order_number IN
> . . . . . . . . . . . >          (
> . . . . . . . . . . . >                 SELECT wr.wr_order_number
> . . . . . . . . . . . >                 FROM   
> dfs.`/data/tpcds/sf1/parquet/web_returns` wr,
> . . . . . . . . . . . >                        ws_wh
> . . . . . . . . . . . >                 WHERE  wr.wr_order_number = 
> ws_wh.ws_order_number)
> . . . . . . . . . . . > ORDER BY count(DISTINCT ws1.ws_order_number)
> . . . . . . . . . . . > LIMIT 100;
> +--------------+----------------------+---------------------+
> | order count  | total shipping cost  |  total net profit   |
> +--------------+----------------------+---------------------+
> | 17           | 38508.130000000005   | 20822.3           |
> +--------------+----------------------+---------------------+
> 1 row selected (105.621 seconds)
> {code}
> The correct results should be:
> {code:java}
> +--------------+----------------------+---------------------+
> | order count  | total shipping cost  |  total net profit   |
> +--------------+----------------------+---------------------+
> | 34           | 63754.72             | 15919.009999999998  |
> +--------------+----------------------+---------------------+
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to