Yuming Wang created SPARK-34079:
-----------------------------------
Summary: Improvement CTE table scan
Key: SPARK-34079
URL: https://issues.apache.org/jira/browse/SPARK-34079
Project: Spark
Issue Type: Improvement
Components: SQL
Affects Versions: 3.2.0
Reporter: Yuming Wang
Prepare table:
{code:sql}
CREATE TABLE store_sales ( ss_sold_date_sk INT, ss_sold_time_sk INT,
ss_item_sk INT, ss_customer_sk INT, ss_cdemo_sk INT, ss_hdemo_sk INT,
ss_addr_sk INT, ss_store_sk INT, ss_promo_sk INT, ss_ticket_number INT,
ss_quantity INT, ss_wholesale_cost DECIMAL(7,2), ss_list_price DECIMAL(7,2),
ss_sales_price DECIMAL(7,2), ss_ext_discount_amt DECIMAL(7,2),
ss_ext_sales_price DECIMAL(7,2), ss_ext_wholesale_cost DECIMAL(7,2),
ss_ext_list_price DECIMAL(7,2), ss_ext_tax DECIMAL(7,2), ss_coupon_amt
DECIMAL(7,2), ss_net_paid DECIMAL(7,2), ss_net_paid_inc_tax
DECIMAL(7,2),ss_net_profit DECIMAL(7,2));
CREATE TABLE reason ( r_reason_sk INT, r_reason_id varchar(255),
r_reason_desc varchar(255));
{code}
{code:sql}
WITH bucket_result AS (
SELECT
CASE WHEN (count (CASE WHEN ss_quantity BETWEEN 1 AND 20 THEN ss_quantity
END)) > 62316685
THEN (avg(CASE WHEN ss_quantity BETWEEN 1 AND 20 THEN ss_ext_discount_amt
END))
ELSE (avg(CASE WHEN ss_quantity BETWEEN 1 AND 20 THEN ss_net_paid END)) END
bucket1,
CASE WHEN (count (CASE WHEN ss_quantity BETWEEN 21 AND 40 THEN ss_quantity
END)) > 19045798
THEN (avg(CASE WHEN ss_quantity BETWEEN 21 AND 40 THEN
ss_ext_discount_amt END))
ELSE (avg(CASE WHEN ss_quantity BETWEEN 21 AND 40 THEN ss_net_paid END))
END bucket2,
CASE WHEN (count (CASE WHEN ss_quantity BETWEEN 41 AND 60 THEN ss_quantity
END)) > 365541424
THEN (avg(CASE WHEN ss_quantity BETWEEN 41 AND 60 THEN
ss_ext_discount_amt END))
ELSE (avg(CASE WHEN ss_quantity BETWEEN 41 AND 60 THEN ss_net_paid END))
END bucket3,
CASE WHEN (count (CASE WHEN ss_quantity BETWEEN 61 AND 80 THEN ss_quantity
END)) > 19045798
THEN (avg(CASE WHEN ss_quantity BETWEEN 61 AND 80 THEN
ss_ext_discount_amt END))
ELSE (avg(CASE WHEN ss_quantity BETWEEN 61 AND 80 THEN ss_net_paid END))
END bucket4,
CASE WHEN (count (CASE WHEN ss_quantity BETWEEN 81 AND 100 THEN ss_quantity
END)) > 365541424
THEN (avg(CASE WHEN ss_quantity BETWEEN 81 AND 100 THEN
ss_ext_discount_amt END))
ELSE (avg(CASE WHEN ss_quantity BETWEEN 81 AND 100 THEN ss_net_paid END))
END bucket5
FROM store_sales
)
SELECT
(SELECT bucket1 FROM bucket_result) as bucket1,
(SELECT bucket2 FROM bucket_result) as bucket2,
(SELECT bucket3 FROM bucket_result) as bucket3,
(SELECT bucket4 FROM bucket_result) as bucket4,
(SELECT bucket5 FROM bucket_result) as bucket5
FROM reason
WHERE r_reason_sk = 1;
{code}
Spark SQL:
{noformat}
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [Subquery subquery#0, [id=#23] AS bucket1#1, Subquery subquery#2,
[id=#34] AS bucket2#3, Subquery subquery#4, [id=#45] AS bucket3#5, Subquery
subquery#6, [id=#56] AS bucket4#7, Subquery subquery#8, [id=#67] AS bucket5#9]
: :- Subquery subquery#0, [id=#23]
: : +- AdaptiveSparkPlan isFinalPlan=false
: : +- HashAggregate(keys=[], functions=[count(if (((ss_quantity#28 >=
1) AND (ss_quantity#28 <= 20))) ss_quantity#28 else null), avg(UnscaledValue(if
(((ss_quantity#28 >= 1) AND (ss_quantity#28 <= 20))) ss_ext_discount_amt#32
else null)), avg(UnscaledValue(if (((ss_quantity#28 >= 1) AND (ss_quantity#28
<= 20))) ss_net_paid#38 else null))])
: : +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#21]
: : +- HashAggregate(keys=[], functions=[partial_count(if
(((ss_quantity#28 >= 1) AND (ss_quantity#28 <= 20))) ss_quantity#28 else null),
partial_avg(UnscaledValue(if (((ss_quantity#28 >= 1) AND (ss_quantity#28 <=
20))) ss_ext_discount_amt#32 else null)), partial_avg(UnscaledValue(if
(((ss_quantity#28 >= 1) AND (ss_quantity#28 <= 20))) ss_net_paid#38 else
null))])
: : +- FileScan parquet
default.store_sales[ss_quantity#28,ss_ext_discount_amt#32,ss_net_paid#38]
Batched: true, DataFilters: [], Format: Parquet, Location:
InMemoryFileIndex[file:/Users/yumwang/spark/SPARK-28169/spark-warehouse/org.apache.spark.sql.Data...,
PartitionFilters: [], PushedFilters: [], ReadSchema:
struct<ss_quantity:int,ss_ext_discount_amt:decimal(7,2),ss_net_paid:decimal(7,2)>
: :- Subquery subquery#2, [id=#34]
: : +- AdaptiveSparkPlan isFinalPlan=false
: : +- HashAggregate(keys=[], functions=[count(if (((ss_quantity#28 >=
21) AND (ss_quantity#28 <= 40))) ss_quantity#28 else null),
avg(UnscaledValue(if (((ss_quantity#28 >= 21) AND (ss_quantity#28 <= 40)))
ss_ext_discount_amt#32 else null)), avg(UnscaledValue(if (((ss_quantity#28 >=
21) AND (ss_quantity#28 <= 40))) ss_net_paid#38 else null))])
: : +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#32]
: : +- HashAggregate(keys=[], functions=[partial_count(if
(((ss_quantity#28 >= 21) AND (ss_quantity#28 <= 40))) ss_quantity#28 else
null), partial_avg(UnscaledValue(if (((ss_quantity#28 >= 21) AND
(ss_quantity#28 <= 40))) ss_ext_discount_amt#32 else null)),
partial_avg(UnscaledValue(if (((ss_quantity#28 >= 21) AND (ss_quantity#28 <=
40))) ss_net_paid#38 else null))])
: : +- FileScan parquet
default.store_sales[ss_quantity#28,ss_ext_discount_amt#32,ss_net_paid#38]
Batched: true, DataFilters: [], Format: Parquet, Location:
InMemoryFileIndex[file:/Users/yumwang/spark/SPARK-28169/spark-warehouse/org.apache.spark.sql.Data...,
PartitionFilters: [], PushedFilters: [], ReadSchema:
struct<ss_quantity:int,ss_ext_discount_amt:decimal(7,2),ss_net_paid:decimal(7,2)>
: :- Subquery subquery#4, [id=#45]
: : +- AdaptiveSparkPlan isFinalPlan=false
: : +- HashAggregate(keys=[], functions=[count(if (((ss_quantity#28 >=
41) AND (ss_quantity#28 <= 60))) ss_quantity#28 else null),
avg(UnscaledValue(if (((ss_quantity#28 >= 41) AND (ss_quantity#28 <= 60)))
ss_ext_discount_amt#32 else null)), avg(UnscaledValue(if (((ss_quantity#28 >=
41) AND (ss_quantity#28 <= 60))) ss_net_paid#38 else null))])
: : +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#43]
: : +- HashAggregate(keys=[], functions=[partial_count(if
(((ss_quantity#28 >= 41) AND (ss_quantity#28 <= 60))) ss_quantity#28 else
null), partial_avg(UnscaledValue(if (((ss_quantity#28 >= 41) AND
(ss_quantity#28 <= 60))) ss_ext_discount_amt#32 else null)),
partial_avg(UnscaledValue(if (((ss_quantity#28 >= 41) AND (ss_quantity#28 <=
60))) ss_net_paid#38 else null))])
: : +- FileScan parquet
default.store_sales[ss_quantity#28,ss_ext_discount_amt#32,ss_net_paid#38]
Batched: true, DataFilters: [], Format: Parquet, Location:
InMemoryFileIndex[file:/Users/yumwang/spark/SPARK-28169/spark-warehouse/org.apache.spark.sql.Data...,
PartitionFilters: [], PushedFilters: [], ReadSchema:
struct<ss_quantity:int,ss_ext_discount_amt:decimal(7,2),ss_net_paid:decimal(7,2)>
: :- Subquery subquery#6, [id=#56]
: : +- AdaptiveSparkPlan isFinalPlan=false
: : +- HashAggregate(keys=[], functions=[count(if (((ss_quantity#28 >=
61) AND (ss_quantity#28 <= 80))) ss_quantity#28 else null),
avg(UnscaledValue(if (((ss_quantity#28 >= 61) AND (ss_quantity#28 <= 80)))
ss_ext_discount_amt#32 else null)), avg(UnscaledValue(if (((ss_quantity#28 >=
61) AND (ss_quantity#28 <= 80))) ss_net_paid#38 else null))])
: : +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#54]
: : +- HashAggregate(keys=[], functions=[partial_count(if
(((ss_quantity#28 >= 61) AND (ss_quantity#28 <= 80))) ss_quantity#28 else
null), partial_avg(UnscaledValue(if (((ss_quantity#28 >= 61) AND
(ss_quantity#28 <= 80))) ss_ext_discount_amt#32 else null)),
partial_avg(UnscaledValue(if (((ss_quantity#28 >= 61) AND (ss_quantity#28 <=
80))) ss_net_paid#38 else null))])
: : +- FileScan parquet
default.store_sales[ss_quantity#28,ss_ext_discount_amt#32,ss_net_paid#38]
Batched: true, DataFilters: [], Format: Parquet, Location:
InMemoryFileIndex[file:/Users/yumwang/spark/SPARK-28169/spark-warehouse/org.apache.spark.sql.Data...,
PartitionFilters: [], PushedFilters: [], ReadSchema:
struct<ss_quantity:int,ss_ext_discount_amt:decimal(7,2),ss_net_paid:decimal(7,2)>
: +- Subquery subquery#8, [id=#67]
: +- AdaptiveSparkPlan isFinalPlan=false
: +- HashAggregate(keys=[], functions=[count(if (((ss_quantity#28 >=
81) AND (ss_quantity#28 <= 100))) ss_quantity#28 else null),
avg(UnscaledValue(if (((ss_quantity#28 >= 81) AND (ss_quantity#28 <= 100)))
ss_ext_discount_amt#32 else null)), avg(UnscaledValue(if (((ss_quantity#28 >=
81) AND (ss_quantity#28 <= 100))) ss_net_paid#38 else null))])
: +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#65]
: +- HashAggregate(keys=[], functions=[partial_count(if
(((ss_quantity#28 >= 81) AND (ss_quantity#28 <= 100))) ss_quantity#28 else
null), partial_avg(UnscaledValue(if (((ss_quantity#28 >= 81) AND
(ss_quantity#28 <= 100))) ss_ext_discount_amt#32 else null)),
partial_avg(UnscaledValue(if (((ss_quantity#28 >= 81) AND (ss_quantity#28 <=
100))) ss_net_paid#38 else null))])
: +- FileScan parquet
default.store_sales[ss_quantity#28,ss_ext_discount_amt#32,ss_net_paid#38]
Batched: true, DataFilters: [], Format: Parquet, Location:
InMemoryFileIndex[file:/Users/yumwang/spark/SPARK-28169/spark-warehouse/org.apache.spark.sql.Data...,
PartitionFilters: [], PushedFilters: [], ReadSchema:
struct<ss_quantity:int,ss_ext_discount_amt:decimal(7,2),ss_net_paid:decimal(7,2)>
+- Filter (isnotnull(r_reason_sk#15) AND (r_reason_sk#15 = 1))
+- FileScan parquet default.reason[r_reason_sk#15] Batched: true,
DataFilters: [isnotnull(r_reason_sk#15), (r_reason_sk#15 = 1)], Format:
Parquet, Location:
InMemoryFileIndex[file:/Users/yumwang/spark/SPARK-28169/spark-warehouse/org.apache.spark.sql.Data...,
PartitionFilters: [], PushedFilters: [IsNotNull(r_reason_sk),
EqualTo(r_reason_sk,1)], ReadSchema: struct<r_reason_sk:int>
{noformat}
PostgreSQL:
{noformat}
QUERY PLAN
--------------------------------------------------------------------------------------
Seq Scan on reason (cost=51.80..62.67 rows=1 width=160)
Filter: (r_reason_sk = 1)
CTE bucket_result
-> Aggregate (cost=51.68..51.70 rows=1 width=160)
-> Seq Scan on store_sales (cost=0.00..13.40 rows=340 width=32)
InitPlan 2 (returns $1)
-> CTE Scan on bucket_result (cost=0.00..0.02 rows=1 width=32)
InitPlan 3 (returns $2)
-> CTE Scan on bucket_result bucket_result_1 (cost=0.00..0.02 rows=1
width=32)
InitPlan 4 (returns $3)
-> CTE Scan on bucket_result bucket_result_2 (cost=0.00..0.02 rows=1
width=32)
InitPlan 5 (returns $4)
-> CTE Scan on bucket_result bucket_result_3 (cost=0.00..0.02 rows=1
width=32)
InitPlan 6 (returns $5)
-> CTE Scan on bucket_result bucket_result_4 (cost=0.00..0.02 rows=1
width=32)
(15 rows)
{noformat}
It seems Spark SQL scan store_sales five times, but PostgreSQL scan store_sales
only one time.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]