[
https://issues.apache.org/jira/browse/FLINK-28988?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Xuannan Su updated FLINK-28988:
-------------------------------
Description:
The following code can reproduce the case
{code:java}
public class TemporalJoinSQLExample1 {
public static void main(String[] args) throws Exception {
// set up the Java DataStream API
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// set up the Java Table API
final StreamTableEnvironment tableEnv =
StreamTableEnvironment.create(env);
final DataStreamSource<Tuple3<Integer, String, Instant>> ds =
env.fromElements(
new Tuple3<>(0, "online", Instant.ofEpochMilli(0)),
new Tuple3<>(0, "offline", Instant.ofEpochMilli(10)),
new Tuple3<>(0, "online", Instant.ofEpochMilli(20)));
final Table table =
tableEnv.fromDataStream(
ds,
Schema.newBuilder()
.column("f0", DataTypes.INT())
.column("f1", DataTypes.STRING())
.column("f2",
DataTypes.TIMESTAMP_LTZ(3))
.watermark("f2", "f2 - INTERVAL '2'
SECONDS")
.build())
.as("id", "state", "ts");
tableEnv.createTemporaryView("source_table", table);
final Table dedupeTable =
tableEnv.sqlQuery(
"SELECT * FROM ("
+ " SELECT *, ROW_NUMBER() OVER (PARTITION BY
id ORDER BY ts DESC) AS row_num FROM source_table"
+ ") WHERE row_num = 1");
tableEnv.createTemporaryView("versioned_table", dedupeTable);
DataStreamSource<Tuple2<Integer, Instant>> event =
env.fromElements(
new Tuple2<>(0, Instant.ofEpochMilli(0)),
new Tuple2<>(0, Instant.ofEpochMilli(5)),
new Tuple2<>(0, Instant.ofEpochMilli(10)),
new Tuple2<>(0, Instant.ofEpochMilli(15)),
new Tuple2<>(0, Instant.ofEpochMilli(20)),
new Tuple2<>(0, Instant.ofEpochMilli(25)));
final Table eventTable =
tableEnv.fromDataStream(
event,
Schema.newBuilder()
.column("f0", DataTypes.INT())
.column("f1",
DataTypes.TIMESTAMP_LTZ(3))
.watermark("f1", "f1 - INTERVAL '2'
SECONDS")
.build())
.as("id", "ts");
tableEnv.createTemporaryView("event_table", eventTable);
final Table result =
tableEnv.sqlQuery(
"SELECT * FROM event_table"
+ " LEFT JOIN versioned_table FOR SYSTEM_TIME
AS OF event_table.ts"
+ " ON event_table.id = versioned_table.id");
result.execute().print();
result.filter($("state").isEqual("online")).execute().print();
}
} {code}
The result of temporal join is the following:
|op| id| ts| id0|
state| ts0| row_num|
|+I| 0|1970-01-01 08:00:00.000| 0|
online|1970-01-01 08:00:00.000| 1|
|+I| 0|1970-01-01 08:00:00.005| 0|
online|1970-01-01 08:00:00.000| 1|
|+I| 0|1970-01-01 08:00:00.010| 0|
offline|1970-01-01 08:00:00.010| 1|
|+I| 0|1970-01-01 08:00:00.015| 0|
offline|1970-01-01 08:00:00.010| 1|
|+I| 0|1970-01-01 08:00:00.020| 0|
online|1970-01-01 08:00:00.020| 1|
|+I| 0|1970-01-01 08:00:00.025| 0|
online|1970-01-01 08:00:00.020| 1|
After filtering with predicate state = 'online', I expect only the two rows
with state offline will be filtered out. But I got the following result:
|op| id| ts| id0|
state| ts0| row_num|
|+I| 0|1970-01-01 08:00:00.020| 0|
online|1970-01-01 08:00:00.020| 1|
|+I| 0|1970-01-01 08:00:00.025| 0|
online|1970-01-01 08:00:00.020| 1|
was:
The following code can reproduce the case
{code:java}
public class TemporalJoinSQLExample1 {
public static void main(String[] args) throws Exception {
// set up the Java DataStream API
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// set up the Java Table API
final StreamTableEnvironment tableEnv =
StreamTableEnvironment.create(env);
final DataStreamSource<Tuple3<Integer, String, Instant>> ds =
env.fromElements(
new Tuple3<>(0, "online", Instant.ofEpochMilli(0)),
new Tuple3<>(0, "offline", Instant.ofEpochMilli(10)),
new Tuple3<>(0, "online", Instant.ofEpochMilli(20)));
final Table table =
tableEnv.fromDataStream(
ds,
Schema.newBuilder()
.column("f0", DataTypes.INT())
.column("f1", DataTypes.STRING())
.column("f2",
DataTypes.TIMESTAMP_LTZ(3))
.watermark("f2", "f2 - INTERVAL '2'
SECONDS")
.build())
.as("id", "state", "ts");
tableEnv.createTemporaryView("source_table", table);
final Table dedupeTable =
tableEnv.sqlQuery(
"SELECT * FROM ("
+ " SELECT *, ROW_NUMBER() OVER (PARTITION BY
id ORDER BY ts DESC) AS row_num FROM source_table"
+ ") WHERE row_num = 1");
tableEnv.createTemporaryView("versioned_table", dedupeTable);
DataStreamSource<Tuple2<Integer, Instant>> event =
env.fromElements(
new Tuple2<>(0, Instant.ofEpochMilli(0)),
new Tuple2<>(0, Instant.ofEpochMilli(5)),
new Tuple2<>(0, Instant.ofEpochMilli(10)),
new Tuple2<>(0, Instant.ofEpochMilli(15)),
new Tuple2<>(0, Instant.ofEpochMilli(20)),
new Tuple2<>(0, Instant.ofEpochMilli(25)));
final Table eventTable =
tableEnv.fromDataStream(
event,
Schema.newBuilder()
.column("f0", DataTypes.INT())
.column("f1",
DataTypes.TIMESTAMP_LTZ(3))
.watermark("f1", "f1 - INTERVAL '2'
SECONDS")
.build())
.as("id", "ts");
tableEnv.createTemporaryView("event_table", eventTable);
final Table result =
tableEnv.sqlQuery(
"SELECT * FROM event_table"
+ " LEFT JOIN versioned_table FOR SYSTEM_TIME
AS OF event_table.ts"
+ " ON event_table.id = versioned_table.id");
result.execute().print();
result.filter($("state").isEqual("online")).execute().print();
}
} {code}
The result of temporal join is the following:
+----+-------------+-------------------------+-------------+--------------------------------+-------------------------+----------------------+
| op | id | ts | id0 |
state | ts0 | row_num |
+----+-------------+-------------------------+-------------+--------------------------------+-------------------------+----------------------+
| +I | 0 | 1970-01-01 08:00:00.000 | 0 |
online | 1970-01-01 08:00:00.000 | 1 |
| +I | 0 | 1970-01-01 08:00:00.005 | 0 |
online | 1970-01-01 08:00:00.000 | 1 |
| +I | 0 | 1970-01-01 08:00:00.010 | 0 |
offline | 1970-01-01 08:00:00.010 | 1 |
| +I | 0 | 1970-01-01 08:00:00.015 | 0 |
offline | 1970-01-01 08:00:00.010 | 1 |
| +I | 0 | 1970-01-01 08:00:00.020 | 0 |
online | 1970-01-01 08:00:00.020 | 1 |
| +I | 0 | 1970-01-01 08:00:00.025 | 0 |
online | 1970-01-01 08:00:00.020 | 1 |
+----+-------------+-------------------------+-------------+--------------------------------+-------------------------+----------------------+
After filtering with predicate state = 'online', I expect only the two rows
with state offline will be filtered out. But I got the following result:
+----+-------------+-------------------------+-------------+--------------------------------+-------------------------+----------------------+
| op | id | ts | id0 |
state | ts0 | row_num |
+----+-------------+-------------------------+-------------+--------------------------------+-------------------------+----------------------+
| +I | 0 | 1970-01-01 08:00:00.020 | 0 |
online | 1970-01-01 08:00:00.020 | 1 |
| +I | 0 | 1970-01-01 08:00:00.025 | 0 |
online | 1970-01-01 08:00:00.020 | 1 |
+----+-------------+-------------------------+-------------+--------------------------------+-------------------------+----------------------+
> Incorrect result for filter after temporal join
> -----------------------------------------------
>
> Key: FLINK-28988
> URL: https://issues.apache.org/jira/browse/FLINK-28988
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / API
> Affects Versions: 1.15.1
> Reporter: Xuannan Su
> Priority: Major
>
> The following code can reproduce the case
>
> {code:java}
> public class TemporalJoinSQLExample1 {
> public static void main(String[] args) throws Exception {
> // set up the Java DataStream API
> final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> // set up the Java Table API
> final StreamTableEnvironment tableEnv =
> StreamTableEnvironment.create(env);
> final DataStreamSource<Tuple3<Integer, String, Instant>> ds =
> env.fromElements(
> new Tuple3<>(0, "online", Instant.ofEpochMilli(0)),
> new Tuple3<>(0, "offline", Instant.ofEpochMilli(10)),
> new Tuple3<>(0, "online", Instant.ofEpochMilli(20)));
> final Table table =
> tableEnv.fromDataStream(
> ds,
> Schema.newBuilder()
> .column("f0", DataTypes.INT())
> .column("f1", DataTypes.STRING())
> .column("f2",
> DataTypes.TIMESTAMP_LTZ(3))
> .watermark("f2", "f2 - INTERVAL '2'
> SECONDS")
> .build())
> .as("id", "state", "ts");
> tableEnv.createTemporaryView("source_table", table);
> final Table dedupeTable =
> tableEnv.sqlQuery(
> "SELECT * FROM ("
> + " SELECT *, ROW_NUMBER() OVER (PARTITION BY
> id ORDER BY ts DESC) AS row_num FROM source_table"
> + ") WHERE row_num = 1");
> tableEnv.createTemporaryView("versioned_table", dedupeTable);
> DataStreamSource<Tuple2<Integer, Instant>> event =
> env.fromElements(
> new Tuple2<>(0, Instant.ofEpochMilli(0)),
> new Tuple2<>(0, Instant.ofEpochMilli(5)),
> new Tuple2<>(0, Instant.ofEpochMilli(10)),
> new Tuple2<>(0, Instant.ofEpochMilli(15)),
> new Tuple2<>(0, Instant.ofEpochMilli(20)),
> new Tuple2<>(0, Instant.ofEpochMilli(25)));
> final Table eventTable =
> tableEnv.fromDataStream(
> event,
> Schema.newBuilder()
> .column("f0", DataTypes.INT())
> .column("f1",
> DataTypes.TIMESTAMP_LTZ(3))
> .watermark("f1", "f1 - INTERVAL '2'
> SECONDS")
> .build())
> .as("id", "ts");
> tableEnv.createTemporaryView("event_table", eventTable);
> final Table result =
> tableEnv.sqlQuery(
> "SELECT * FROM event_table"
> + " LEFT JOIN versioned_table FOR SYSTEM_TIME
> AS OF event_table.ts"
> + " ON event_table.id = versioned_table.id");
> result.execute().print();
> result.filter($("state").isEqual("online")).execute().print();
> }
> } {code}
>
> The result of temporal join is the following:
> |op| id| ts| id0|
> state| ts0| row_num|
> |+I| 0|1970-01-01 08:00:00.000| 0|
> online|1970-01-01 08:00:00.000| 1|
> |+I| 0|1970-01-01 08:00:00.005| 0|
> online|1970-01-01 08:00:00.000| 1|
> |+I| 0|1970-01-01 08:00:00.010| 0|
> offline|1970-01-01 08:00:00.010| 1|
> |+I| 0|1970-01-01 08:00:00.015| 0|
> offline|1970-01-01 08:00:00.010| 1|
> |+I| 0|1970-01-01 08:00:00.020| 0|
> online|1970-01-01 08:00:00.020| 1|
> |+I| 0|1970-01-01 08:00:00.025| 0|
> online|1970-01-01 08:00:00.020| 1|
>
> After filtering with predicate state = 'online', I expect only the two rows
> with state offline will be filtered out. But I got the following result:
> |op| id| ts| id0|
> state| ts0| row_num|
> |+I| 0|1970-01-01 08:00:00.020| 0|
> online|1970-01-01 08:00:00.020| 1|
> |+I| 0|1970-01-01 08:00:00.025| 0|
> online|1970-01-01 08:00:00.020| 1|
>
>
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)