ahshahid opened a new issue, #16563:
URL: https://github.com/apache/iceberg/issues/16563

   While doing integrated testing of iceberg with Spark, have found issue 
regarding the re-use of exchange operator in Spark.
   
   Because the equality and hashCode in SparkBatchQueryScan is not imposing an 
order in generation of the filter expression strings, two plans though 
structurally similar, do not match.
   
   The stack trace of the issue is pasted below:
   
   The reuse of exchange is not happening because of the equality mismatch of 
SparkBatchQueryScan, as I highlighted :
   The issue shows up in the table 
   BatchScan spark_catalog.default.item ,
   as marked in bold in the actual plan.
   
   I am attaching a patch for Spark 4.1 , which solves this issue.  Same patch 
needs application in all relevant spark versions...
   
   `
   simplified plans did not match
    actual simplified =
   
   [patch.txt](https://github.com/user-attachments/files/28231547/patch.txt)
   
   TakeOrderedAndProject 
[sum_sales,avg_monthly_sales,s_store_name,i_category,i_brand,s_company_name,d_year,d_moy,psum,nsum]
     WholeStageCodegen (22)
       Project 
[i_category,i_brand,s_store_name,s_company_name,d_year,d_moy,avg_monthly_sales,sum_sales,sum_sales,sum_sales]
         BroadcastHashJoin 
[i_category,i_brand,s_store_name,s_company_name,rn,i_category,i_brand,s_store_name,s_company_name,rn]
           Project 
[i_category,i_brand,s_store_name,s_company_name,d_year,d_moy,sum_sales,avg_monthly_sales,rn,sum_sales]
             BroadcastHashJoin 
[i_category,i_brand,s_store_name,s_company_name,rn,i_category,i_brand,s_store_name,s_company_name,rn]
               Project 
[i_category,i_brand,s_store_name,s_company_name,d_year,d_moy,sum_sales,avg_monthly_sales,rn]
                 Filter [avg_monthly_sales,sum_sales]
                   InputAdapter
                     Window 
[_w0,i_category,i_brand,s_store_name,s_company_name,d_year]
                       WholeStageCodegen (7)
                         Filter [d_year]
                           InputAdapter
                             Window 
[d_year,d_moy,i_category,i_brand,s_store_name,s_company_name]
                               WholeStageCodegen (6)
                                 Sort 
[i_category,i_brand,s_store_name,s_company_name,d_year,d_moy]
                                   InputAdapter
                                     Exchange 
[i_category,i_brand,s_store_name,s_company_name] #1
                                       WholeStageCodegen (5)
                                         HashAggregate 
[i_category,i_brand,s_store_name,s_company_name,d_year,d_moy,sum] 
[sum(UnscaledValue(ss_sales_price)),sum_sales,_w0,sum]
                                           InputAdapter
                                             Exchange 
[i_category,i_brand,s_store_name,s_company_name,d_year,d_moy] #2
                                               WholeStageCodegen (4)
                                                 HashAggregate 
[i_category,i_brand,s_store_name,s_company_name,d_year,d_moy,ss_sales_price] 
[sum,sum]
                                                   Project 
[i_brand,i_category,ss_sales_price,d_year,d_moy,s_store_name,s_company_name]
                                                     BroadcastHashJoin 
[ss_store_sk,s_store_sk]
                                                       Project 
[i_brand,i_category,ss_store_sk,ss_sales_price,d_year,d_moy]
                                                         BroadcastHashJoin 
[ss_sold_date_sk,d_date_sk]
                                                           Project 
[i_brand,i_category,ss_sold_date_sk,ss_store_sk,ss_sales_price]
                                                             BroadcastHashJoin 
[i_item_sk,ss_item_sk]
                                                               Project 
[i_item_sk,i_brand,i_category]
                                                                 Filter 
[i_item_sk,i_brand,i_category]
                                                                   
**InputAdapter
                                                                     BatchScan 
spark_catalog.default.item [i_item_sk,i_brand,i_category]**
                                                               InputAdapter
                                                                 
BroadcastExchange #3
                                                                   
WholeStageCodegen (1)
                                                                     Project 
[ss_sold_date_sk,ss_item_sk,ss_store_sk,ss_sales_price]
                                                                       Filter 
[ss_item_sk,ss_store_sk]
                                                                         
InputAdapter
                                                                           
BatchScan spark_catalog.default.store_sales 
[ss_sold_date_sk,ss_item_sk,ss_store_sk,ss_sales_price]
                                                           InputAdapter
                                                             BroadcastExchange 
#4
                                                               
WholeStageCodegen (2)
                                                                 Project 
[d_date_sk,d_year,d_moy]
                                                                   Filter 
[d_year,d_moy,d_date_sk]
                                                                     
InputAdapter
                                                                       
BatchScan spark_catalog.default.date_dim [d_date_sk,d_year,d_moy]
                                                       InputAdapter
                                                         BroadcastExchange #5
                                                           WholeStageCodegen (3)
                                                             Project 
[s_store_sk,s_store_name,s_company_name]
                                                               Filter 
[s_store_sk,s_company_name,s_store_name]
                                                                 InputAdapter
                                                                   BatchScan 
spark_catalog.default.store [s_store_sk,s_store_name,s_company_name]
               InputAdapter
                 BroadcastExchange #6
                   WholeStageCodegen (14)
                     Project 
[i_category,i_brand,s_store_name,s_company_name,sum_sales,rn]
                       InputAdapter
                         Window 
[d_year,d_moy,i_category,i_brand,s_store_name,s_company_name]
                           WholeStageCodegen (13)
                             Sort 
[i_category,i_brand,s_store_name,s_company_name,d_year,d_moy]
                               InputAdapter
                                 Exchange 
[i_category,i_brand,s_store_name,s_company_name] #7
                                   WholeStageCodegen (12)
                                     HashAggregate 
[i_category,i_brand,s_store_name,s_company_name,d_year,d_moy,sum] 
[sum(UnscaledValue(ss_sales_price)),sum_sales,sum]
                                       InputAdapter
                                         ReusedExchange 
[i_category,i_brand,s_store_name,s_company_name,d_year,d_moy,sum] #2
           InputAdapter
             BroadcastExchange #8
               WholeStageCodegen (21)
                 Project 
[i_category,i_brand,s_store_name,s_company_name,sum_sales,rn]
                   InputAdapter
                     Window 
[d_year,d_moy,i_category,i_brand,s_store_name,s_company_name]
                       WholeStageCodegen (20)
                         Sort 
[i_category,i_brand,s_store_name,s_company_name,d_year,d_moy]
                           InputAdapter
                             Exchange 
[i_category,i_brand,s_store_name,s_company_name] #9
                               WholeStageCodegen (19)
                                 HashAggregate 
[i_category,i_brand,s_store_name,s_company_name,d_year,d_moy,sum] 
[sum(UnscaledValue(ss_sales_price)),sum_sales,sum]
                                   InputAdapter
                                     Exchange 
[i_category,i_brand,s_store_name,s_company_name,d_year,d_moy] #10
                                       WholeStageCodegen (18)
                                         HashAggregate 
[i_category,i_brand,s_store_name,s_company_name,d_year,d_moy,ss_sales_price] 
[sum,sum]
                                           Project 
[i_brand,i_category,ss_sales_price,d_year,d_moy,s_store_name,s_company_name]
                                             BroadcastHashJoin 
[ss_store_sk,s_store_sk]
                                               Project 
[i_brand,i_category,ss_store_sk,ss_sales_price,d_year,d_moy]
                                                 BroadcastHashJoin 
[ss_sold_date_sk,d_date_sk]
                                                   Project 
[i_brand,i_category,ss_sold_date_sk,ss_store_sk,ss_sales_price]
                                                     BroadcastHashJoin 
[i_item_sk,ss_item_sk]
                                                       Project 
[i_item_sk,i_brand,i_category]
                                                         Filter 
[i_item_sk,i_category,i_brand]
                                                           **InputAdapter
                                                             BatchScan 
spark_catalog.default.item [i_item_sk,i_brand,i_category]**
                                                       InputAdapter
                                                         ReusedExchange 
[ss_sold_date_sk,ss_item_sk,ss_store_sk,ss_sales_price] #3
                                                   InputAdapter
                                                     ReusedExchange 
[d_date_sk,d_year,d_moy] #4
                                               InputAdapter
                                                 BroadcastExchange #11
                                                   WholeStageCodegen (17)
                                                     Project 
[s_store_sk,s_store_name,s_company_name]
                                                       Filter 
[s_store_sk,s_store_name,s_company_name]
                                                         InputAdapter
                                                           BatchScan 
spark_catalog.default.store [s_store_sk,s_store_name,s_company_name]
   
   
    approved simplified=
   TakeOrderedAndProject 
[sum_sales,avg_monthly_sales,s_store_name,i_category,i_brand,s_company_name,d_year,d_moy,psum,nsum]
     WholeStageCodegen (22)
       Project 
[i_category,i_brand,s_store_name,s_company_name,d_year,d_moy,avg_monthly_sales,sum_sales,sum_sales,sum_sales]
         BroadcastHashJoin 
[i_category,i_brand,s_store_name,s_company_name,rn,i_category,i_brand,s_store_name,s_company_name,rn]
           Project 
[i_category,i_brand,s_store_name,s_company_name,d_year,d_moy,sum_sales,avg_monthly_sales,rn,sum_sales]
             BroadcastHashJoin 
[i_category,i_brand,s_store_name,s_company_name,rn,i_category,i_brand,s_store_name,s_company_name,rn]
               Project 
[i_category,i_brand,s_store_name,s_company_name,d_year,d_moy,sum_sales,avg_monthly_sales,rn]
                 Filter [avg_monthly_sales,sum_sales]
                   InputAdapter
                     Window 
[_w0,i_category,i_brand,s_store_name,s_company_name,d_year]
                       WholeStageCodegen (7)
                         Filter [d_year]
                           InputAdapter
                             Window 
[d_year,d_moy,i_category,i_brand,s_store_name,s_company_name]
                               WholeStageCodegen (6)
                                 Sort 
[i_category,i_brand,s_store_name,s_company_name,d_year,d_moy]
                                   InputAdapter
                                     Exchange 
[i_category,i_brand,s_store_name,s_company_name] #1
                                       WholeStageCodegen (5)
                                         HashAggregate 
[i_category,i_brand,s_store_name,s_company_name,d_year,d_moy,sum] 
[sum(UnscaledValue(ss_sales_price)),sum_sales,_w0,sum]
                                           InputAdapter
                                             Exchange 
[i_category,i_brand,s_store_name,s_company_name,d_year,d_moy] #2
                                               WholeStageCodegen (4)
                                                 HashAggregate 
[i_category,i_brand,s_store_name,s_company_name,d_year,d_moy,ss_sales_price] 
[sum,sum]
                                                   Project 
[i_brand,i_category,ss_sales_price,d_year,d_moy,s_store_name,s_company_name]
                                                     BroadcastHashJoin 
[ss_store_sk,s_store_sk]
                                                       Project 
[i_brand,i_category,ss_store_sk,ss_sales_price,d_year,d_moy]
                                                         BroadcastHashJoin 
[ss_sold_date_sk,d_date_sk]
                                                           Project 
[i_brand,i_category,ss_sold_date_sk,ss_store_sk,ss_sales_price]
                                                             BroadcastHashJoin 
[i_item_sk,ss_item_sk]
                                                               Project 
[i_item_sk,i_brand,i_category]
                                                                 Filter 
[i_item_sk,i_category,i_brand]
                                                                   InputAdapter
                                                                     BatchScan 
spark_catalog.default.item [i_item_sk,i_brand,i_category]
                                                               InputAdapter
                                                                 
BroadcastExchange #3
                                                                   
WholeStageCodegen (1)
                                                                     Project 
[ss_sold_date_sk,ss_item_sk,ss_store_sk,ss_sales_price]
                                                                       Filter 
[ss_item_sk,ss_store_sk]
                                                                         
InputAdapter
                                                                           
BatchScan spark_catalog.default.store_sales 
[ss_sold_date_sk,ss_item_sk,ss_store_sk,ss_sales_price]
                                                           InputAdapter
                                                             BroadcastExchange 
#4
                                                               
WholeStageCodegen (2)
                                                                 Project 
[d_date_sk,d_year,d_moy]
                                                                   Filter 
[d_year,d_moy,d_date_sk]
                                                                     
InputAdapter
                                                                       
BatchScan spark_catalog.default.date_dim [d_date_sk,d_year,d_moy]
                                                       InputAdapter
                                                         BroadcastExchange #5
                                                           WholeStageCodegen (3)
                                                             Project 
[s_store_sk,s_store_name,s_company_name]
                                                               Filter 
[s_store_sk,s_store_name,s_company_name]
                                                                 InputAdapter
                                                                   BatchScan 
spark_catalog.default.store [s_store_sk,s_store_name,s_company_name]
               InputAdapter
                 BroadcastExchange #6
                   WholeStageCodegen (14)
                     Project 
[i_category,i_brand,s_store_name,s_company_name,sum_sales,rn]
                       InputAdapter
                         Window 
[d_year,d_moy,i_category,i_brand,s_store_name,s_company_name]
                           WholeStageCodegen (13)
                             Sort 
[i_category,i_brand,s_store_name,s_company_name,d_year,d_moy]
                               InputAdapter
                                 Exchange 
[i_category,i_brand,s_store_name,s_company_name] #7
                                   WholeStageCodegen (12)
                                     HashAggregate 
[i_category,i_brand,s_store_name,s_company_name,d_year,d_moy,sum] 
[sum(UnscaledValue(ss_sales_price)),sum_sales,sum]
                                       InputAdapter
                                         ReusedExchange 
[i_category,i_brand,s_store_name,s_company_name,d_year,d_moy,sum] #2
           InputAdapter
             BroadcastExchange #8
               WholeStageCodegen (21)
                 Project 
[i_category,i_brand,s_store_name,s_company_name,sum_sales,rn]
                   InputAdapter
                     Window 
[d_year,d_moy,i_category,i_brand,s_store_name,s_company_name]
                       WholeStageCodegen (20)
                         Sort 
[i_category,i_brand,s_store_name,s_company_name,d_year,d_moy]
                           InputAdapter
                             ReusedExchange 
[i_category,i_brand,s_store_name,s_company_name,d_year,d_moy,sum_sales] #7
   
   `
   
   
   
   **Pls Note:**
   There is another issue too, which causes failure of re-use of exchange, but 
that is more involved, shows up when DPP of Spark and AQE both are enabled. The 
fix for that requires enahncement of SupportsRuntimeFiltering interface and 
hence spans both iceberg and Spark 
   ( In case interested, pls take a look at spark jira:  
[SPARK-45866](https://issues.apache.org/jira/browse/SPARK-45866) 
   This ticket was opened nearly 2 years back, but no action has been taken...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to