[ 
https://issues.apache.org/jira/browse/FLINK-36962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17993049#comment-17993049
 ] 

Qilong Wang edited comment on FLINK-36962 at 7/14/25 2:29 AM:
--------------------------------------------------------------

Try to solve it in FLINK-38080


was (Author: JIRAUSER306284):
Try to solve it in 7e85026a6f4070ce4f09d688243052dcd5d7e521.

> push down non-deterministic filter after stream join to source by mistake
> -------------------------------------------------------------------------
>
>                 Key: FLINK-36962
>                 URL: https://issues.apache.org/jira/browse/FLINK-36962
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 2.0-preview
>            Reporter: Shuai Xu
>            Priority: Major
>              Labels: pull-request-available
>
> Non-deterministic filter after stream join is push down to source by mistake.
> Modify the 
> org.apache.flink.table.planner.plan.stream.sql.CalcTest with following 
> snippet of code.
>  
> {code:java}
> @BeforeEach
> def setup(): Unit = {
>   util.addTableSource[(Long, Int, String)]("MyTable", 'a, 'b, 'c)
>   util.addTableSource[(Long, Int, String)]("SourceTable", 'a, 'b, 'c)
>   util.addTemporarySystemFunction("random_udf", new NonDeterministicUdf)
> }
> @Test
> def testCalcWithNonDeterministicFilterAfterJoin(): Unit = {
>   val sqlQuery =
>     "SELECT a FROM (SELECT t1.a, t1.c as t1c, t2.c as t2c FROM MyTable t1 
> join SourceTable t2 on t1.b = t2.b) t " +
>       "WHERE TO_TIMESTAMP(t.t1c, 'yyyy-MM-dd HH:mm:ss') > TIMESTAMPADD(HOUR, 
> -2, NOW()) and t.t2c > '2022-01-01 00:00:00'"
>   util.verifyRelPlan(sqlQuery)
> }
> {code}
> we expected the plan with 
> {code:java}
> Calc(select=[a], where=[>(TO_TIMESTAMP(c, 'yyyy-MM-dd HH:mm:ss'), +(NOW(), 
> -7200000:INTERVAL HOUR))])
> +- Join(joinType=[InnerJoin], where=[=(b, b0)], select=[a, b, c, b0], 
> leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
>    :- Exchange(distribution=[hash[b]])
>    :  +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
> MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
>    +- Exchange(distribution=[hash[b]])
>       +- Calc(select=[b], where=[>(c, '2022-01-01 00:00:00')])
>          +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
> SourceTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]){code}
> but the plan is
> {code:java}
> Calc(select=[a])
> +- Join(joinType=[InnerJoin], where=[=(b, b0)], select=[a, b, b0], 
> leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
>    :- Exchange(distribution=[hash[b]])
>    :  +- Calc(select=[a, b], where=[>(TO_TIMESTAMP(c, 'yyyy-MM-dd HH:mm:ss'), 
> +(NOW(), -7200000:INTERVAL HOUR))])
>    :     +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
> MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
>    +- Exchange(distribution=[hash[b]])
>       +- Calc(select=[b], where=[>(c, '2022-01-01 00:00:00')])
>          +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
> SourceTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to