[
https://issues.apache.org/jira/browse/FLINK-28988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17600135#comment-17600135
]
Shuiqiang Chen commented on FLINK-28988:
----------------------------------------
TLTD: The filters in aboveFilter should not be pushed down into right table
when it is a temporal join.
when there's no filter after temporal join, the query is explained as below:
{code:xml}
== Abstract Syntax Tree ==
LogicalProject(id=[$0], ts=[$1], id0=[$2], state=[$3], ts0=[$4], row_num=[$5])
+- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{0,
1}])
:- LogicalProject(id=[AS($0, _UTF-16LE'id')], ts=[AS($1, _UTF-16LE'ts')])
: +- LogicalWatermarkAssigner(rowtime=[f1], watermark=[-($1, 2000:INTERVAL
SECOND)])
: +- LogicalTableScan(table=[[*anonymous_datastream_source$2*]])
+- LogicalFilter(condition=[=($cor0.id, $0)])
+- LogicalSnapshot(period=[$cor0.ts])
+- LogicalProject(id=[$0], state=[$1], ts=[$2], row_num=[$3])
+- LogicalFilter(condition=[=($3, 1)])
+- LogicalProject(id=[AS($0, _UTF-16LE'id')], state=[AS($1,
_UTF-16LE'state')], ts=[AS($2, _UTF-16LE'ts')], row_num=[ROW_NUMBER() OVER
(PARTITION BY AS($0, _UTF-16LE'id') ORDER BY AS($2, _UTF-16LE'ts') DESC NULLS
LAST)])
+- LogicalWatermarkAssigner(rowtime=[f2], watermark=[-($2,
2000:INTERVAL SECOND)])
+-
LogicalTableScan(table=[[*anonymous_datastream_source$1*]])
== Optimized Physical Plan ==
Calc(select=[id, ts, id0, state, CAST(ts0 AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3))
AS ts0, row_num])
+- TemporalJoin(joinType=[LeftOuterJoin], where=[AND(=(id, id0),
__TEMPORAL_JOIN_CONDITION(ts, ts0, __TEMPORAL_JOIN_CONDITION_PRIMARY_KEY(id0),
__TEMPORAL_JOIN_LEFT_KEY(id), __TEMPORAL_JOIN_RIGHT_KEY(id0)))], select=[id,
ts, id0, state, ts0, row_num])
:- Exchange(distribution=[hash[id]])
: +- Calc(select=[f0 AS id, f1 AS ts])
: +- WatermarkAssigner(rowtime=[f1], watermark=[-(f1, 2000:INTERVAL
SECOND)])
: +- TableSourceScan(table=[[*anonymous_datastream_source$2*]],
fields=[f0, f1])
+- Exchange(distribution=[hash[id]])
+- Calc(select=[$0 AS id, $1 AS state, $2 AS ts, 1:BIGINT AS row_num])
+- Deduplicate(keep=[LastRow], key=[$0], order=[ROWTIME])
+- Exchange(distribution=[hash[$0]])
+- Calc(select=[f0 AS $0, f1 AS $1, f2 AS $2])
+- WatermarkAssigner(rowtime=[f2], watermark=[-(f2,
2000:INTERVAL SECOND)])
+-
TableSourceScan(table=[[*anonymous_datastream_source$1*]], fields=[f0, f1, f2])
== Optimized Execution Plan ==
Calc(select=[id, ts, id0, state, CAST(ts0 AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3))
AS ts0, row_num])
+- TemporalJoin(joinType=[LeftOuterJoin], where=[((id = id0) AND
__TEMPORAL_JOIN_CONDITION(ts, ts0, __TEMPORAL_JOIN_CONDITION_PRIMARY_KEY(id0),
__TEMPORAL_JOIN_LEFT_KEY(id), __TEMPORAL_JOIN_RIGHT_KEY(id0)))], select=[id,
ts, id0, state, ts0, row_num])
:- Exchange(distribution=[hash[id]])
: +- Calc(select=[f0 AS id, f1 AS ts])
: +- WatermarkAssigner(rowtime=[f1], watermark=[(f1 - 2000:INTERVAL
SECOND)])
: +- TableSourceScan(table=[[*anonymous_datastream_source$2*]],
fields=[f0, f1])
+- Exchange(distribution=[hash[id]])
+- Calc(select=[$0 AS id, $1 AS state, $2 AS ts, 1 AS row_num])
+- Deduplicate(keep=[LastRow], key=[$0], order=[ROWTIME])
+- Exchange(distribution=[hash[$0]])
+- Calc(select=[f0 AS $0, f1 AS $1, f2 AS $2])
+- WatermarkAssigner(rowtime=[f2], watermark=[(f2 -
2000:INTERVAL SECOND)])
+-
TableSourceScan(table=[[*anonymous_datastream_source$1*]], fields=[f0, f1, f2])
{code}
And after the FlinkChangelogModeInferenceProgram, the UpdateKindTrait of
Deduplicate(keep=[LastRow], key=[$0], order=[ROWTIME]) will be come
[ONLY_UPDATE_AFTER]. Therefore, during execution runtime, the
rightSortedState in TemporalRowTimeJoinOperator contains the following rows:
[+I, 0, online, 1970-01-01 08:00:00.000]
[+I, 0, offline, 1970-01-01 08:00:00.010]
[+I, 0, online, 1970-01-01 08:00:00.020]
So we can get the expected temporal join result:
[+I,0,1970-01-01 08:00:00.000,0,online,970-01-01 08:00:00.000 ,1]
[+I,0,1970-01-01 08:00:00.005,0,online,1970-01-01 08:00:00.000,1]
[+I,0,1970-01-01 08:00:00.0010,0,offline,1970-01-01 08:00:00.010,1]
[+I,0,1970-01-01 08:00:00.0015,0,offline,1970-01-01 08:00:00.010,1]
[+I,0,1970-01-01 08:00:00.020,0,online,1970-01-01 08:00:00.020,1]
[+I,0,1970-01-01 08:00:00.025,0,online,1970-01-01 08:00:00.020,1]
However, if the filter was pushed down into the right table, the right sorted
state will bocome:
[+I, 0, online, 1970-01-01 08:00:00.000]
[+I, 0, online, 1970-01-01 08:00:00.020]
and the temporal join result will become:
[+I,0,1970-01-01 08:00:00.000,0,online,970-01-01 08:00:00.000 ,1]
[+I,0,1970-01-01 08:00:00.005,0,online,1970-01-01 08:00:00.000,1]
[+I,0,1970-01-01 08:00:00.0010,0,online,1970-01-01 08:00:00.000,1]
[+I,0,1970-01-01 08:00:00.0015,0,online,1970-01-01 08:00:00.000,1]
[+I,0,1970-01-01 08:00:00.020,0,online,1970-01-01 08:00:00.020,1]
[+I,0,1970-01-01 08:00:00.025,0,online,1970-01-01 08:00:00.020,1]
while the expected result is:
[+I,0,1970-01-01 08:00:00.000,0,online,970-01-01 08:00:00.000 ,1]
[+I,0,1970-01-01 08:00:00.005,0,online,1970-01-01 08:00:00.000,1]
[+I,0,1970-01-01 08:00:00.020,0,online,1970-01-01 08:00:00.020,1]
[+I,0,1970-01-01 08:00:00.025,0,online,1970-01-01 08:00:00.020,1].
> Incorrect result for filter after temporal join
> -----------------------------------------------
>
> Key: FLINK-28988
> URL: https://issues.apache.org/jira/browse/FLINK-28988
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / API
> Affects Versions: 1.15.1
> Reporter: Xuannan Su
> Assignee: Shuiqiang Chen
> Priority: Major
>
> The following code can reproduce the case
>
> {code:java}
> public class TemporalJoinSQLExample1 {
> public static void main(String[] args) throws Exception {
> // set up the Java DataStream API
> final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> // set up the Java Table API
> final StreamTableEnvironment tableEnv =
> StreamTableEnvironment.create(env);
> final DataStreamSource<Tuple3<Integer, String, Instant>> ds =
> env.fromElements(
> new Tuple3<>(0, "online", Instant.ofEpochMilli(0)),
> new Tuple3<>(0, "offline", Instant.ofEpochMilli(10)),
> new Tuple3<>(0, "online", Instant.ofEpochMilli(20)));
> final Table table =
> tableEnv.fromDataStream(
> ds,
> Schema.newBuilder()
> .column("f0", DataTypes.INT())
> .column("f1", DataTypes.STRING())
> .column("f2",
> DataTypes.TIMESTAMP_LTZ(3))
> .watermark("f2", "f2 - INTERVAL '2'
> SECONDS")
> .build())
> .as("id", "state", "ts");
> tableEnv.createTemporaryView("source_table", table);
> final Table dedupeTable =
> tableEnv.sqlQuery(
> "SELECT * FROM ("
> + " SELECT *, ROW_NUMBER() OVER (PARTITION BY
> id ORDER BY ts DESC) AS row_num FROM source_table"
> + ") WHERE row_num = 1");
> tableEnv.createTemporaryView("versioned_table", dedupeTable);
> DataStreamSource<Tuple2<Integer, Instant>> event =
> env.fromElements(
> new Tuple2<>(0, Instant.ofEpochMilli(0)),
> new Tuple2<>(0, Instant.ofEpochMilli(5)),
> new Tuple2<>(0, Instant.ofEpochMilli(10)),
> new Tuple2<>(0, Instant.ofEpochMilli(15)),
> new Tuple2<>(0, Instant.ofEpochMilli(20)),
> new Tuple2<>(0, Instant.ofEpochMilli(25)));
> final Table eventTable =
> tableEnv.fromDataStream(
> event,
> Schema.newBuilder()
> .column("f0", DataTypes.INT())
> .column("f1",
> DataTypes.TIMESTAMP_LTZ(3))
> .watermark("f1", "f1 - INTERVAL '2'
> SECONDS")
> .build())
> .as("id", "ts");
> tableEnv.createTemporaryView("event_table", eventTable);
> final Table result =
> tableEnv.sqlQuery(
> "SELECT * FROM event_table"
> + " LEFT JOIN versioned_table FOR SYSTEM_TIME
> AS OF event_table.ts"
> + " ON event_table.id = versioned_table.id");
> result.execute().print();
> result.filter($("state").isEqual("online")).execute().print();
> }
> } {code}
>
> The result of temporal join is the following:
> |op| id| ts| id0|
> state| ts0| row_num|
> |+I| 0|1970-01-01 08:00:00.000| 0|
> online|1970-01-01 08:00:00.000| 1|
> |+I| 0|1970-01-01 08:00:00.005| 0|
> online|1970-01-01 08:00:00.000| 1|
> |+I| 0|1970-01-01 08:00:00.010| 0|
> offline|1970-01-01 08:00:00.010| 1|
> |+I| 0|1970-01-01 08:00:00.015| 0|
> offline|1970-01-01 08:00:00.010| 1|
> |+I| 0|1970-01-01 08:00:00.020| 0|
> online|1970-01-01 08:00:00.020| 1|
> |+I| 0|1970-01-01 08:00:00.025| 0|
> online|1970-01-01 08:00:00.020| 1|
>
> After filtering with predicate state = 'online', I expect only the two rows
> with state offline will be filtered out. But I got the following result:
> |op| id| ts| id0|
> state| ts0| row_num|
> |+I| 0|1970-01-01 08:00:00.020| 0|
> online|1970-01-01 08:00:00.020| 1|
> |+I| 0|1970-01-01 08:00:00.025| 0|
> online|1970-01-01 08:00:00.020| 1|
>
>
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)