lincoln lee created FLINK-30841:
-----------------------------------
Summary: Incorrect calc merge in streaming
Key: FLINK-30841
URL: https://issues.apache.org/jira/browse/FLINK-30841
Project: Flink
Issue Type: Bug
Components: Table SQL / Planner
Affects Versions: 1.16.1
Reporter: lincoln lee
Assignee: lincoln lee
Fix For: 1.17.0
currently we have a `FlinkCalcMergeRuleTest`, take one test as example:
{code}
@Test
def testCalcMergeWithNonDeterministicExpr1(): Unit = {
val sqlQuery = "SELECT a, a1 FROM (SELECT a, random_udf(a) AS a1 FROM
MyTable) t WHERE a1 > 10"
util.verifyRelPlan(sqlQuery)
}
{code}
the current final optimized plan will be wrong in streaming:
{code}
Calc(select=[a, random_udf(b) AS a1], where=[(random_udf(b) > 10)])
+- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable,
source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
{code}
the merged calc contains two `random_udf` call, users may encounter the result
satisfied by where predicate (>10) but the selected column <= 10, that's
counter-intuitive for users
the expected plan for streaming is:
{code}
Calc(select=[a, a1], where=[(a1 > 10)])
+- Calc(select=[a, random_udf(b) AS a1])
+- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable,
source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
{code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)