[ 
https://issues.apache.org/jira/browse/SPARK-34079?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yuming Wang updated SPARK-34079:
--------------------------------
    Description: 
Prepare table:
{code:sql}
CREATE TABLE store_sales (  ss_sold_date_sk INT,  ss_sold_time_sk INT,  
ss_item_sk INT,  ss_customer_sk INT,  ss_cdemo_sk INT,  ss_hdemo_sk INT,  
ss_addr_sk INT,  ss_store_sk INT,  ss_promo_sk INT,  ss_ticket_number INT,  
ss_quantity INT,  ss_wholesale_cost DECIMAL(7,2),  ss_list_price DECIMAL(7,2),  
ss_sales_price DECIMAL(7,2),  ss_ext_discount_amt DECIMAL(7,2),  
ss_ext_sales_price DECIMAL(7,2),  ss_ext_wholesale_cost DECIMAL(7,2),  
ss_ext_list_price DECIMAL(7,2),  ss_ext_tax DECIMAL(7,2),  ss_coupon_amt 
DECIMAL(7,2),  ss_net_paid DECIMAL(7,2),  ss_net_paid_inc_tax 
DECIMAL(7,2),ss_net_profit DECIMAL(7,2));
CREATE TABLE reason (  r_reason_sk INT,  r_reason_id varchar(255),  
r_reason_desc varchar(255));
{code}

SQL:
{code:sql}
WITH bucket_result AS (
SELECT
    CASE WHEN (count (CASE WHEN ss_quantity BETWEEN 1 AND 20 THEN ss_quantity 
END)) > 62316685
      THEN (avg(CASE WHEN ss_quantity BETWEEN 1 AND 20 THEN ss_ext_discount_amt 
END))
    ELSE (avg(CASE WHEN ss_quantity BETWEEN 1 AND 20 THEN ss_net_paid END)) END 
bucket1,
    CASE WHEN (count (CASE WHEN ss_quantity BETWEEN 21 AND 40 THEN ss_quantity 
END)) > 19045798
      THEN (avg(CASE WHEN ss_quantity BETWEEN 21 AND 40 THEN 
ss_ext_discount_amt END))
    ELSE (avg(CASE WHEN ss_quantity BETWEEN 21 AND 40 THEN ss_net_paid END)) 
END bucket2,
    CASE WHEN (count (CASE WHEN ss_quantity BETWEEN 41 AND 60 THEN ss_quantity 
END)) > 365541424
      THEN (avg(CASE WHEN ss_quantity BETWEEN 41 AND 60 THEN 
ss_ext_discount_amt END))
    ELSE (avg(CASE WHEN ss_quantity BETWEEN 41 AND 60 THEN ss_net_paid END)) 
END bucket3,
    CASE WHEN (count (CASE WHEN ss_quantity BETWEEN 61 AND 80 THEN ss_quantity 
END)) > 19045798
      THEN (avg(CASE WHEN ss_quantity BETWEEN 61 AND 80 THEN 
ss_ext_discount_amt END))
    ELSE (avg(CASE WHEN ss_quantity BETWEEN 61 AND 80 THEN ss_net_paid END)) 
END bucket4,
    CASE WHEN (count (CASE WHEN ss_quantity BETWEEN 81 AND 100 THEN ss_quantity 
END)) > 365541424
      THEN (avg(CASE WHEN ss_quantity BETWEEN 81 AND 100 THEN 
ss_ext_discount_amt END))
    ELSE (avg(CASE WHEN ss_quantity BETWEEN 81 AND 100 THEN ss_net_paid END)) 
END bucket5
  FROM store_sales
)
SELECT
  (SELECT bucket1 FROM bucket_result) as bucket1,
  (SELECT bucket2 FROM bucket_result) as bucket2,
  (SELECT bucket3 FROM bucket_result) as bucket3,
  (SELECT bucket4 FROM bucket_result) as bucket4,
  (SELECT bucket5 FROM bucket_result) as bucket5
FROM reason
WHERE r_reason_sk = 1;

{code}

Plan of Spark SQL:
{noformat}
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [Subquery subquery#0, [id=#23] AS bucket1#1, Subquery subquery#2, 
[id=#34] AS bucket2#3, Subquery subquery#4, [id=#45] AS bucket3#5, Subquery 
subquery#6, [id=#56] AS bucket4#7, Subquery subquery#8, [id=#67] AS bucket5#9]
   :  :- Subquery subquery#0, [id=#23]
   :  :  +- AdaptiveSparkPlan isFinalPlan=false
   :  :     +- HashAggregate(keys=[], functions=[count(if (((ss_quantity#28 >= 
1) AND (ss_quantity#28 <= 20))) ss_quantity#28 else null), avg(UnscaledValue(if 
(((ss_quantity#28 >= 1) AND (ss_quantity#28 <= 20))) ss_ext_discount_amt#32 
else null)), avg(UnscaledValue(if (((ss_quantity#28 >= 1) AND (ss_quantity#28 
<= 20))) ss_net_paid#38 else null))])
   :  :        +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#21]
   :  :           +- HashAggregate(keys=[], functions=[partial_count(if 
(((ss_quantity#28 >= 1) AND (ss_quantity#28 <= 20))) ss_quantity#28 else null), 
partial_avg(UnscaledValue(if (((ss_quantity#28 >= 1) AND (ss_quantity#28 <= 
20))) ss_ext_discount_amt#32 else null)), partial_avg(UnscaledValue(if 
(((ss_quantity#28 >= 1) AND (ss_quantity#28 <= 20))) ss_net_paid#38 else 
null))])
   :  :              +- FileScan parquet 
default.store_sales[ss_quantity#28,ss_ext_discount_amt#32,ss_net_paid#38] 
Batched: true, DataFilters: [], Format: Parquet, Location: 
InMemoryFileIndex[file:/Users/yumwang/spark/SPARK-28169/spark-warehouse/org.apache.spark.sql.Data...,
 PartitionFilters: [], PushedFilters: [], ReadSchema: 
struct<ss_quantity:int,ss_ext_discount_amt:decimal(7,2),ss_net_paid:decimal(7,2)>
   :  :- Subquery subquery#2, [id=#34]
   :  :  +- AdaptiveSparkPlan isFinalPlan=false
   :  :     +- HashAggregate(keys=[], functions=[count(if (((ss_quantity#28 >= 
21) AND (ss_quantity#28 <= 40))) ss_quantity#28 else null), 
avg(UnscaledValue(if (((ss_quantity#28 >= 21) AND (ss_quantity#28 <= 40))) 
ss_ext_discount_amt#32 else null)), avg(UnscaledValue(if (((ss_quantity#28 >= 
21) AND (ss_quantity#28 <= 40))) ss_net_paid#38 else null))])
   :  :        +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#32]
   :  :           +- HashAggregate(keys=[], functions=[partial_count(if 
(((ss_quantity#28 >= 21) AND (ss_quantity#28 <= 40))) ss_quantity#28 else 
null), partial_avg(UnscaledValue(if (((ss_quantity#28 >= 21) AND 
(ss_quantity#28 <= 40))) ss_ext_discount_amt#32 else null)), 
partial_avg(UnscaledValue(if (((ss_quantity#28 >= 21) AND (ss_quantity#28 <= 
40))) ss_net_paid#38 else null))])
   :  :              +- FileScan parquet 
default.store_sales[ss_quantity#28,ss_ext_discount_amt#32,ss_net_paid#38] 
Batched: true, DataFilters: [], Format: Parquet, Location: 
InMemoryFileIndex[file:/Users/yumwang/spark/SPARK-28169/spark-warehouse/org.apache.spark.sql.Data...,
 PartitionFilters: [], PushedFilters: [], ReadSchema: 
struct<ss_quantity:int,ss_ext_discount_amt:decimal(7,2),ss_net_paid:decimal(7,2)>
   :  :- Subquery subquery#4, [id=#45]
   :  :  +- AdaptiveSparkPlan isFinalPlan=false
   :  :     +- HashAggregate(keys=[], functions=[count(if (((ss_quantity#28 >= 
41) AND (ss_quantity#28 <= 60))) ss_quantity#28 else null), 
avg(UnscaledValue(if (((ss_quantity#28 >= 41) AND (ss_quantity#28 <= 60))) 
ss_ext_discount_amt#32 else null)), avg(UnscaledValue(if (((ss_quantity#28 >= 
41) AND (ss_quantity#28 <= 60))) ss_net_paid#38 else null))])
   :  :        +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#43]
   :  :           +- HashAggregate(keys=[], functions=[partial_count(if 
(((ss_quantity#28 >= 41) AND (ss_quantity#28 <= 60))) ss_quantity#28 else 
null), partial_avg(UnscaledValue(if (((ss_quantity#28 >= 41) AND 
(ss_quantity#28 <= 60))) ss_ext_discount_amt#32 else null)), 
partial_avg(UnscaledValue(if (((ss_quantity#28 >= 41) AND (ss_quantity#28 <= 
60))) ss_net_paid#38 else null))])
   :  :              +- FileScan parquet 
default.store_sales[ss_quantity#28,ss_ext_discount_amt#32,ss_net_paid#38] 
Batched: true, DataFilters: [], Format: Parquet, Location: 
InMemoryFileIndex[file:/Users/yumwang/spark/SPARK-28169/spark-warehouse/org.apache.spark.sql.Data...,
 PartitionFilters: [], PushedFilters: [], ReadSchema: 
struct<ss_quantity:int,ss_ext_discount_amt:decimal(7,2),ss_net_paid:decimal(7,2)>
   :  :- Subquery subquery#6, [id=#56]
   :  :  +- AdaptiveSparkPlan isFinalPlan=false
   :  :     +- HashAggregate(keys=[], functions=[count(if (((ss_quantity#28 >= 
61) AND (ss_quantity#28 <= 80))) ss_quantity#28 else null), 
avg(UnscaledValue(if (((ss_quantity#28 >= 61) AND (ss_quantity#28 <= 80))) 
ss_ext_discount_amt#32 else null)), avg(UnscaledValue(if (((ss_quantity#28 >= 
61) AND (ss_quantity#28 <= 80))) ss_net_paid#38 else null))])
   :  :        +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#54]
   :  :           +- HashAggregate(keys=[], functions=[partial_count(if 
(((ss_quantity#28 >= 61) AND (ss_quantity#28 <= 80))) ss_quantity#28 else 
null), partial_avg(UnscaledValue(if (((ss_quantity#28 >= 61) AND 
(ss_quantity#28 <= 80))) ss_ext_discount_amt#32 else null)), 
partial_avg(UnscaledValue(if (((ss_quantity#28 >= 61) AND (ss_quantity#28 <= 
80))) ss_net_paid#38 else null))])
   :  :              +- FileScan parquet 
default.store_sales[ss_quantity#28,ss_ext_discount_amt#32,ss_net_paid#38] 
Batched: true, DataFilters: [], Format: Parquet, Location: 
InMemoryFileIndex[file:/Users/yumwang/spark/SPARK-28169/spark-warehouse/org.apache.spark.sql.Data...,
 PartitionFilters: [], PushedFilters: [], ReadSchema: 
struct<ss_quantity:int,ss_ext_discount_amt:decimal(7,2),ss_net_paid:decimal(7,2)>
   :  +- Subquery subquery#8, [id=#67]
   :     +- AdaptiveSparkPlan isFinalPlan=false
   :        +- HashAggregate(keys=[], functions=[count(if (((ss_quantity#28 >= 
81) AND (ss_quantity#28 <= 100))) ss_quantity#28 else null), 
avg(UnscaledValue(if (((ss_quantity#28 >= 81) AND (ss_quantity#28 <= 100))) 
ss_ext_discount_amt#32 else null)), avg(UnscaledValue(if (((ss_quantity#28 >= 
81) AND (ss_quantity#28 <= 100))) ss_net_paid#38 else null))])
   :           +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#65]
   :              +- HashAggregate(keys=[], functions=[partial_count(if 
(((ss_quantity#28 >= 81) AND (ss_quantity#28 <= 100))) ss_quantity#28 else 
null), partial_avg(UnscaledValue(if (((ss_quantity#28 >= 81) AND 
(ss_quantity#28 <= 100))) ss_ext_discount_amt#32 else null)), 
partial_avg(UnscaledValue(if (((ss_quantity#28 >= 81) AND (ss_quantity#28 <= 
100))) ss_net_paid#38 else null))])
   :                 +- FileScan parquet 
default.store_sales[ss_quantity#28,ss_ext_discount_amt#32,ss_net_paid#38] 
Batched: true, DataFilters: [], Format: Parquet, Location: 
InMemoryFileIndex[file:/Users/yumwang/spark/SPARK-28169/spark-warehouse/org.apache.spark.sql.Data...,
 PartitionFilters: [], PushedFilters: [], ReadSchema: 
struct<ss_quantity:int,ss_ext_discount_amt:decimal(7,2),ss_net_paid:decimal(7,2)>
   +- Filter (isnotnull(r_reason_sk#15) AND (r_reason_sk#15 = 1))
      +- FileScan parquet default.reason[r_reason_sk#15] Batched: true, 
DataFilters: [isnotnull(r_reason_sk#15), (r_reason_sk#15 = 1)], Format: 
Parquet, Location: 
InMemoryFileIndex[file:/Users/yumwang/spark/SPARK-28169/spark-warehouse/org.apache.spark.sql.Data...,
 PartitionFilters: [], PushedFilters: [IsNotNull(r_reason_sk), 
EqualTo(r_reason_sk,1)], ReadSchema: struct<r_reason_sk:int>
{noformat}

Plan of PostgreSQL:
{noformat}
                                      QUERY PLAN
--------------------------------------------------------------------------------------
 Seq Scan on reason  (cost=51.80..62.67 rows=1 width=160)
   Filter: (r_reason_sk = 1)
   CTE bucket_result
     ->  Aggregate  (cost=51.68..51.70 rows=1 width=160)
           ->  Seq Scan on store_sales  (cost=0.00..13.40 rows=340 width=32)
   InitPlan 2 (returns $1)
     ->  CTE Scan on bucket_result  (cost=0.00..0.02 rows=1 width=32)
   InitPlan 3 (returns $2)
     ->  CTE Scan on bucket_result bucket_result_1  (cost=0.00..0.02 rows=1 
width=32)
   InitPlan 4 (returns $3)
     ->  CTE Scan on bucket_result bucket_result_2  (cost=0.00..0.02 rows=1 
width=32)
   InitPlan 5 (returns $4)
     ->  CTE Scan on bucket_result bucket_result_3  (cost=0.00..0.02 rows=1 
width=32)
   InitPlan 6 (returns $5)
     ->  CTE Scan on bucket_result bucket_result_4  (cost=0.00..0.02 rows=1 
width=32)
(15 rows)
{noformat}

It seems Spark SQL scan store_sales five times, but PostgreSQL scan store_sales 
only one time.




  was:
Prepare table:
{code:sql}
CREATE TABLE store_sales (  ss_sold_date_sk INT,  ss_sold_time_sk INT,  
ss_item_sk INT,  ss_customer_sk INT,  ss_cdemo_sk INT,  ss_hdemo_sk INT,  
ss_addr_sk INT,  ss_store_sk INT,  ss_promo_sk INT,  ss_ticket_number INT,  
ss_quantity INT,  ss_wholesale_cost DECIMAL(7,2),  ss_list_price DECIMAL(7,2),  
ss_sales_price DECIMAL(7,2),  ss_ext_discount_amt DECIMAL(7,2),  
ss_ext_sales_price DECIMAL(7,2),  ss_ext_wholesale_cost DECIMAL(7,2),  
ss_ext_list_price DECIMAL(7,2),  ss_ext_tax DECIMAL(7,2),  ss_coupon_amt 
DECIMAL(7,2),  ss_net_paid DECIMAL(7,2),  ss_net_paid_inc_tax 
DECIMAL(7,2),ss_net_profit DECIMAL(7,2));
CREATE TABLE reason (  r_reason_sk INT,  r_reason_id varchar(255),  
r_reason_desc varchar(255));
{code}


{code:sql}
WITH bucket_result AS (
SELECT
    CASE WHEN (count (CASE WHEN ss_quantity BETWEEN 1 AND 20 THEN ss_quantity 
END)) > 62316685
      THEN (avg(CASE WHEN ss_quantity BETWEEN 1 AND 20 THEN ss_ext_discount_amt 
END))
    ELSE (avg(CASE WHEN ss_quantity BETWEEN 1 AND 20 THEN ss_net_paid END)) END 
bucket1,
    CASE WHEN (count (CASE WHEN ss_quantity BETWEEN 21 AND 40 THEN ss_quantity 
END)) > 19045798
      THEN (avg(CASE WHEN ss_quantity BETWEEN 21 AND 40 THEN 
ss_ext_discount_amt END))
    ELSE (avg(CASE WHEN ss_quantity BETWEEN 21 AND 40 THEN ss_net_paid END)) 
END bucket2,
    CASE WHEN (count (CASE WHEN ss_quantity BETWEEN 41 AND 60 THEN ss_quantity 
END)) > 365541424
      THEN (avg(CASE WHEN ss_quantity BETWEEN 41 AND 60 THEN 
ss_ext_discount_amt END))
    ELSE (avg(CASE WHEN ss_quantity BETWEEN 41 AND 60 THEN ss_net_paid END)) 
END bucket3,
    CASE WHEN (count (CASE WHEN ss_quantity BETWEEN 61 AND 80 THEN ss_quantity 
END)) > 19045798
      THEN (avg(CASE WHEN ss_quantity BETWEEN 61 AND 80 THEN 
ss_ext_discount_amt END))
    ELSE (avg(CASE WHEN ss_quantity BETWEEN 61 AND 80 THEN ss_net_paid END)) 
END bucket4,
    CASE WHEN (count (CASE WHEN ss_quantity BETWEEN 81 AND 100 THEN ss_quantity 
END)) > 365541424
      THEN (avg(CASE WHEN ss_quantity BETWEEN 81 AND 100 THEN 
ss_ext_discount_amt END))
    ELSE (avg(CASE WHEN ss_quantity BETWEEN 81 AND 100 THEN ss_net_paid END)) 
END bucket5
  FROM store_sales
)
SELECT
  (SELECT bucket1 FROM bucket_result) as bucket1,
  (SELECT bucket2 FROM bucket_result) as bucket2,
  (SELECT bucket3 FROM bucket_result) as bucket3,
  (SELECT bucket4 FROM bucket_result) as bucket4,
  (SELECT bucket5 FROM bucket_result) as bucket5
FROM reason
WHERE r_reason_sk = 1;

{code}

Spark SQL:
{noformat}
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [Subquery subquery#0, [id=#23] AS bucket1#1, Subquery subquery#2, 
[id=#34] AS bucket2#3, Subquery subquery#4, [id=#45] AS bucket3#5, Subquery 
subquery#6, [id=#56] AS bucket4#7, Subquery subquery#8, [id=#67] AS bucket5#9]
   :  :- Subquery subquery#0, [id=#23]
   :  :  +- AdaptiveSparkPlan isFinalPlan=false
   :  :     +- HashAggregate(keys=[], functions=[count(if (((ss_quantity#28 >= 
1) AND (ss_quantity#28 <= 20))) ss_quantity#28 else null), avg(UnscaledValue(if 
(((ss_quantity#28 >= 1) AND (ss_quantity#28 <= 20))) ss_ext_discount_amt#32 
else null)), avg(UnscaledValue(if (((ss_quantity#28 >= 1) AND (ss_quantity#28 
<= 20))) ss_net_paid#38 else null))])
   :  :        +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#21]
   :  :           +- HashAggregate(keys=[], functions=[partial_count(if 
(((ss_quantity#28 >= 1) AND (ss_quantity#28 <= 20))) ss_quantity#28 else null), 
partial_avg(UnscaledValue(if (((ss_quantity#28 >= 1) AND (ss_quantity#28 <= 
20))) ss_ext_discount_amt#32 else null)), partial_avg(UnscaledValue(if 
(((ss_quantity#28 >= 1) AND (ss_quantity#28 <= 20))) ss_net_paid#38 else 
null))])
   :  :              +- FileScan parquet 
default.store_sales[ss_quantity#28,ss_ext_discount_amt#32,ss_net_paid#38] 
Batched: true, DataFilters: [], Format: Parquet, Location: 
InMemoryFileIndex[file:/Users/yumwang/spark/SPARK-28169/spark-warehouse/org.apache.spark.sql.Data...,
 PartitionFilters: [], PushedFilters: [], ReadSchema: 
struct<ss_quantity:int,ss_ext_discount_amt:decimal(7,2),ss_net_paid:decimal(7,2)>
   :  :- Subquery subquery#2, [id=#34]
   :  :  +- AdaptiveSparkPlan isFinalPlan=false
   :  :     +- HashAggregate(keys=[], functions=[count(if (((ss_quantity#28 >= 
21) AND (ss_quantity#28 <= 40))) ss_quantity#28 else null), 
avg(UnscaledValue(if (((ss_quantity#28 >= 21) AND (ss_quantity#28 <= 40))) 
ss_ext_discount_amt#32 else null)), avg(UnscaledValue(if (((ss_quantity#28 >= 
21) AND (ss_quantity#28 <= 40))) ss_net_paid#38 else null))])
   :  :        +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#32]
   :  :           +- HashAggregate(keys=[], functions=[partial_count(if 
(((ss_quantity#28 >= 21) AND (ss_quantity#28 <= 40))) ss_quantity#28 else 
null), partial_avg(UnscaledValue(if (((ss_quantity#28 >= 21) AND 
(ss_quantity#28 <= 40))) ss_ext_discount_amt#32 else null)), 
partial_avg(UnscaledValue(if (((ss_quantity#28 >= 21) AND (ss_quantity#28 <= 
40))) ss_net_paid#38 else null))])
   :  :              +- FileScan parquet 
default.store_sales[ss_quantity#28,ss_ext_discount_amt#32,ss_net_paid#38] 
Batched: true, DataFilters: [], Format: Parquet, Location: 
InMemoryFileIndex[file:/Users/yumwang/spark/SPARK-28169/spark-warehouse/org.apache.spark.sql.Data...,
 PartitionFilters: [], PushedFilters: [], ReadSchema: 
struct<ss_quantity:int,ss_ext_discount_amt:decimal(7,2),ss_net_paid:decimal(7,2)>
   :  :- Subquery subquery#4, [id=#45]
   :  :  +- AdaptiveSparkPlan isFinalPlan=false
   :  :     +- HashAggregate(keys=[], functions=[count(if (((ss_quantity#28 >= 
41) AND (ss_quantity#28 <= 60))) ss_quantity#28 else null), 
avg(UnscaledValue(if (((ss_quantity#28 >= 41) AND (ss_quantity#28 <= 60))) 
ss_ext_discount_amt#32 else null)), avg(UnscaledValue(if (((ss_quantity#28 >= 
41) AND (ss_quantity#28 <= 60))) ss_net_paid#38 else null))])
   :  :        +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#43]
   :  :           +- HashAggregate(keys=[], functions=[partial_count(if 
(((ss_quantity#28 >= 41) AND (ss_quantity#28 <= 60))) ss_quantity#28 else 
null), partial_avg(UnscaledValue(if (((ss_quantity#28 >= 41) AND 
(ss_quantity#28 <= 60))) ss_ext_discount_amt#32 else null)), 
partial_avg(UnscaledValue(if (((ss_quantity#28 >= 41) AND (ss_quantity#28 <= 
60))) ss_net_paid#38 else null))])
   :  :              +- FileScan parquet 
default.store_sales[ss_quantity#28,ss_ext_discount_amt#32,ss_net_paid#38] 
Batched: true, DataFilters: [], Format: Parquet, Location: 
InMemoryFileIndex[file:/Users/yumwang/spark/SPARK-28169/spark-warehouse/org.apache.spark.sql.Data...,
 PartitionFilters: [], PushedFilters: [], ReadSchema: 
struct<ss_quantity:int,ss_ext_discount_amt:decimal(7,2),ss_net_paid:decimal(7,2)>
   :  :- Subquery subquery#6, [id=#56]
   :  :  +- AdaptiveSparkPlan isFinalPlan=false
   :  :     +- HashAggregate(keys=[], functions=[count(if (((ss_quantity#28 >= 
61) AND (ss_quantity#28 <= 80))) ss_quantity#28 else null), 
avg(UnscaledValue(if (((ss_quantity#28 >= 61) AND (ss_quantity#28 <= 80))) 
ss_ext_discount_amt#32 else null)), avg(UnscaledValue(if (((ss_quantity#28 >= 
61) AND (ss_quantity#28 <= 80))) ss_net_paid#38 else null))])
   :  :        +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#54]
   :  :           +- HashAggregate(keys=[], functions=[partial_count(if 
(((ss_quantity#28 >= 61) AND (ss_quantity#28 <= 80))) ss_quantity#28 else 
null), partial_avg(UnscaledValue(if (((ss_quantity#28 >= 61) AND 
(ss_quantity#28 <= 80))) ss_ext_discount_amt#32 else null)), 
partial_avg(UnscaledValue(if (((ss_quantity#28 >= 61) AND (ss_quantity#28 <= 
80))) ss_net_paid#38 else null))])
   :  :              +- FileScan parquet 
default.store_sales[ss_quantity#28,ss_ext_discount_amt#32,ss_net_paid#38] 
Batched: true, DataFilters: [], Format: Parquet, Location: 
InMemoryFileIndex[file:/Users/yumwang/spark/SPARK-28169/spark-warehouse/org.apache.spark.sql.Data...,
 PartitionFilters: [], PushedFilters: [], ReadSchema: 
struct<ss_quantity:int,ss_ext_discount_amt:decimal(7,2),ss_net_paid:decimal(7,2)>
   :  +- Subquery subquery#8, [id=#67]
   :     +- AdaptiveSparkPlan isFinalPlan=false
   :        +- HashAggregate(keys=[], functions=[count(if (((ss_quantity#28 >= 
81) AND (ss_quantity#28 <= 100))) ss_quantity#28 else null), 
avg(UnscaledValue(if (((ss_quantity#28 >= 81) AND (ss_quantity#28 <= 100))) 
ss_ext_discount_amt#32 else null)), avg(UnscaledValue(if (((ss_quantity#28 >= 
81) AND (ss_quantity#28 <= 100))) ss_net_paid#38 else null))])
   :           +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#65]
   :              +- HashAggregate(keys=[], functions=[partial_count(if 
(((ss_quantity#28 >= 81) AND (ss_quantity#28 <= 100))) ss_quantity#28 else 
null), partial_avg(UnscaledValue(if (((ss_quantity#28 >= 81) AND 
(ss_quantity#28 <= 100))) ss_ext_discount_amt#32 else null)), 
partial_avg(UnscaledValue(if (((ss_quantity#28 >= 81) AND (ss_quantity#28 <= 
100))) ss_net_paid#38 else null))])
   :                 +- FileScan parquet 
default.store_sales[ss_quantity#28,ss_ext_discount_amt#32,ss_net_paid#38] 
Batched: true, DataFilters: [], Format: Parquet, Location: 
InMemoryFileIndex[file:/Users/yumwang/spark/SPARK-28169/spark-warehouse/org.apache.spark.sql.Data...,
 PartitionFilters: [], PushedFilters: [], ReadSchema: 
struct<ss_quantity:int,ss_ext_discount_amt:decimal(7,2),ss_net_paid:decimal(7,2)>
   +- Filter (isnotnull(r_reason_sk#15) AND (r_reason_sk#15 = 1))
      +- FileScan parquet default.reason[r_reason_sk#15] Batched: true, 
DataFilters: [isnotnull(r_reason_sk#15), (r_reason_sk#15 = 1)], Format: 
Parquet, Location: 
InMemoryFileIndex[file:/Users/yumwang/spark/SPARK-28169/spark-warehouse/org.apache.spark.sql.Data...,
 PartitionFilters: [], PushedFilters: [IsNotNull(r_reason_sk), 
EqualTo(r_reason_sk,1)], ReadSchema: struct<r_reason_sk:int>
{noformat}

PostgreSQL:
{noformat}
                                      QUERY PLAN
--------------------------------------------------------------------------------------
 Seq Scan on reason  (cost=51.80..62.67 rows=1 width=160)
   Filter: (r_reason_sk = 1)
   CTE bucket_result
     ->  Aggregate  (cost=51.68..51.70 rows=1 width=160)
           ->  Seq Scan on store_sales  (cost=0.00..13.40 rows=340 width=32)
   InitPlan 2 (returns $1)
     ->  CTE Scan on bucket_result  (cost=0.00..0.02 rows=1 width=32)
   InitPlan 3 (returns $2)
     ->  CTE Scan on bucket_result bucket_result_1  (cost=0.00..0.02 rows=1 
width=32)
   InitPlan 4 (returns $3)
     ->  CTE Scan on bucket_result bucket_result_2  (cost=0.00..0.02 rows=1 
width=32)
   InitPlan 5 (returns $4)
     ->  CTE Scan on bucket_result bucket_result_3  (cost=0.00..0.02 rows=1 
width=32)
   InitPlan 6 (returns $5)
     ->  CTE Scan on bucket_result bucket_result_4  (cost=0.00..0.02 rows=1 
width=32)
(15 rows)
{noformat}

It seems Spark SQL scan store_sales five times, but PostgreSQL scan store_sales 
only one time.





> Improvement CTE table scan
> --------------------------
>
>                 Key: SPARK-34079
>                 URL: https://issues.apache.org/jira/browse/SPARK-34079
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 3.2.0
>            Reporter: Yuming Wang
>            Priority: Major
>
> Prepare table:
> {code:sql}
> CREATE TABLE store_sales (  ss_sold_date_sk INT,  ss_sold_time_sk INT,  
> ss_item_sk INT,  ss_customer_sk INT,  ss_cdemo_sk INT,  ss_hdemo_sk INT,  
> ss_addr_sk INT,  ss_store_sk INT,  ss_promo_sk INT,  ss_ticket_number INT,  
> ss_quantity INT,  ss_wholesale_cost DECIMAL(7,2),  ss_list_price 
> DECIMAL(7,2),  ss_sales_price DECIMAL(7,2),  ss_ext_discount_amt 
> DECIMAL(7,2),  ss_ext_sales_price DECIMAL(7,2),  ss_ext_wholesale_cost 
> DECIMAL(7,2),  ss_ext_list_price DECIMAL(7,2),  ss_ext_tax DECIMAL(7,2),  
> ss_coupon_amt DECIMAL(7,2),  ss_net_paid DECIMAL(7,2),  ss_net_paid_inc_tax 
> DECIMAL(7,2),ss_net_profit DECIMAL(7,2));
> CREATE TABLE reason (  r_reason_sk INT,  r_reason_id varchar(255),  
> r_reason_desc varchar(255));
> {code}
> SQL:
> {code:sql}
> WITH bucket_result AS (
> SELECT
>     CASE WHEN (count (CASE WHEN ss_quantity BETWEEN 1 AND 20 THEN ss_quantity 
> END)) > 62316685
>       THEN (avg(CASE WHEN ss_quantity BETWEEN 1 AND 20 THEN 
> ss_ext_discount_amt END))
>     ELSE (avg(CASE WHEN ss_quantity BETWEEN 1 AND 20 THEN ss_net_paid END)) 
> END bucket1,
>     CASE WHEN (count (CASE WHEN ss_quantity BETWEEN 21 AND 40 THEN 
> ss_quantity END)) > 19045798
>       THEN (avg(CASE WHEN ss_quantity BETWEEN 21 AND 40 THEN 
> ss_ext_discount_amt END))
>     ELSE (avg(CASE WHEN ss_quantity BETWEEN 21 AND 40 THEN ss_net_paid END)) 
> END bucket2,
>     CASE WHEN (count (CASE WHEN ss_quantity BETWEEN 41 AND 60 THEN 
> ss_quantity END)) > 365541424
>       THEN (avg(CASE WHEN ss_quantity BETWEEN 41 AND 60 THEN 
> ss_ext_discount_amt END))
>     ELSE (avg(CASE WHEN ss_quantity BETWEEN 41 AND 60 THEN ss_net_paid END)) 
> END bucket3,
>     CASE WHEN (count (CASE WHEN ss_quantity BETWEEN 61 AND 80 THEN 
> ss_quantity END)) > 19045798
>       THEN (avg(CASE WHEN ss_quantity BETWEEN 61 AND 80 THEN 
> ss_ext_discount_amt END))
>     ELSE (avg(CASE WHEN ss_quantity BETWEEN 61 AND 80 THEN ss_net_paid END)) 
> END bucket4,
>     CASE WHEN (count (CASE WHEN ss_quantity BETWEEN 81 AND 100 THEN 
> ss_quantity END)) > 365541424
>       THEN (avg(CASE WHEN ss_quantity BETWEEN 81 AND 100 THEN 
> ss_ext_discount_amt END))
>     ELSE (avg(CASE WHEN ss_quantity BETWEEN 81 AND 100 THEN ss_net_paid END)) 
> END bucket5
>   FROM store_sales
> )
> SELECT
>   (SELECT bucket1 FROM bucket_result) as bucket1,
>   (SELECT bucket2 FROM bucket_result) as bucket2,
>   (SELECT bucket3 FROM bucket_result) as bucket3,
>   (SELECT bucket4 FROM bucket_result) as bucket4,
>   (SELECT bucket5 FROM bucket_result) as bucket5
> FROM reason
> WHERE r_reason_sk = 1;
> {code}
> Plan of Spark SQL:
> {noformat}
> == Physical Plan ==
> AdaptiveSparkPlan isFinalPlan=false
> +- Project [Subquery subquery#0, [id=#23] AS bucket1#1, Subquery subquery#2, 
> [id=#34] AS bucket2#3, Subquery subquery#4, [id=#45] AS bucket3#5, Subquery 
> subquery#6, [id=#56] AS bucket4#7, Subquery subquery#8, [id=#67] AS bucket5#9]
>    :  :- Subquery subquery#0, [id=#23]
>    :  :  +- AdaptiveSparkPlan isFinalPlan=false
>    :  :     +- HashAggregate(keys=[], functions=[count(if (((ss_quantity#28 
> >= 1) AND (ss_quantity#28 <= 20))) ss_quantity#28 else null), 
> avg(UnscaledValue(if (((ss_quantity#28 >= 1) AND (ss_quantity#28 <= 20))) 
> ss_ext_discount_amt#32 else null)), avg(UnscaledValue(if (((ss_quantity#28 >= 
> 1) AND (ss_quantity#28 <= 20))) ss_net_paid#38 else null))])
>    :  :        +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#21]
>    :  :           +- HashAggregate(keys=[], functions=[partial_count(if 
> (((ss_quantity#28 >= 1) AND (ss_quantity#28 <= 20))) ss_quantity#28 else 
> null), partial_avg(UnscaledValue(if (((ss_quantity#28 >= 1) AND 
> (ss_quantity#28 <= 20))) ss_ext_discount_amt#32 else null)), 
> partial_avg(UnscaledValue(if (((ss_quantity#28 >= 1) AND (ss_quantity#28 <= 
> 20))) ss_net_paid#38 else null))])
>    :  :              +- FileScan parquet 
> default.store_sales[ss_quantity#28,ss_ext_discount_amt#32,ss_net_paid#38] 
> Batched: true, DataFilters: [], Format: Parquet, Location: 
> InMemoryFileIndex[file:/Users/yumwang/spark/SPARK-28169/spark-warehouse/org.apache.spark.sql.Data...,
>  PartitionFilters: [], PushedFilters: [], ReadSchema: 
> struct<ss_quantity:int,ss_ext_discount_amt:decimal(7,2),ss_net_paid:decimal(7,2)>
>    :  :- Subquery subquery#2, [id=#34]
>    :  :  +- AdaptiveSparkPlan isFinalPlan=false
>    :  :     +- HashAggregate(keys=[], functions=[count(if (((ss_quantity#28 
> >= 21) AND (ss_quantity#28 <= 40))) ss_quantity#28 else null), 
> avg(UnscaledValue(if (((ss_quantity#28 >= 21) AND (ss_quantity#28 <= 40))) 
> ss_ext_discount_amt#32 else null)), avg(UnscaledValue(if (((ss_quantity#28 >= 
> 21) AND (ss_quantity#28 <= 40))) ss_net_paid#38 else null))])
>    :  :        +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#32]
>    :  :           +- HashAggregate(keys=[], functions=[partial_count(if 
> (((ss_quantity#28 >= 21) AND (ss_quantity#28 <= 40))) ss_quantity#28 else 
> null), partial_avg(UnscaledValue(if (((ss_quantity#28 >= 21) AND 
> (ss_quantity#28 <= 40))) ss_ext_discount_amt#32 else null)), 
> partial_avg(UnscaledValue(if (((ss_quantity#28 >= 21) AND (ss_quantity#28 <= 
> 40))) ss_net_paid#38 else null))])
>    :  :              +- FileScan parquet 
> default.store_sales[ss_quantity#28,ss_ext_discount_amt#32,ss_net_paid#38] 
> Batched: true, DataFilters: [], Format: Parquet, Location: 
> InMemoryFileIndex[file:/Users/yumwang/spark/SPARK-28169/spark-warehouse/org.apache.spark.sql.Data...,
>  PartitionFilters: [], PushedFilters: [], ReadSchema: 
> struct<ss_quantity:int,ss_ext_discount_amt:decimal(7,2),ss_net_paid:decimal(7,2)>
>    :  :- Subquery subquery#4, [id=#45]
>    :  :  +- AdaptiveSparkPlan isFinalPlan=false
>    :  :     +- HashAggregate(keys=[], functions=[count(if (((ss_quantity#28 
> >= 41) AND (ss_quantity#28 <= 60))) ss_quantity#28 else null), 
> avg(UnscaledValue(if (((ss_quantity#28 >= 41) AND (ss_quantity#28 <= 60))) 
> ss_ext_discount_amt#32 else null)), avg(UnscaledValue(if (((ss_quantity#28 >= 
> 41) AND (ss_quantity#28 <= 60))) ss_net_paid#38 else null))])
>    :  :        +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#43]
>    :  :           +- HashAggregate(keys=[], functions=[partial_count(if 
> (((ss_quantity#28 >= 41) AND (ss_quantity#28 <= 60))) ss_quantity#28 else 
> null), partial_avg(UnscaledValue(if (((ss_quantity#28 >= 41) AND 
> (ss_quantity#28 <= 60))) ss_ext_discount_amt#32 else null)), 
> partial_avg(UnscaledValue(if (((ss_quantity#28 >= 41) AND (ss_quantity#28 <= 
> 60))) ss_net_paid#38 else null))])
>    :  :              +- FileScan parquet 
> default.store_sales[ss_quantity#28,ss_ext_discount_amt#32,ss_net_paid#38] 
> Batched: true, DataFilters: [], Format: Parquet, Location: 
> InMemoryFileIndex[file:/Users/yumwang/spark/SPARK-28169/spark-warehouse/org.apache.spark.sql.Data...,
>  PartitionFilters: [], PushedFilters: [], ReadSchema: 
> struct<ss_quantity:int,ss_ext_discount_amt:decimal(7,2),ss_net_paid:decimal(7,2)>
>    :  :- Subquery subquery#6, [id=#56]
>    :  :  +- AdaptiveSparkPlan isFinalPlan=false
>    :  :     +- HashAggregate(keys=[], functions=[count(if (((ss_quantity#28 
> >= 61) AND (ss_quantity#28 <= 80))) ss_quantity#28 else null), 
> avg(UnscaledValue(if (((ss_quantity#28 >= 61) AND (ss_quantity#28 <= 80))) 
> ss_ext_discount_amt#32 else null)), avg(UnscaledValue(if (((ss_quantity#28 >= 
> 61) AND (ss_quantity#28 <= 80))) ss_net_paid#38 else null))])
>    :  :        +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#54]
>    :  :           +- HashAggregate(keys=[], functions=[partial_count(if 
> (((ss_quantity#28 >= 61) AND (ss_quantity#28 <= 80))) ss_quantity#28 else 
> null), partial_avg(UnscaledValue(if (((ss_quantity#28 >= 61) AND 
> (ss_quantity#28 <= 80))) ss_ext_discount_amt#32 else null)), 
> partial_avg(UnscaledValue(if (((ss_quantity#28 >= 61) AND (ss_quantity#28 <= 
> 80))) ss_net_paid#38 else null))])
>    :  :              +- FileScan parquet 
> default.store_sales[ss_quantity#28,ss_ext_discount_amt#32,ss_net_paid#38] 
> Batched: true, DataFilters: [], Format: Parquet, Location: 
> InMemoryFileIndex[file:/Users/yumwang/spark/SPARK-28169/spark-warehouse/org.apache.spark.sql.Data...,
>  PartitionFilters: [], PushedFilters: [], ReadSchema: 
> struct<ss_quantity:int,ss_ext_discount_amt:decimal(7,2),ss_net_paid:decimal(7,2)>
>    :  +- Subquery subquery#8, [id=#67]
>    :     +- AdaptiveSparkPlan isFinalPlan=false
>    :        +- HashAggregate(keys=[], functions=[count(if (((ss_quantity#28 
> >= 81) AND (ss_quantity#28 <= 100))) ss_quantity#28 else null), 
> avg(UnscaledValue(if (((ss_quantity#28 >= 81) AND (ss_quantity#28 <= 100))) 
> ss_ext_discount_amt#32 else null)), avg(UnscaledValue(if (((ss_quantity#28 >= 
> 81) AND (ss_quantity#28 <= 100))) ss_net_paid#38 else null))])
>    :           +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#65]
>    :              +- HashAggregate(keys=[], functions=[partial_count(if 
> (((ss_quantity#28 >= 81) AND (ss_quantity#28 <= 100))) ss_quantity#28 else 
> null), partial_avg(UnscaledValue(if (((ss_quantity#28 >= 81) AND 
> (ss_quantity#28 <= 100))) ss_ext_discount_amt#32 else null)), 
> partial_avg(UnscaledValue(if (((ss_quantity#28 >= 81) AND (ss_quantity#28 <= 
> 100))) ss_net_paid#38 else null))])
>    :                 +- FileScan parquet 
> default.store_sales[ss_quantity#28,ss_ext_discount_amt#32,ss_net_paid#38] 
> Batched: true, DataFilters: [], Format: Parquet, Location: 
> InMemoryFileIndex[file:/Users/yumwang/spark/SPARK-28169/spark-warehouse/org.apache.spark.sql.Data...,
>  PartitionFilters: [], PushedFilters: [], ReadSchema: 
> struct<ss_quantity:int,ss_ext_discount_amt:decimal(7,2),ss_net_paid:decimal(7,2)>
>    +- Filter (isnotnull(r_reason_sk#15) AND (r_reason_sk#15 = 1))
>       +- FileScan parquet default.reason[r_reason_sk#15] Batched: true, 
> DataFilters: [isnotnull(r_reason_sk#15), (r_reason_sk#15 = 1)], Format: 
> Parquet, Location: 
> InMemoryFileIndex[file:/Users/yumwang/spark/SPARK-28169/spark-warehouse/org.apache.spark.sql.Data...,
>  PartitionFilters: [], PushedFilters: [IsNotNull(r_reason_sk), 
> EqualTo(r_reason_sk,1)], ReadSchema: struct<r_reason_sk:int>
> {noformat}
> Plan of PostgreSQL:
> {noformat}
>                                       QUERY PLAN
> --------------------------------------------------------------------------------------
>  Seq Scan on reason  (cost=51.80..62.67 rows=1 width=160)
>    Filter: (r_reason_sk = 1)
>    CTE bucket_result
>      ->  Aggregate  (cost=51.68..51.70 rows=1 width=160)
>            ->  Seq Scan on store_sales  (cost=0.00..13.40 rows=340 width=32)
>    InitPlan 2 (returns $1)
>      ->  CTE Scan on bucket_result  (cost=0.00..0.02 rows=1 width=32)
>    InitPlan 3 (returns $2)
>      ->  CTE Scan on bucket_result bucket_result_1  (cost=0.00..0.02 rows=1 
> width=32)
>    InitPlan 4 (returns $3)
>      ->  CTE Scan on bucket_result bucket_result_2  (cost=0.00..0.02 rows=1 
> width=32)
>    InitPlan 5 (returns $4)
>      ->  CTE Scan on bucket_result bucket_result_3  (cost=0.00..0.02 rows=1 
> width=32)
>    InitPlan 6 (returns $5)
>      ->  CTE Scan on bucket_result bucket_result_4  (cost=0.00..0.02 rows=1 
> width=32)
> (15 rows)
> {noformat}
> It seems Spark SQL scan store_sales five times, but PostgreSQL scan 
> store_sales only one time.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to