Shuai Xu created FLINK-36962:
--------------------------------
Summary: push down non-deterministic filter after stream join to
source by mistake
Key: FLINK-36962
URL: https://issues.apache.org/jira/browse/FLINK-36962
Project: Flink
Issue Type: Bug
Components: Table SQL / Planner
Affects Versions: 2.0-preview
Reporter: Shuai Xu
Non-deterministic filter after stream join is push down to source by mistake.
Modify the
org.apache.flink.table.planner.plan.stream.sql.CalcTest with following snippet
of code.
{code:java}
@BeforeEach
def setup(): Unit = {
util.addTableSource[(Long, Int, String)]("MyTable", 'a, 'b, 'c)
util.addTableSource[(Long, Int, String)]("SourceTable", 'a, 'b, 'c)
util.addTemporarySystemFunction("random_udf", new NonDeterministicUdf)
}
@Test
def testCalcWithNonDeterministicFilterAfterJoin(): Unit = {
val sqlQuery =
"SELECT a FROM (SELECT t1.a, t1.c as t1c, t2.c as t2c FROM MyTable t1 join
SourceTable t2 on t1.b = t2.b) t " +
"WHERE TO_TIMESTAMP(t.t1c, 'yyyy-MM-dd HH:mm:ss') > TIMESTAMPADD(HOUR,
-2, NOW()) and t.t2c > '2022-01-01 00:00:00'"
util.verifyRelPlan(sqlQuery)
}
{code}
we expected the plan with
{code:java}
Calc(select=[a], where=[>(TO_TIMESTAMP(c, 'yyyy-MM-dd HH:mm:ss'), +(NOW(),
-7200000:INTERVAL HOUR))])+- Join(joinType=[InnerJoin], where=[=(b, b0)],
select=[a, b, c, b0], leftInputSpec=[NoUniqueKey],
rightInputSpec=[NoUniqueKey]) :- Exchange(distribution=[hash[b]]) : +-
LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable,
source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) +-
Exchange(distribution=[hash[b]]) +- Calc(select=[b], where=[>(c,
'2022-01-01 00:00:00')]) +-
LegacyTableSourceScan(table=[[default_catalog, default_database, SourceTable,
source: [TestTableSource(a, b, c)]]], fields=[a, b, c]){code}
but the plan is
{code:java}
Calc(select=[a])
+- Join(joinType=[InnerJoin], where=[=(b, b0)], select=[a, b, b0],
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
:- Exchange(distribution=[hash[b]])
: +- Calc(select=[a, b], where=[>(TO_TIMESTAMP(c, 'yyyy-MM-dd HH:mm:ss'),
+(NOW(), -7200000:INTERVAL HOUR))])
: +- LegacyTableSourceScan(table=[[default_catalog, default_database,
MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+- Exchange(distribution=[hash[b]])
+- Calc(select=[b], where=[>(c, '2022-01-01 00:00:00')])
+- LegacyTableSourceScan(table=[[default_catalog, default_database,
SourceTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
{code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)