[
https://issues.apache.org/jira/browse/FLINK-23379?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Kurt Young updated FLINK-23379:
-------------------------------
Fix Version/s: 1.15.0
> interval left join null value result out of order
> --------------------------------------------------
>
> Key: FLINK-23379
> URL: https://issues.apache.org/jira/browse/FLINK-23379
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Runtime
> Affects Versions: 1.12.2
> Reporter: waywtdcc
> Priority: Major
> Fix For: 1.15.0
>
> Attachments: image-2021-07-15-16-53-59-228.png
>
>
> * Scenes:
> Person main table left interval join associated message information table,
> The first record that is not associated with the message information table
> will be later than the later record that is associated with the message
> information table.
> When there are normal output and null value output with the same primary
> key, it will be out of order, and the null value output is later than the
> normal value output, resulting in incorrect results
> enter:
> {"id": 1, "name":"chencc2", "message": "good boy2", "ts":"2021-03-26
> 18:56:43"}
> {"id": 1, "name":"chencc2", "age": "28", "ts":"2021-03-26 19:02:47"}
> {"id": 1, "name":"chencc2", "message": "good boy3", "ts":"2021-03-26
> 19:06:43"}
> {"id": 1, "name":"chencc2", "age": "27", "ts":"2021-03-26 19:06:47"}
> Output:
> +I(chencc2,27,2021-03-26T19:06:47,good boy3,2021-03-26 19:06:43.000)
> +I(chencc2,28,2021-03-26T19:02:47,null,null)
> The time of the second record here is 19:02 earlier than the first record,
> but the output of the result is late, causing data update errors
>
> * code
> {code:java}
> tableEnv.executeSql("drop table if exists persons_table_kafka2");
> String kafka_source_sql = "CREATE TABLE persons_table_kafka2 (\n" +
> " `id` BIGINT,\n" +
> " `name` STRING,\n" +
> " `age` INT,\n" +
> " proctime as PROCTIME(),\n" +
> " `ts` TIMESTAMP(3),\n" +
> " WATERMARK FOR ts AS ts\n" +
> ") WITH (\n" +
> " 'connector' = 'kafka',\n" +
> " 'topic' = 'persons_test2',\n" +
> " 'properties.bootstrap.servers' = 'node2:6667',\n" +
> " 'properties.group.id' = 'testGroa115',\n" +
> " 'scan.startup.mode' = 'group-offsets',\n" +
> " 'format' = 'json'\n" +
> ")";
> tableEnv.executeSql(kafka_source_sql);
> tableEnv.executeSql("drop table if exists
> persons_message_table_kafka2");
> String kafka_source_sql2 = "CREATE TABLE
> persons_message_table_kafka2 (\n" +
> " `id` BIGINT,\n" +
> " `name` STRING,\n" +
> " `message` STRING,\n" +
> " `ts` TIMESTAMP(3) ," +
> // " WATERMARK FOR ts AS ts - INTERVAL '5' SECOND\n" +
> " WATERMARK FOR ts AS ts\n" +
> ") WITH (\n" +
> " 'connector' = 'kafka',\n" +
> " 'topic' = 'persons_extra_message2',\n" +
> " 'properties.bootstrap.servers' = 'node2:6667',\n" +
> " 'properties.group.id' = 'testGroud2e313',\n" +
> " 'scan.startup.mode' = 'group-offsets',\n" +
> " 'format' = 'json'\n" +
> ")";
> tableEnv.executeSql(kafka_source_sql2);
> tableEnv.executeSql("" +
> "CREATE TEMPORARY VIEW result_data_view " +
> " as " +
> " select " +
> " t1.name, t1.age,t1.ts as ts1,t2.message, cast (t2.ts as
> string) as ts2 " +
> " from persons_table_kafka2 t1 " +
> " left join persons_message_table_kafka2 t2 on t1.name =
> t2.name and t1.ts between " +
> " t2.ts and t2.ts + INTERVAL '3' MINUTE"
> );
> Table resultTable = tableEnv.from("result_data_view");
> DataStream<RowData> rowDataDataStream =
> tableEnv.toAppendStream(resultTable, RowData.class);
> rowDataDataStream.print();
> {code}
>
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)