lsyldliu commented on code in PR #23216:
URL: https://github.com/apache/flink/pull/23216#discussion_r1299354403
##########
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/program/FlinkRuntimeFilterProgramTest.xml:
##########
@@ -430,6 +430,104 @@ MultipleInput(readOrder=[2,0,1],
members=[\nHashJoin(joinType=[InnerJoin], where
</Resource>
</TestCase>
+ <TestCase name="testBuildSideIsJoinWithTwoAggInputs">
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(id=[$0], name=[$1], amount=[$2], price=[$3], fact_date_sk=[$4],
dim_date_sk=[$5], EXPR$1=[$6], dim_date_sk0=[$7], EXPR$10=[$8])
++- LogicalJoin(condition=[=($4, $5)], joinType=[inner])
+ :- LogicalTableScan(table=[[testCatalog, test_database, fact]])
+ +- LogicalProject(dim_date_sk=[$0], EXPR$1=[$1], dim_date_sk0=[$2],
EXPR$10=[$3])
+ +- LogicalJoin(condition=[=($0, $2)], joinType=[inner])
+ :- LogicalAggregate(group=[{0}], EXPR$1=[SUM($1)])
+ : +- LogicalProject(dim_date_sk=[$4], price=[$3])
+ : +- LogicalTableScan(table=[[testCatalog, test_database, dim]])
+ +- LogicalAggregate(group=[{0}], EXPR$1=[SUM($1)])
+ +- LogicalProject(dim_date_sk=[$4], amount=[$2])
+ +- LogicalTableScan(table=[[testCatalog, test_database, dim]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+HashJoin(joinType=[InnerJoin], where=[=(fact_date_sk, dim_date_sk)],
select=[id, name, amount, price, fact_date_sk, dim_date_sk, EXPR$1,
dim_date_sk0, EXPR$10], build=[right])
+:- Exchange(distribution=[hash[fact_date_sk]])
+: +- RuntimeFilter(select=[fact_date_sk],
estimatedFilterRatio=[0.9998779296875])
+: :- Exchange(distribution=[broadcast])
+: : +- GlobalRuntimeFilterBuilder(select=[dim_date_sk],
estimatedRowCount=[131072], maxRowCount=[436907])
+: : +- Exchange(distribution=[single])
+: : +- LocalRuntimeFilterBuilder(select=[dim_date_sk],
estimatedRowCount=[131072], maxRowCount=[436907])
+: : +- TableSourceScan(table=[[testCatalog, test_database, dim,
project=[dim_date_sk, amount], metadata=[]]], fields=[dim_date_sk, amount])
+: +- TableSourceScan(table=[[testCatalog, test_database, fact]],
fields=[id, name, amount, price, fact_date_sk])
++- HashJoin(joinType=[InnerJoin], where=[=(dim_date_sk, dim_date_sk0)],
select=[dim_date_sk, EXPR$1, dim_date_sk0, EXPR$10], build=[right])
+ :- HashAggregate(isMerge=[false], groupBy=[dim_date_sk],
select=[dim_date_sk, SUM(price) AS EXPR$1])
+ : +- Exchange(distribution=[hash[dim_date_sk]])
+ : +- Calc(select=[dim_date_sk, price])
+ : +- TableSourceScan(table=[[testCatalog, test_database, dim,
project=[amount, price, dim_date_sk], metadata=[]]], fields=[amount, price,
dim_date_sk])
+ +- HashAggregate(isMerge=[false], groupBy=[dim_date_sk],
select=[dim_date_sk, SUM(amount) AS EXPR$1])
+ +- Exchange(distribution=[hash[dim_date_sk]])
+ +- Calc(select=[dim_date_sk, amount])
+ +- TableSourceScan(table=[[testCatalog, test_database, dim,
project=[amount, price, dim_date_sk], metadata=[]]], fields=[amount, price,
dim_date_sk])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+MultipleInput(readOrder=[2,1,0], members=[\nHashJoin(joinType=[InnerJoin],
where=[(fact_date_sk = dim_date_sk)], select=[id, name, amount, price,
fact_date_sk, dim_date_sk, EXPR$1, dim_date_sk0, EXPR$10], build=[right])\n:-
[#1] Exchange(distribution=[hash[fact_date_sk]])\n+-
HashJoin(joinType=[InnerJoin], where=[(dim_date_sk = dim_date_sk0)],
select=[dim_date_sk, EXPR$1, dim_date_sk0, EXPR$10], build=[right])\n :-
HashAggregate(isMerge=[false], groupBy=[dim_date_sk], select=[dim_date_sk,
SUM(price) AS EXPR$1])\n : +- [#2]
Exchange(distribution=[hash[dim_date_sk]])\n +-
HashAggregate(isMerge=[false], groupBy=[dim_date_sk], select=[dim_date_sk,
SUM(amount) AS EXPR$1])\n +- [#3]
Exchange(distribution=[hash[dim_date_sk]])\n])
+:- Exchange(distribution=[hash[fact_date_sk]])
+: +- MultipleInput(readOrder=[0,1],
members=[\nRuntimeFilter(select=[fact_date_sk],
estimatedFilterRatio=[0.9998779296875])\n:- [#1]
Exchange(distribution=[broadcast])\n+- [#2]
TableSourceScan(table=[[testCatalog, test_database, fact]], fields=[id, name,
amount, price, fact_date_sk])\n])
+: :- Exchange(distribution=[broadcast])
+: : +- GlobalRuntimeFilterBuilder(select=[dim_date_sk],
estimatedRowCount=[131072], maxRowCount=[436907])
+: : +- Exchange(distribution=[single])
+: : +- LocalRuntimeFilterBuilder(select=[dim_date_sk],
estimatedRowCount=[131072], maxRowCount=[436907])
+: : +- TableSourceScan(table=[[testCatalog, test_database, dim,
project=[dim_date_sk, amount], metadata=[]]], fields=[dim_date_sk, amount])
Review Comment:
From this plan, I found some problems about
`BatchPhysicalLocalRuntimeFilterBuilder#copy`,
`BatchPhysicalGlobalRuntimeFilterBuilder#copy` and
`BatchPhysicalRuntimeFilter#copy` method implementation. We should override the
`traitSet` and `inputs`, please refer to [1].
[1]
https://github.com/apache/flink/blob/9546f8243a24e7b45582b6de6702f819f1d73f97/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalMatch.java#L53
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]