[
https://issues.apache.org/jira/browse/DRILL-6861?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16693760#comment-16693760
]
ASF GitHub Bot commented on DRILL-6861:
---------------------------------------
Ben-Zvi closed pull request #1546: DRILL-6861: Hash-Join should not exit after
an empty probe-side spilled partition
URL: https://github.com/apache/drill/pull/1546
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index f1c61816584..88eadf29115 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -614,33 +614,41 @@ public IterOutcome innerNext() {
//
// (recursively) Handle the spilled partitions, if any
//
- if (!buildSideIsEmpty.booleanValue() && !spilledState.isEmpty()) {
- // Get the next (previously) spilled partition to handle as incoming
- HashJoinSpilledPartition currSp =
spilledState.getNextSpilledPartition();
-
- // Create a BUILD-side "incoming" out of the inner spill file of
that partition
- buildBatch = new SpilledRecordbatch(currSp.innerSpillFile,
currSp.innerSpilledBatches, context, buildSchema, oContext, spillSet);
- // The above ctor call also got the first batch; need to update the
outcome
- rightUpstream = ((SpilledRecordbatch)
buildBatch).getInitialOutcome();
-
- if (currSp.outerSpilledBatches > 0) {
- // Create a PROBE-side "incoming" out of the outer spill file of
that partition
- probeBatch = new SpilledRecordbatch(currSp.outerSpillFile,
currSp.outerSpilledBatches, context, probeSchema, oContext, spillSet);
+ if (!buildSideIsEmpty.booleanValue()) {
+ while (!spilledState.isEmpty()) { // "while" is only used for
skipping; see "continue" below
+
+ // Get the next (previously) spilled partition to handle as
incoming
+ HashJoinSpilledPartition currSp =
spilledState.getNextSpilledPartition();
+
+ // If the outer is empty (and it's not a right/full join) - try
the next spilled partition
+ if (currSp.outerSpilledBatches == 0 && !joinIsRightOrFull) {
+ continue;
+ }
+
+ // Create a BUILD-side "incoming" out of the inner spill file of
that partition
+ buildBatch = new SpilledRecordbatch(currSp.innerSpillFile,
currSp.innerSpilledBatches, context, buildSchema, oContext, spillSet);
// The above ctor call also got the first batch; need to update
the outcome
- leftUpstream = ((SpilledRecordbatch)
probeBatch).getInitialOutcome();
- } else {
- probeBatch = left; // if no outer batch then reuse left - needed
for updateIncoming()
- leftUpstream = IterOutcome.NONE;
- hashJoinProbe.changeToFinalProbeState();
- }
+ rightUpstream = ((SpilledRecordbatch)
buildBatch).getInitialOutcome();
+
+ if (currSp.outerSpilledBatches > 0) {
+ // Create a PROBE-side "incoming" out of the outer spill file of
that partition
+ probeBatch = new SpilledRecordbatch(currSp.outerSpillFile,
currSp.outerSpilledBatches, context, probeSchema, oContext, spillSet);
+ // The above ctor call also got the first batch; need to update
the outcome
+ leftUpstream = ((SpilledRecordbatch)
probeBatch).getInitialOutcome();
+ } else {
+ probeBatch = left; // if no outer batch then reuse left - needed
for updateIncoming()
+ leftUpstream = IterOutcome.NONE;
+ hashJoinProbe.changeToFinalProbeState();
+ }
- spilledState.updateCycle(stats, currSp, spilledStateUpdater);
- state = BatchState.FIRST; // TODO need to determine if this is
still necessary since prefetchFirstBatchFromBothSides sets this
+ spilledState.updateCycle(stats, currSp, spilledStateUpdater);
+ state = BatchState.FIRST; // TODO need to determine if this is
still necessary since prefetchFirstBatchFromBothSides sets this
- prefetchedBuild.setValue(false);
- prefetchedProbe.setValue(false);
+ prefetchedBuild.setValue(false);
+ prefetchedProbe.setValue(false);
- return innerNext(); // start processing the next spilled partition
"recursively"
+ return innerNext(); // start processing the next spilled partition
"recursively"
+ }
}
} else {
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 87e5286a485..7897c3b0096 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -124,7 +124,7 @@
new OptionDefinition(ExecConstants.HASHJOIN_NUM_PARTITIONS_VALIDATOR),
new OptionDefinition(ExecConstants.HASHJOIN_MAX_MEMORY_VALIDATOR, new
OptionMetaData(OptionValue.AccessibleScopes.SYSTEM, true, true)),
new OptionDefinition(ExecConstants.HASHJOIN_NUM_ROWS_IN_BATCH_VALIDATOR),
- new
OptionDefinition(ExecConstants.HASHJOIN_MAX_BATCHES_IN_MEMORY_VALIDATOR, new
OptionMetaData(OptionValue.AccessibleScopes.SYSTEM, true, true)),
+ new
OptionDefinition(ExecConstants.HASHJOIN_MAX_BATCHES_IN_MEMORY_VALIDATOR, new
OptionMetaData(OptionValue.AccessibleScopes.SYSTEM, false, true)),
new OptionDefinition(ExecConstants.HASHJOIN_FALLBACK_ENABLED_VALIDATOR),
// for enable/disable unbounded HashJoin
new OptionDefinition(ExecConstants.HASHJOIN_ENABLE_RUNTIME_FILTER),
new OptionDefinition(ExecConstants.HASHJOIN_BLOOM_FILTER_MAX_SIZE),
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Hash-Join: Spilled partitions are skipped following an empty probe side
> -----------------------------------------------------------------------
>
> Key: DRILL-6861
> URL: https://issues.apache.org/jira/browse/DRILL-6861
> Project: Apache Drill
> Issue Type: Bug
> Components: Execution - Relational Operators
> Affects Versions: 1.14.0
> Reporter: Boaz Ben-Zvi
> Assignee: Boaz Ben-Zvi
> Priority: Blocker
> Labels: ready-to-commit
> Fix For: 1.15.0
>
> Original Estimate: 96h
> Remaining Estimate: 96h
>
> Following DRILL-6755 (_Avoid building a hash table when the probe side
> is empty_) - The special case of an empty spilled probe-partition was not
> handled. When such a case happens, the Hash-Join terminates early (returns
> NONE) and the remaining partitions are not processed/returned (which may lead
> to incorrect results).
> A test case - force tpcds/query95 to spill (sf1) :
> {code:java}
> 0: jdbc:drill:zk=local> alter system set
> `exec.hashjoin.max_batches_in_memory` = 40;
> +-------+-----------------------------------------------+
> | ok | summary |
> +-------+-----------------------------------------------+
> | true | exec.hashjoin.max_batches_in_memory updated. |
> +-------+-----------------------------------------------+
> 1 row selected (1.325 seconds)
> 0: jdbc:drill:zk=local> WITH ws_wh AS
> . . . . . . . . . . . > (
> . . . . . . . . . . . > SELECT ws1.ws_order_number,
> . . . . . . . . . . . > ws1.ws_warehouse_sk wh1,
> . . . . . . . . . . . > ws2.ws_warehouse_sk wh2
> . . . . . . . . . . . > FROM dfs.`/data/tpcds/sf1/parquet/web_sales`
> ws1,
> . . . . . . . . . . . > dfs.`/data/tpcds/sf1/parquet/web_sales`
> ws2
> . . . . . . . . . . . > WHERE ws1.ws_order_number =
> ws2.ws_order_number
> . . . . . . . . . . . > AND ws1.ws_warehouse_sk <>
> ws2.ws_warehouse_sk)
> . . . . . . . . . . . > SELECT
> . . . . . . . . . . . > Count(DISTINCT ws1.ws_order_number) AS
> `order count` ,
> . . . . . . . . . . . > Sum(ws1.ws_ext_ship_cost) AS
> `total shipping cost` ,
> . . . . . . . . . . . > Sum(ws1.ws_net_profit) AS
> `total net profit`
> . . . . . . . . . . . > FROM dfs.`/data/tpcds/sf1/parquet/web_sales` ws1 ,
> . . . . . . . . . . . > dfs.`/data/tpcds/sf1/parquet/date_dim` dd,
> . . . . . . . . . . . >
> dfs.`/data/tpcds/sf1/parquet/customer_address` ca,
> . . . . . . . . . . . > dfs.`/data/tpcds/sf1/parquet/web_site` wbst
> . . . . . . . . . . . > WHERE dd.d_date BETWEEN '2000-04-01' AND (
> . . . . . . . . . . . > Cast('2000-04-01' AS DATE) +
> INTERVAL '60' day)
> . . . . . . . . . . . > AND ws1.ws_ship_date_sk = dd.d_date_sk
> . . . . . . . . . . . > AND ws1.ws_ship_addr_sk = ca.ca_address_sk
> . . . . . . . . . . . > AND ca.ca_state = 'IN'
> . . . . . . . . . . . > AND ws1.ws_web_site_sk = wbst.web_site_sk
> . . . . . . . . . . . > AND wbst.web_company_name = 'pri'
> . . . . . . . . . . . > AND ws1.ws_order_number IN
> . . . . . . . . . . . > (
> . . . . . . . . . . . > SELECT ws_wh.ws_order_number
> . . . . . . . . . . . > FROM ws_wh)
> . . . . . . . . . . . > AND ws1.ws_order_number IN
> . . . . . . . . . . . > (
> . . . . . . . . . . . > SELECT wr.wr_order_number
> . . . . . . . . . . . > FROM
> dfs.`/data/tpcds/sf1/parquet/web_returns` wr,
> . . . . . . . . . . . > ws_wh
> . . . . . . . . . . . > WHERE wr.wr_order_number =
> ws_wh.ws_order_number)
> . . . . . . . . . . . > ORDER BY count(DISTINCT ws1.ws_order_number)
> . . . . . . . . . . . > LIMIT 100;
> +--------------+----------------------+---------------------+
> | order count | total shipping cost | total net profit |
> +--------------+----------------------+---------------------+
> | 17 | 38508.130000000005 | 20822.3 |
> +--------------+----------------------+---------------------+
> 1 row selected (105.621 seconds)
> {code}
> The correct results should be:
> {code:java}
> +--------------+----------------------+---------------------+
> | order count | total shipping cost | total net profit |
> +--------------+----------------------+---------------------+
> | 34 | 63754.72 | 15919.009999999998 |
> +--------------+----------------------+---------------------+
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)