[ https://issues.apache.org/jira/browse/FLINK-36962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17993049#comment-17993049 ]
Qilong Wang edited comment on FLINK-36962 at 7/14/25 2:29 AM: -------------------------------------------------------------- Try to solve it in FLINK-38080 was (Author: JIRAUSER306284): Try to solve it in 7e85026a6f4070ce4f09d688243052dcd5d7e521. > push down non-deterministic filter after stream join to source by mistake > ------------------------------------------------------------------------- > > Key: FLINK-36962 > URL: https://issues.apache.org/jira/browse/FLINK-36962 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner > Affects Versions: 2.0-preview > Reporter: Shuai Xu > Priority: Major > Labels: pull-request-available > > Non-deterministic filter after stream join is push down to source by mistake. > Modify the > org.apache.flink.table.planner.plan.stream.sql.CalcTest with following > snippet of code. > > {code:java} > @BeforeEach > def setup(): Unit = { > util.addTableSource[(Long, Int, String)]("MyTable", 'a, 'b, 'c) > util.addTableSource[(Long, Int, String)]("SourceTable", 'a, 'b, 'c) > util.addTemporarySystemFunction("random_udf", new NonDeterministicUdf) > } > @Test > def testCalcWithNonDeterministicFilterAfterJoin(): Unit = { > val sqlQuery = > "SELECT a FROM (SELECT t1.a, t1.c as t1c, t2.c as t2c FROM MyTable t1 > join SourceTable t2 on t1.b = t2.b) t " + > "WHERE TO_TIMESTAMP(t.t1c, 'yyyy-MM-dd HH:mm:ss') > TIMESTAMPADD(HOUR, > -2, NOW()) and t.t2c > '2022-01-01 00:00:00'" > util.verifyRelPlan(sqlQuery) > } > {code} > we expected the plan with > {code:java} > Calc(select=[a], where=[>(TO_TIMESTAMP(c, 'yyyy-MM-dd HH:mm:ss'), +(NOW(), > -7200000:INTERVAL HOUR))]) > +- Join(joinType=[InnerJoin], where=[=(b, b0)], select=[a, b, c, b0], > leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) > :- Exchange(distribution=[hash[b]]) > : +- LegacyTableSourceScan(table=[[default_catalog, default_database, > MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) > +- Exchange(distribution=[hash[b]]) > +- Calc(select=[b], where=[>(c, '2022-01-01 00:00:00')]) > +- LegacyTableSourceScan(table=[[default_catalog, default_database, > SourceTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]){code} > but the plan is > {code:java} > Calc(select=[a]) > +- Join(joinType=[InnerJoin], where=[=(b, b0)], select=[a, b, b0], > leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) > :- Exchange(distribution=[hash[b]]) > : +- Calc(select=[a, b], where=[>(TO_TIMESTAMP(c, 'yyyy-MM-dd HH:mm:ss'), > +(NOW(), -7200000:INTERVAL HOUR))]) > : +- LegacyTableSourceScan(table=[[default_catalog, default_database, > MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) > +- Exchange(distribution=[hash[b]]) > +- Calc(select=[b], where=[>(c, '2022-01-01 00:00:00')]) > +- LegacyTableSourceScan(table=[[default_catalog, default_database, > SourceTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)