ahshahid opened a new issue, #16563:
URL: https://github.com/apache/iceberg/issues/16563
While doing integrated testing of iceberg with Spark, have found issue
regarding the re-use of exchange operator in Spark.
Because the equality and hashCode in SparkBatchQueryScan is not imposing an
order in generation of the filter expression strings, two plans though
structurally similar, do not match.
The stack trace of the issue is pasted below:
The reuse of exchange is not happening because of the equality mismatch of
SparkBatchQueryScan, as I highlighted :
The issue shows up in the table
BatchScan spark_catalog.default.item ,
as marked in bold in the actual plan.
I am attaching a patch for Spark 4.1 , which solves this issue. Same patch
needs application in all relevant spark versions...
`
simplified plans did not match
actual simplified =
[patch.txt](https://github.com/user-attachments/files/28231547/patch.txt)
TakeOrderedAndProject
[sum_sales,avg_monthly_sales,s_store_name,i_category,i_brand,s_company_name,d_year,d_moy,psum,nsum]
WholeStageCodegen (22)
Project
[i_category,i_brand,s_store_name,s_company_name,d_year,d_moy,avg_monthly_sales,sum_sales,sum_sales,sum_sales]
BroadcastHashJoin
[i_category,i_brand,s_store_name,s_company_name,rn,i_category,i_brand,s_store_name,s_company_name,rn]
Project
[i_category,i_brand,s_store_name,s_company_name,d_year,d_moy,sum_sales,avg_monthly_sales,rn,sum_sales]
BroadcastHashJoin
[i_category,i_brand,s_store_name,s_company_name,rn,i_category,i_brand,s_store_name,s_company_name,rn]
Project
[i_category,i_brand,s_store_name,s_company_name,d_year,d_moy,sum_sales,avg_monthly_sales,rn]
Filter [avg_monthly_sales,sum_sales]
InputAdapter
Window
[_w0,i_category,i_brand,s_store_name,s_company_name,d_year]
WholeStageCodegen (7)
Filter [d_year]
InputAdapter
Window
[d_year,d_moy,i_category,i_brand,s_store_name,s_company_name]
WholeStageCodegen (6)
Sort
[i_category,i_brand,s_store_name,s_company_name,d_year,d_moy]
InputAdapter
Exchange
[i_category,i_brand,s_store_name,s_company_name] #1
WholeStageCodegen (5)
HashAggregate
[i_category,i_brand,s_store_name,s_company_name,d_year,d_moy,sum]
[sum(UnscaledValue(ss_sales_price)),sum_sales,_w0,sum]
InputAdapter
Exchange
[i_category,i_brand,s_store_name,s_company_name,d_year,d_moy] #2
WholeStageCodegen (4)
HashAggregate
[i_category,i_brand,s_store_name,s_company_name,d_year,d_moy,ss_sales_price]
[sum,sum]
Project
[i_brand,i_category,ss_sales_price,d_year,d_moy,s_store_name,s_company_name]
BroadcastHashJoin
[ss_store_sk,s_store_sk]
Project
[i_brand,i_category,ss_store_sk,ss_sales_price,d_year,d_moy]
BroadcastHashJoin
[ss_sold_date_sk,d_date_sk]
Project
[i_brand,i_category,ss_sold_date_sk,ss_store_sk,ss_sales_price]
BroadcastHashJoin
[i_item_sk,ss_item_sk]
Project
[i_item_sk,i_brand,i_category]
Filter
[i_item_sk,i_brand,i_category]
**InputAdapter
BatchScan
spark_catalog.default.item [i_item_sk,i_brand,i_category]**
InputAdapter
BroadcastExchange #3
WholeStageCodegen (1)
Project
[ss_sold_date_sk,ss_item_sk,ss_store_sk,ss_sales_price]
Filter
[ss_item_sk,ss_store_sk]
InputAdapter
BatchScan spark_catalog.default.store_sales
[ss_sold_date_sk,ss_item_sk,ss_store_sk,ss_sales_price]
InputAdapter
BroadcastExchange
#4
WholeStageCodegen (2)
Project
[d_date_sk,d_year,d_moy]
Filter
[d_year,d_moy,d_date_sk]
InputAdapter
BatchScan spark_catalog.default.date_dim [d_date_sk,d_year,d_moy]
InputAdapter
BroadcastExchange #5
WholeStageCodegen (3)
Project
[s_store_sk,s_store_name,s_company_name]
Filter
[s_store_sk,s_company_name,s_store_name]
InputAdapter
BatchScan
spark_catalog.default.store [s_store_sk,s_store_name,s_company_name]
InputAdapter
BroadcastExchange #6
WholeStageCodegen (14)
Project
[i_category,i_brand,s_store_name,s_company_name,sum_sales,rn]
InputAdapter
Window
[d_year,d_moy,i_category,i_brand,s_store_name,s_company_name]
WholeStageCodegen (13)
Sort
[i_category,i_brand,s_store_name,s_company_name,d_year,d_moy]
InputAdapter
Exchange
[i_category,i_brand,s_store_name,s_company_name] #7
WholeStageCodegen (12)
HashAggregate
[i_category,i_brand,s_store_name,s_company_name,d_year,d_moy,sum]
[sum(UnscaledValue(ss_sales_price)),sum_sales,sum]
InputAdapter
ReusedExchange
[i_category,i_brand,s_store_name,s_company_name,d_year,d_moy,sum] #2
InputAdapter
BroadcastExchange #8
WholeStageCodegen (21)
Project
[i_category,i_brand,s_store_name,s_company_name,sum_sales,rn]
InputAdapter
Window
[d_year,d_moy,i_category,i_brand,s_store_name,s_company_name]
WholeStageCodegen (20)
Sort
[i_category,i_brand,s_store_name,s_company_name,d_year,d_moy]
InputAdapter
Exchange
[i_category,i_brand,s_store_name,s_company_name] #9
WholeStageCodegen (19)
HashAggregate
[i_category,i_brand,s_store_name,s_company_name,d_year,d_moy,sum]
[sum(UnscaledValue(ss_sales_price)),sum_sales,sum]
InputAdapter
Exchange
[i_category,i_brand,s_store_name,s_company_name,d_year,d_moy] #10
WholeStageCodegen (18)
HashAggregate
[i_category,i_brand,s_store_name,s_company_name,d_year,d_moy,ss_sales_price]
[sum,sum]
Project
[i_brand,i_category,ss_sales_price,d_year,d_moy,s_store_name,s_company_name]
BroadcastHashJoin
[ss_store_sk,s_store_sk]
Project
[i_brand,i_category,ss_store_sk,ss_sales_price,d_year,d_moy]
BroadcastHashJoin
[ss_sold_date_sk,d_date_sk]
Project
[i_brand,i_category,ss_sold_date_sk,ss_store_sk,ss_sales_price]
BroadcastHashJoin
[i_item_sk,ss_item_sk]
Project
[i_item_sk,i_brand,i_category]
Filter
[i_item_sk,i_category,i_brand]
**InputAdapter
BatchScan
spark_catalog.default.item [i_item_sk,i_brand,i_category]**
InputAdapter
ReusedExchange
[ss_sold_date_sk,ss_item_sk,ss_store_sk,ss_sales_price] #3
InputAdapter
ReusedExchange
[d_date_sk,d_year,d_moy] #4
InputAdapter
BroadcastExchange #11
WholeStageCodegen (17)
Project
[s_store_sk,s_store_name,s_company_name]
Filter
[s_store_sk,s_store_name,s_company_name]
InputAdapter
BatchScan
spark_catalog.default.store [s_store_sk,s_store_name,s_company_name]
approved simplified=
TakeOrderedAndProject
[sum_sales,avg_monthly_sales,s_store_name,i_category,i_brand,s_company_name,d_year,d_moy,psum,nsum]
WholeStageCodegen (22)
Project
[i_category,i_brand,s_store_name,s_company_name,d_year,d_moy,avg_monthly_sales,sum_sales,sum_sales,sum_sales]
BroadcastHashJoin
[i_category,i_brand,s_store_name,s_company_name,rn,i_category,i_brand,s_store_name,s_company_name,rn]
Project
[i_category,i_brand,s_store_name,s_company_name,d_year,d_moy,sum_sales,avg_monthly_sales,rn,sum_sales]
BroadcastHashJoin
[i_category,i_brand,s_store_name,s_company_name,rn,i_category,i_brand,s_store_name,s_company_name,rn]
Project
[i_category,i_brand,s_store_name,s_company_name,d_year,d_moy,sum_sales,avg_monthly_sales,rn]
Filter [avg_monthly_sales,sum_sales]
InputAdapter
Window
[_w0,i_category,i_brand,s_store_name,s_company_name,d_year]
WholeStageCodegen (7)
Filter [d_year]
InputAdapter
Window
[d_year,d_moy,i_category,i_brand,s_store_name,s_company_name]
WholeStageCodegen (6)
Sort
[i_category,i_brand,s_store_name,s_company_name,d_year,d_moy]
InputAdapter
Exchange
[i_category,i_brand,s_store_name,s_company_name] #1
WholeStageCodegen (5)
HashAggregate
[i_category,i_brand,s_store_name,s_company_name,d_year,d_moy,sum]
[sum(UnscaledValue(ss_sales_price)),sum_sales,_w0,sum]
InputAdapter
Exchange
[i_category,i_brand,s_store_name,s_company_name,d_year,d_moy] #2
WholeStageCodegen (4)
HashAggregate
[i_category,i_brand,s_store_name,s_company_name,d_year,d_moy,ss_sales_price]
[sum,sum]
Project
[i_brand,i_category,ss_sales_price,d_year,d_moy,s_store_name,s_company_name]
BroadcastHashJoin
[ss_store_sk,s_store_sk]
Project
[i_brand,i_category,ss_store_sk,ss_sales_price,d_year,d_moy]
BroadcastHashJoin
[ss_sold_date_sk,d_date_sk]
Project
[i_brand,i_category,ss_sold_date_sk,ss_store_sk,ss_sales_price]
BroadcastHashJoin
[i_item_sk,ss_item_sk]
Project
[i_item_sk,i_brand,i_category]
Filter
[i_item_sk,i_category,i_brand]
InputAdapter
BatchScan
spark_catalog.default.item [i_item_sk,i_brand,i_category]
InputAdapter
BroadcastExchange #3
WholeStageCodegen (1)
Project
[ss_sold_date_sk,ss_item_sk,ss_store_sk,ss_sales_price]
Filter
[ss_item_sk,ss_store_sk]
InputAdapter
BatchScan spark_catalog.default.store_sales
[ss_sold_date_sk,ss_item_sk,ss_store_sk,ss_sales_price]
InputAdapter
BroadcastExchange
#4
WholeStageCodegen (2)
Project
[d_date_sk,d_year,d_moy]
Filter
[d_year,d_moy,d_date_sk]
InputAdapter
BatchScan spark_catalog.default.date_dim [d_date_sk,d_year,d_moy]
InputAdapter
BroadcastExchange #5
WholeStageCodegen (3)
Project
[s_store_sk,s_store_name,s_company_name]
Filter
[s_store_sk,s_store_name,s_company_name]
InputAdapter
BatchScan
spark_catalog.default.store [s_store_sk,s_store_name,s_company_name]
InputAdapter
BroadcastExchange #6
WholeStageCodegen (14)
Project
[i_category,i_brand,s_store_name,s_company_name,sum_sales,rn]
InputAdapter
Window
[d_year,d_moy,i_category,i_brand,s_store_name,s_company_name]
WholeStageCodegen (13)
Sort
[i_category,i_brand,s_store_name,s_company_name,d_year,d_moy]
InputAdapter
Exchange
[i_category,i_brand,s_store_name,s_company_name] #7
WholeStageCodegen (12)
HashAggregate
[i_category,i_brand,s_store_name,s_company_name,d_year,d_moy,sum]
[sum(UnscaledValue(ss_sales_price)),sum_sales,sum]
InputAdapter
ReusedExchange
[i_category,i_brand,s_store_name,s_company_name,d_year,d_moy,sum] #2
InputAdapter
BroadcastExchange #8
WholeStageCodegen (21)
Project
[i_category,i_brand,s_store_name,s_company_name,sum_sales,rn]
InputAdapter
Window
[d_year,d_moy,i_category,i_brand,s_store_name,s_company_name]
WholeStageCodegen (20)
Sort
[i_category,i_brand,s_store_name,s_company_name,d_year,d_moy]
InputAdapter
ReusedExchange
[i_category,i_brand,s_store_name,s_company_name,d_year,d_moy,sum_sales] #7
`
**Pls Note:**
There is another issue too, which causes failure of re-use of exchange, but
that is more involved, shows up when DPP of Spark and AQE both are enabled. The
fix for that requires enahncement of SupportsRuntimeFiltering interface and
hence spans both iceberg and Spark
( In case interested, pls take a look at spark jira:
[SPARK-45866](https://issues.apache.org/jira/browse/SPARK-45866)
This ticket was opened nearly 2 years back, but no action has been taken...
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]