[
https://issues.apache.org/jira/browse/FLINK-35804?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
roncenzhao updated FLINK-35804:
-------------------------------
Description:
Like the same issue in FLINK-30841.
Take one test as example:
{code:java}
@Test
def testCalcMergeWithNonDeterministicExpr3(): Unit =
{ val sqlUdtfQuery = "SELECT a, b, len FROM MyTable, LATERAL TABLE
(length_udtf(c)) AS T(len)" val sqlView1Query = "SELECT a, b, len " + s"FROM
($sqlUdtfQuery) t JOIN MyTable_Join t2 " + "ON t.a = t2.d" val view1 =
util.tableEnv.sqlQuery(sqlView1Query)
util.tableEnv.createTemporaryView("View1", view1) val sqlView2Query = "SELECT
random_udf(b) AS r FROM View1" val view2 =
util.tableEnv.sqlQuery(sqlView2Query)
util.tableEnv.createTemporaryView("View2", view2) val sqlQuery = "SELECT r FROM
View2 WHERE r > 10" util.verifyRelPlan(sqlQuery) }
{code}
optimized plan will be wrong:
{code:java}
Calc(select=[random_udf(b) AS r])
+- Join(joinType=[InnerJoin], where=[=(a, d)], select=[a, b, d],
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
:- Exchange(distribution=[hash[a]])
: +- Calc(select=[a, b], where=[>(random_udf(b), 10)])
: +- Correlate(invocation=[length_udtf($cor0.c)],
correlate=[table(length_udtf($cor0.c))], select=[a,b,c,EXPR$0],
rowType=[RecordType(BIGINT a, INTEGER b, VARCHAR(2147483647) c, INTEGER
EXPR$0)], joinType=[INNER])
: +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable,
source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+- Exchange(distribution=[hash[d]])
+- Calc(select=[d])
+- LegacyTableSourceScan(table=[[default_catalog, default_database,
MyTable_Join, source: [TestTableSource(d, e, f)]]], fields=[d, e, f]){code}
the expected plan is:
{code:java}
Calc(select=[r], where=[>(r, 10)])
+- Calc(select=[random_udf(b) AS r])
+-Join(joinType=[InnerJoin], where=[=(a, d)], select=[a, b, d],
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
:- Exchange(distribution=[hash[a]])
:+-Calc(select=[a, b])
:+-Correlate(invocation=[length_udtf($cor0.c)],
correlate=[table(length_udtf($cor0.c))], select=[a,b,c,EXPR$0],
rowType=[RecordType(BIGINT a, INTEGER b, VARCHAR(2147483647) c, INTEGER
EXPR$0)], joinType=[INNER])
:+-LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable,
source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+-Exchange(distribution=[hash[d]])
+-Calc(select=[d])
+-LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable2,
source: [TestTableSource(d, e, f)]]], fields=[d, e, f]){code}
was:
Like the same issue in FLINK-30841.
Take one test as example:
{code:java}
@Test
def testCalcMergeWithNonDeterministicExpr3(): Unit =
{ val sqlUdtfQuery = "SELECT a, b, len FROM MyTable, LATERAL TABLE
(length_udtf(c)) AS T(len)" val sqlView1Query = "SELECT a, b, len " + s"FROM
($sqlUdtfQuery) t JOIN MyTable_Join t2 " + "ON t.a = t2.d" val view1 =
util.tableEnv.sqlQuery(sqlView1Query)
util.tableEnv.createTemporaryView("View1", view1) val sqlView2Query = "SELECT
random_udf(b) AS r FROM View1" val view2 =
util.tableEnv.sqlQuery(sqlView2Query)
util.tableEnv.createTemporaryView("View2", view2) val sqlQuery = "SELECT r FROM
View2 WHERE r > 10" util.verifyRelPlan(sqlQuery) }
{code}
optimized plan will be wrong:
{code:java}
Calc(select=[random_udf(b) AS r])
+- Join(joinType=[InnerJoin], where=[=(a, d)], select=[a, b, d],
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
:- Exchange(distribution=[hash[a]])
: +- Calc(select=[a, b], where=[>(random_udf(b), 10)])
: +- Correlate(invocation=[length_udtf($cor0.c)],
correlate=[table(length_udtf($cor0.c))], select=[a,b,c,EXPR$0],
rowType=[RecordType(BIGINT a, INTEGER b, VARCHAR(2147483647) c, INTEGER
EXPR$0)], joinType=[INNER])
: +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable,
source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+- Exchange(distribution=[hash[d]])
+- Calc(select=[d])
+- LegacyTableSourceScan(table=[[default_catalog, default_database,
MyTable_Join, source: [TestTableSource(d, e, f)]]], fields=[d, e, f]){code}
the expected plan is:
{code:java}
Calc(select=[r], where=[>(r, 10)])
+- Calc(select=[random_udf(b) AS r])
+-Join(joinType=[InnerJoin], where=[=(a, d)], select=[a, b, d],
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
:- Exchange(distribution=[hash[a]])
:+-Calc(select=[a, b])
:+-Correlate(invocation=[length_udtf($cor0.c)],
correlate=[table(length_udtf($cor0.c))], select=[a,b,c,EXPR$0],
rowType=[RecordType(BIGINT a, INTEGER b, VARCHAR(2147483647) c, INTEGER
EXPR$0)], joinType=[INNER])
:+-LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable,
source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+-Exchange(distribution=[hash[d]])
+-Calc(select=[d])
+-LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable2,
source: [TestTableSource(d, e, f)]]], fields=[d, e, f]){code}
> Incorrect calc merge generate wrong plan about udtf+join+udf
> ------------------------------------------------------------
>
> Key: FLINK-35804
> URL: https://issues.apache.org/jira/browse/FLINK-35804
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Affects Versions: 1.17.2, 1.18.1, 1.19.1
> Reporter: roncenzhao
> Priority: Major
> Labels: pull-request-available
>
> Like the same issue in FLINK-30841.
> Take one test as example:
> {code:java}
> @Test
> def testCalcMergeWithNonDeterministicExpr3(): Unit =
> { val sqlUdtfQuery = "SELECT a, b, len FROM MyTable, LATERAL TABLE
> (length_udtf(c)) AS T(len)" val sqlView1Query = "SELECT a, b, len " + s"FROM
> ($sqlUdtfQuery) t JOIN MyTable_Join t2 " + "ON t.a = t2.d" val view1 =
> util.tableEnv.sqlQuery(sqlView1Query)
> util.tableEnv.createTemporaryView("View1", view1) val sqlView2Query = "SELECT
> random_udf(b) AS r FROM View1" val view2 =
> util.tableEnv.sqlQuery(sqlView2Query)
> util.tableEnv.createTemporaryView("View2", view2) val sqlQuery = "SELECT r
> FROM View2 WHERE r > 10" util.verifyRelPlan(sqlQuery) }
> {code}
> optimized plan will be wrong:
> {code:java}
> Calc(select=[random_udf(b) AS r])
> +- Join(joinType=[InnerJoin], where=[=(a, d)], select=[a, b, d],
> leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
> :- Exchange(distribution=[hash[a]])
> : +- Calc(select=[a, b], where=[>(random_udf(b), 10)])
> : +- Correlate(invocation=[length_udtf($cor0.c)],
> correlate=[table(length_udtf($cor0.c))], select=[a,b,c,EXPR$0],
> rowType=[RecordType(BIGINT a, INTEGER b, VARCHAR(2147483647) c, INTEGER
> EXPR$0)], joinType=[INNER])
> : +- LegacyTableSourceScan(table=[[default_catalog, default_database,
> MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
> +- Exchange(distribution=[hash[d]])
> +- Calc(select=[d])
> +- LegacyTableSourceScan(table=[[default_catalog, default_database,
> MyTable_Join, source: [TestTableSource(d, e, f)]]], fields=[d, e, f]){code}
> the expected plan is:
> {code:java}
> Calc(select=[r], where=[>(r, 10)])
> +- Calc(select=[random_udf(b) AS r])
> +-Join(joinType=[InnerJoin], where=[=(a, d)], select=[a, b, d],
> leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
> :- Exchange(distribution=[hash[a]])
> :+-Calc(select=[a, b])
> :+-Correlate(invocation=[length_udtf($cor0.c)],
> correlate=[table(length_udtf($cor0.c))], select=[a,b,c,EXPR$0],
> rowType=[RecordType(BIGINT a, INTEGER b, VARCHAR(2147483647) c, INTEGER
> EXPR$0)], joinType=[INNER])
> :+-LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable,
> source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
> +-Exchange(distribution=[hash[d]])
> +-Calc(select=[d])
> +-LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable2,
> source: [TestTableSource(d, e, f)]]], fields=[d, e, f]){code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)