[ https://issues.apache.org/jira/browse/SPARK-34079?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Wenchen Fan resolved SPARK-34079. --------------------------------- Fix Version/s: 3.3.0 Resolution: Fixed Issue resolved by pull request 32298 [https://github.com/apache/spark/pull/32298] > Improvement CTE table scan > -------------------------- > > Key: SPARK-34079 > URL: https://issues.apache.org/jira/browse/SPARK-34079 > Project: Spark > Issue Type: Improvement > Components: SQL > Affects Versions: 3.2.0 > Reporter: Yuming Wang > Assignee: Peter Toth > Priority: Major > Fix For: 3.3.0 > > > Prepare table: > {code:sql} > CREATE TABLE store_sales ( ss_sold_date_sk INT, ss_sold_time_sk INT, > ss_item_sk INT, ss_customer_sk INT, ss_cdemo_sk INT, ss_hdemo_sk INT, > ss_addr_sk INT, ss_store_sk INT, ss_promo_sk INT, ss_ticket_number INT, > ss_quantity INT, ss_wholesale_cost DECIMAL(7,2), ss_list_price > DECIMAL(7,2), ss_sales_price DECIMAL(7,2), ss_ext_discount_amt > DECIMAL(7,2), ss_ext_sales_price DECIMAL(7,2), ss_ext_wholesale_cost > DECIMAL(7,2), ss_ext_list_price DECIMAL(7,2), ss_ext_tax DECIMAL(7,2), > ss_coupon_amt DECIMAL(7,2), ss_net_paid DECIMAL(7,2), ss_net_paid_inc_tax > DECIMAL(7,2),ss_net_profit DECIMAL(7,2)); > CREATE TABLE reason ( r_reason_sk INT, r_reason_id varchar(255), > r_reason_desc varchar(255)); > {code} > SQL: > {code:sql} > WITH bucket_result AS ( > SELECT > CASE WHEN (count (CASE WHEN ss_quantity BETWEEN 1 AND 20 THEN ss_quantity > END)) > 62316685 > THEN (avg(CASE WHEN ss_quantity BETWEEN 1 AND 20 THEN > ss_ext_discount_amt END)) > ELSE (avg(CASE WHEN ss_quantity BETWEEN 1 AND 20 THEN ss_net_paid END)) > END bucket1, > CASE WHEN (count (CASE WHEN ss_quantity BETWEEN 21 AND 40 THEN > ss_quantity END)) > 19045798 > THEN (avg(CASE WHEN ss_quantity BETWEEN 21 AND 40 THEN > ss_ext_discount_amt END)) > ELSE (avg(CASE WHEN ss_quantity BETWEEN 21 AND 40 THEN ss_net_paid END)) > END bucket2, > CASE WHEN (count (CASE WHEN ss_quantity BETWEEN 41 AND 60 THEN > ss_quantity END)) > 365541424 > THEN (avg(CASE WHEN ss_quantity BETWEEN 41 AND 60 THEN > ss_ext_discount_amt END)) > ELSE (avg(CASE WHEN ss_quantity BETWEEN 41 AND 60 THEN ss_net_paid END)) > END bucket3, > CASE WHEN (count (CASE WHEN ss_quantity BETWEEN 61 AND 80 THEN > ss_quantity END)) > 19045798 > THEN (avg(CASE WHEN ss_quantity BETWEEN 61 AND 80 THEN > ss_ext_discount_amt END)) > ELSE (avg(CASE WHEN ss_quantity BETWEEN 61 AND 80 THEN ss_net_paid END)) > END bucket4, > CASE WHEN (count (CASE WHEN ss_quantity BETWEEN 81 AND 100 THEN > ss_quantity END)) > 365541424 > THEN (avg(CASE WHEN ss_quantity BETWEEN 81 AND 100 THEN > ss_ext_discount_amt END)) > ELSE (avg(CASE WHEN ss_quantity BETWEEN 81 AND 100 THEN ss_net_paid END)) > END bucket5 > FROM store_sales > ) > SELECT > (SELECT bucket1 FROM bucket_result) as bucket1, > (SELECT bucket2 FROM bucket_result) as bucket2, > (SELECT bucket3 FROM bucket_result) as bucket3, > (SELECT bucket4 FROM bucket_result) as bucket4, > (SELECT bucket5 FROM bucket_result) as bucket5 > FROM reason > WHERE r_reason_sk = 1; > {code} > Plan of Spark SQL: > {noformat} > == Physical Plan == > AdaptiveSparkPlan isFinalPlan=false > +- Project [Subquery subquery#0, [id=#23] AS bucket1#1, Subquery subquery#2, > [id=#34] AS bucket2#3, Subquery subquery#4, [id=#45] AS bucket3#5, Subquery > subquery#6, [id=#56] AS bucket4#7, Subquery subquery#8, [id=#67] AS bucket5#9] > : :- Subquery subquery#0, [id=#23] > : : +- AdaptiveSparkPlan isFinalPlan=false > : : +- HashAggregate(keys=[], functions=[count(if (((ss_quantity#28 > >= 1) AND (ss_quantity#28 <= 20))) ss_quantity#28 else null), > avg(UnscaledValue(if (((ss_quantity#28 >= 1) AND (ss_quantity#28 <= 20))) > ss_ext_discount_amt#32 else null)), avg(UnscaledValue(if (((ss_quantity#28 >= > 1) AND (ss_quantity#28 <= 20))) ss_net_paid#38 else null))]) > : : +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#21] > : : +- HashAggregate(keys=[], functions=[partial_count(if > (((ss_quantity#28 >= 1) AND (ss_quantity#28 <= 20))) ss_quantity#28 else > null), partial_avg(UnscaledValue(if (((ss_quantity#28 >= 1) AND > (ss_quantity#28 <= 20))) ss_ext_discount_amt#32 else null)), > partial_avg(UnscaledValue(if (((ss_quantity#28 >= 1) AND (ss_quantity#28 <= > 20))) ss_net_paid#38 else null))]) > : : +- FileScan parquet > default.store_sales[ss_quantity#28,ss_ext_discount_amt#32,ss_net_paid#38] > Batched: true, DataFilters: [], Format: Parquet, Location: > InMemoryFileIndex[file:/Users/yumwang/spark/SPARK-28169/spark-warehouse/org.apache.spark.sql.Data..., > PartitionFilters: [], PushedFilters: [], ReadSchema: > struct<ss_quantity:int,ss_ext_discount_amt:decimal(7,2),ss_net_paid:decimal(7,2)> > : :- Subquery subquery#2, [id=#34] > : : +- AdaptiveSparkPlan isFinalPlan=false > : : +- HashAggregate(keys=[], functions=[count(if (((ss_quantity#28 > >= 21) AND (ss_quantity#28 <= 40))) ss_quantity#28 else null), > avg(UnscaledValue(if (((ss_quantity#28 >= 21) AND (ss_quantity#28 <= 40))) > ss_ext_discount_amt#32 else null)), avg(UnscaledValue(if (((ss_quantity#28 >= > 21) AND (ss_quantity#28 <= 40))) ss_net_paid#38 else null))]) > : : +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#32] > : : +- HashAggregate(keys=[], functions=[partial_count(if > (((ss_quantity#28 >= 21) AND (ss_quantity#28 <= 40))) ss_quantity#28 else > null), partial_avg(UnscaledValue(if (((ss_quantity#28 >= 21) AND > (ss_quantity#28 <= 40))) ss_ext_discount_amt#32 else null)), > partial_avg(UnscaledValue(if (((ss_quantity#28 >= 21) AND (ss_quantity#28 <= > 40))) ss_net_paid#38 else null))]) > : : +- FileScan parquet > default.store_sales[ss_quantity#28,ss_ext_discount_amt#32,ss_net_paid#38] > Batched: true, DataFilters: [], Format: Parquet, Location: > InMemoryFileIndex[file:/Users/yumwang/spark/SPARK-28169/spark-warehouse/org.apache.spark.sql.Data..., > PartitionFilters: [], PushedFilters: [], ReadSchema: > struct<ss_quantity:int,ss_ext_discount_amt:decimal(7,2),ss_net_paid:decimal(7,2)> > : :- Subquery subquery#4, [id=#45] > : : +- AdaptiveSparkPlan isFinalPlan=false > : : +- HashAggregate(keys=[], functions=[count(if (((ss_quantity#28 > >= 41) AND (ss_quantity#28 <= 60))) ss_quantity#28 else null), > avg(UnscaledValue(if (((ss_quantity#28 >= 41) AND (ss_quantity#28 <= 60))) > ss_ext_discount_amt#32 else null)), avg(UnscaledValue(if (((ss_quantity#28 >= > 41) AND (ss_quantity#28 <= 60))) ss_net_paid#38 else null))]) > : : +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#43] > : : +- HashAggregate(keys=[], functions=[partial_count(if > (((ss_quantity#28 >= 41) AND (ss_quantity#28 <= 60))) ss_quantity#28 else > null), partial_avg(UnscaledValue(if (((ss_quantity#28 >= 41) AND > (ss_quantity#28 <= 60))) ss_ext_discount_amt#32 else null)), > partial_avg(UnscaledValue(if (((ss_quantity#28 >= 41) AND (ss_quantity#28 <= > 60))) ss_net_paid#38 else null))]) > : : +- FileScan parquet > default.store_sales[ss_quantity#28,ss_ext_discount_amt#32,ss_net_paid#38] > Batched: true, DataFilters: [], Format: Parquet, Location: > InMemoryFileIndex[file:/Users/yumwang/spark/SPARK-28169/spark-warehouse/org.apache.spark.sql.Data..., > PartitionFilters: [], PushedFilters: [], ReadSchema: > struct<ss_quantity:int,ss_ext_discount_amt:decimal(7,2),ss_net_paid:decimal(7,2)> > : :- Subquery subquery#6, [id=#56] > : : +- AdaptiveSparkPlan isFinalPlan=false > : : +- HashAggregate(keys=[], functions=[count(if (((ss_quantity#28 > >= 61) AND (ss_quantity#28 <= 80))) ss_quantity#28 else null), > avg(UnscaledValue(if (((ss_quantity#28 >= 61) AND (ss_quantity#28 <= 80))) > ss_ext_discount_amt#32 else null)), avg(UnscaledValue(if (((ss_quantity#28 >= > 61) AND (ss_quantity#28 <= 80))) ss_net_paid#38 else null))]) > : : +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#54] > : : +- HashAggregate(keys=[], functions=[partial_count(if > (((ss_quantity#28 >= 61) AND (ss_quantity#28 <= 80))) ss_quantity#28 else > null), partial_avg(UnscaledValue(if (((ss_quantity#28 >= 61) AND > (ss_quantity#28 <= 80))) ss_ext_discount_amt#32 else null)), > partial_avg(UnscaledValue(if (((ss_quantity#28 >= 61) AND (ss_quantity#28 <= > 80))) ss_net_paid#38 else null))]) > : : +- FileScan parquet > default.store_sales[ss_quantity#28,ss_ext_discount_amt#32,ss_net_paid#38] > Batched: true, DataFilters: [], Format: Parquet, Location: > InMemoryFileIndex[file:/Users/yumwang/spark/SPARK-28169/spark-warehouse/org.apache.spark.sql.Data..., > PartitionFilters: [], PushedFilters: [], ReadSchema: > struct<ss_quantity:int,ss_ext_discount_amt:decimal(7,2),ss_net_paid:decimal(7,2)> > : +- Subquery subquery#8, [id=#67] > : +- AdaptiveSparkPlan isFinalPlan=false > : +- HashAggregate(keys=[], functions=[count(if (((ss_quantity#28 > >= 81) AND (ss_quantity#28 <= 100))) ss_quantity#28 else null), > avg(UnscaledValue(if (((ss_quantity#28 >= 81) AND (ss_quantity#28 <= 100))) > ss_ext_discount_amt#32 else null)), avg(UnscaledValue(if (((ss_quantity#28 >= > 81) AND (ss_quantity#28 <= 100))) ss_net_paid#38 else null))]) > : +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#65] > : +- HashAggregate(keys=[], functions=[partial_count(if > (((ss_quantity#28 >= 81) AND (ss_quantity#28 <= 100))) ss_quantity#28 else > null), partial_avg(UnscaledValue(if (((ss_quantity#28 >= 81) AND > (ss_quantity#28 <= 100))) ss_ext_discount_amt#32 else null)), > partial_avg(UnscaledValue(if (((ss_quantity#28 >= 81) AND (ss_quantity#28 <= > 100))) ss_net_paid#38 else null))]) > : +- FileScan parquet > default.store_sales[ss_quantity#28,ss_ext_discount_amt#32,ss_net_paid#38] > Batched: true, DataFilters: [], Format: Parquet, Location: > InMemoryFileIndex[file:/Users/yumwang/spark/SPARK-28169/spark-warehouse/org.apache.spark.sql.Data..., > PartitionFilters: [], PushedFilters: [], ReadSchema: > struct<ss_quantity:int,ss_ext_discount_amt:decimal(7,2),ss_net_paid:decimal(7,2)> > +- Filter (isnotnull(r_reason_sk#15) AND (r_reason_sk#15 = 1)) > +- FileScan parquet default.reason[r_reason_sk#15] Batched: true, > DataFilters: [isnotnull(r_reason_sk#15), (r_reason_sk#15 = 1)], Format: > Parquet, Location: > InMemoryFileIndex[file:/Users/yumwang/spark/SPARK-28169/spark-warehouse/org.apache.spark.sql.Data..., > PartitionFilters: [], PushedFilters: [IsNotNull(r_reason_sk), > EqualTo(r_reason_sk,1)], ReadSchema: struct<r_reason_sk:int> > {noformat} > Plan of PostgreSQL: > {noformat} > QUERY PLAN > -------------------------------------------------------------------------------------- > Seq Scan on reason (cost=51.80..62.67 rows=1 width=160) > Filter: (r_reason_sk = 1) > CTE bucket_result > -> Aggregate (cost=51.68..51.70 rows=1 width=160) > -> Seq Scan on store_sales (cost=0.00..13.40 rows=340 width=32) > InitPlan 2 (returns $1) > -> CTE Scan on bucket_result (cost=0.00..0.02 rows=1 width=32) > InitPlan 3 (returns $2) > -> CTE Scan on bucket_result bucket_result_1 (cost=0.00..0.02 rows=1 > width=32) > InitPlan 4 (returns $3) > -> CTE Scan on bucket_result bucket_result_2 (cost=0.00..0.02 rows=1 > width=32) > InitPlan 5 (returns $4) > -> CTE Scan on bucket_result bucket_result_3 (cost=0.00..0.02 rows=1 > width=32) > InitPlan 6 (returns $5) > -> CTE Scan on bucket_result bucket_result_4 (cost=0.00..0.02 rows=1 > width=32) > (15 rows) > {noformat} > It seems Spark SQL scan store_sales five times, but PostgreSQL scan > store_sales only once. -- This message was sent by Atlassian Jira (v8.20.7#820007) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org