This is an automated email from the ASF dual-hosted git repository.

viirya pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new b4c2dc23 fix: fix CometNativeExec.doCanonicalize for 
ReusedExchangeExec (#447)
b4c2dc23 is described below

commit b4c2dc2367296e286d08ecbbf35a00aec9821fbd
Author: Liang-Chi Hsieh <[email protected]>
AuthorDate: Sat May 18 22:33:40 2024 -0700

    fix: fix CometNativeExec.doCanonicalize for ReusedExchangeExec (#447)
---
 .../org/apache/spark/sql/comet/operators.scala     |  20 +-
 .../approved-plans-v2_7/q5a/explain.txt            | 472 ++++-----------------
 .../approved-plans-v2_7/q5a/simplified.txt         |  88 +---
 .../org/apache/comet/exec/CometExecSuite.scala     |  30 ++
 4 files changed, 125 insertions(+), 485 deletions(-)

diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala 
b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
index 63587af3..bb17442a 100644
--- a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
+++ b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
@@ -363,7 +363,11 @@ abstract class CometNativeExec extends CometExec {
   }
 
   override protected def doCanonicalize(): SparkPlan = {
-    val canonicalizedPlan = 
super.doCanonicalize().asInstanceOf[CometNativeExec]
+    val canonicalizedPlan = super
+      .doCanonicalize()
+      .asInstanceOf[CometNativeExec]
+      .canonicalizePlans()
+
     if (serializedPlanOpt.isDefined) {
       // If the plan is a boundary node, we should remove the serialized plan.
       canonicalizedPlan.cleanBlock()
@@ -371,6 +375,20 @@ abstract class CometNativeExec extends CometExec {
       canonicalizedPlan
     }
   }
+
+  /**
+   * Canonicalizes the plans of Product parameters in Comet native operators.
+   */
+  protected def canonicalizePlans(): CometNativeExec = {
+    def transform(arg: Any): AnyRef = arg match {
+      case sparkPlan: SparkPlan => sparkPlan.canonicalized
+      case other: AnyRef => other
+      case null => null
+    }
+
+    val newArgs = mapProductIterator(transform)
+    makeCopy(newArgs).asInstanceOf[CometNativeExec]
+  }
 }
 
 abstract class CometUnaryExec extends CometNativeExec with UnaryExecNode
diff --git 
a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q5a/explain.txt
 
b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q5a/explain.txt
index 2345d02e..2b45472f 100644
--- 
a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q5a/explain.txt
+++ 
b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q5a/explain.txt
@@ -1,9 +1,9 @@
 == Physical Plan ==
-TakeOrderedAndProject (137)
-+- * HashAggregate (136)
-   +- Exchange (135)
-      +- * HashAggregate (134)
-         +- Union (133)
+TakeOrderedAndProject (83)
++- * HashAggregate (82)
+   +- Exchange (81)
+      +- * HashAggregate (80)
+         +- Union (79)
             :- * HashAggregate (68)
             :  +- Exchange (67)
             :     +- * HashAggregate (66)
@@ -72,70 +72,16 @@ TakeOrderedAndProject (137)
             :                             +- * ColumnarToRow (58)
             :                                +- CometFilter (57)
             :                                   +- CometScan parquet 
spark_catalog.default.web_site (56)
-            :- * HashAggregate (100)
-            :  +- Exchange (99)
-            :     +- * HashAggregate (98)
-            :        +- * HashAggregate (97)
-            :           +- Exchange (96)
-            :              +- * HashAggregate (95)
-            :                 +- Union (94)
-            :                    :- * HashAggregate (70)
-            :                    :  +- ReusedExchange (69)
-            :                    :- * HashAggregate (72)
-            :                    :  +- ReusedExchange (71)
-            :                    +- * HashAggregate (93)
-            :                       +- Exchange (92)
-            :                          +- * HashAggregate (91)
-            :                             +- * Project (90)
-            :                                +- * BroadcastHashJoin Inner 
BuildRight (89)
-            :                                   :- * Project (87)
-            :                                   :  +- * BroadcastHashJoin 
Inner BuildRight (86)
-            :                                   :     :- * ColumnarToRow (84)
-            :                                   :     :  +- CometUnion (83)
-            :                                   :     :     :- CometProject 
(75)
-            :                                   :     :     :  +- CometFilter 
(74)
-            :                                   :     :     :     +- CometScan 
parquet spark_catalog.default.web_sales (73)
-            :                                   :     :     +- CometProject 
(82)
-            :                                   :     :        +- 
CometBroadcastHashJoin (81)
-            :                                   :     :           :- 
CometBroadcastExchange (77)
-            :                                   :     :           :  +- 
CometScan parquet spark_catalog.default.web_returns (76)
-            :                                   :     :           +- 
CometProject (80)
-            :                                   :     :              +- 
CometFilter (79)
-            :                                   :     :                 +- 
CometScan parquet spark_catalog.default.web_sales (78)
-            :                                   :     +- ReusedExchange (85)
-            :                                   +- ReusedExchange (88)
-            +- * HashAggregate (132)
-               +- Exchange (131)
-                  +- * HashAggregate (130)
-                     +- * HashAggregate (129)
-                        +- Exchange (128)
-                           +- * HashAggregate (127)
-                              +- Union (126)
-                                 :- * HashAggregate (102)
-                                 :  +- ReusedExchange (101)
-                                 :- * HashAggregate (104)
-                                 :  +- ReusedExchange (103)
-                                 +- * HashAggregate (125)
-                                    +- Exchange (124)
-                                       +- * HashAggregate (123)
-                                          +- * Project (122)
-                                             +- * BroadcastHashJoin Inner 
BuildRight (121)
-                                                :- * Project (119)
-                                                :  +- * BroadcastHashJoin 
Inner BuildRight (118)
-                                                :     :- * ColumnarToRow (116)
-                                                :     :  +- CometUnion (115)
-                                                :     :     :- CometProject 
(107)
-                                                :     :     :  +- CometFilter 
(106)
-                                                :     :     :     +- CometScan 
parquet spark_catalog.default.web_sales (105)
-                                                :     :     +- CometProject 
(114)
-                                                :     :        +- 
CometBroadcastHashJoin (113)
-                                                :     :           :- 
CometBroadcastExchange (109)
-                                                :     :           :  +- 
CometScan parquet spark_catalog.default.web_returns (108)
-                                                :     :           +- 
CometProject (112)
-                                                :     :              +- 
CometFilter (111)
-                                                :     :                 +- 
CometScan parquet spark_catalog.default.web_sales (110)
-                                                :     +- ReusedExchange (117)
-                                                +- ReusedExchange (120)
+            :- * HashAggregate (73)
+            :  +- Exchange (72)
+            :     +- * HashAggregate (71)
+            :        +- * HashAggregate (70)
+            :           +- ReusedExchange (69)
+            +- * HashAggregate (78)
+               +- Exchange (77)
+                  +- * HashAggregate (76)
+                     +- * HashAggregate (75)
+                        +- ReusedExchange (74)
 
 
 (1) Scan parquet spark_catalog.default.store_sales
@@ -177,7 +123,7 @@ Child 1 Input [6]: [store_sk#16, date_sk#17, 
sales_price#18, profit#19, return_a
 (8) ColumnarToRow [codegen id : 3]
 Input [6]: [store_sk#6, date_sk#7, sales_price#8, profit#9, return_amt#10, 
net_loss#11]
 
-(9) ReusedExchange [Reuses operator id: 142]
+(9) ReusedExchange [Reuses operator id: 88]
 Output [1]: [d_date_sk#22]
 
 (10) BroadcastHashJoin [codegen id : 3]
@@ -275,7 +221,7 @@ Child 1 Input [6]: [page_sk#57, date_sk#58, sales_price#59, 
profit#60, return_am
 (28) ColumnarToRow [codegen id : 7]
 Input [6]: [page_sk#47, date_sk#48, sales_price#49, profit#50, return_amt#51, 
net_loss#52]
 
-(29) ReusedExchange [Reuses operator id: 142]
+(29) ReusedExchange [Reuses operator id: 88]
 Output [1]: [d_date_sk#63]
 
 (30) BroadcastHashJoin [codegen id : 7]
@@ -392,7 +338,7 @@ Child 1 Input [6]: [wsr_web_site_sk#103, date_sk#104, 
sales_price#105, profit#10
 (52) ColumnarToRow [codegen id : 11]
 Input [6]: [wsr_web_site_sk#88, date_sk#89, sales_price#90, profit#91, 
return_amt#92, net_loss#93]
 
-(53) ReusedExchange [Reuses operator id: 142]
+(53) ReusedExchange [Reuses operator id: 88]
 Output [1]: [d_date_sk#109]
 
 (54) BroadcastHashJoin [codegen id : 11]
@@ -471,381 +417,117 @@ Functions [3]: [sum(sales#39), sum(returns#40), 
sum(profit#41)]
 Aggregate Attributes [3]: [sum(sales#39)#141, sum(returns#40)#142, 
sum(profit#41)#143]
 Results [5]: [channel#37, id#38, cast(sum(sales#39)#141 as decimal(37,2)) AS 
sales#144, cast(sum(returns#40)#142 as decimal(37,2)) AS returns#145, 
cast(sum(profit#41)#143 as decimal(38,2)) AS profit#146]
 
-(69) ReusedExchange [Reuses operator id: 19]
-Output [5]: [s_store_id#24, sum#147, sum#148, sum#149, sum#150]
+(69) ReusedExchange [Reuses operator id: 67]
+Output [8]: [channel#37, id#38, sum#135, isEmpty#136, sum#137, isEmpty#138, 
sum#139, isEmpty#140]
 
-(70) HashAggregate [codegen id : 18]
-Input [5]: [s_store_id#24, sum#147, sum#148, sum#149, sum#150]
-Keys [1]: [s_store_id#24]
-Functions [4]: [sum(UnscaledValue(sales_price#8)), 
sum(UnscaledValue(return_amt#10)), sum(UnscaledValue(profit#9)), 
sum(UnscaledValue(net_loss#11))]
-Aggregate Attributes [4]: [sum(UnscaledValue(sales_price#8))#33, 
sum(UnscaledValue(return_amt#10))#34, sum(UnscaledValue(profit#9))#35, 
sum(UnscaledValue(net_loss#11))#36]
-Results [5]: [store channel AS channel#37, concat(store, s_store_id#24) AS 
id#38, MakeDecimal(sum(UnscaledValue(sales_price#8))#33,17,2) AS sales#39, 
MakeDecimal(sum(UnscaledValue(return_amt#10))#34,17,2) AS returns#40, 
(MakeDecimal(sum(UnscaledValue(profit#9))#35,17,2) - 
MakeDecimal(sum(UnscaledValue(net_loss#11))#36,17,2)) AS profit#41]
-
-(71) ReusedExchange [Reuses operator id: 39]
-Output [5]: [cp_catalog_page_id#65, sum#151, sum#152, sum#153, sum#154]
-
-(72) HashAggregate [codegen id : 22]
-Input [5]: [cp_catalog_page_id#65, sum#151, sum#152, sum#153, sum#154]
-Keys [1]: [cp_catalog_page_id#65]
-Functions [4]: [sum(UnscaledValue(sales_price#49)), 
sum(UnscaledValue(return_amt#51)), sum(UnscaledValue(profit#50)), 
sum(UnscaledValue(net_loss#52))]
-Aggregate Attributes [4]: [sum(UnscaledValue(sales_price#49))#74, 
sum(UnscaledValue(return_amt#51))#75, sum(UnscaledValue(profit#50))#76, 
sum(UnscaledValue(net_loss#52))#77]
-Results [5]: [catalog channel AS channel#78, concat(catalog_page, 
cp_catalog_page_id#65) AS id#79, 
MakeDecimal(sum(UnscaledValue(sales_price#49))#74,17,2) AS sales#80, 
MakeDecimal(sum(UnscaledValue(return_amt#51))#75,17,2) AS returns#81, 
(MakeDecimal(sum(UnscaledValue(profit#50))#76,17,2) - 
MakeDecimal(sum(UnscaledValue(net_loss#52))#77,17,2)) AS profit#82]
-
-(73) Scan parquet spark_catalog.default.web_sales
-Output [4]: [ws_web_site_sk#83, ws_ext_sales_price#84, ws_net_profit#85, 
ws_sold_date_sk#86]
-Batched: true
-Location: InMemoryFileIndex []
-PartitionFilters: [isnotnull(ws_sold_date_sk#86), 
dynamicpruningexpression(ws_sold_date_sk#86 IN dynamicpruning#155)]
-PushedFilters: [IsNotNull(ws_web_site_sk)]
-ReadSchema: 
struct<ws_web_site_sk:int,ws_ext_sales_price:decimal(7,2),ws_net_profit:decimal(7,2)>
-
-(74) CometFilter
-Input [4]: [ws_web_site_sk#83, ws_ext_sales_price#84, ws_net_profit#85, 
ws_sold_date_sk#86]
-Condition : isnotnull(ws_web_site_sk#83)
-
-(75) CometProject
-Input [4]: [ws_web_site_sk#83, ws_ext_sales_price#84, ws_net_profit#85, 
ws_sold_date_sk#86]
-Arguments: [wsr_web_site_sk#88, date_sk#89, sales_price#90, profit#91, 
return_amt#92, net_loss#93], [ws_web_site_sk#83 AS wsr_web_site_sk#88, 
ws_sold_date_sk#86 AS date_sk#89, ws_ext_sales_price#84 AS sales_price#90, 
ws_net_profit#85 AS profit#91, 0.00 AS return_amt#92, 0.00 AS net_loss#93]
-
-(76) Scan parquet spark_catalog.default.web_returns
-Output [5]: [wr_item_sk#94, wr_order_number#95, wr_return_amt#96, 
wr_net_loss#97, wr_returned_date_sk#98]
-Batched: true
-Location: InMemoryFileIndex []
-PartitionFilters: [isnotnull(wr_returned_date_sk#98), 
dynamicpruningexpression(wr_returned_date_sk#98 IN dynamicpruning#155)]
-ReadSchema: 
struct<wr_item_sk:int,wr_order_number:int,wr_return_amt:decimal(7,2),wr_net_loss:decimal(7,2)>
-
-(77) CometBroadcastExchange
-Input [5]: [wr_item_sk#94, wr_order_number#95, wr_return_amt#96, 
wr_net_loss#97, wr_returned_date_sk#98]
-Arguments: [wr_item_sk#94, wr_order_number#95, wr_return_amt#96, 
wr_net_loss#97, wr_returned_date_sk#98]
-
-(78) Scan parquet spark_catalog.default.web_sales
-Output [4]: [ws_item_sk#99, ws_web_site_sk#100, ws_order_number#101, 
ws_sold_date_sk#102]
-Batched: true
-Location [not included in comparison]/{warehouse_dir}/web_sales]
-PushedFilters: [IsNotNull(ws_item_sk), IsNotNull(ws_order_number), 
IsNotNull(ws_web_site_sk)]
-ReadSchema: struct<ws_item_sk:int,ws_web_site_sk:int,ws_order_number:int>
-
-(79) CometFilter
-Input [4]: [ws_item_sk#99, ws_web_site_sk#100, ws_order_number#101, 
ws_sold_date_sk#102]
-Condition : ((isnotnull(ws_item_sk#99) AND isnotnull(ws_order_number#101)) AND 
isnotnull(ws_web_site_sk#100))
-
-(80) CometProject
-Input [4]: [ws_item_sk#99, ws_web_site_sk#100, ws_order_number#101, 
ws_sold_date_sk#102]
-Arguments: [ws_item_sk#99, ws_web_site_sk#100, ws_order_number#101], 
[ws_item_sk#99, ws_web_site_sk#100, ws_order_number#101]
-
-(81) CometBroadcastHashJoin
-Left output [5]: [wr_item_sk#94, wr_order_number#95, wr_return_amt#96, 
wr_net_loss#97, wr_returned_date_sk#98]
-Right output [3]: [ws_item_sk#99, ws_web_site_sk#100, ws_order_number#101]
-Arguments: [wr_item_sk#94, wr_order_number#95], [ws_item_sk#99, 
ws_order_number#101], Inner
-
-(82) CometProject
-Input [8]: [wr_item_sk#94, wr_order_number#95, wr_return_amt#96, 
wr_net_loss#97, wr_returned_date_sk#98, ws_item_sk#99, ws_web_site_sk#100, 
ws_order_number#101]
-Arguments: [wsr_web_site_sk#103, date_sk#104, sales_price#105, profit#106, 
return_amt#107, net_loss#108], [ws_web_site_sk#100 AS wsr_web_site_sk#103, 
wr_returned_date_sk#98 AS date_sk#104, 0.00 AS sales_price#105, 0.00 AS 
profit#106, wr_return_amt#96 AS return_amt#107, wr_net_loss#97 AS net_loss#108]
-
-(83) CometUnion
-Child 0 Input [6]: [wsr_web_site_sk#88, date_sk#89, sales_price#90, profit#91, 
return_amt#92, net_loss#93]
-Child 1 Input [6]: [wsr_web_site_sk#103, date_sk#104, sales_price#105, 
profit#106, return_amt#107, net_loss#108]
-
-(84) ColumnarToRow [codegen id : 25]
-Input [6]: [wsr_web_site_sk#88, date_sk#89, sales_price#90, profit#91, 
return_amt#92, net_loss#93]
-
-(85) ReusedExchange [Reuses operator id: 142]
-Output [1]: [d_date_sk#109]
-
-(86) BroadcastHashJoin [codegen id : 25]
-Left keys [1]: [date_sk#89]
-Right keys [1]: [d_date_sk#109]
-Join type: Inner
-Join condition: None
-
-(87) Project [codegen id : 25]
-Output [5]: [wsr_web_site_sk#88, sales_price#90, profit#91, return_amt#92, 
net_loss#93]
-Input [7]: [wsr_web_site_sk#88, date_sk#89, sales_price#90, profit#91, 
return_amt#92, net_loss#93, d_date_sk#109]
-
-(88) ReusedExchange [Reuses operator id: 59]
-Output [2]: [web_site_sk#110, web_site_id#111]
-
-(89) BroadcastHashJoin [codegen id : 25]
-Left keys [1]: [wsr_web_site_sk#88]
-Right keys [1]: [web_site_sk#110]
-Join type: Inner
-Join condition: None
-
-(90) Project [codegen id : 25]
-Output [5]: [sales_price#90, profit#91, return_amt#92, net_loss#93, 
web_site_id#111]
-Input [7]: [wsr_web_site_sk#88, sales_price#90, profit#91, return_amt#92, 
net_loss#93, web_site_sk#110, web_site_id#111]
-
-(91) HashAggregate [codegen id : 25]
-Input [5]: [sales_price#90, profit#91, return_amt#92, net_loss#93, 
web_site_id#111]
-Keys [1]: [web_site_id#111]
-Functions [4]: [partial_sum(UnscaledValue(sales_price#90)), 
partial_sum(UnscaledValue(return_amt#92)), 
partial_sum(UnscaledValue(profit#91)), partial_sum(UnscaledValue(net_loss#93))]
-Aggregate Attributes [4]: [sum#156, sum#157, sum#158, sum#159]
-Results [5]: [web_site_id#111, sum#160, sum#161, sum#162, sum#163]
-
-(92) Exchange
-Input [5]: [web_site_id#111, sum#160, sum#161, sum#162, sum#163]
-Arguments: hashpartitioning(web_site_id#111, 5), ENSURE_REQUIREMENTS, 
[plan_id=8]
-
-(93) HashAggregate [codegen id : 26]
-Input [5]: [web_site_id#111, sum#160, sum#161, sum#162, sum#163]
-Keys [1]: [web_site_id#111]
-Functions [4]: [sum(UnscaledValue(sales_price#90)), 
sum(UnscaledValue(return_amt#92)), sum(UnscaledValue(profit#91)), 
sum(UnscaledValue(net_loss#93))]
-Aggregate Attributes [4]: [sum(UnscaledValue(sales_price#90))#120, 
sum(UnscaledValue(return_amt#92))#121, sum(UnscaledValue(profit#91))#122, 
sum(UnscaledValue(net_loss#93))#123]
-Results [5]: [web channel AS channel#124, concat(web_site, web_site_id#111) AS 
id#125, MakeDecimal(sum(UnscaledValue(sales_price#90))#120,17,2) AS sales#126, 
MakeDecimal(sum(UnscaledValue(return_amt#92))#121,17,2) AS returns#127, 
(MakeDecimal(sum(UnscaledValue(profit#91))#122,17,2) - 
MakeDecimal(sum(UnscaledValue(net_loss#93))#123,17,2)) AS profit#128]
-
-(94) Union
-
-(95) HashAggregate [codegen id : 27]
-Input [5]: [channel#37, id#38, sales#39, returns#40, profit#41]
-Keys [2]: [channel#37, id#38]
-Functions [3]: [partial_sum(sales#39), partial_sum(returns#40), 
partial_sum(profit#41)]
-Aggregate Attributes [6]: [sum#129, isEmpty#130, sum#131, isEmpty#132, 
sum#133, isEmpty#134]
-Results [8]: [channel#37, id#38, sum#135, isEmpty#136, sum#137, isEmpty#138, 
sum#139, isEmpty#140]
-
-(96) Exchange
-Input [8]: [channel#37, id#38, sum#135, isEmpty#136, sum#137, isEmpty#138, 
sum#139, isEmpty#140]
-Arguments: hashpartitioning(channel#37, id#38, 5), ENSURE_REQUIREMENTS, 
[plan_id=9]
-
-(97) HashAggregate [codegen id : 28]
+(70) HashAggregate [codegen id : 28]
 Input [8]: [channel#37, id#38, sum#135, isEmpty#136, sum#137, isEmpty#138, 
sum#139, isEmpty#140]
 Keys [2]: [channel#37, id#38]
 Functions [3]: [sum(sales#39), sum(returns#40), sum(profit#41)]
 Aggregate Attributes [3]: [sum(sales#39)#141, sum(returns#40)#142, 
sum(profit#41)#143]
-Results [4]: [channel#37, sum(sales#39)#141 AS sales#164, sum(returns#40)#142 
AS returns#165, sum(profit#41)#143 AS profit#166]
+Results [4]: [channel#37, sum(sales#39)#141 AS sales#147, sum(returns#40)#142 
AS returns#148, sum(profit#41)#143 AS profit#149]
 
-(98) HashAggregate [codegen id : 28]
-Input [4]: [channel#37, sales#164, returns#165, profit#166]
+(71) HashAggregate [codegen id : 28]
+Input [4]: [channel#37, sales#147, returns#148, profit#149]
 Keys [1]: [channel#37]
-Functions [3]: [partial_sum(sales#164), partial_sum(returns#165), 
partial_sum(profit#166)]
-Aggregate Attributes [6]: [sum#167, isEmpty#168, sum#169, isEmpty#170, 
sum#171, isEmpty#172]
-Results [7]: [channel#37, sum#173, isEmpty#174, sum#175, isEmpty#176, sum#177, 
isEmpty#178]
+Functions [3]: [partial_sum(sales#147), partial_sum(returns#148), 
partial_sum(profit#149)]
+Aggregate Attributes [6]: [sum#150, isEmpty#151, sum#152, isEmpty#153, 
sum#154, isEmpty#155]
+Results [7]: [channel#37, sum#156, isEmpty#157, sum#158, isEmpty#159, sum#160, 
isEmpty#161]
 
-(99) Exchange
-Input [7]: [channel#37, sum#173, isEmpty#174, sum#175, isEmpty#176, sum#177, 
isEmpty#178]
-Arguments: hashpartitioning(channel#37, 5), ENSURE_REQUIREMENTS, [plan_id=10]
+(72) Exchange
+Input [7]: [channel#37, sum#156, isEmpty#157, sum#158, isEmpty#159, sum#160, 
isEmpty#161]
+Arguments: hashpartitioning(channel#37, 5), ENSURE_REQUIREMENTS, [plan_id=8]
 
-(100) HashAggregate [codegen id : 29]
-Input [7]: [channel#37, sum#173, isEmpty#174, sum#175, isEmpty#176, sum#177, 
isEmpty#178]
+(73) HashAggregate [codegen id : 29]
+Input [7]: [channel#37, sum#156, isEmpty#157, sum#158, isEmpty#159, sum#160, 
isEmpty#161]
 Keys [1]: [channel#37]
-Functions [3]: [sum(sales#164), sum(returns#165), sum(profit#166)]
-Aggregate Attributes [3]: [sum(sales#164)#179, sum(returns#165)#180, 
sum(profit#166)#181]
-Results [5]: [channel#37, null AS id#182, sum(sales#164)#179 AS 
sum(sales)#183, sum(returns#165)#180 AS sum(returns)#184, sum(profit#166)#181 
AS sum(profit)#185]
-
-(101) ReusedExchange [Reuses operator id: 19]
-Output [5]: [s_store_id#24, sum#186, sum#187, sum#188, sum#189]
-
-(102) HashAggregate [codegen id : 33]
-Input [5]: [s_store_id#24, sum#186, sum#187, sum#188, sum#189]
-Keys [1]: [s_store_id#24]
-Functions [4]: [sum(UnscaledValue(sales_price#8)), 
sum(UnscaledValue(return_amt#10)), sum(UnscaledValue(profit#9)), 
sum(UnscaledValue(net_loss#11))]
-Aggregate Attributes [4]: [sum(UnscaledValue(sales_price#8))#33, 
sum(UnscaledValue(return_amt#10))#34, sum(UnscaledValue(profit#9))#35, 
sum(UnscaledValue(net_loss#11))#36]
-Results [5]: [store channel AS channel#37, concat(store, s_store_id#24) AS 
id#38, MakeDecimal(sum(UnscaledValue(sales_price#8))#33,17,2) AS sales#39, 
MakeDecimal(sum(UnscaledValue(return_amt#10))#34,17,2) AS returns#40, 
(MakeDecimal(sum(UnscaledValue(profit#9))#35,17,2) - 
MakeDecimal(sum(UnscaledValue(net_loss#11))#36,17,2)) AS profit#41]
-
-(103) ReusedExchange [Reuses operator id: 39]
-Output [5]: [cp_catalog_page_id#65, sum#190, sum#191, sum#192, sum#193]
-
-(104) HashAggregate [codegen id : 37]
-Input [5]: [cp_catalog_page_id#65, sum#190, sum#191, sum#192, sum#193]
-Keys [1]: [cp_catalog_page_id#65]
-Functions [4]: [sum(UnscaledValue(sales_price#49)), 
sum(UnscaledValue(return_amt#51)), sum(UnscaledValue(profit#50)), 
sum(UnscaledValue(net_loss#52))]
-Aggregate Attributes [4]: [sum(UnscaledValue(sales_price#49))#74, 
sum(UnscaledValue(return_amt#51))#75, sum(UnscaledValue(profit#50))#76, 
sum(UnscaledValue(net_loss#52))#77]
-Results [5]: [catalog channel AS channel#78, concat(catalog_page, 
cp_catalog_page_id#65) AS id#79, 
MakeDecimal(sum(UnscaledValue(sales_price#49))#74,17,2) AS sales#80, 
MakeDecimal(sum(UnscaledValue(return_amt#51))#75,17,2) AS returns#81, 
(MakeDecimal(sum(UnscaledValue(profit#50))#76,17,2) - 
MakeDecimal(sum(UnscaledValue(net_loss#52))#77,17,2)) AS profit#82]
-
-(105) Scan parquet spark_catalog.default.web_sales
-Output [4]: [ws_web_site_sk#83, ws_ext_sales_price#84, ws_net_profit#85, 
ws_sold_date_sk#86]
-Batched: true
-Location: InMemoryFileIndex []
-PartitionFilters: [isnotnull(ws_sold_date_sk#86), 
dynamicpruningexpression(ws_sold_date_sk#86 IN dynamicpruning#194)]
-PushedFilters: [IsNotNull(ws_web_site_sk)]
-ReadSchema: 
struct<ws_web_site_sk:int,ws_ext_sales_price:decimal(7,2),ws_net_profit:decimal(7,2)>
-
-(106) CometFilter
-Input [4]: [ws_web_site_sk#83, ws_ext_sales_price#84, ws_net_profit#85, 
ws_sold_date_sk#86]
-Condition : isnotnull(ws_web_site_sk#83)
-
-(107) CometProject
-Input [4]: [ws_web_site_sk#83, ws_ext_sales_price#84, ws_net_profit#85, 
ws_sold_date_sk#86]
-Arguments: [wsr_web_site_sk#88, date_sk#89, sales_price#90, profit#91, 
return_amt#92, net_loss#93], [ws_web_site_sk#83 AS wsr_web_site_sk#88, 
ws_sold_date_sk#86 AS date_sk#89, ws_ext_sales_price#84 AS sales_price#90, 
ws_net_profit#85 AS profit#91, 0.00 AS return_amt#92, 0.00 AS net_loss#93]
-
-(108) Scan parquet spark_catalog.default.web_returns
-Output [5]: [wr_item_sk#94, wr_order_number#95, wr_return_amt#96, 
wr_net_loss#97, wr_returned_date_sk#98]
-Batched: true
-Location: InMemoryFileIndex []
-PartitionFilters: [isnotnull(wr_returned_date_sk#98), 
dynamicpruningexpression(wr_returned_date_sk#98 IN dynamicpruning#194)]
-ReadSchema: 
struct<wr_item_sk:int,wr_order_number:int,wr_return_amt:decimal(7,2),wr_net_loss:decimal(7,2)>
-
-(109) CometBroadcastExchange
-Input [5]: [wr_item_sk#94, wr_order_number#95, wr_return_amt#96, 
wr_net_loss#97, wr_returned_date_sk#98]
-Arguments: [wr_item_sk#94, wr_order_number#95, wr_return_amt#96, 
wr_net_loss#97, wr_returned_date_sk#98]
-
-(110) Scan parquet spark_catalog.default.web_sales
-Output [4]: [ws_item_sk#99, ws_web_site_sk#100, ws_order_number#101, 
ws_sold_date_sk#102]
-Batched: true
-Location [not included in comparison]/{warehouse_dir}/web_sales]
-PushedFilters: [IsNotNull(ws_item_sk), IsNotNull(ws_order_number), 
IsNotNull(ws_web_site_sk)]
-ReadSchema: struct<ws_item_sk:int,ws_web_site_sk:int,ws_order_number:int>
-
-(111) CometFilter
-Input [4]: [ws_item_sk#99, ws_web_site_sk#100, ws_order_number#101, 
ws_sold_date_sk#102]
-Condition : ((isnotnull(ws_item_sk#99) AND isnotnull(ws_order_number#101)) AND 
isnotnull(ws_web_site_sk#100))
-
-(112) CometProject
-Input [4]: [ws_item_sk#99, ws_web_site_sk#100, ws_order_number#101, 
ws_sold_date_sk#102]
-Arguments: [ws_item_sk#99, ws_web_site_sk#100, ws_order_number#101], 
[ws_item_sk#99, ws_web_site_sk#100, ws_order_number#101]
-
-(113) CometBroadcastHashJoin
-Left output [5]: [wr_item_sk#94, wr_order_number#95, wr_return_amt#96, 
wr_net_loss#97, wr_returned_date_sk#98]
-Right output [3]: [ws_item_sk#99, ws_web_site_sk#100, ws_order_number#101]
-Arguments: [wr_item_sk#94, wr_order_number#95], [ws_item_sk#99, 
ws_order_number#101], Inner
-
-(114) CometProject
-Input [8]: [wr_item_sk#94, wr_order_number#95, wr_return_amt#96, 
wr_net_loss#97, wr_returned_date_sk#98, ws_item_sk#99, ws_web_site_sk#100, 
ws_order_number#101]
-Arguments: [wsr_web_site_sk#103, date_sk#104, sales_price#105, profit#106, 
return_amt#107, net_loss#108], [ws_web_site_sk#100 AS wsr_web_site_sk#103, 
wr_returned_date_sk#98 AS date_sk#104, 0.00 AS sales_price#105, 0.00 AS 
profit#106, wr_return_amt#96 AS return_amt#107, wr_net_loss#97 AS net_loss#108]
-
-(115) CometUnion
-Child 0 Input [6]: [wsr_web_site_sk#88, date_sk#89, sales_price#90, profit#91, 
return_amt#92, net_loss#93]
-Child 1 Input [6]: [wsr_web_site_sk#103, date_sk#104, sales_price#105, 
profit#106, return_amt#107, net_loss#108]
-
-(116) ColumnarToRow [codegen id : 40]
-Input [6]: [wsr_web_site_sk#88, date_sk#89, sales_price#90, profit#91, 
return_amt#92, net_loss#93]
-
-(117) ReusedExchange [Reuses operator id: 142]
-Output [1]: [d_date_sk#109]
-
-(118) BroadcastHashJoin [codegen id : 40]
-Left keys [1]: [date_sk#89]
-Right keys [1]: [d_date_sk#109]
-Join type: Inner
-Join condition: None
-
-(119) Project [codegen id : 40]
-Output [5]: [wsr_web_site_sk#88, sales_price#90, profit#91, return_amt#92, 
net_loss#93]
-Input [7]: [wsr_web_site_sk#88, date_sk#89, sales_price#90, profit#91, 
return_amt#92, net_loss#93, d_date_sk#109]
-
-(120) ReusedExchange [Reuses operator id: 59]
-Output [2]: [web_site_sk#110, web_site_id#111]
-
-(121) BroadcastHashJoin [codegen id : 40]
-Left keys [1]: [wsr_web_site_sk#88]
-Right keys [1]: [web_site_sk#110]
-Join type: Inner
-Join condition: None
-
-(122) Project [codegen id : 40]
-Output [5]: [sales_price#90, profit#91, return_amt#92, net_loss#93, 
web_site_id#111]
-Input [7]: [wsr_web_site_sk#88, sales_price#90, profit#91, return_amt#92, 
net_loss#93, web_site_sk#110, web_site_id#111]
-
-(123) HashAggregate [codegen id : 40]
-Input [5]: [sales_price#90, profit#91, return_amt#92, net_loss#93, 
web_site_id#111]
-Keys [1]: [web_site_id#111]
-Functions [4]: [partial_sum(UnscaledValue(sales_price#90)), 
partial_sum(UnscaledValue(return_amt#92)), 
partial_sum(UnscaledValue(profit#91)), partial_sum(UnscaledValue(net_loss#93))]
-Aggregate Attributes [4]: [sum#195, sum#196, sum#197, sum#198]
-Results [5]: [web_site_id#111, sum#199, sum#200, sum#201, sum#202]
-
-(124) Exchange
-Input [5]: [web_site_id#111, sum#199, sum#200, sum#201, sum#202]
-Arguments: hashpartitioning(web_site_id#111, 5), ENSURE_REQUIREMENTS, 
[plan_id=11]
-
-(125) HashAggregate [codegen id : 41]
-Input [5]: [web_site_id#111, sum#199, sum#200, sum#201, sum#202]
-Keys [1]: [web_site_id#111]
-Functions [4]: [sum(UnscaledValue(sales_price#90)), 
sum(UnscaledValue(return_amt#92)), sum(UnscaledValue(profit#91)), 
sum(UnscaledValue(net_loss#93))]
-Aggregate Attributes [4]: [sum(UnscaledValue(sales_price#90))#120, 
sum(UnscaledValue(return_amt#92))#121, sum(UnscaledValue(profit#91))#122, 
sum(UnscaledValue(net_loss#93))#123]
-Results [5]: [web channel AS channel#124, concat(web_site, web_site_id#111) AS 
id#125, MakeDecimal(sum(UnscaledValue(sales_price#90))#120,17,2) AS sales#126, 
MakeDecimal(sum(UnscaledValue(return_amt#92))#121,17,2) AS returns#127, 
(MakeDecimal(sum(UnscaledValue(profit#91))#122,17,2) - 
MakeDecimal(sum(UnscaledValue(net_loss#93))#123,17,2)) AS profit#128]
+Functions [3]: [sum(sales#147), sum(returns#148), sum(profit#149)]
+Aggregate Attributes [3]: [sum(sales#147)#162, sum(returns#148)#163, 
sum(profit#149)#164]
+Results [5]: [channel#37, null AS id#165, sum(sales#147)#162 AS 
sum(sales)#166, sum(returns#148)#163 AS sum(returns)#167, sum(profit#149)#164 
AS sum(profit)#168]
 
-(126) Union
+(74) ReusedExchange [Reuses operator id: 67]
+Output [8]: [channel#37, id#38, sum#135, isEmpty#136, sum#137, isEmpty#138, 
sum#139, isEmpty#140]
 
-(127) HashAggregate [codegen id : 42]
-Input [5]: [channel#37, id#38, sales#39, returns#40, profit#41]
-Keys [2]: [channel#37, id#38]
-Functions [3]: [partial_sum(sales#39), partial_sum(returns#40), 
partial_sum(profit#41)]
-Aggregate Attributes [6]: [sum#129, isEmpty#130, sum#131, isEmpty#132, 
sum#133, isEmpty#134]
-Results [8]: [channel#37, id#38, sum#135, isEmpty#136, sum#137, isEmpty#138, 
sum#139, isEmpty#140]
-
-(128) Exchange
-Input [8]: [channel#37, id#38, sum#135, isEmpty#136, sum#137, isEmpty#138, 
sum#139, isEmpty#140]
-Arguments: hashpartitioning(channel#37, id#38, 5), ENSURE_REQUIREMENTS, 
[plan_id=12]
-
-(129) HashAggregate [codegen id : 43]
+(75) HashAggregate [codegen id : 43]
 Input [8]: [channel#37, id#38, sum#135, isEmpty#136, sum#137, isEmpty#138, 
sum#139, isEmpty#140]
 Keys [2]: [channel#37, id#38]
 Functions [3]: [sum(sales#39), sum(returns#40), sum(profit#41)]
 Aggregate Attributes [3]: [sum(sales#39)#141, sum(returns#40)#142, 
sum(profit#41)#143]
-Results [3]: [sum(sales#39)#141 AS sales#164, sum(returns#40)#142 AS 
returns#165, sum(profit#41)#143 AS profit#166]
+Results [3]: [sum(sales#39)#141 AS sales#147, sum(returns#40)#142 AS 
returns#148, sum(profit#41)#143 AS profit#149]
 
-(130) HashAggregate [codegen id : 43]
-Input [3]: [sales#164, returns#165, profit#166]
+(76) HashAggregate [codegen id : 43]
+Input [3]: [sales#147, returns#148, profit#149]
 Keys: []
-Functions [3]: [partial_sum(sales#164), partial_sum(returns#165), 
partial_sum(profit#166)]
-Aggregate Attributes [6]: [sum#203, isEmpty#204, sum#205, isEmpty#206, 
sum#207, isEmpty#208]
-Results [6]: [sum#209, isEmpty#210, sum#211, isEmpty#212, sum#213, isEmpty#214]
+Functions [3]: [partial_sum(sales#147), partial_sum(returns#148), 
partial_sum(profit#149)]
+Aggregate Attributes [6]: [sum#169, isEmpty#170, sum#171, isEmpty#172, 
sum#173, isEmpty#174]
+Results [6]: [sum#175, isEmpty#176, sum#177, isEmpty#178, sum#179, isEmpty#180]
 
-(131) Exchange
-Input [6]: [sum#209, isEmpty#210, sum#211, isEmpty#212, sum#213, isEmpty#214]
-Arguments: SinglePartition, ENSURE_REQUIREMENTS, [plan_id=13]
+(77) Exchange
+Input [6]: [sum#175, isEmpty#176, sum#177, isEmpty#178, sum#179, isEmpty#180]
+Arguments: SinglePartition, ENSURE_REQUIREMENTS, [plan_id=9]
 
-(132) HashAggregate [codegen id : 44]
-Input [6]: [sum#209, isEmpty#210, sum#211, isEmpty#212, sum#213, isEmpty#214]
+(78) HashAggregate [codegen id : 44]
+Input [6]: [sum#175, isEmpty#176, sum#177, isEmpty#178, sum#179, isEmpty#180]
 Keys: []
-Functions [3]: [sum(sales#164), sum(returns#165), sum(profit#166)]
-Aggregate Attributes [3]: [sum(sales#164)#215, sum(returns#165)#216, 
sum(profit#166)#217]
-Results [5]: [null AS channel#218, null AS id#219, sum(sales#164)#215 AS 
sum(sales)#220, sum(returns#165)#216 AS sum(returns)#221, sum(profit#166)#217 
AS sum(profit)#222]
+Functions [3]: [sum(sales#147), sum(returns#148), sum(profit#149)]
+Aggregate Attributes [3]: [sum(sales#147)#181, sum(returns#148)#182, 
sum(profit#149)#183]
+Results [5]: [null AS channel#184, null AS id#185, sum(sales#147)#181 AS 
sum(sales)#186, sum(returns#148)#182 AS sum(returns)#187, sum(profit#149)#183 
AS sum(profit)#188]
 
-(133) Union
+(79) Union
 
-(134) HashAggregate [codegen id : 45]
+(80) HashAggregate [codegen id : 45]
 Input [5]: [channel#37, id#38, sales#144, returns#145, profit#146]
 Keys [5]: [channel#37, id#38, sales#144, returns#145, profit#146]
 Functions: []
 Aggregate Attributes: []
 Results [5]: [channel#37, id#38, sales#144, returns#145, profit#146]
 
-(135) Exchange
+(81) Exchange
 Input [5]: [channel#37, id#38, sales#144, returns#145, profit#146]
-Arguments: hashpartitioning(channel#37, id#38, sales#144, returns#145, 
profit#146, 5), ENSURE_REQUIREMENTS, [plan_id=14]
+Arguments: hashpartitioning(channel#37, id#38, sales#144, returns#145, 
profit#146, 5), ENSURE_REQUIREMENTS, [plan_id=10]
 
-(136) HashAggregate [codegen id : 46]
+(82) HashAggregate [codegen id : 46]
 Input [5]: [channel#37, id#38, sales#144, returns#145, profit#146]
 Keys [5]: [channel#37, id#38, sales#144, returns#145, profit#146]
 Functions: []
 Aggregate Attributes: []
 Results [5]: [channel#37, id#38, sales#144, returns#145, profit#146]
 
-(137) TakeOrderedAndProject
+(83) TakeOrderedAndProject
 Input [5]: [channel#37, id#38, sales#144, returns#145, profit#146]
 Arguments: 100, [channel#37 ASC NULLS FIRST, id#38 ASC NULLS FIRST], 
[channel#37, id#38, sales#144, returns#145, profit#146]
 
 ===== Subqueries =====
 
 Subquery:1 Hosting operator id = 1 Hosting Expression = ss_sold_date_sk#4 IN 
dynamicpruning#5
-BroadcastExchange (142)
-+- * ColumnarToRow (141)
-   +- CometProject (140)
-      +- CometFilter (139)
-         +- CometScan parquet spark_catalog.default.date_dim (138)
+BroadcastExchange (88)
++- * ColumnarToRow (87)
+   +- CometProject (86)
+      +- CometFilter (85)
+         +- CometScan parquet spark_catalog.default.date_dim (84)
 
 
-(138) Scan parquet spark_catalog.default.date_dim
-Output [2]: [d_date_sk#22, d_date#223]
+(84) Scan parquet spark_catalog.default.date_dim
+Output [2]: [d_date_sk#22, d_date#189]
 Batched: true
 Location [not included in comparison]/{warehouse_dir}/date_dim]
 PushedFilters: [IsNotNull(d_date), GreaterThanOrEqual(d_date,1998-08-04), 
LessThanOrEqual(d_date,1998-08-18), IsNotNull(d_date_sk)]
 ReadSchema: struct<d_date_sk:int,d_date:date>
 
-(139) CometFilter
-Input [2]: [d_date_sk#22, d_date#223]
-Condition : (((isnotnull(d_date#223) AND (d_date#223 >= 1998-08-04)) AND 
(d_date#223 <= 1998-08-18)) AND isnotnull(d_date_sk#22))
+(85) CometFilter
+Input [2]: [d_date_sk#22, d_date#189]
+Condition : (((isnotnull(d_date#189) AND (d_date#189 >= 1998-08-04)) AND 
(d_date#189 <= 1998-08-18)) AND isnotnull(d_date_sk#22))
 
-(140) CometProject
-Input [2]: [d_date_sk#22, d_date#223]
+(86) CometProject
+Input [2]: [d_date_sk#22, d_date#189]
 Arguments: [d_date_sk#22], [d_date_sk#22]
 
-(141) ColumnarToRow [codegen id : 1]
+(87) ColumnarToRow [codegen id : 1]
 Input [1]: [d_date_sk#22]
 
-(142) BroadcastExchange
+(88) BroadcastExchange
 Input [1]: [d_date_sk#22]
-Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as 
bigint)),false), [plan_id=15]
+Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as 
bigint)),false), [plan_id=11]
 
 Subquery:2 Hosting operator id = 4 Hosting Expression = sr_returned_date_sk#15 
IN dynamicpruning#5
 
@@ -857,12 +539,4 @@ Subquery:5 Hosting operator id = 41 Hosting Expression = 
ws_sold_date_sk#86 IN d
 
 Subquery:6 Hosting operator id = 44 Hosting Expression = 
wr_returned_date_sk#98 IN dynamicpruning#5
 
-Subquery:7 Hosting operator id = 73 Hosting Expression = ws_sold_date_sk#86 IN 
dynamicpruning#5
-
-Subquery:8 Hosting operator id = 76 Hosting Expression = 
wr_returned_date_sk#98 IN dynamicpruning#5
-
-Subquery:9 Hosting operator id = 105 Hosting Expression = ws_sold_date_sk#86 
IN dynamicpruning#5
-
-Subquery:10 Hosting operator id = 108 Hosting Expression = 
wr_returned_date_sk#98 IN dynamicpruning#5
-
 
diff --git 
a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q5a/simplified.txt
 
b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q5a/simplified.txt
index ff62cb5c..aaec304f 100644
--- 
a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q5a/simplified.txt
+++ 
b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q5a/simplified.txt
@@ -124,95 +124,13 @@ TakeOrderedAndProject [channel,id,sales,returns,profit]
                             HashAggregate [channel,sales,returns,profit] 
[sum,isEmpty,sum,isEmpty,sum,isEmpty,sum,isEmpty,sum,isEmpty,sum,isEmpty]
                               HashAggregate 
[channel,id,sum,isEmpty,sum,isEmpty,sum,isEmpty] 
[sum(sales),sum(returns),sum(profit),sales,returns,profit,sum,isEmpty,sum,isEmpty,sum,isEmpty]
                                 InputAdapter
-                                  Exchange [channel,id] #12
-                                    WholeStageCodegen (27)
-                                      HashAggregate 
[channel,id,sales,returns,profit] 
[sum,isEmpty,sum,isEmpty,sum,isEmpty,sum,isEmpty,sum,isEmpty,sum,isEmpty]
-                                        InputAdapter
-                                          Union
-                                            WholeStageCodegen (18)
-                                              HashAggregate 
[s_store_id,sum,sum,sum,sum] 
[sum(UnscaledValue(sales_price)),sum(UnscaledValue(return_amt)),sum(UnscaledValue(profit)),sum(UnscaledValue(net_loss)),channel,id,sales,returns,profit,sum,sum,sum,sum]
-                                                InputAdapter
-                                                  ReusedExchange 
[s_store_id,sum,sum,sum,sum] #3
-                                            WholeStageCodegen (22)
-                                              HashAggregate 
[cp_catalog_page_id,sum,sum,sum,sum] 
[sum(UnscaledValue(sales_price)),sum(UnscaledValue(return_amt)),sum(UnscaledValue(profit)),sum(UnscaledValue(net_loss)),channel,id,sales,returns,profit,sum,sum,sum,sum]
-                                                InputAdapter
-                                                  ReusedExchange 
[cp_catalog_page_id,sum,sum,sum,sum] #6
-                                            WholeStageCodegen (26)
-                                              HashAggregate 
[web_site_id,sum,sum,sum,sum] 
[sum(UnscaledValue(sales_price)),sum(UnscaledValue(return_amt)),sum(UnscaledValue(profit)),sum(UnscaledValue(net_loss)),channel,id,sales,returns,profit,sum,sum,sum,sum]
-                                                InputAdapter
-                                                  Exchange [web_site_id] #13
-                                                    WholeStageCodegen (25)
-                                                      HashAggregate 
[web_site_id,sales_price,return_amt,profit,net_loss] 
[sum,sum,sum,sum,sum,sum,sum,sum]
-                                                        Project 
[sales_price,profit,return_amt,net_loss,web_site_id]
-                                                          BroadcastHashJoin 
[wsr_web_site_sk,web_site_sk]
-                                                            Project 
[wsr_web_site_sk,sales_price,profit,return_amt,net_loss]
-                                                              
BroadcastHashJoin [date_sk,d_date_sk]
-                                                                ColumnarToRow
-                                                                  InputAdapter
-                                                                    CometUnion
-                                                                      
CometProject [ws_web_site_sk,ws_sold_date_sk,ws_ext_sales_price,ws_net_profit] 
[wsr_web_site_sk,date_sk,sales_price,profit,return_amt,net_loss]
-                                                                        
CometFilter [ws_web_site_sk]
-                                                                          
CometScan parquet spark_catalog.default.web_sales 
[ws_web_site_sk,ws_ext_sales_price,ws_net_profit,ws_sold_date_sk]
-                                                                            
ReusedSubquery [d_date_sk] #1
-                                                                      
CometProject [ws_web_site_sk,wr_returned_date_sk,wr_return_amt,wr_net_loss] 
[wsr_web_site_sk,date_sk,sales_price,profit,return_amt,net_loss]
-                                                                        
CometBroadcastHashJoin [wr_item_sk,wr_order_number,ws_item_sk,ws_order_number]
-                                                                          
CometBroadcastExchange #14
-                                                                            
CometScan parquet spark_catalog.default.web_returns 
[wr_item_sk,wr_order_number,wr_return_amt,wr_net_loss,wr_returned_date_sk]
-                                                                              
ReusedSubquery [d_date_sk] #1
-                                                                          
CometProject [ws_item_sk,ws_web_site_sk,ws_order_number]
-                                                                            
CometFilter [ws_item_sk,ws_order_number,ws_web_site_sk]
-                                                                              
CometScan parquet spark_catalog.default.web_sales 
[ws_item_sk,ws_web_site_sk,ws_order_number,ws_sold_date_sk]
-                                                                InputAdapter
-                                                                  
ReusedExchange [d_date_sk] #4
-                                                            InputAdapter
-                                                              ReusedExchange 
[web_site_sk,web_site_id] #10
+                                  ReusedExchange 
[channel,id,sum,isEmpty,sum,isEmpty,sum,isEmpty] #2
                   WholeStageCodegen (44)
                     HashAggregate [sum,isEmpty,sum,isEmpty,sum,isEmpty] 
[sum(sales),sum(returns),sum(profit),channel,id,sum(sales),sum(returns),sum(profit),sum,isEmpty,sum,isEmpty,sum,isEmpty]
                       InputAdapter
-                        Exchange #15
+                        Exchange #12
                           WholeStageCodegen (43)
                             HashAggregate [sales,returns,profit] 
[sum,isEmpty,sum,isEmpty,sum,isEmpty,sum,isEmpty,sum,isEmpty,sum,isEmpty]
                               HashAggregate 
[channel,id,sum,isEmpty,sum,isEmpty,sum,isEmpty] 
[sum(sales),sum(returns),sum(profit),sales,returns,profit,sum,isEmpty,sum,isEmpty,sum,isEmpty]
                                 InputAdapter
-                                  Exchange [channel,id] #16
-                                    WholeStageCodegen (42)
-                                      HashAggregate 
[channel,id,sales,returns,profit] 
[sum,isEmpty,sum,isEmpty,sum,isEmpty,sum,isEmpty,sum,isEmpty,sum,isEmpty]
-                                        InputAdapter
-                                          Union
-                                            WholeStageCodegen (33)
-                                              HashAggregate 
[s_store_id,sum,sum,sum,sum] 
[sum(UnscaledValue(sales_price)),sum(UnscaledValue(return_amt)),sum(UnscaledValue(profit)),sum(UnscaledValue(net_loss)),channel,id,sales,returns,profit,sum,sum,sum,sum]
-                                                InputAdapter
-                                                  ReusedExchange 
[s_store_id,sum,sum,sum,sum] #3
-                                            WholeStageCodegen (37)
-                                              HashAggregate 
[cp_catalog_page_id,sum,sum,sum,sum] 
[sum(UnscaledValue(sales_price)),sum(UnscaledValue(return_amt)),sum(UnscaledValue(profit)),sum(UnscaledValue(net_loss)),channel,id,sales,returns,profit,sum,sum,sum,sum]
-                                                InputAdapter
-                                                  ReusedExchange 
[cp_catalog_page_id,sum,sum,sum,sum] #6
-                                            WholeStageCodegen (41)
-                                              HashAggregate 
[web_site_id,sum,sum,sum,sum] 
[sum(UnscaledValue(sales_price)),sum(UnscaledValue(return_amt)),sum(UnscaledValue(profit)),sum(UnscaledValue(net_loss)),channel,id,sales,returns,profit,sum,sum,sum,sum]
-                                                InputAdapter
-                                                  Exchange [web_site_id] #17
-                                                    WholeStageCodegen (40)
-                                                      HashAggregate 
[web_site_id,sales_price,return_amt,profit,net_loss] 
[sum,sum,sum,sum,sum,sum,sum,sum]
-                                                        Project 
[sales_price,profit,return_amt,net_loss,web_site_id]
-                                                          BroadcastHashJoin 
[wsr_web_site_sk,web_site_sk]
-                                                            Project 
[wsr_web_site_sk,sales_price,profit,return_amt,net_loss]
-                                                              
BroadcastHashJoin [date_sk,d_date_sk]
-                                                                ColumnarToRow
-                                                                  InputAdapter
-                                                                    CometUnion
-                                                                      
CometProject [ws_web_site_sk,ws_sold_date_sk,ws_ext_sales_price,ws_net_profit] 
[wsr_web_site_sk,date_sk,sales_price,profit,return_amt,net_loss]
-                                                                        
CometFilter [ws_web_site_sk]
-                                                                          
CometScan parquet spark_catalog.default.web_sales 
[ws_web_site_sk,ws_ext_sales_price,ws_net_profit,ws_sold_date_sk]
-                                                                            
ReusedSubquery [d_date_sk] #1
-                                                                      
CometProject [ws_web_site_sk,wr_returned_date_sk,wr_return_amt,wr_net_loss] 
[wsr_web_site_sk,date_sk,sales_price,profit,return_amt,net_loss]
-                                                                        
CometBroadcastHashJoin [wr_item_sk,wr_order_number,ws_item_sk,ws_order_number]
-                                                                          
CometBroadcastExchange #18
-                                                                            
CometScan parquet spark_catalog.default.web_returns 
[wr_item_sk,wr_order_number,wr_return_amt,wr_net_loss,wr_returned_date_sk]
-                                                                              
ReusedSubquery [d_date_sk] #1
-                                                                          
CometProject [ws_item_sk,ws_web_site_sk,ws_order_number]
-                                                                            
CometFilter [ws_item_sk,ws_order_number,ws_web_site_sk]
-                                                                              
CometScan parquet spark_catalog.default.web_sales 
[ws_item_sk,ws_web_site_sk,ws_order_number,ws_sold_date_sk]
-                                                                InputAdapter
-                                                                  
ReusedExchange [d_date_sk] #4
-                                                            InputAdapter
-                                                              ReusedExchange 
[web_site_sk,web_site_id] #10
+                                  ReusedExchange 
[channel,id,sum,isEmpty,sum,isEmpty,sum,isEmpty] #2
diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala 
b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
index 2e144428..c5fef022 100644
--- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
@@ -63,6 +63,36 @@ class CometExecSuite extends CometTestBase {
     }
   }
 
+  test("fix CometNativeExec.doCanonicalize for ReusedExchangeExec") {
+    assume(isSpark34Plus, "ChunkedByteBuffer is not serializable before Spark 
3.4+")
+    withSQLConf(
+      CometConf.COMET_EXEC_BROADCAST_FORCE_ENABLED.key -> "true",
+      SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
+      withTable("td") {
+        testData
+          .withColumn("bucket", $"key" % 3)
+          .write
+          .mode(SaveMode.Overwrite)
+          .bucketBy(2, "bucket")
+          .format("parquet")
+          .saveAsTable("td")
+        val df = sql("""
+            |SELECT t1.key, t2.key, t3.key
+            |FROM td AS t1
+            |JOIN td AS t2 ON t2.key = t1.key
+            |JOIN td AS t3 ON t3.key = t2.key
+            |WHERE t1.bucket = 1 AND t2.bucket = 1 AND t3.bucket = 1
+            |""".stripMargin)
+        val reusedPlan = 
ReuseExchangeAndSubquery.apply(df.queryExecution.executedPlan)
+        val reusedExchanges = collect(reusedPlan) { case r: ReusedExchangeExec 
=>
+          r
+        }
+        assert(reusedExchanges.size == 1)
+        
assert(reusedExchanges.head.child.isInstanceOf[CometBroadcastExchangeExec])
+      }
+    }
+  }
+
   test("ReusedExchangeExec should work on CometBroadcastExchangeExec") {
     assume(isSpark34Plus, "ChunkedByteBuffer is not serializable before Spark 
3.4+")
     withSQLConf(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to