roncenzhao created FLINK-35804:
----------------------------------

             Summary: Incorrect calc merge generate wrong plan about 
udtf+join+udf
                 Key: FLINK-35804
                 URL: https://issues.apache.org/jira/browse/FLINK-35804
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Planner
    Affects Versions: 1.19.1, 1.18.1, 1.17.2
            Reporter: roncenzhao


Like the same issue in 
[FLINK-30841|https://issues.apache.org/jira/browse/FLINK-30841].

Take one test as example:

```scala
@Test
def testCalcMergeWithNonDeterministicExpr3(): Unit = {
val sqlUdtfQuery = "SELECT a, b, len FROM MyTable, LATERAL TABLE 
(length_udtf(c)) AS T(len)"
val sqlView1Query = "SELECT a, b, len " +
s"FROM ($sqlUdtfQuery) t JOIN MyTable_Join t2 " +
"ON t.a = t2.d"
val view1 = util.tableEnv.sqlQuery(sqlView1Query)
util.tableEnv.createTemporaryView("View1", view1)
val sqlView2Query = "SELECT random_udf(b) AS r FROM View1"
val view2 = util.tableEnv.sqlQuery(sqlView2Query)
util.tableEnv.createTemporaryView("View2", view2)
val sqlQuery = "SELECT r FROM View2 WHERE r > 10"
util.verifyRelPlan(sqlQuery)
}
```

optimized plan will be wrong:

```

Calc(select=[random_udf(b) AS r])
+- Join(joinType=[InnerJoin], where=[=(a, d)], select=[a, b, d], 
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
:- Exchange(distribution=[hash[a]])
: +- Calc(select=[a, b], where=[>(random_udf(b), 10)])
: +- Correlate(invocation=[length_udtf($cor0.c)], 
correlate=[table(length_udtf($cor0.c))], select=[a,b,c,EXPR$0], 
rowType=[RecordType(BIGINT a, INTEGER b, VARCHAR(2147483647) c, INTEGER 
EXPR$0)], joinType=[INNER])
: +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+- Exchange(distribution=[hash[d]])
+- Calc(select=[d])
+- LegacyTableSourceScan(table=[[default_catalog, default_database, 
MyTable_Join, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
```

 

the expected plan is:

```
Calc(select=[r], where=[>(r, 10)])
+- Calc(select=[random_udf(b) AS r])
+-Join(joinType=[InnerJoin], where=[=(a, d)], select=[a, b, d], 
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
:- Exchange(distribution=[hash[a]])
:+-Calc(select=[a, b])
:+-Correlate(invocation=[length_udtf($cor0.c)], 
correlate=[table(length_udtf($cor0.c))], select=[a,b,c,EXPR$0], 
rowType=[RecordType(BIGINT a, INTEGER b, VARCHAR(2147483647) c, INTEGER 
EXPR$0)], joinType=[INNER])
:+-LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+-Exchange(distribution=[hash[d]])
+-Calc(select=[d])
+-LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable2, 
source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
```



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to