roncenzhao created FLINK-35804: ---------------------------------- Summary: Incorrect calc merge generate wrong plan about udtf+join+udf Key: FLINK-35804 URL: https://issues.apache.org/jira/browse/FLINK-35804 Project: Flink Issue Type: Bug Components: Table SQL / Planner Affects Versions: 1.19.1, 1.18.1, 1.17.2 Reporter: roncenzhao
Like the same issue in [FLINK-30841|https://issues.apache.org/jira/browse/FLINK-30841]. Take one test as example: ```scala @Test def testCalcMergeWithNonDeterministicExpr3(): Unit = { val sqlUdtfQuery = "SELECT a, b, len FROM MyTable, LATERAL TABLE (length_udtf(c)) AS T(len)" val sqlView1Query = "SELECT a, b, len " + s"FROM ($sqlUdtfQuery) t JOIN MyTable_Join t2 " + "ON t.a = t2.d" val view1 = util.tableEnv.sqlQuery(sqlView1Query) util.tableEnv.createTemporaryView("View1", view1) val sqlView2Query = "SELECT random_udf(b) AS r FROM View1" val view2 = util.tableEnv.sqlQuery(sqlView2Query) util.tableEnv.createTemporaryView("View2", view2) val sqlQuery = "SELECT r FROM View2 WHERE r > 10" util.verifyRelPlan(sqlQuery) } ``` optimized plan will be wrong: ``` Calc(select=[random_udf(b) AS r]) +- Join(joinType=[InnerJoin], where=[=(a, d)], select=[a, b, d], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) :- Exchange(distribution=[hash[a]]) : +- Calc(select=[a, b], where=[>(random_udf(b), 10)]) : +- Correlate(invocation=[length_udtf($cor0.c)], correlate=[table(length_udtf($cor0.c))], select=[a,b,c,EXPR$0], rowType=[RecordType(BIGINT a, INTEGER b, VARCHAR(2147483647) c, INTEGER EXPR$0)], joinType=[INNER]) : +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) +- Exchange(distribution=[hash[d]]) +- Calc(select=[d]) +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable_Join, source: [TestTableSource(d, e, f)]]], fields=[d, e, f]) ``` the expected plan is: ``` Calc(select=[r], where=[>(r, 10)]) +- Calc(select=[random_udf(b) AS r]) +-Join(joinType=[InnerJoin], where=[=(a, d)], select=[a, b, d], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) :- Exchange(distribution=[hash[a]]) :+-Calc(select=[a, b]) :+-Correlate(invocation=[length_udtf($cor0.c)], correlate=[table(length_udtf($cor0.c))], select=[a,b,c,EXPR$0], rowType=[RecordType(BIGINT a, INTEGER b, VARCHAR(2147483647) c, INTEGER EXPR$0)], joinType=[INNER]) :+-LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) +-Exchange(distribution=[hash[d]]) +-Calc(select=[d]) +-LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(d, e, f)]]], fields=[d, e, f]) ``` -- This message was sent by Atlassian Jira (v8.20.10#820010)