This is an automated email from the ASF dual-hosted git repository.
viirya pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new b4c2dc23 fix: fix CometNativeExec.doCanonicalize for
ReusedExchangeExec (#447)
b4c2dc23 is described below
commit b4c2dc2367296e286d08ecbbf35a00aec9821fbd
Author: Liang-Chi Hsieh <[email protected]>
AuthorDate: Sat May 18 22:33:40 2024 -0700
fix: fix CometNativeExec.doCanonicalize for ReusedExchangeExec (#447)
---
.../org/apache/spark/sql/comet/operators.scala | 20 +-
.../approved-plans-v2_7/q5a/explain.txt | 472 ++++-----------------
.../approved-plans-v2_7/q5a/simplified.txt | 88 +---
.../org/apache/comet/exec/CometExecSuite.scala | 30 ++
4 files changed, 125 insertions(+), 485 deletions(-)
diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
index 63587af3..bb17442a 100644
--- a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
+++ b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
@@ -363,7 +363,11 @@ abstract class CometNativeExec extends CometExec {
}
override protected def doCanonicalize(): SparkPlan = {
- val canonicalizedPlan =
super.doCanonicalize().asInstanceOf[CometNativeExec]
+ val canonicalizedPlan = super
+ .doCanonicalize()
+ .asInstanceOf[CometNativeExec]
+ .canonicalizePlans()
+
if (serializedPlanOpt.isDefined) {
// If the plan is a boundary node, we should remove the serialized plan.
canonicalizedPlan.cleanBlock()
@@ -371,6 +375,20 @@ abstract class CometNativeExec extends CometExec {
canonicalizedPlan
}
}
+
+ /**
+ * Canonicalizes the plans of Product parameters in Comet native operators.
+ */
+ protected def canonicalizePlans(): CometNativeExec = {
+ def transform(arg: Any): AnyRef = arg match {
+ case sparkPlan: SparkPlan => sparkPlan.canonicalized
+ case other: AnyRef => other
+ case null => null
+ }
+
+ val newArgs = mapProductIterator(transform)
+ makeCopy(newArgs).asInstanceOf[CometNativeExec]
+ }
}
abstract class CometUnaryExec extends CometNativeExec with UnaryExecNode
diff --git
a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q5a/explain.txt
b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q5a/explain.txt
index 2345d02e..2b45472f 100644
---
a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q5a/explain.txt
+++
b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q5a/explain.txt
@@ -1,9 +1,9 @@
== Physical Plan ==
-TakeOrderedAndProject (137)
-+- * HashAggregate (136)
- +- Exchange (135)
- +- * HashAggregate (134)
- +- Union (133)
+TakeOrderedAndProject (83)
++- * HashAggregate (82)
+ +- Exchange (81)
+ +- * HashAggregate (80)
+ +- Union (79)
:- * HashAggregate (68)
: +- Exchange (67)
: +- * HashAggregate (66)
@@ -72,70 +72,16 @@ TakeOrderedAndProject (137)
: +- * ColumnarToRow (58)
: +- CometFilter (57)
: +- CometScan parquet
spark_catalog.default.web_site (56)
- :- * HashAggregate (100)
- : +- Exchange (99)
- : +- * HashAggregate (98)
- : +- * HashAggregate (97)
- : +- Exchange (96)
- : +- * HashAggregate (95)
- : +- Union (94)
- : :- * HashAggregate (70)
- : : +- ReusedExchange (69)
- : :- * HashAggregate (72)
- : : +- ReusedExchange (71)
- : +- * HashAggregate (93)
- : +- Exchange (92)
- : +- * HashAggregate (91)
- : +- * Project (90)
- : +- * BroadcastHashJoin Inner
BuildRight (89)
- : :- * Project (87)
- : : +- * BroadcastHashJoin
Inner BuildRight (86)
- : : :- * ColumnarToRow (84)
- : : : +- CometUnion (83)
- : : : :- CometProject
(75)
- : : : : +- CometFilter
(74)
- : : : : +- CometScan
parquet spark_catalog.default.web_sales (73)
- : : : +- CometProject
(82)
- : : : +-
CometBroadcastHashJoin (81)
- : : : :-
CometBroadcastExchange (77)
- : : : : +-
CometScan parquet spark_catalog.default.web_returns (76)
- : : : +-
CometProject (80)
- : : : +-
CometFilter (79)
- : : : +-
CometScan parquet spark_catalog.default.web_sales (78)
- : : +- ReusedExchange (85)
- : +- ReusedExchange (88)
- +- * HashAggregate (132)
- +- Exchange (131)
- +- * HashAggregate (130)
- +- * HashAggregate (129)
- +- Exchange (128)
- +- * HashAggregate (127)
- +- Union (126)
- :- * HashAggregate (102)
- : +- ReusedExchange (101)
- :- * HashAggregate (104)
- : +- ReusedExchange (103)
- +- * HashAggregate (125)
- +- Exchange (124)
- +- * HashAggregate (123)
- +- * Project (122)
- +- * BroadcastHashJoin Inner
BuildRight (121)
- :- * Project (119)
- : +- * BroadcastHashJoin
Inner BuildRight (118)
- : :- * ColumnarToRow (116)
- : : +- CometUnion (115)
- : : :- CometProject
(107)
- : : : +- CometFilter
(106)
- : : : +- CometScan
parquet spark_catalog.default.web_sales (105)
- : : +- CometProject
(114)
- : : +-
CometBroadcastHashJoin (113)
- : : :-
CometBroadcastExchange (109)
- : : : +-
CometScan parquet spark_catalog.default.web_returns (108)
- : : +-
CometProject (112)
- : : +-
CometFilter (111)
- : : +-
CometScan parquet spark_catalog.default.web_sales (110)
- : +- ReusedExchange (117)
- +- ReusedExchange (120)
+ :- * HashAggregate (73)
+ : +- Exchange (72)
+ : +- * HashAggregate (71)
+ : +- * HashAggregate (70)
+ : +- ReusedExchange (69)
+ +- * HashAggregate (78)
+ +- Exchange (77)
+ +- * HashAggregate (76)
+ +- * HashAggregate (75)
+ +- ReusedExchange (74)
(1) Scan parquet spark_catalog.default.store_sales
@@ -177,7 +123,7 @@ Child 1 Input [6]: [store_sk#16, date_sk#17,
sales_price#18, profit#19, return_a
(8) ColumnarToRow [codegen id : 3]
Input [6]: [store_sk#6, date_sk#7, sales_price#8, profit#9, return_amt#10,
net_loss#11]
-(9) ReusedExchange [Reuses operator id: 142]
+(9) ReusedExchange [Reuses operator id: 88]
Output [1]: [d_date_sk#22]
(10) BroadcastHashJoin [codegen id : 3]
@@ -275,7 +221,7 @@ Child 1 Input [6]: [page_sk#57, date_sk#58, sales_price#59,
profit#60, return_am
(28) ColumnarToRow [codegen id : 7]
Input [6]: [page_sk#47, date_sk#48, sales_price#49, profit#50, return_amt#51,
net_loss#52]
-(29) ReusedExchange [Reuses operator id: 142]
+(29) ReusedExchange [Reuses operator id: 88]
Output [1]: [d_date_sk#63]
(30) BroadcastHashJoin [codegen id : 7]
@@ -392,7 +338,7 @@ Child 1 Input [6]: [wsr_web_site_sk#103, date_sk#104,
sales_price#105, profit#10
(52) ColumnarToRow [codegen id : 11]
Input [6]: [wsr_web_site_sk#88, date_sk#89, sales_price#90, profit#91,
return_amt#92, net_loss#93]
-(53) ReusedExchange [Reuses operator id: 142]
+(53) ReusedExchange [Reuses operator id: 88]
Output [1]: [d_date_sk#109]
(54) BroadcastHashJoin [codegen id : 11]
@@ -471,381 +417,117 @@ Functions [3]: [sum(sales#39), sum(returns#40),
sum(profit#41)]
Aggregate Attributes [3]: [sum(sales#39)#141, sum(returns#40)#142,
sum(profit#41)#143]
Results [5]: [channel#37, id#38, cast(sum(sales#39)#141 as decimal(37,2)) AS
sales#144, cast(sum(returns#40)#142 as decimal(37,2)) AS returns#145,
cast(sum(profit#41)#143 as decimal(38,2)) AS profit#146]
-(69) ReusedExchange [Reuses operator id: 19]
-Output [5]: [s_store_id#24, sum#147, sum#148, sum#149, sum#150]
+(69) ReusedExchange [Reuses operator id: 67]
+Output [8]: [channel#37, id#38, sum#135, isEmpty#136, sum#137, isEmpty#138,
sum#139, isEmpty#140]
-(70) HashAggregate [codegen id : 18]
-Input [5]: [s_store_id#24, sum#147, sum#148, sum#149, sum#150]
-Keys [1]: [s_store_id#24]
-Functions [4]: [sum(UnscaledValue(sales_price#8)),
sum(UnscaledValue(return_amt#10)), sum(UnscaledValue(profit#9)),
sum(UnscaledValue(net_loss#11))]
-Aggregate Attributes [4]: [sum(UnscaledValue(sales_price#8))#33,
sum(UnscaledValue(return_amt#10))#34, sum(UnscaledValue(profit#9))#35,
sum(UnscaledValue(net_loss#11))#36]
-Results [5]: [store channel AS channel#37, concat(store, s_store_id#24) AS
id#38, MakeDecimal(sum(UnscaledValue(sales_price#8))#33,17,2) AS sales#39,
MakeDecimal(sum(UnscaledValue(return_amt#10))#34,17,2) AS returns#40,
(MakeDecimal(sum(UnscaledValue(profit#9))#35,17,2) -
MakeDecimal(sum(UnscaledValue(net_loss#11))#36,17,2)) AS profit#41]
-
-(71) ReusedExchange [Reuses operator id: 39]
-Output [5]: [cp_catalog_page_id#65, sum#151, sum#152, sum#153, sum#154]
-
-(72) HashAggregate [codegen id : 22]
-Input [5]: [cp_catalog_page_id#65, sum#151, sum#152, sum#153, sum#154]
-Keys [1]: [cp_catalog_page_id#65]
-Functions [4]: [sum(UnscaledValue(sales_price#49)),
sum(UnscaledValue(return_amt#51)), sum(UnscaledValue(profit#50)),
sum(UnscaledValue(net_loss#52))]
-Aggregate Attributes [4]: [sum(UnscaledValue(sales_price#49))#74,
sum(UnscaledValue(return_amt#51))#75, sum(UnscaledValue(profit#50))#76,
sum(UnscaledValue(net_loss#52))#77]
-Results [5]: [catalog channel AS channel#78, concat(catalog_page,
cp_catalog_page_id#65) AS id#79,
MakeDecimal(sum(UnscaledValue(sales_price#49))#74,17,2) AS sales#80,
MakeDecimal(sum(UnscaledValue(return_amt#51))#75,17,2) AS returns#81,
(MakeDecimal(sum(UnscaledValue(profit#50))#76,17,2) -
MakeDecimal(sum(UnscaledValue(net_loss#52))#77,17,2)) AS profit#82]
-
-(73) Scan parquet spark_catalog.default.web_sales
-Output [4]: [ws_web_site_sk#83, ws_ext_sales_price#84, ws_net_profit#85,
ws_sold_date_sk#86]
-Batched: true
-Location: InMemoryFileIndex []
-PartitionFilters: [isnotnull(ws_sold_date_sk#86),
dynamicpruningexpression(ws_sold_date_sk#86 IN dynamicpruning#155)]
-PushedFilters: [IsNotNull(ws_web_site_sk)]
-ReadSchema:
struct<ws_web_site_sk:int,ws_ext_sales_price:decimal(7,2),ws_net_profit:decimal(7,2)>
-
-(74) CometFilter
-Input [4]: [ws_web_site_sk#83, ws_ext_sales_price#84, ws_net_profit#85,
ws_sold_date_sk#86]
-Condition : isnotnull(ws_web_site_sk#83)
-
-(75) CometProject
-Input [4]: [ws_web_site_sk#83, ws_ext_sales_price#84, ws_net_profit#85,
ws_sold_date_sk#86]
-Arguments: [wsr_web_site_sk#88, date_sk#89, sales_price#90, profit#91,
return_amt#92, net_loss#93], [ws_web_site_sk#83 AS wsr_web_site_sk#88,
ws_sold_date_sk#86 AS date_sk#89, ws_ext_sales_price#84 AS sales_price#90,
ws_net_profit#85 AS profit#91, 0.00 AS return_amt#92, 0.00 AS net_loss#93]
-
-(76) Scan parquet spark_catalog.default.web_returns
-Output [5]: [wr_item_sk#94, wr_order_number#95, wr_return_amt#96,
wr_net_loss#97, wr_returned_date_sk#98]
-Batched: true
-Location: InMemoryFileIndex []
-PartitionFilters: [isnotnull(wr_returned_date_sk#98),
dynamicpruningexpression(wr_returned_date_sk#98 IN dynamicpruning#155)]
-ReadSchema:
struct<wr_item_sk:int,wr_order_number:int,wr_return_amt:decimal(7,2),wr_net_loss:decimal(7,2)>
-
-(77) CometBroadcastExchange
-Input [5]: [wr_item_sk#94, wr_order_number#95, wr_return_amt#96,
wr_net_loss#97, wr_returned_date_sk#98]
-Arguments: [wr_item_sk#94, wr_order_number#95, wr_return_amt#96,
wr_net_loss#97, wr_returned_date_sk#98]
-
-(78) Scan parquet spark_catalog.default.web_sales
-Output [4]: [ws_item_sk#99, ws_web_site_sk#100, ws_order_number#101,
ws_sold_date_sk#102]
-Batched: true
-Location [not included in comparison]/{warehouse_dir}/web_sales]
-PushedFilters: [IsNotNull(ws_item_sk), IsNotNull(ws_order_number),
IsNotNull(ws_web_site_sk)]
-ReadSchema: struct<ws_item_sk:int,ws_web_site_sk:int,ws_order_number:int>
-
-(79) CometFilter
-Input [4]: [ws_item_sk#99, ws_web_site_sk#100, ws_order_number#101,
ws_sold_date_sk#102]
-Condition : ((isnotnull(ws_item_sk#99) AND isnotnull(ws_order_number#101)) AND
isnotnull(ws_web_site_sk#100))
-
-(80) CometProject
-Input [4]: [ws_item_sk#99, ws_web_site_sk#100, ws_order_number#101,
ws_sold_date_sk#102]
-Arguments: [ws_item_sk#99, ws_web_site_sk#100, ws_order_number#101],
[ws_item_sk#99, ws_web_site_sk#100, ws_order_number#101]
-
-(81) CometBroadcastHashJoin
-Left output [5]: [wr_item_sk#94, wr_order_number#95, wr_return_amt#96,
wr_net_loss#97, wr_returned_date_sk#98]
-Right output [3]: [ws_item_sk#99, ws_web_site_sk#100, ws_order_number#101]
-Arguments: [wr_item_sk#94, wr_order_number#95], [ws_item_sk#99,
ws_order_number#101], Inner
-
-(82) CometProject
-Input [8]: [wr_item_sk#94, wr_order_number#95, wr_return_amt#96,
wr_net_loss#97, wr_returned_date_sk#98, ws_item_sk#99, ws_web_site_sk#100,
ws_order_number#101]
-Arguments: [wsr_web_site_sk#103, date_sk#104, sales_price#105, profit#106,
return_amt#107, net_loss#108], [ws_web_site_sk#100 AS wsr_web_site_sk#103,
wr_returned_date_sk#98 AS date_sk#104, 0.00 AS sales_price#105, 0.00 AS
profit#106, wr_return_amt#96 AS return_amt#107, wr_net_loss#97 AS net_loss#108]
-
-(83) CometUnion
-Child 0 Input [6]: [wsr_web_site_sk#88, date_sk#89, sales_price#90, profit#91,
return_amt#92, net_loss#93]
-Child 1 Input [6]: [wsr_web_site_sk#103, date_sk#104, sales_price#105,
profit#106, return_amt#107, net_loss#108]
-
-(84) ColumnarToRow [codegen id : 25]
-Input [6]: [wsr_web_site_sk#88, date_sk#89, sales_price#90, profit#91,
return_amt#92, net_loss#93]
-
-(85) ReusedExchange [Reuses operator id: 142]
-Output [1]: [d_date_sk#109]
-
-(86) BroadcastHashJoin [codegen id : 25]
-Left keys [1]: [date_sk#89]
-Right keys [1]: [d_date_sk#109]
-Join type: Inner
-Join condition: None
-
-(87) Project [codegen id : 25]
-Output [5]: [wsr_web_site_sk#88, sales_price#90, profit#91, return_amt#92,
net_loss#93]
-Input [7]: [wsr_web_site_sk#88, date_sk#89, sales_price#90, profit#91,
return_amt#92, net_loss#93, d_date_sk#109]
-
-(88) ReusedExchange [Reuses operator id: 59]
-Output [2]: [web_site_sk#110, web_site_id#111]
-
-(89) BroadcastHashJoin [codegen id : 25]
-Left keys [1]: [wsr_web_site_sk#88]
-Right keys [1]: [web_site_sk#110]
-Join type: Inner
-Join condition: None
-
-(90) Project [codegen id : 25]
-Output [5]: [sales_price#90, profit#91, return_amt#92, net_loss#93,
web_site_id#111]
-Input [7]: [wsr_web_site_sk#88, sales_price#90, profit#91, return_amt#92,
net_loss#93, web_site_sk#110, web_site_id#111]
-
-(91) HashAggregate [codegen id : 25]
-Input [5]: [sales_price#90, profit#91, return_amt#92, net_loss#93,
web_site_id#111]
-Keys [1]: [web_site_id#111]
-Functions [4]: [partial_sum(UnscaledValue(sales_price#90)),
partial_sum(UnscaledValue(return_amt#92)),
partial_sum(UnscaledValue(profit#91)), partial_sum(UnscaledValue(net_loss#93))]
-Aggregate Attributes [4]: [sum#156, sum#157, sum#158, sum#159]
-Results [5]: [web_site_id#111, sum#160, sum#161, sum#162, sum#163]
-
-(92) Exchange
-Input [5]: [web_site_id#111, sum#160, sum#161, sum#162, sum#163]
-Arguments: hashpartitioning(web_site_id#111, 5), ENSURE_REQUIREMENTS,
[plan_id=8]
-
-(93) HashAggregate [codegen id : 26]
-Input [5]: [web_site_id#111, sum#160, sum#161, sum#162, sum#163]
-Keys [1]: [web_site_id#111]
-Functions [4]: [sum(UnscaledValue(sales_price#90)),
sum(UnscaledValue(return_amt#92)), sum(UnscaledValue(profit#91)),
sum(UnscaledValue(net_loss#93))]
-Aggregate Attributes [4]: [sum(UnscaledValue(sales_price#90))#120,
sum(UnscaledValue(return_amt#92))#121, sum(UnscaledValue(profit#91))#122,
sum(UnscaledValue(net_loss#93))#123]
-Results [5]: [web channel AS channel#124, concat(web_site, web_site_id#111) AS
id#125, MakeDecimal(sum(UnscaledValue(sales_price#90))#120,17,2) AS sales#126,
MakeDecimal(sum(UnscaledValue(return_amt#92))#121,17,2) AS returns#127,
(MakeDecimal(sum(UnscaledValue(profit#91))#122,17,2) -
MakeDecimal(sum(UnscaledValue(net_loss#93))#123,17,2)) AS profit#128]
-
-(94) Union
-
-(95) HashAggregate [codegen id : 27]
-Input [5]: [channel#37, id#38, sales#39, returns#40, profit#41]
-Keys [2]: [channel#37, id#38]
-Functions [3]: [partial_sum(sales#39), partial_sum(returns#40),
partial_sum(profit#41)]
-Aggregate Attributes [6]: [sum#129, isEmpty#130, sum#131, isEmpty#132,
sum#133, isEmpty#134]
-Results [8]: [channel#37, id#38, sum#135, isEmpty#136, sum#137, isEmpty#138,
sum#139, isEmpty#140]
-
-(96) Exchange
-Input [8]: [channel#37, id#38, sum#135, isEmpty#136, sum#137, isEmpty#138,
sum#139, isEmpty#140]
-Arguments: hashpartitioning(channel#37, id#38, 5), ENSURE_REQUIREMENTS,
[plan_id=9]
-
-(97) HashAggregate [codegen id : 28]
+(70) HashAggregate [codegen id : 28]
Input [8]: [channel#37, id#38, sum#135, isEmpty#136, sum#137, isEmpty#138,
sum#139, isEmpty#140]
Keys [2]: [channel#37, id#38]
Functions [3]: [sum(sales#39), sum(returns#40), sum(profit#41)]
Aggregate Attributes [3]: [sum(sales#39)#141, sum(returns#40)#142,
sum(profit#41)#143]
-Results [4]: [channel#37, sum(sales#39)#141 AS sales#164, sum(returns#40)#142
AS returns#165, sum(profit#41)#143 AS profit#166]
+Results [4]: [channel#37, sum(sales#39)#141 AS sales#147, sum(returns#40)#142
AS returns#148, sum(profit#41)#143 AS profit#149]
-(98) HashAggregate [codegen id : 28]
-Input [4]: [channel#37, sales#164, returns#165, profit#166]
+(71) HashAggregate [codegen id : 28]
+Input [4]: [channel#37, sales#147, returns#148, profit#149]
Keys [1]: [channel#37]
-Functions [3]: [partial_sum(sales#164), partial_sum(returns#165),
partial_sum(profit#166)]
-Aggregate Attributes [6]: [sum#167, isEmpty#168, sum#169, isEmpty#170,
sum#171, isEmpty#172]
-Results [7]: [channel#37, sum#173, isEmpty#174, sum#175, isEmpty#176, sum#177,
isEmpty#178]
+Functions [3]: [partial_sum(sales#147), partial_sum(returns#148),
partial_sum(profit#149)]
+Aggregate Attributes [6]: [sum#150, isEmpty#151, sum#152, isEmpty#153,
sum#154, isEmpty#155]
+Results [7]: [channel#37, sum#156, isEmpty#157, sum#158, isEmpty#159, sum#160,
isEmpty#161]
-(99) Exchange
-Input [7]: [channel#37, sum#173, isEmpty#174, sum#175, isEmpty#176, sum#177,
isEmpty#178]
-Arguments: hashpartitioning(channel#37, 5), ENSURE_REQUIREMENTS, [plan_id=10]
+(72) Exchange
+Input [7]: [channel#37, sum#156, isEmpty#157, sum#158, isEmpty#159, sum#160,
isEmpty#161]
+Arguments: hashpartitioning(channel#37, 5), ENSURE_REQUIREMENTS, [plan_id=8]
-(100) HashAggregate [codegen id : 29]
-Input [7]: [channel#37, sum#173, isEmpty#174, sum#175, isEmpty#176, sum#177,
isEmpty#178]
+(73) HashAggregate [codegen id : 29]
+Input [7]: [channel#37, sum#156, isEmpty#157, sum#158, isEmpty#159, sum#160,
isEmpty#161]
Keys [1]: [channel#37]
-Functions [3]: [sum(sales#164), sum(returns#165), sum(profit#166)]
-Aggregate Attributes [3]: [sum(sales#164)#179, sum(returns#165)#180,
sum(profit#166)#181]
-Results [5]: [channel#37, null AS id#182, sum(sales#164)#179 AS
sum(sales)#183, sum(returns#165)#180 AS sum(returns)#184, sum(profit#166)#181
AS sum(profit)#185]
-
-(101) ReusedExchange [Reuses operator id: 19]
-Output [5]: [s_store_id#24, sum#186, sum#187, sum#188, sum#189]
-
-(102) HashAggregate [codegen id : 33]
-Input [5]: [s_store_id#24, sum#186, sum#187, sum#188, sum#189]
-Keys [1]: [s_store_id#24]
-Functions [4]: [sum(UnscaledValue(sales_price#8)),
sum(UnscaledValue(return_amt#10)), sum(UnscaledValue(profit#9)),
sum(UnscaledValue(net_loss#11))]
-Aggregate Attributes [4]: [sum(UnscaledValue(sales_price#8))#33,
sum(UnscaledValue(return_amt#10))#34, sum(UnscaledValue(profit#9))#35,
sum(UnscaledValue(net_loss#11))#36]
-Results [5]: [store channel AS channel#37, concat(store, s_store_id#24) AS
id#38, MakeDecimal(sum(UnscaledValue(sales_price#8))#33,17,2) AS sales#39,
MakeDecimal(sum(UnscaledValue(return_amt#10))#34,17,2) AS returns#40,
(MakeDecimal(sum(UnscaledValue(profit#9))#35,17,2) -
MakeDecimal(sum(UnscaledValue(net_loss#11))#36,17,2)) AS profit#41]
-
-(103) ReusedExchange [Reuses operator id: 39]
-Output [5]: [cp_catalog_page_id#65, sum#190, sum#191, sum#192, sum#193]
-
-(104) HashAggregate [codegen id : 37]
-Input [5]: [cp_catalog_page_id#65, sum#190, sum#191, sum#192, sum#193]
-Keys [1]: [cp_catalog_page_id#65]
-Functions [4]: [sum(UnscaledValue(sales_price#49)),
sum(UnscaledValue(return_amt#51)), sum(UnscaledValue(profit#50)),
sum(UnscaledValue(net_loss#52))]
-Aggregate Attributes [4]: [sum(UnscaledValue(sales_price#49))#74,
sum(UnscaledValue(return_amt#51))#75, sum(UnscaledValue(profit#50))#76,
sum(UnscaledValue(net_loss#52))#77]
-Results [5]: [catalog channel AS channel#78, concat(catalog_page,
cp_catalog_page_id#65) AS id#79,
MakeDecimal(sum(UnscaledValue(sales_price#49))#74,17,2) AS sales#80,
MakeDecimal(sum(UnscaledValue(return_amt#51))#75,17,2) AS returns#81,
(MakeDecimal(sum(UnscaledValue(profit#50))#76,17,2) -
MakeDecimal(sum(UnscaledValue(net_loss#52))#77,17,2)) AS profit#82]
-
-(105) Scan parquet spark_catalog.default.web_sales
-Output [4]: [ws_web_site_sk#83, ws_ext_sales_price#84, ws_net_profit#85,
ws_sold_date_sk#86]
-Batched: true
-Location: InMemoryFileIndex []
-PartitionFilters: [isnotnull(ws_sold_date_sk#86),
dynamicpruningexpression(ws_sold_date_sk#86 IN dynamicpruning#194)]
-PushedFilters: [IsNotNull(ws_web_site_sk)]
-ReadSchema:
struct<ws_web_site_sk:int,ws_ext_sales_price:decimal(7,2),ws_net_profit:decimal(7,2)>
-
-(106) CometFilter
-Input [4]: [ws_web_site_sk#83, ws_ext_sales_price#84, ws_net_profit#85,
ws_sold_date_sk#86]
-Condition : isnotnull(ws_web_site_sk#83)
-
-(107) CometProject
-Input [4]: [ws_web_site_sk#83, ws_ext_sales_price#84, ws_net_profit#85,
ws_sold_date_sk#86]
-Arguments: [wsr_web_site_sk#88, date_sk#89, sales_price#90, profit#91,
return_amt#92, net_loss#93], [ws_web_site_sk#83 AS wsr_web_site_sk#88,
ws_sold_date_sk#86 AS date_sk#89, ws_ext_sales_price#84 AS sales_price#90,
ws_net_profit#85 AS profit#91, 0.00 AS return_amt#92, 0.00 AS net_loss#93]
-
-(108) Scan parquet spark_catalog.default.web_returns
-Output [5]: [wr_item_sk#94, wr_order_number#95, wr_return_amt#96,
wr_net_loss#97, wr_returned_date_sk#98]
-Batched: true
-Location: InMemoryFileIndex []
-PartitionFilters: [isnotnull(wr_returned_date_sk#98),
dynamicpruningexpression(wr_returned_date_sk#98 IN dynamicpruning#194)]
-ReadSchema:
struct<wr_item_sk:int,wr_order_number:int,wr_return_amt:decimal(7,2),wr_net_loss:decimal(7,2)>
-
-(109) CometBroadcastExchange
-Input [5]: [wr_item_sk#94, wr_order_number#95, wr_return_amt#96,
wr_net_loss#97, wr_returned_date_sk#98]
-Arguments: [wr_item_sk#94, wr_order_number#95, wr_return_amt#96,
wr_net_loss#97, wr_returned_date_sk#98]
-
-(110) Scan parquet spark_catalog.default.web_sales
-Output [4]: [ws_item_sk#99, ws_web_site_sk#100, ws_order_number#101,
ws_sold_date_sk#102]
-Batched: true
-Location [not included in comparison]/{warehouse_dir}/web_sales]
-PushedFilters: [IsNotNull(ws_item_sk), IsNotNull(ws_order_number),
IsNotNull(ws_web_site_sk)]
-ReadSchema: struct<ws_item_sk:int,ws_web_site_sk:int,ws_order_number:int>
-
-(111) CometFilter
-Input [4]: [ws_item_sk#99, ws_web_site_sk#100, ws_order_number#101,
ws_sold_date_sk#102]
-Condition : ((isnotnull(ws_item_sk#99) AND isnotnull(ws_order_number#101)) AND
isnotnull(ws_web_site_sk#100))
-
-(112) CometProject
-Input [4]: [ws_item_sk#99, ws_web_site_sk#100, ws_order_number#101,
ws_sold_date_sk#102]
-Arguments: [ws_item_sk#99, ws_web_site_sk#100, ws_order_number#101],
[ws_item_sk#99, ws_web_site_sk#100, ws_order_number#101]
-
-(113) CometBroadcastHashJoin
-Left output [5]: [wr_item_sk#94, wr_order_number#95, wr_return_amt#96,
wr_net_loss#97, wr_returned_date_sk#98]
-Right output [3]: [ws_item_sk#99, ws_web_site_sk#100, ws_order_number#101]
-Arguments: [wr_item_sk#94, wr_order_number#95], [ws_item_sk#99,
ws_order_number#101], Inner
-
-(114) CometProject
-Input [8]: [wr_item_sk#94, wr_order_number#95, wr_return_amt#96,
wr_net_loss#97, wr_returned_date_sk#98, ws_item_sk#99, ws_web_site_sk#100,
ws_order_number#101]
-Arguments: [wsr_web_site_sk#103, date_sk#104, sales_price#105, profit#106,
return_amt#107, net_loss#108], [ws_web_site_sk#100 AS wsr_web_site_sk#103,
wr_returned_date_sk#98 AS date_sk#104, 0.00 AS sales_price#105, 0.00 AS
profit#106, wr_return_amt#96 AS return_amt#107, wr_net_loss#97 AS net_loss#108]
-
-(115) CometUnion
-Child 0 Input [6]: [wsr_web_site_sk#88, date_sk#89, sales_price#90, profit#91,
return_amt#92, net_loss#93]
-Child 1 Input [6]: [wsr_web_site_sk#103, date_sk#104, sales_price#105,
profit#106, return_amt#107, net_loss#108]
-
-(116) ColumnarToRow [codegen id : 40]
-Input [6]: [wsr_web_site_sk#88, date_sk#89, sales_price#90, profit#91,
return_amt#92, net_loss#93]
-
-(117) ReusedExchange [Reuses operator id: 142]
-Output [1]: [d_date_sk#109]
-
-(118) BroadcastHashJoin [codegen id : 40]
-Left keys [1]: [date_sk#89]
-Right keys [1]: [d_date_sk#109]
-Join type: Inner
-Join condition: None
-
-(119) Project [codegen id : 40]
-Output [5]: [wsr_web_site_sk#88, sales_price#90, profit#91, return_amt#92,
net_loss#93]
-Input [7]: [wsr_web_site_sk#88, date_sk#89, sales_price#90, profit#91,
return_amt#92, net_loss#93, d_date_sk#109]
-
-(120) ReusedExchange [Reuses operator id: 59]
-Output [2]: [web_site_sk#110, web_site_id#111]
-
-(121) BroadcastHashJoin [codegen id : 40]
-Left keys [1]: [wsr_web_site_sk#88]
-Right keys [1]: [web_site_sk#110]
-Join type: Inner
-Join condition: None
-
-(122) Project [codegen id : 40]
-Output [5]: [sales_price#90, profit#91, return_amt#92, net_loss#93,
web_site_id#111]
-Input [7]: [wsr_web_site_sk#88, sales_price#90, profit#91, return_amt#92,
net_loss#93, web_site_sk#110, web_site_id#111]
-
-(123) HashAggregate [codegen id : 40]
-Input [5]: [sales_price#90, profit#91, return_amt#92, net_loss#93,
web_site_id#111]
-Keys [1]: [web_site_id#111]
-Functions [4]: [partial_sum(UnscaledValue(sales_price#90)),
partial_sum(UnscaledValue(return_amt#92)),
partial_sum(UnscaledValue(profit#91)), partial_sum(UnscaledValue(net_loss#93))]
-Aggregate Attributes [4]: [sum#195, sum#196, sum#197, sum#198]
-Results [5]: [web_site_id#111, sum#199, sum#200, sum#201, sum#202]
-
-(124) Exchange
-Input [5]: [web_site_id#111, sum#199, sum#200, sum#201, sum#202]
-Arguments: hashpartitioning(web_site_id#111, 5), ENSURE_REQUIREMENTS,
[plan_id=11]
-
-(125) HashAggregate [codegen id : 41]
-Input [5]: [web_site_id#111, sum#199, sum#200, sum#201, sum#202]
-Keys [1]: [web_site_id#111]
-Functions [4]: [sum(UnscaledValue(sales_price#90)),
sum(UnscaledValue(return_amt#92)), sum(UnscaledValue(profit#91)),
sum(UnscaledValue(net_loss#93))]
-Aggregate Attributes [4]: [sum(UnscaledValue(sales_price#90))#120,
sum(UnscaledValue(return_amt#92))#121, sum(UnscaledValue(profit#91))#122,
sum(UnscaledValue(net_loss#93))#123]
-Results [5]: [web channel AS channel#124, concat(web_site, web_site_id#111) AS
id#125, MakeDecimal(sum(UnscaledValue(sales_price#90))#120,17,2) AS sales#126,
MakeDecimal(sum(UnscaledValue(return_amt#92))#121,17,2) AS returns#127,
(MakeDecimal(sum(UnscaledValue(profit#91))#122,17,2) -
MakeDecimal(sum(UnscaledValue(net_loss#93))#123,17,2)) AS profit#128]
+Functions [3]: [sum(sales#147), sum(returns#148), sum(profit#149)]
+Aggregate Attributes [3]: [sum(sales#147)#162, sum(returns#148)#163,
sum(profit#149)#164]
+Results [5]: [channel#37, null AS id#165, sum(sales#147)#162 AS
sum(sales)#166, sum(returns#148)#163 AS sum(returns)#167, sum(profit#149)#164
AS sum(profit)#168]
-(126) Union
+(74) ReusedExchange [Reuses operator id: 67]
+Output [8]: [channel#37, id#38, sum#135, isEmpty#136, sum#137, isEmpty#138,
sum#139, isEmpty#140]
-(127) HashAggregate [codegen id : 42]
-Input [5]: [channel#37, id#38, sales#39, returns#40, profit#41]
-Keys [2]: [channel#37, id#38]
-Functions [3]: [partial_sum(sales#39), partial_sum(returns#40),
partial_sum(profit#41)]
-Aggregate Attributes [6]: [sum#129, isEmpty#130, sum#131, isEmpty#132,
sum#133, isEmpty#134]
-Results [8]: [channel#37, id#38, sum#135, isEmpty#136, sum#137, isEmpty#138,
sum#139, isEmpty#140]
-
-(128) Exchange
-Input [8]: [channel#37, id#38, sum#135, isEmpty#136, sum#137, isEmpty#138,
sum#139, isEmpty#140]
-Arguments: hashpartitioning(channel#37, id#38, 5), ENSURE_REQUIREMENTS,
[plan_id=12]
-
-(129) HashAggregate [codegen id : 43]
+(75) HashAggregate [codegen id : 43]
Input [8]: [channel#37, id#38, sum#135, isEmpty#136, sum#137, isEmpty#138,
sum#139, isEmpty#140]
Keys [2]: [channel#37, id#38]
Functions [3]: [sum(sales#39), sum(returns#40), sum(profit#41)]
Aggregate Attributes [3]: [sum(sales#39)#141, sum(returns#40)#142,
sum(profit#41)#143]
-Results [3]: [sum(sales#39)#141 AS sales#164, sum(returns#40)#142 AS
returns#165, sum(profit#41)#143 AS profit#166]
+Results [3]: [sum(sales#39)#141 AS sales#147, sum(returns#40)#142 AS
returns#148, sum(profit#41)#143 AS profit#149]
-(130) HashAggregate [codegen id : 43]
-Input [3]: [sales#164, returns#165, profit#166]
+(76) HashAggregate [codegen id : 43]
+Input [3]: [sales#147, returns#148, profit#149]
Keys: []
-Functions [3]: [partial_sum(sales#164), partial_sum(returns#165),
partial_sum(profit#166)]
-Aggregate Attributes [6]: [sum#203, isEmpty#204, sum#205, isEmpty#206,
sum#207, isEmpty#208]
-Results [6]: [sum#209, isEmpty#210, sum#211, isEmpty#212, sum#213, isEmpty#214]
+Functions [3]: [partial_sum(sales#147), partial_sum(returns#148),
partial_sum(profit#149)]
+Aggregate Attributes [6]: [sum#169, isEmpty#170, sum#171, isEmpty#172,
sum#173, isEmpty#174]
+Results [6]: [sum#175, isEmpty#176, sum#177, isEmpty#178, sum#179, isEmpty#180]
-(131) Exchange
-Input [6]: [sum#209, isEmpty#210, sum#211, isEmpty#212, sum#213, isEmpty#214]
-Arguments: SinglePartition, ENSURE_REQUIREMENTS, [plan_id=13]
+(77) Exchange
+Input [6]: [sum#175, isEmpty#176, sum#177, isEmpty#178, sum#179, isEmpty#180]
+Arguments: SinglePartition, ENSURE_REQUIREMENTS, [plan_id=9]
-(132) HashAggregate [codegen id : 44]
-Input [6]: [sum#209, isEmpty#210, sum#211, isEmpty#212, sum#213, isEmpty#214]
+(78) HashAggregate [codegen id : 44]
+Input [6]: [sum#175, isEmpty#176, sum#177, isEmpty#178, sum#179, isEmpty#180]
Keys: []
-Functions [3]: [sum(sales#164), sum(returns#165), sum(profit#166)]
-Aggregate Attributes [3]: [sum(sales#164)#215, sum(returns#165)#216,
sum(profit#166)#217]
-Results [5]: [null AS channel#218, null AS id#219, sum(sales#164)#215 AS
sum(sales)#220, sum(returns#165)#216 AS sum(returns)#221, sum(profit#166)#217
AS sum(profit)#222]
+Functions [3]: [sum(sales#147), sum(returns#148), sum(profit#149)]
+Aggregate Attributes [3]: [sum(sales#147)#181, sum(returns#148)#182,
sum(profit#149)#183]
+Results [5]: [null AS channel#184, null AS id#185, sum(sales#147)#181 AS
sum(sales)#186, sum(returns#148)#182 AS sum(returns)#187, sum(profit#149)#183
AS sum(profit)#188]
-(133) Union
+(79) Union
-(134) HashAggregate [codegen id : 45]
+(80) HashAggregate [codegen id : 45]
Input [5]: [channel#37, id#38, sales#144, returns#145, profit#146]
Keys [5]: [channel#37, id#38, sales#144, returns#145, profit#146]
Functions: []
Aggregate Attributes: []
Results [5]: [channel#37, id#38, sales#144, returns#145, profit#146]
-(135) Exchange
+(81) Exchange
Input [5]: [channel#37, id#38, sales#144, returns#145, profit#146]
-Arguments: hashpartitioning(channel#37, id#38, sales#144, returns#145,
profit#146, 5), ENSURE_REQUIREMENTS, [plan_id=14]
+Arguments: hashpartitioning(channel#37, id#38, sales#144, returns#145,
profit#146, 5), ENSURE_REQUIREMENTS, [plan_id=10]
-(136) HashAggregate [codegen id : 46]
+(82) HashAggregate [codegen id : 46]
Input [5]: [channel#37, id#38, sales#144, returns#145, profit#146]
Keys [5]: [channel#37, id#38, sales#144, returns#145, profit#146]
Functions: []
Aggregate Attributes: []
Results [5]: [channel#37, id#38, sales#144, returns#145, profit#146]
-(137) TakeOrderedAndProject
+(83) TakeOrderedAndProject
Input [5]: [channel#37, id#38, sales#144, returns#145, profit#146]
Arguments: 100, [channel#37 ASC NULLS FIRST, id#38 ASC NULLS FIRST],
[channel#37, id#38, sales#144, returns#145, profit#146]
===== Subqueries =====
Subquery:1 Hosting operator id = 1 Hosting Expression = ss_sold_date_sk#4 IN
dynamicpruning#5
-BroadcastExchange (142)
-+- * ColumnarToRow (141)
- +- CometProject (140)
- +- CometFilter (139)
- +- CometScan parquet spark_catalog.default.date_dim (138)
+BroadcastExchange (88)
++- * ColumnarToRow (87)
+ +- CometProject (86)
+ +- CometFilter (85)
+ +- CometScan parquet spark_catalog.default.date_dim (84)
-(138) Scan parquet spark_catalog.default.date_dim
-Output [2]: [d_date_sk#22, d_date#223]
+(84) Scan parquet spark_catalog.default.date_dim
+Output [2]: [d_date_sk#22, d_date#189]
Batched: true
Location [not included in comparison]/{warehouse_dir}/date_dim]
PushedFilters: [IsNotNull(d_date), GreaterThanOrEqual(d_date,1998-08-04),
LessThanOrEqual(d_date,1998-08-18), IsNotNull(d_date_sk)]
ReadSchema: struct<d_date_sk:int,d_date:date>
-(139) CometFilter
-Input [2]: [d_date_sk#22, d_date#223]
-Condition : (((isnotnull(d_date#223) AND (d_date#223 >= 1998-08-04)) AND
(d_date#223 <= 1998-08-18)) AND isnotnull(d_date_sk#22))
+(85) CometFilter
+Input [2]: [d_date_sk#22, d_date#189]
+Condition : (((isnotnull(d_date#189) AND (d_date#189 >= 1998-08-04)) AND
(d_date#189 <= 1998-08-18)) AND isnotnull(d_date_sk#22))
-(140) CometProject
-Input [2]: [d_date_sk#22, d_date#223]
+(86) CometProject
+Input [2]: [d_date_sk#22, d_date#189]
Arguments: [d_date_sk#22], [d_date_sk#22]
-(141) ColumnarToRow [codegen id : 1]
+(87) ColumnarToRow [codegen id : 1]
Input [1]: [d_date_sk#22]
-(142) BroadcastExchange
+(88) BroadcastExchange
Input [1]: [d_date_sk#22]
-Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as
bigint)),false), [plan_id=15]
+Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as
bigint)),false), [plan_id=11]
Subquery:2 Hosting operator id = 4 Hosting Expression = sr_returned_date_sk#15
IN dynamicpruning#5
@@ -857,12 +539,4 @@ Subquery:5 Hosting operator id = 41 Hosting Expression =
ws_sold_date_sk#86 IN d
Subquery:6 Hosting operator id = 44 Hosting Expression =
wr_returned_date_sk#98 IN dynamicpruning#5
-Subquery:7 Hosting operator id = 73 Hosting Expression = ws_sold_date_sk#86 IN
dynamicpruning#5
-
-Subquery:8 Hosting operator id = 76 Hosting Expression =
wr_returned_date_sk#98 IN dynamicpruning#5
-
-Subquery:9 Hosting operator id = 105 Hosting Expression = ws_sold_date_sk#86
IN dynamicpruning#5
-
-Subquery:10 Hosting operator id = 108 Hosting Expression =
wr_returned_date_sk#98 IN dynamicpruning#5
-
diff --git
a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q5a/simplified.txt
b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q5a/simplified.txt
index ff62cb5c..aaec304f 100644
---
a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q5a/simplified.txt
+++
b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q5a/simplified.txt
@@ -124,95 +124,13 @@ TakeOrderedAndProject [channel,id,sales,returns,profit]
HashAggregate [channel,sales,returns,profit]
[sum,isEmpty,sum,isEmpty,sum,isEmpty,sum,isEmpty,sum,isEmpty,sum,isEmpty]
HashAggregate
[channel,id,sum,isEmpty,sum,isEmpty,sum,isEmpty]
[sum(sales),sum(returns),sum(profit),sales,returns,profit,sum,isEmpty,sum,isEmpty,sum,isEmpty]
InputAdapter
- Exchange [channel,id] #12
- WholeStageCodegen (27)
- HashAggregate
[channel,id,sales,returns,profit]
[sum,isEmpty,sum,isEmpty,sum,isEmpty,sum,isEmpty,sum,isEmpty,sum,isEmpty]
- InputAdapter
- Union
- WholeStageCodegen (18)
- HashAggregate
[s_store_id,sum,sum,sum,sum]
[sum(UnscaledValue(sales_price)),sum(UnscaledValue(return_amt)),sum(UnscaledValue(profit)),sum(UnscaledValue(net_loss)),channel,id,sales,returns,profit,sum,sum,sum,sum]
- InputAdapter
- ReusedExchange
[s_store_id,sum,sum,sum,sum] #3
- WholeStageCodegen (22)
- HashAggregate
[cp_catalog_page_id,sum,sum,sum,sum]
[sum(UnscaledValue(sales_price)),sum(UnscaledValue(return_amt)),sum(UnscaledValue(profit)),sum(UnscaledValue(net_loss)),channel,id,sales,returns,profit,sum,sum,sum,sum]
- InputAdapter
- ReusedExchange
[cp_catalog_page_id,sum,sum,sum,sum] #6
- WholeStageCodegen (26)
- HashAggregate
[web_site_id,sum,sum,sum,sum]
[sum(UnscaledValue(sales_price)),sum(UnscaledValue(return_amt)),sum(UnscaledValue(profit)),sum(UnscaledValue(net_loss)),channel,id,sales,returns,profit,sum,sum,sum,sum]
- InputAdapter
- Exchange [web_site_id] #13
- WholeStageCodegen (25)
- HashAggregate
[web_site_id,sales_price,return_amt,profit,net_loss]
[sum,sum,sum,sum,sum,sum,sum,sum]
- Project
[sales_price,profit,return_amt,net_loss,web_site_id]
- BroadcastHashJoin
[wsr_web_site_sk,web_site_sk]
- Project
[wsr_web_site_sk,sales_price,profit,return_amt,net_loss]
-
BroadcastHashJoin [date_sk,d_date_sk]
- ColumnarToRow
- InputAdapter
- CometUnion
-
CometProject [ws_web_site_sk,ws_sold_date_sk,ws_ext_sales_price,ws_net_profit]
[wsr_web_site_sk,date_sk,sales_price,profit,return_amt,net_loss]
-
CometFilter [ws_web_site_sk]
-
CometScan parquet spark_catalog.default.web_sales
[ws_web_site_sk,ws_ext_sales_price,ws_net_profit,ws_sold_date_sk]
-
ReusedSubquery [d_date_sk] #1
-
CometProject [ws_web_site_sk,wr_returned_date_sk,wr_return_amt,wr_net_loss]
[wsr_web_site_sk,date_sk,sales_price,profit,return_amt,net_loss]
-
CometBroadcastHashJoin [wr_item_sk,wr_order_number,ws_item_sk,ws_order_number]
-
CometBroadcastExchange #14
-
CometScan parquet spark_catalog.default.web_returns
[wr_item_sk,wr_order_number,wr_return_amt,wr_net_loss,wr_returned_date_sk]
-
ReusedSubquery [d_date_sk] #1
-
CometProject [ws_item_sk,ws_web_site_sk,ws_order_number]
-
CometFilter [ws_item_sk,ws_order_number,ws_web_site_sk]
-
CometScan parquet spark_catalog.default.web_sales
[ws_item_sk,ws_web_site_sk,ws_order_number,ws_sold_date_sk]
- InputAdapter
-
ReusedExchange [d_date_sk] #4
- InputAdapter
- ReusedExchange
[web_site_sk,web_site_id] #10
+ ReusedExchange
[channel,id,sum,isEmpty,sum,isEmpty,sum,isEmpty] #2
WholeStageCodegen (44)
HashAggregate [sum,isEmpty,sum,isEmpty,sum,isEmpty]
[sum(sales),sum(returns),sum(profit),channel,id,sum(sales),sum(returns),sum(profit),sum,isEmpty,sum,isEmpty,sum,isEmpty]
InputAdapter
- Exchange #15
+ Exchange #12
WholeStageCodegen (43)
HashAggregate [sales,returns,profit]
[sum,isEmpty,sum,isEmpty,sum,isEmpty,sum,isEmpty,sum,isEmpty,sum,isEmpty]
HashAggregate
[channel,id,sum,isEmpty,sum,isEmpty,sum,isEmpty]
[sum(sales),sum(returns),sum(profit),sales,returns,profit,sum,isEmpty,sum,isEmpty,sum,isEmpty]
InputAdapter
- Exchange [channel,id] #16
- WholeStageCodegen (42)
- HashAggregate
[channel,id,sales,returns,profit]
[sum,isEmpty,sum,isEmpty,sum,isEmpty,sum,isEmpty,sum,isEmpty,sum,isEmpty]
- InputAdapter
- Union
- WholeStageCodegen (33)
- HashAggregate
[s_store_id,sum,sum,sum,sum]
[sum(UnscaledValue(sales_price)),sum(UnscaledValue(return_amt)),sum(UnscaledValue(profit)),sum(UnscaledValue(net_loss)),channel,id,sales,returns,profit,sum,sum,sum,sum]
- InputAdapter
- ReusedExchange
[s_store_id,sum,sum,sum,sum] #3
- WholeStageCodegen (37)
- HashAggregate
[cp_catalog_page_id,sum,sum,sum,sum]
[sum(UnscaledValue(sales_price)),sum(UnscaledValue(return_amt)),sum(UnscaledValue(profit)),sum(UnscaledValue(net_loss)),channel,id,sales,returns,profit,sum,sum,sum,sum]
- InputAdapter
- ReusedExchange
[cp_catalog_page_id,sum,sum,sum,sum] #6
- WholeStageCodegen (41)
- HashAggregate
[web_site_id,sum,sum,sum,sum]
[sum(UnscaledValue(sales_price)),sum(UnscaledValue(return_amt)),sum(UnscaledValue(profit)),sum(UnscaledValue(net_loss)),channel,id,sales,returns,profit,sum,sum,sum,sum]
- InputAdapter
- Exchange [web_site_id] #17
- WholeStageCodegen (40)
- HashAggregate
[web_site_id,sales_price,return_amt,profit,net_loss]
[sum,sum,sum,sum,sum,sum,sum,sum]
- Project
[sales_price,profit,return_amt,net_loss,web_site_id]
- BroadcastHashJoin
[wsr_web_site_sk,web_site_sk]
- Project
[wsr_web_site_sk,sales_price,profit,return_amt,net_loss]
-
BroadcastHashJoin [date_sk,d_date_sk]
- ColumnarToRow
- InputAdapter
- CometUnion
-
CometProject [ws_web_site_sk,ws_sold_date_sk,ws_ext_sales_price,ws_net_profit]
[wsr_web_site_sk,date_sk,sales_price,profit,return_amt,net_loss]
-
CometFilter [ws_web_site_sk]
-
CometScan parquet spark_catalog.default.web_sales
[ws_web_site_sk,ws_ext_sales_price,ws_net_profit,ws_sold_date_sk]
-
ReusedSubquery [d_date_sk] #1
-
CometProject [ws_web_site_sk,wr_returned_date_sk,wr_return_amt,wr_net_loss]
[wsr_web_site_sk,date_sk,sales_price,profit,return_amt,net_loss]
-
CometBroadcastHashJoin [wr_item_sk,wr_order_number,ws_item_sk,ws_order_number]
-
CometBroadcastExchange #18
-
CometScan parquet spark_catalog.default.web_returns
[wr_item_sk,wr_order_number,wr_return_amt,wr_net_loss,wr_returned_date_sk]
-
ReusedSubquery [d_date_sk] #1
-
CometProject [ws_item_sk,ws_web_site_sk,ws_order_number]
-
CometFilter [ws_item_sk,ws_order_number,ws_web_site_sk]
-
CometScan parquet spark_catalog.default.web_sales
[ws_item_sk,ws_web_site_sk,ws_order_number,ws_sold_date_sk]
- InputAdapter
-
ReusedExchange [d_date_sk] #4
- InputAdapter
- ReusedExchange
[web_site_sk,web_site_id] #10
+ ReusedExchange
[channel,id,sum,isEmpty,sum,isEmpty,sum,isEmpty] #2
diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
index 2e144428..c5fef022 100644
--- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
@@ -63,6 +63,36 @@ class CometExecSuite extends CometTestBase {
}
}
+ test("fix CometNativeExec.doCanonicalize for ReusedExchangeExec") {
+ assume(isSpark34Plus, "ChunkedByteBuffer is not serializable before Spark
3.4+")
+ withSQLConf(
+ CometConf.COMET_EXEC_BROADCAST_FORCE_ENABLED.key -> "true",
+ SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
+ withTable("td") {
+ testData
+ .withColumn("bucket", $"key" % 3)
+ .write
+ .mode(SaveMode.Overwrite)
+ .bucketBy(2, "bucket")
+ .format("parquet")
+ .saveAsTable("td")
+ val df = sql("""
+ |SELECT t1.key, t2.key, t3.key
+ |FROM td AS t1
+ |JOIN td AS t2 ON t2.key = t1.key
+ |JOIN td AS t3 ON t3.key = t2.key
+ |WHERE t1.bucket = 1 AND t2.bucket = 1 AND t3.bucket = 1
+ |""".stripMargin)
+ val reusedPlan =
ReuseExchangeAndSubquery.apply(df.queryExecution.executedPlan)
+ val reusedExchanges = collect(reusedPlan) { case r: ReusedExchangeExec
=>
+ r
+ }
+ assert(reusedExchanges.size == 1)
+
assert(reusedExchanges.head.child.isInstanceOf[CometBroadcastExchangeExec])
+ }
+ }
+ }
+
test("ReusedExchangeExec should work on CometBroadcastExchangeExec") {
assume(isSpark34Plus, "ChunkedByteBuffer is not serializable before Spark
3.4+")
withSQLConf(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]