This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new ca1a1d0710d [SPARK-40628][SQL] Do not push complex left semi/anti join condition through project ca1a1d0710d is described below commit ca1a1d0710d7fdeac3ed6afa075b214e342fae08 Author: Yuming Wang <yumw...@ebay.com> AuthorDate: Thu Oct 6 22:50:31 2022 -0700 [SPARK-40628][SQL] Do not push complex left semi/anti join condition through project ### What changes were proposed in this pull request? This PR makes `PushDownLeftSemiAntiJoin` do not push complex left semi/anti join condition through project. ### Why are the changes needed? It will impact performance because the complex expression will evaluate three times if it is SortMergeJoin. For example: ```sql CREATE TABLE t1(item_id BIGINT, event_type STRING, dt STRING) USING parquet PARTITIONED BY (dt); CREATE TABLE t2(item_id BIGINT, cal_dt DATE) using parquet; set spark.sql.autoBroadcastJoinThreshold=-1; SELECT item_id, event_type FROM ( SELECT *, To_date(t1.dt, 'yyyyMMdd') AS new_dt FROM t1) tmp LEFT SEMI JOIN t2 ON tmp.item_id = t2.item_id AND tmp.new_dt = t2.cal_dt; ``` Before this PR, `cast(gettimestamp(dt#30, yyyyMMdd, TimestampType, Some(America/Los_Angeles), false) as date)` will evaluate three times: ``` AdaptiveSparkPlan isFinalPlan=false +- Project [item_id#28L, event_type#29] +- SortMergeJoin [item_id#28L, cast(gettimestamp(dt#30, yyyyMMdd, TimestampType, Some(America/Los_Angeles), false) as date)], [item_id#31L, cal_dt#32], LeftSemi :- Sort [item_id#28L ASC NULLS FIRST, cast(gettimestamp(dt#30, yyyyMMdd, TimestampType, Some(America/Los_Angeles), false) as date) ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(item_id#28L, cast(gettimestamp(dt#30, yyyyMMdd, TimestampType, Some(America/Los_Angeles), false) as date), 5), ENSURE_REQUIREMENTS, [plan_id=110] : +- Filter isnotnull(item_id#28L) : +- FileScan parquet spark_catalog.default.t1[item_id#28L,event_type#29,dt#30] +- Sort [item_id#31L ASC NULLS FIRST, cal_dt#32 ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(item_id#31L, cal_dt#32, 5), ENSURE_REQUIREMENTS, [plan_id=111] +- Filter (isnotnull(item_id#31L) AND isnotnull(cal_dt#32)) +- FileScan parquet spark_catalog.default.t2[item_id#31L,cal_dt#32] ``` The task stack trace: ``` java.base17.0.4.1/java.text.DecimalFormat.parse(DecimalFormat.java:2149) java.base17.0.4.1/java.text.SimpleDateFormat.subParse(SimpleDateFormat.java:1935) java.base17.0.4.1/java.text.SimpleDateFormat.parse(SimpleDateFormat.java:1545) java.base17.0.4.1/java.text.DateFormat.parse(DateFormat.java:397) app//org.apache.spark.sql.catalyst.util.LegacySimpleTimestampFormatter.parse(TimestampFormatter.scala:237) org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificOrdering.Cast_0$(Unknown Source) org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificOrdering.compare(Unknown Source) app//org.apache.spark.sql.catalyst.expressions.BaseOrdering.compare(ordering.scala:29) ... ``` After this PR: ``` == Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- Project [item_id#28L, event_type#29] +- SortMergeJoin [item_id#28L, new_dt#37], [item_id#31L, cal_dt#32], LeftSemi :- Sort [item_id#28L ASC NULLS FIRST, new_dt#37 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(item_id#28L, new_dt#37, 5), ENSURE_REQUIREMENTS, [plan_id=110] : +- Project [item_id#28L, event_type#29, cast(gettimestamp(dt#30, yyyyMMdd, TimestampType, Some(America/Los_Angeles), false) as date) AS new_dt#37] : +- Filter isnotnull(item_id#28L) : +- FileScan parquet spark_catalog.default.t1[item_id#28L,event_type#29,dt#30] +- Sort [item_id#31L ASC NULLS FIRST, cal_dt#32 ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(item_id#31L, cal_dt#32, 5), ENSURE_REQUIREMENTS, [plan_id=111] +- Filter (isnotnull(item_id#31L) AND isnotnull(cal_dt#32)) +- FileScan parquet spark_catalog.default.t2[item_id#31L,cal_dt#32] ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Unit test. Closes #38069 from wangyum/SPARK-40628. Authored-by: Yuming Wang <yumw...@ebay.com> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> --- .../optimizer/PushDownLeftSemiAntiJoin.scala | 15 +- .../optimizer/LeftSemiAntiJoinPushDownSuite.scala | 8 + .../approved-plans-v1_4/q8.sf100/explain.txt | 202 ++++++++++----------- .../approved-plans-v1_4/q8.sf100/simplified.txt | 66 +++---- .../approved-plans-v1_4/q8/explain.txt | 170 ++++++++--------- .../approved-plans-v1_4/q8/simplified.txt | 52 +++--- 6 files changed, 263 insertions(+), 250 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushDownLeftSemiAntiJoin.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushDownLeftSemiAntiJoin.scala index 31b9d604060..9f3c7ef9c28 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushDownLeftSemiAntiJoin.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushDownLeftSemiAntiJoin.scala @@ -38,7 +38,7 @@ object PushDownLeftSemiAntiJoin extends Rule[LogicalPlan] def apply(plan: LogicalPlan): LogicalPlan = plan.transformWithPruning( _.containsPattern(LEFT_SEMI_OR_ANTI_JOIN), ruleId) { // LeftSemi/LeftAnti over Project - case Join(p @ Project(pList, gChild), rightOp, LeftSemiOrAnti(joinType), joinCond, hint) + case j @ Join(p @ Project(pList, gChild), rightOp, LeftSemiOrAnti(joinType), joinCond, hint) if pList.forall(_.deterministic) && !pList.exists(ScalarSubquery.hasCorrelatedScalarSubquery) && canPushThroughCondition(Seq(gChild), joinCond, rightOp) => @@ -47,12 +47,17 @@ object PushDownLeftSemiAntiJoin extends Rule[LogicalPlan] p.copy(child = Join(gChild, rightOp, joinType, joinCond, hint)) } else { val aliasMap = getAliasMap(p) - val newJoinCond = if (aliasMap.nonEmpty) { - Option(replaceAlias(joinCond.get, aliasMap)) + // Do not push complex join condition + if (aliasMap.forall(_._2.child.children.isEmpty)) { + val newJoinCond = if (aliasMap.nonEmpty) { + Option(replaceAlias(joinCond.get, aliasMap)) + } else { + joinCond + } + p.copy(child = Join(gChild, rightOp, joinType, newJoinCond, hint)) } else { - joinCond + j } - p.copy(child = Join(gChild, rightOp, joinType, newJoinCond, hint)) } // LeftSemi/LeftAnti over Aggregate, only push down if join can be planned as broadcast join. diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LeftSemiAntiJoinPushDownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LeftSemiAntiJoinPushDownSuite.scala index 77f58746dfc..9f77f448d23 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LeftSemiAntiJoinPushDownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LeftSemiAntiJoinPushDownSuite.scala @@ -483,4 +483,12 @@ class LeftSemiPushdownSuite extends PlanTest { } } + test("SPARK-40628: Do not push complex left semi/anti join condition through project") { + val originalQuery = testRelation + .select(($"a" + 1).as("new_a")) + .join(testRelation1, joinType = LeftSemi, condition = Some($"new_a" === $"d")) + .analyze + + comparePlans(Optimize.execute(originalQuery), originalQuery) + } } diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q8.sf100/explain.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q8.sf100/explain.txt index ca6a5cc1ffb..d443723b063 100644 --- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q8.sf100/explain.txt +++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q8.sf100/explain.txt @@ -24,30 +24,30 @@ TakeOrderedAndProject (49) +- * HashAggregate (41) +- Exchange (40) +- * HashAggregate (39) - +- * Project (38) - +- * BroadcastHashJoin LeftSemi BuildRight (37) - :- * Filter (17) - : +- * ColumnarToRow (16) - : +- Scan parquet spark_catalog.default.customer_address (15) - +- BroadcastExchange (36) - +- * Project (35) - +- * Filter (34) - +- * HashAggregate (33) - +- Exchange (32) - +- * HashAggregate (31) - +- * Project (30) - +- * SortMergeJoin Inner (29) - :- * Sort (22) - : +- Exchange (21) - : +- * Filter (20) - : +- * ColumnarToRow (19) - : +- Scan parquet spark_catalog.default.customer_address (18) - +- * Sort (28) - +- Exchange (27) - +- * Project (26) - +- * Filter (25) - +- * ColumnarToRow (24) - +- Scan parquet spark_catalog.default.customer (23) + +- * BroadcastHashJoin LeftSemi BuildRight (38) + :- * Project (18) + : +- * Filter (17) + : +- * ColumnarToRow (16) + : +- Scan parquet spark_catalog.default.customer_address (15) + +- BroadcastExchange (37) + +- * Project (36) + +- * Filter (35) + +- * HashAggregate (34) + +- Exchange (33) + +- * HashAggregate (32) + +- * Project (31) + +- * SortMergeJoin Inner (30) + :- * Sort (23) + : +- Exchange (22) + : +- * Filter (21) + : +- * ColumnarToRow (20) + : +- Scan parquet spark_catalog.default.customer_address (19) + +- * Sort (29) + +- Exchange (28) + +- * Project (27) + +- * Filter (26) + +- * ColumnarToRow (25) + +- Scan parquet spark_catalog.default.customer (24) (1) Scan parquet spark_catalog.default.store_sales @@ -127,139 +127,139 @@ Input [1]: [ca_zip#9] Input [1]: [ca_zip#9] Condition : (substr(ca_zip#9, 1, 5) INSET 10144, 10336, 10390, 10445, 10516, 10567, 11101, 11356, 11376, 11489, 11634, 11928, 12305, 13354, 13375, 13376, 13394, 13595, 13695, 13955, 14060, 14089, 14171, 14328, 14663, 14867, 14922, 15126, 15146, 15371, 15455, 15559, 15723, 15734, 15765, 15798, 15882, 16021, 16725, 16807, 17043, 17183, 17871, 17879, 17920, 18119, 18270, 18376, 18383, 18426, 18652, 18767, 18799, 18840, 18842, 18845, 18906, 19430, 19505, 19512, 19515, 19736, 19769, 19849, 20 [...] -(18) Scan parquet spark_catalog.default.customer_address -Output [2]: [ca_address_sk#10, ca_zip#11] +(18) Project [codegen id : 11] +Output [1]: [substr(ca_zip#9, 1, 5) AS ca_zip#10] +Input [1]: [ca_zip#9] + +(19) Scan parquet spark_catalog.default.customer_address +Output [2]: [ca_address_sk#11, ca_zip#12] Batched: true Location [not included in comparison]/{warehouse_dir}/customer_address] PushedFilters: [IsNotNull(ca_address_sk)] ReadSchema: struct<ca_address_sk:int,ca_zip:string> -(19) ColumnarToRow [codegen id : 5] -Input [2]: [ca_address_sk#10, ca_zip#11] +(20) ColumnarToRow [codegen id : 5] +Input [2]: [ca_address_sk#11, ca_zip#12] -(20) Filter [codegen id : 5] -Input [2]: [ca_address_sk#10, ca_zip#11] -Condition : isnotnull(ca_address_sk#10) +(21) Filter [codegen id : 5] +Input [2]: [ca_address_sk#11, ca_zip#12] +Condition : isnotnull(ca_address_sk#11) -(21) Exchange -Input [2]: [ca_address_sk#10, ca_zip#11] -Arguments: hashpartitioning(ca_address_sk#10, 5), ENSURE_REQUIREMENTS, [plan_id=3] +(22) Exchange +Input [2]: [ca_address_sk#11, ca_zip#12] +Arguments: hashpartitioning(ca_address_sk#11, 5), ENSURE_REQUIREMENTS, [plan_id=3] -(22) Sort [codegen id : 6] -Input [2]: [ca_address_sk#10, ca_zip#11] -Arguments: [ca_address_sk#10 ASC NULLS FIRST], false, 0 +(23) Sort [codegen id : 6] +Input [2]: [ca_address_sk#11, ca_zip#12] +Arguments: [ca_address_sk#11 ASC NULLS FIRST], false, 0 -(23) Scan parquet spark_catalog.default.customer -Output [2]: [c_current_addr_sk#12, c_preferred_cust_flag#13] +(24) Scan parquet spark_catalog.default.customer +Output [2]: [c_current_addr_sk#13, c_preferred_cust_flag#14] Batched: true Location [not included in comparison]/{warehouse_dir}/customer] PushedFilters: [IsNotNull(c_preferred_cust_flag), EqualTo(c_preferred_cust_flag,Y), IsNotNull(c_current_addr_sk)] ReadSchema: struct<c_current_addr_sk:int,c_preferred_cust_flag:string> -(24) ColumnarToRow [codegen id : 7] -Input [2]: [c_current_addr_sk#12, c_preferred_cust_flag#13] +(25) ColumnarToRow [codegen id : 7] +Input [2]: [c_current_addr_sk#13, c_preferred_cust_flag#14] -(25) Filter [codegen id : 7] -Input [2]: [c_current_addr_sk#12, c_preferred_cust_flag#13] -Condition : ((isnotnull(c_preferred_cust_flag#13) AND (c_preferred_cust_flag#13 = Y)) AND isnotnull(c_current_addr_sk#12)) +(26) Filter [codegen id : 7] +Input [2]: [c_current_addr_sk#13, c_preferred_cust_flag#14] +Condition : ((isnotnull(c_preferred_cust_flag#14) AND (c_preferred_cust_flag#14 = Y)) AND isnotnull(c_current_addr_sk#13)) -(26) Project [codegen id : 7] -Output [1]: [c_current_addr_sk#12] -Input [2]: [c_current_addr_sk#12, c_preferred_cust_flag#13] +(27) Project [codegen id : 7] +Output [1]: [c_current_addr_sk#13] +Input [2]: [c_current_addr_sk#13, c_preferred_cust_flag#14] -(27) Exchange -Input [1]: [c_current_addr_sk#12] -Arguments: hashpartitioning(c_current_addr_sk#12, 5), ENSURE_REQUIREMENTS, [plan_id=4] +(28) Exchange +Input [1]: [c_current_addr_sk#13] +Arguments: hashpartitioning(c_current_addr_sk#13, 5), ENSURE_REQUIREMENTS, [plan_id=4] -(28) Sort [codegen id : 8] -Input [1]: [c_current_addr_sk#12] -Arguments: [c_current_addr_sk#12 ASC NULLS FIRST], false, 0 +(29) Sort [codegen id : 8] +Input [1]: [c_current_addr_sk#13] +Arguments: [c_current_addr_sk#13 ASC NULLS FIRST], false, 0 -(29) SortMergeJoin [codegen id : 9] -Left keys [1]: [ca_address_sk#10] -Right keys [1]: [c_current_addr_sk#12] +(30) SortMergeJoin [codegen id : 9] +Left keys [1]: [ca_address_sk#11] +Right keys [1]: [c_current_addr_sk#13] Join type: Inner Join condition: None -(30) Project [codegen id : 9] -Output [1]: [ca_zip#11] -Input [3]: [ca_address_sk#10, ca_zip#11, c_current_addr_sk#12] +(31) Project [codegen id : 9] +Output [1]: [ca_zip#12] +Input [3]: [ca_address_sk#11, ca_zip#12, c_current_addr_sk#13] -(31) HashAggregate [codegen id : 9] -Input [1]: [ca_zip#11] -Keys [1]: [ca_zip#11] +(32) HashAggregate [codegen id : 9] +Input [1]: [ca_zip#12] +Keys [1]: [ca_zip#12] Functions [1]: [partial_count(1)] -Aggregate Attributes [1]: [count#14] -Results [2]: [ca_zip#11, count#15] +Aggregate Attributes [1]: [count#15] +Results [2]: [ca_zip#12, count#16] -(32) Exchange -Input [2]: [ca_zip#11, count#15] -Arguments: hashpartitioning(ca_zip#11, 5), ENSURE_REQUIREMENTS, [plan_id=5] +(33) Exchange +Input [2]: [ca_zip#12, count#16] +Arguments: hashpartitioning(ca_zip#12, 5), ENSURE_REQUIREMENTS, [plan_id=5] -(33) HashAggregate [codegen id : 10] -Input [2]: [ca_zip#11, count#15] -Keys [1]: [ca_zip#11] +(34) HashAggregate [codegen id : 10] +Input [2]: [ca_zip#12, count#16] +Keys [1]: [ca_zip#12] Functions [1]: [count(1)] -Aggregate Attributes [1]: [count(1)#16] -Results [2]: [substr(ca_zip#11, 1, 5) AS ca_zip#17, count(1)#16 AS cnt#18] +Aggregate Attributes [1]: [count(1)#17] +Results [2]: [substr(ca_zip#12, 1, 5) AS ca_zip#18, count(1)#17 AS cnt#19] -(34) Filter [codegen id : 10] -Input [2]: [ca_zip#17, cnt#18] -Condition : (cnt#18 > 10) +(35) Filter [codegen id : 10] +Input [2]: [ca_zip#18, cnt#19] +Condition : (cnt#19 > 10) -(35) Project [codegen id : 10] -Output [1]: [ca_zip#17] -Input [2]: [ca_zip#17, cnt#18] +(36) Project [codegen id : 10] +Output [1]: [ca_zip#18] +Input [2]: [ca_zip#18, cnt#19] -(36) BroadcastExchange -Input [1]: [ca_zip#17] +(37) BroadcastExchange +Input [1]: [ca_zip#18] Arguments: HashedRelationBroadcastMode(List(coalesce(input[0, string, true], ), isnull(input[0, string, true])),false), [plan_id=6] -(37) BroadcastHashJoin [codegen id : 11] -Left keys [2]: [coalesce(substr(ca_zip#9, 1, 5), ), isnull(substr(ca_zip#9, 1, 5))] -Right keys [2]: [coalesce(ca_zip#17, ), isnull(ca_zip#17)] +(38) BroadcastHashJoin [codegen id : 11] +Left keys [2]: [coalesce(ca_zip#10, ), isnull(ca_zip#10)] +Right keys [2]: [coalesce(ca_zip#18, ), isnull(ca_zip#18)] Join type: LeftSemi Join condition: None -(38) Project [codegen id : 11] -Output [1]: [substr(ca_zip#9, 1, 5) AS ca_zip#19] -Input [1]: [ca_zip#9] - (39) HashAggregate [codegen id : 11] -Input [1]: [ca_zip#19] -Keys [1]: [ca_zip#19] +Input [1]: [ca_zip#10] +Keys [1]: [ca_zip#10] Functions: [] Aggregate Attributes: [] -Results [1]: [ca_zip#19] +Results [1]: [ca_zip#10] (40) Exchange -Input [1]: [ca_zip#19] -Arguments: hashpartitioning(ca_zip#19, 5), ENSURE_REQUIREMENTS, [plan_id=7] +Input [1]: [ca_zip#10] +Arguments: hashpartitioning(ca_zip#10, 5), ENSURE_REQUIREMENTS, [plan_id=7] (41) HashAggregate [codegen id : 12] -Input [1]: [ca_zip#19] -Keys [1]: [ca_zip#19] +Input [1]: [ca_zip#10] +Keys [1]: [ca_zip#10] Functions: [] Aggregate Attributes: [] -Results [1]: [ca_zip#19] +Results [1]: [ca_zip#10] (42) Exchange -Input [1]: [ca_zip#19] -Arguments: hashpartitioning(substr(ca_zip#19, 1, 2), 5), ENSURE_REQUIREMENTS, [plan_id=8] +Input [1]: [ca_zip#10] +Arguments: hashpartitioning(substr(ca_zip#10, 1, 2), 5), ENSURE_REQUIREMENTS, [plan_id=8] (43) Sort [codegen id : 13] -Input [1]: [ca_zip#19] -Arguments: [substr(ca_zip#19, 1, 2) ASC NULLS FIRST], false, 0 +Input [1]: [ca_zip#10] +Arguments: [substr(ca_zip#10, 1, 2) ASC NULLS FIRST], false, 0 (44) SortMergeJoin [codegen id : 14] Left keys [1]: [substr(s_zip#8, 1, 2)] -Right keys [1]: [substr(ca_zip#19, 1, 2)] +Right keys [1]: [substr(ca_zip#10, 1, 2)] Join type: Inner Join condition: None (45) Project [codegen id : 14] Output [2]: [ss_net_profit#2, s_store_name#7] -Input [4]: [ss_net_profit#2, s_store_name#7, s_zip#8, ca_zip#19] +Input [4]: [ss_net_profit#2, s_store_name#7, s_zip#8, ca_zip#10] (46) HashAggregate [codegen id : 14] Input [2]: [ss_net_profit#2, s_store_name#7] diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q8.sf100/simplified.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q8.sf100/simplified.txt index 912904d5643..86dd6134fc2 100644 --- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q8.sf100/simplified.txt +++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q8.sf100/simplified.txt @@ -49,42 +49,42 @@ TakeOrderedAndProject [s_store_name,sum(ss_net_profit)] Exchange [ca_zip] #6 WholeStageCodegen (11) HashAggregate [ca_zip] - Project [ca_zip] - BroadcastHashJoin [ca_zip,ca_zip] + BroadcastHashJoin [ca_zip,ca_zip] + Project [ca_zip] Filter [ca_zip] ColumnarToRow InputAdapter Scan parquet spark_catalog.default.customer_address [ca_zip] - InputAdapter - BroadcastExchange #7 - WholeStageCodegen (10) - Project [ca_zip] - Filter [cnt] - HashAggregate [ca_zip,count] [count(1),ca_zip,cnt,count] - InputAdapter - Exchange [ca_zip] #8 - WholeStageCodegen (9) - HashAggregate [ca_zip] [count,count] - Project [ca_zip] - SortMergeJoin [ca_address_sk,c_current_addr_sk] - InputAdapter - WholeStageCodegen (6) - Sort [ca_address_sk] - InputAdapter - Exchange [ca_address_sk] #9 - WholeStageCodegen (5) - Filter [ca_address_sk] + InputAdapter + BroadcastExchange #7 + WholeStageCodegen (10) + Project [ca_zip] + Filter [cnt] + HashAggregate [ca_zip,count] [count(1),ca_zip,cnt,count] + InputAdapter + Exchange [ca_zip] #8 + WholeStageCodegen (9) + HashAggregate [ca_zip] [count,count] + Project [ca_zip] + SortMergeJoin [ca_address_sk,c_current_addr_sk] + InputAdapter + WholeStageCodegen (6) + Sort [ca_address_sk] + InputAdapter + Exchange [ca_address_sk] #9 + WholeStageCodegen (5) + Filter [ca_address_sk] + ColumnarToRow + InputAdapter + Scan parquet spark_catalog.default.customer_address [ca_address_sk,ca_zip] + InputAdapter + WholeStageCodegen (8) + Sort [c_current_addr_sk] + InputAdapter + Exchange [c_current_addr_sk] #10 + WholeStageCodegen (7) + Project [c_current_addr_sk] + Filter [c_preferred_cust_flag,c_current_addr_sk] ColumnarToRow InputAdapter - Scan parquet spark_catalog.default.customer_address [ca_address_sk,ca_zip] - InputAdapter - WholeStageCodegen (8) - Sort [c_current_addr_sk] - InputAdapter - Exchange [c_current_addr_sk] #10 - WholeStageCodegen (7) - Project [c_current_addr_sk] - Filter [c_preferred_cust_flag,c_current_addr_sk] - ColumnarToRow - InputAdapter - Scan parquet spark_catalog.default.customer [c_current_addr_sk,c_preferred_cust_flag] + Scan parquet spark_catalog.default.customer [c_current_addr_sk,c_preferred_cust_flag] diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q8/explain.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q8/explain.txt index bc6b4781839..24c3a657ddd 100644 --- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q8/explain.txt +++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q8/explain.txt @@ -21,27 +21,27 @@ TakeOrderedAndProject (43) +- * HashAggregate (36) +- Exchange (35) +- * HashAggregate (34) - +- * Project (33) - +- * BroadcastHashJoin LeftSemi BuildRight (32) - :- * Filter (15) - : +- * ColumnarToRow (14) - : +- Scan parquet spark_catalog.default.customer_address (13) - +- BroadcastExchange (31) - +- * Project (30) - +- * Filter (29) - +- * HashAggregate (28) - +- Exchange (27) - +- * HashAggregate (26) - +- * Project (25) - +- * BroadcastHashJoin Inner BuildRight (24) - :- * Filter (18) - : +- * ColumnarToRow (17) - : +- Scan parquet spark_catalog.default.customer_address (16) - +- BroadcastExchange (23) - +- * Project (22) - +- * Filter (21) - +- * ColumnarToRow (20) - +- Scan parquet spark_catalog.default.customer (19) + +- * BroadcastHashJoin LeftSemi BuildRight (33) + :- * Project (16) + : +- * Filter (15) + : +- * ColumnarToRow (14) + : +- Scan parquet spark_catalog.default.customer_address (13) + +- BroadcastExchange (32) + +- * Project (31) + +- * Filter (30) + +- * HashAggregate (29) + +- Exchange (28) + +- * HashAggregate (27) + +- * Project (26) + +- * BroadcastHashJoin Inner BuildRight (25) + :- * Filter (19) + : +- * ColumnarToRow (18) + : +- Scan parquet spark_catalog.default.customer_address (17) + +- BroadcastExchange (24) + +- * Project (23) + +- * Filter (22) + +- * ColumnarToRow (21) + +- Scan parquet spark_catalog.default.customer (20) (1) Scan parquet spark_catalog.default.store_sales @@ -113,123 +113,123 @@ Input [1]: [ca_zip#9] Input [1]: [ca_zip#9] Condition : (substr(ca_zip#9, 1, 5) INSET 10144, 10336, 10390, 10445, 10516, 10567, 11101, 11356, 11376, 11489, 11634, 11928, 12305, 13354, 13375, 13376, 13394, 13595, 13695, 13955, 14060, 14089, 14171, 14328, 14663, 14867, 14922, 15126, 15146, 15371, 15455, 15559, 15723, 15734, 15765, 15798, 15882, 16021, 16725, 16807, 17043, 17183, 17871, 17879, 17920, 18119, 18270, 18376, 18383, 18426, 18652, 18767, 18799, 18840, 18842, 18845, 18906, 19430, 19505, 19512, 19515, 19736, 19769, 19849, 20 [...] -(16) Scan parquet spark_catalog.default.customer_address -Output [2]: [ca_address_sk#10, ca_zip#11] +(16) Project [codegen id : 6] +Output [1]: [substr(ca_zip#9, 1, 5) AS ca_zip#10] +Input [1]: [ca_zip#9] + +(17) Scan parquet spark_catalog.default.customer_address +Output [2]: [ca_address_sk#11, ca_zip#12] Batched: true Location [not included in comparison]/{warehouse_dir}/customer_address] PushedFilters: [IsNotNull(ca_address_sk)] ReadSchema: struct<ca_address_sk:int,ca_zip:string> -(17) ColumnarToRow [codegen id : 4] -Input [2]: [ca_address_sk#10, ca_zip#11] +(18) ColumnarToRow [codegen id : 4] +Input [2]: [ca_address_sk#11, ca_zip#12] -(18) Filter [codegen id : 4] -Input [2]: [ca_address_sk#10, ca_zip#11] -Condition : isnotnull(ca_address_sk#10) +(19) Filter [codegen id : 4] +Input [2]: [ca_address_sk#11, ca_zip#12] +Condition : isnotnull(ca_address_sk#11) -(19) Scan parquet spark_catalog.default.customer -Output [2]: [c_current_addr_sk#12, c_preferred_cust_flag#13] +(20) Scan parquet spark_catalog.default.customer +Output [2]: [c_current_addr_sk#13, c_preferred_cust_flag#14] Batched: true Location [not included in comparison]/{warehouse_dir}/customer] PushedFilters: [IsNotNull(c_preferred_cust_flag), EqualTo(c_preferred_cust_flag,Y), IsNotNull(c_current_addr_sk)] ReadSchema: struct<c_current_addr_sk:int,c_preferred_cust_flag:string> -(20) ColumnarToRow [codegen id : 3] -Input [2]: [c_current_addr_sk#12, c_preferred_cust_flag#13] +(21) ColumnarToRow [codegen id : 3] +Input [2]: [c_current_addr_sk#13, c_preferred_cust_flag#14] -(21) Filter [codegen id : 3] -Input [2]: [c_current_addr_sk#12, c_preferred_cust_flag#13] -Condition : ((isnotnull(c_preferred_cust_flag#13) AND (c_preferred_cust_flag#13 = Y)) AND isnotnull(c_current_addr_sk#12)) +(22) Filter [codegen id : 3] +Input [2]: [c_current_addr_sk#13, c_preferred_cust_flag#14] +Condition : ((isnotnull(c_preferred_cust_flag#14) AND (c_preferred_cust_flag#14 = Y)) AND isnotnull(c_current_addr_sk#13)) -(22) Project [codegen id : 3] -Output [1]: [c_current_addr_sk#12] -Input [2]: [c_current_addr_sk#12, c_preferred_cust_flag#13] +(23) Project [codegen id : 3] +Output [1]: [c_current_addr_sk#13] +Input [2]: [c_current_addr_sk#13, c_preferred_cust_flag#14] -(23) BroadcastExchange -Input [1]: [c_current_addr_sk#12] +(24) BroadcastExchange +Input [1]: [c_current_addr_sk#13] Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [plan_id=2] -(24) BroadcastHashJoin [codegen id : 4] -Left keys [1]: [ca_address_sk#10] -Right keys [1]: [c_current_addr_sk#12] +(25) BroadcastHashJoin [codegen id : 4] +Left keys [1]: [ca_address_sk#11] +Right keys [1]: [c_current_addr_sk#13] Join type: Inner Join condition: None -(25) Project [codegen id : 4] -Output [1]: [ca_zip#11] -Input [3]: [ca_address_sk#10, ca_zip#11, c_current_addr_sk#12] +(26) Project [codegen id : 4] +Output [1]: [ca_zip#12] +Input [3]: [ca_address_sk#11, ca_zip#12, c_current_addr_sk#13] -(26) HashAggregate [codegen id : 4] -Input [1]: [ca_zip#11] -Keys [1]: [ca_zip#11] +(27) HashAggregate [codegen id : 4] +Input [1]: [ca_zip#12] +Keys [1]: [ca_zip#12] Functions [1]: [partial_count(1)] -Aggregate Attributes [1]: [count#14] -Results [2]: [ca_zip#11, count#15] +Aggregate Attributes [1]: [count#15] +Results [2]: [ca_zip#12, count#16] -(27) Exchange -Input [2]: [ca_zip#11, count#15] -Arguments: hashpartitioning(ca_zip#11, 5), ENSURE_REQUIREMENTS, [plan_id=3] +(28) Exchange +Input [2]: [ca_zip#12, count#16] +Arguments: hashpartitioning(ca_zip#12, 5), ENSURE_REQUIREMENTS, [plan_id=3] -(28) HashAggregate [codegen id : 5] -Input [2]: [ca_zip#11, count#15] -Keys [1]: [ca_zip#11] +(29) HashAggregate [codegen id : 5] +Input [2]: [ca_zip#12, count#16] +Keys [1]: [ca_zip#12] Functions [1]: [count(1)] -Aggregate Attributes [1]: [count(1)#16] -Results [2]: [substr(ca_zip#11, 1, 5) AS ca_zip#17, count(1)#16 AS cnt#18] +Aggregate Attributes [1]: [count(1)#17] +Results [2]: [substr(ca_zip#12, 1, 5) AS ca_zip#18, count(1)#17 AS cnt#19] -(29) Filter [codegen id : 5] -Input [2]: [ca_zip#17, cnt#18] -Condition : (cnt#18 > 10) +(30) Filter [codegen id : 5] +Input [2]: [ca_zip#18, cnt#19] +Condition : (cnt#19 > 10) -(30) Project [codegen id : 5] -Output [1]: [ca_zip#17] -Input [2]: [ca_zip#17, cnt#18] +(31) Project [codegen id : 5] +Output [1]: [ca_zip#18] +Input [2]: [ca_zip#18, cnt#19] -(31) BroadcastExchange -Input [1]: [ca_zip#17] +(32) BroadcastExchange +Input [1]: [ca_zip#18] Arguments: HashedRelationBroadcastMode(List(coalesce(input[0, string, true], ), isnull(input[0, string, true])),false), [plan_id=4] -(32) BroadcastHashJoin [codegen id : 6] -Left keys [2]: [coalesce(substr(ca_zip#9, 1, 5), ), isnull(substr(ca_zip#9, 1, 5))] -Right keys [2]: [coalesce(ca_zip#17, ), isnull(ca_zip#17)] +(33) BroadcastHashJoin [codegen id : 6] +Left keys [2]: [coalesce(ca_zip#10, ), isnull(ca_zip#10)] +Right keys [2]: [coalesce(ca_zip#18, ), isnull(ca_zip#18)] Join type: LeftSemi Join condition: None -(33) Project [codegen id : 6] -Output [1]: [substr(ca_zip#9, 1, 5) AS ca_zip#19] -Input [1]: [ca_zip#9] - (34) HashAggregate [codegen id : 6] -Input [1]: [ca_zip#19] -Keys [1]: [ca_zip#19] +Input [1]: [ca_zip#10] +Keys [1]: [ca_zip#10] Functions: [] Aggregate Attributes: [] -Results [1]: [ca_zip#19] +Results [1]: [ca_zip#10] (35) Exchange -Input [1]: [ca_zip#19] -Arguments: hashpartitioning(ca_zip#19, 5), ENSURE_REQUIREMENTS, [plan_id=5] +Input [1]: [ca_zip#10] +Arguments: hashpartitioning(ca_zip#10, 5), ENSURE_REQUIREMENTS, [plan_id=5] (36) HashAggregate [codegen id : 7] -Input [1]: [ca_zip#19] -Keys [1]: [ca_zip#19] +Input [1]: [ca_zip#10] +Keys [1]: [ca_zip#10] Functions: [] Aggregate Attributes: [] -Results [1]: [ca_zip#19] +Results [1]: [ca_zip#10] (37) BroadcastExchange -Input [1]: [ca_zip#19] +Input [1]: [ca_zip#10] Arguments: HashedRelationBroadcastMode(List(substr(input[0, string, true], 1, 2)),false), [plan_id=6] (38) BroadcastHashJoin [codegen id : 8] Left keys [1]: [substr(s_zip#8, 1, 2)] -Right keys [1]: [substr(ca_zip#19, 1, 2)] +Right keys [1]: [substr(ca_zip#10, 1, 2)] Join type: Inner Join condition: None (39) Project [codegen id : 8] Output [2]: [ss_net_profit#2, s_store_name#7] -Input [4]: [ss_net_profit#2, s_store_name#7, s_zip#8, ca_zip#19] +Input [4]: [ss_net_profit#2, s_store_name#7, s_zip#8, ca_zip#10] (40) HashAggregate [codegen id : 8] Input [2]: [ss_net_profit#2, s_store_name#7] diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q8/simplified.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q8/simplified.txt index 4be906d4f50..6ea5a786125 100644 --- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q8/simplified.txt +++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q8/simplified.txt @@ -40,33 +40,33 @@ TakeOrderedAndProject [s_store_name,sum(ss_net_profit)] Exchange [ca_zip] #5 WholeStageCodegen (6) HashAggregate [ca_zip] - Project [ca_zip] - BroadcastHashJoin [ca_zip,ca_zip] + BroadcastHashJoin [ca_zip,ca_zip] + Project [ca_zip] Filter [ca_zip] ColumnarToRow InputAdapter Scan parquet spark_catalog.default.customer_address [ca_zip] - InputAdapter - BroadcastExchange #6 - WholeStageCodegen (5) - Project [ca_zip] - Filter [cnt] - HashAggregate [ca_zip,count] [count(1),ca_zip,cnt,count] - InputAdapter - Exchange [ca_zip] #7 - WholeStageCodegen (4) - HashAggregate [ca_zip] [count,count] - Project [ca_zip] - BroadcastHashJoin [ca_address_sk,c_current_addr_sk] - Filter [ca_address_sk] - ColumnarToRow - InputAdapter - Scan parquet spark_catalog.default.customer_address [ca_address_sk,ca_zip] - InputAdapter - BroadcastExchange #8 - WholeStageCodegen (3) - Project [c_current_addr_sk] - Filter [c_preferred_cust_flag,c_current_addr_sk] - ColumnarToRow - InputAdapter - Scan parquet spark_catalog.default.customer [c_current_addr_sk,c_preferred_cust_flag] + InputAdapter + BroadcastExchange #6 + WholeStageCodegen (5) + Project [ca_zip] + Filter [cnt] + HashAggregate [ca_zip,count] [count(1),ca_zip,cnt,count] + InputAdapter + Exchange [ca_zip] #7 + WholeStageCodegen (4) + HashAggregate [ca_zip] [count,count] + Project [ca_zip] + BroadcastHashJoin [ca_address_sk,c_current_addr_sk] + Filter [ca_address_sk] + ColumnarToRow + InputAdapter + Scan parquet spark_catalog.default.customer_address [ca_address_sk,ca_zip] + InputAdapter + BroadcastExchange #8 + WholeStageCodegen (3) + Project [c_current_addr_sk] + Filter [c_preferred_cust_flag,c_current_addr_sk] + ColumnarToRow + InputAdapter + Scan parquet spark_catalog.default.customer [c_current_addr_sk,c_preferred_cust_flag] --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org