lsyldliu commented on PR #23192:
URL: https://github.com/apache/flink/pull/23192#issuecomment-1676828553

   @lincoln-lil Thanks for your review, I've add test 
`testDimSideReuseAfterProjectionPushdown` to verify the overall change of this 
PR. The plan before this change is
   ```
   Calc(select=[id, price, amount])
   +- MultipleInput(readOrder=[0,0,1], 
members=[\nHashJoin(joinType=[InnerJoin], where=[(amount = id0)], select=[id, 
price, amount, id0], isBroadcast=[true], build=[right])\n:- Calc(select=[id, 
price, amount])\n:  +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk = 
dim_date_sk)], select=[id, price, amount, fact_date_sk, dim_date_sk], 
isBroadcast=[true], build=[right])\n:     :- Union(all=[true], union=[id, 
price, amount, fact_date_sk])\n:     :  :- 
DynamicFilteringTableSourceScan(table=[[testCatalog, test_database, fact_part, 
project=[id, price, amount, fact_date_sk], metadata=[]]], fields=[id, price, 
amount, fact_date_sk])\n:     :  :  +- 
DynamicFilteringDataCollector(fields=[id])(reuse_id=[1])\n:     :  :     +- 
Calc(select=[id], where=[(amount < 10)])\n:     :  :        +- [#3] 
TableSourceScan(table=[[testCatalog, test_database, dim, filter=[], 
project=[id, amount], metadata=[]]], fields=[id, amount])\n:     :  +- 
DynamicFilteringTableSourceScan(table=[[testCatalog, test_data
 base, fact_part2, project=[id, price, amount, fact_date_sk], metadata=[]]], 
fields=[id, price, amount, fact_date_sk])\n:     :     +- 
Reused(reference_id=[1])\n:     +- [#2] Exchange(distribution=[broadcast])\n+- 
[#1] Exchange(distribution=[broadcast])\n])
      :- Exchange(distribution=[broadcast])
      :  +- Calc(select=[id], where=[(amount < 10)])
      :     +- TableSourceScan(table=[[testCatalog, test_database, dim, 
filter=[], project=[id, amount, price, dim_date_sk], metadata=[]]], fields=[id, 
amount, price, dim_date_sk])(reuse_id=[1])
      :- Exchange(distribution=[broadcast])
      :  +- Calc(select=[dim_date_sk], where=[SEARCH(price, Sarg[(300..500)])])
      :     +- Reused(reference_id=[1])
      +- TableSourceScan(table=[[testCatalog, test_database, dim, filter=[], 
project=[id, amount], metadata=[]]], fields=[id, amount])
   ``` 
   After this change is:
   ```
   Calc(select=[id, price, amount])
   +- MultipleInput(readOrder=[0,0,1,1], 
members=[\nHashJoin(joinType=[InnerJoin], where=[(amount = id0)], select=[id, 
price, amount, id0], isBroadcast=[true], build=[right])\n:- Calc(select=[id, 
price, amount])\n:  +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk = 
dim_date_sk)], select=[id, price, amount, fact_date_sk, dim_date_sk], 
isBroadcast=[true], build=[right])\n:     :- Union(all=[true], union=[id, 
price, amount, fact_date_sk])\n:     :  :- [#3] 
DynamicFilteringTableSourceScan(table=[[testCatalog, test_database, fact_part, 
project=[id, price, amount, fact_date_sk], metadata=[]]], fields=[id, price, 
amount, fact_date_sk])\n:     :  +- [#4] 
DynamicFilteringTableSourceScan(table=[[testCatalog, test_database, fact_part2, 
project=[id, price, amount, fact_date_sk], metadata=[]]], fields=[id, price, 
amount, fact_date_sk])\n:     +- [#2] Exchange(distribution=[broadcast])\n+- 
[#1] Exchange(distribution=[broadcast])\n])
      :- Exchange(distribution=[broadcast])
      :  +- Calc(select=[id], where=[(amount < 10)])(reuse_id=[2])
      :     +- TableSourceScan(table=[[testCatalog, test_database, dim, 
filter=[], project=[id, amount, price, dim_date_sk], metadata=[]]], fields=[id, 
amount, price, dim_date_sk])(reuse_id=[1])
      :- Exchange(distribution=[broadcast])
      :  +- Calc(select=[dim_date_sk], where=[SEARCH(price, Sarg[(300..500)])])
      :     +- Reused(reference_id=[1])
      :- DynamicFilteringTableSourceScan(table=[[testCatalog, test_database, 
fact_part, project=[id, price, amount, fact_date_sk], metadata=[]]], 
fields=[id, price, amount, fact_date_sk])
      :  +- DynamicFilteringDataCollector(fields=[id])(reuse_id=[3])
      :     +- Reused(reference_id=[2])
      +- DynamicFilteringTableSourceScan(table=[[testCatalog, test_database, 
fact_part2, project=[id, price, amount, fact_date_sk], metadata=[]]], 
fields=[id, price, amount, fact_date_sk])
         +- Reused(reference_id=[3])
   ```
   
   Regarding the change in `ReplaceScanWithCalcShuttle`, I've optimized it. 
Moreover, I think it is meaningful for some dpp patterns because the 
DynamicFilterScan has an input that is dim side, maybe the dim side also have 
different projection pushdown, then can be reused. At the moment I can't build 
such a test pattern to verify the change,  but I've verified the overall 
changes with tpc-ds, and it's as expected.
   
    


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to