lsyldliu commented on code in PR #23216:
URL: https://github.com/apache/flink/pull/23216#discussion_r1295421775
##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/optimize/program/FlinkRuntimeFilterProgramTest.java:
##########
@@ -202,6 +202,32 @@ public void testBuildSideIsJoinWithoutExchange() throws
Exception {
util.verifyPlan(query);
}
+ @Test
+ public void testBuildSideIsLeftJoinWithoutExchange() throws Exception {
Review Comment:
I have one question, after reverting your change, the plan is as follows:
```
HashJoin(joinType=[InnerJoin], where=[=(amount, amount0)], select=[id,
amount, price, fact_date_sk, id0, name, amount0, price0, fact_date_sk0, id00,
male, amount00, price00, dim_date_sk], build=[left])
:- Exchange(distribution=[hash[amount]])
: +- RuntimeFilter(select=[amount], estimatedFilterRatio=[0.99993896484375])
: :- Exchange(distribution=[broadcast])
: : +- GlobalRuntimeFilterBuilder(select=[amount],
estimatedRowCount=[65536], maxRowCount=[218454])
: : +- Exchange(distribution=[single])
: : +- LocalRuntimeFilterBuilder(select=[amount],
estimatedRowCount=[65536], maxRowCount=[218454])
: : +- Calc(select=[id, male, amount, price, dim_date_sk],
where=[<(price, 500)])
: : +- TableSourceScan(table=[[testCatalog, test_database,
dim, filter=[]]], fields=[id, male, amount, price, dim_date_sk])
: +- TableSourceScan(table=[[testCatalog, test_database, fact2]],
fields=[id, amount, price, fact_date_sk])
+- HashJoin(joinType=[LeftOuterJoin], where=[=(amount0, amount)],
select=[id, name, amount, price, fact_date_sk, id0, male, amount0, price0,
dim_date_sk], build=[right])
:- Exchange(distribution=[hash[amount]])
: +- TableSourceScan(table=[[testCatalog, test_database, fact]],
fields=[id, name, amount, price, fact_date_sk])
+- Exchange(distribution=[hash[amount]])
+- Calc(select=[id, male, amount, price, dim_date_sk], where=[<(price,
500)])
+- TableSourceScan(table=[[testCatalog, test_database, dim,
filter=[]]], fields=[id, male, amount, price, dim_date_sk])
```
For this pattern, I think only injecting runtime filter for fact2 is also
meaningful, but after your change, there doesn't exist runtime filter. So I
think this test case doesn't cover your exchange.
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/program/FlinkRuntimeFilterProgram.java:
##########
@@ -325,6 +322,9 @@ private static Optional<BuildSideInfo>
findSuitableBuildSide(
} else if (rel instanceof Join) {
// try to push the builder to one input of join
Join join = (Join) rel;
+ if (!isSuitableJoinType(join.getJoinType())) {
+ return Optional.empty();
+ }
Review Comment:
I have some doubt about this logic:
```
if (!(join.getLeft() instanceof Exchange) && !(join.getRight() instanceof
Exchange)) {
return Optional.empty();
}
```
I think the Join left and right maybe also HashAgg, the HashAgg input is
Exchange, so the RuntimeFilter can also be pushdown into HashAgg, we should not
stop it here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]