[
https://issues.apache.org/jira/browse/FLINK-36962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17908228#comment-17908228
]
xuyang commented on FLINK-36962:
--------------------------------
It seems that the issue is not limited to Join. Filter with non-deterministic
conditions also should not transpose other nodes...
> push down non-deterministic filter after stream join to source by mistake
> -------------------------------------------------------------------------
>
> Key: FLINK-36962
> URL: https://issues.apache.org/jira/browse/FLINK-36962
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Affects Versions: 2.0-preview
> Reporter: Shuai Xu
> Priority: Major
>
> Non-deterministic filter after stream join is push down to source by mistake.
> Modify the
> org.apache.flink.table.planner.plan.stream.sql.CalcTest with following
> snippet of code.
>
> {code:java}
> @BeforeEach
> def setup(): Unit = {
> util.addTableSource[(Long, Int, String)]("MyTable", 'a, 'b, 'c)
> util.addTableSource[(Long, Int, String)]("SourceTable", 'a, 'b, 'c)
> util.addTemporarySystemFunction("random_udf", new NonDeterministicUdf)
> }
> @Test
> def testCalcWithNonDeterministicFilterAfterJoin(): Unit = {
> val sqlQuery =
> "SELECT a FROM (SELECT t1.a, t1.c as t1c, t2.c as t2c FROM MyTable t1
> join SourceTable t2 on t1.b = t2.b) t " +
> "WHERE TO_TIMESTAMP(t.t1c, 'yyyy-MM-dd HH:mm:ss') > TIMESTAMPADD(HOUR,
> -2, NOW()) and t.t2c > '2022-01-01 00:00:00'"
> util.verifyRelPlan(sqlQuery)
> }
> {code}
> we expected the plan with
> {code:java}
> Calc(select=[a], where=[>(TO_TIMESTAMP(c, 'yyyy-MM-dd HH:mm:ss'), +(NOW(),
> -7200000:INTERVAL HOUR))])
> +- Join(joinType=[InnerJoin], where=[=(b, b0)], select=[a, b, c, b0],
> leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
> :- Exchange(distribution=[hash[b]])
> : +- LegacyTableSourceScan(table=[[default_catalog, default_database,
> MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
> +- Exchange(distribution=[hash[b]])
> +- Calc(select=[b], where=[>(c, '2022-01-01 00:00:00')])
> +- LegacyTableSourceScan(table=[[default_catalog, default_database,
> SourceTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]){code}
> but the plan is
> {code:java}
> Calc(select=[a])
> +- Join(joinType=[InnerJoin], where=[=(b, b0)], select=[a, b, b0],
> leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
> :- Exchange(distribution=[hash[b]])
> : +- Calc(select=[a, b], where=[>(TO_TIMESTAMP(c, 'yyyy-MM-dd HH:mm:ss'),
> +(NOW(), -7200000:INTERVAL HOUR))])
> : +- LegacyTableSourceScan(table=[[default_catalog, default_database,
> MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
> +- Exchange(distribution=[hash[b]])
> +- Calc(select=[b], where=[>(c, '2022-01-01 00:00:00')])
> +- LegacyTableSourceScan(table=[[default_catalog, default_database,
> SourceTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)