[ 
https://issues.apache.org/jira/browse/FLINK-23379?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young updated FLINK-23379:
-------------------------------
    Fix Version/s: 1.15.0

>  interval left join null value result out of order
> --------------------------------------------------
>
>                 Key: FLINK-23379
>                 URL: https://issues.apache.org/jira/browse/FLINK-23379
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Runtime
>    Affects Versions: 1.12.2
>            Reporter: waywtdcc
>            Priority: Major
>             Fix For: 1.15.0
>
>         Attachments: image-2021-07-15-16-53-59-228.png
>
>
> * Scenes:
>  Person main table left interval join associated message information table,
>  The first record that is not associated with the message information table 
> will be later than the later record that is associated with the message 
> information table.
>  When there are normal output and null value output with the same primary 
> key, it will be out of order, and the null value output is later than the 
> normal value output, resulting in incorrect results
> enter:
> {"id": 1, "name":"chencc2", "message": "good boy2", "ts":"2021-03-26 
> 18:56:43"}
> {"id": 1, "name":"chencc2", "age": "28", "ts":"2021-03-26 19:02:47"}
> {"id": 1, "name":"chencc2", "message": "good boy3", "ts":"2021-03-26 
> 19:06:43"}
> {"id": 1, "name":"chencc2", "age": "27", "ts":"2021-03-26 19:06:47"}
> Output:
>  +I(chencc2,27,2021-03-26T19:06:47,good boy3,2021-03-26 19:06:43.000)
>  +I(chencc2,28,2021-03-26T19:02:47,null,null)
>  The time of the second record here is 19:02 earlier than the first record, 
> but the output of the result is late, causing data update errors
>  
>  *  code
> {code:java}
> tableEnv.executeSql("drop table if exists persons_table_kafka2");
>          String kafka_source_sql = "CREATE TABLE persons_table_kafka2 (\n" +
>                  "  `id` BIGINT,\n" +
>                  "  `name` STRING,\n" +
>                  "  `age` INT,\n" +
>                  "  proctime as PROCTIME(),\n" +
>                  "  `ts` TIMESTAMP(3),\n" +
>                  " WATERMARK FOR ts AS ts\n" +
>                  ") WITH (\n" +
>                  "  'connector' = 'kafka',\n" +
>                  "  'topic' = 'persons_test2',\n" +
>                  "  'properties.bootstrap.servers' = 'node2:6667',\n" +
>                  "  'properties.group.id' = 'testGroa115',\n" +
>                  "  'scan.startup.mode' = 'group-offsets',\n" +
>                  "  'format' = 'json'\n" +
>                  ")";
>          tableEnv.executeSql(kafka_source_sql);
>         tableEnv.executeSql("drop table if exists 
> persons_message_table_kafka2");
>          String kafka_source_sql2 = "CREATE TABLE 
> persons_message_table_kafka2 (\n" +
>                  "  `id` BIGINT,\n" +
>                  "  `name` STRING,\n" +
>                  "  `message` STRING,\n" +
>                  "  `ts` TIMESTAMP(3) ," +
>  //                " WATERMARK FOR ts AS ts - INTERVAL '5' SECOND\n" +
>                  " WATERMARK FOR ts AS ts\n" +
>                  ") WITH (\n" +
>                  "  'connector' = 'kafka',\n" +
>                  "  'topic' = 'persons_extra_message2',\n" +
>                  "  'properties.bootstrap.servers' = 'node2:6667',\n" +
>                  "  'properties.group.id' = 'testGroud2e313',\n" +
>                  "  'scan.startup.mode' = 'group-offsets',\n" +
>                  "  'format' = 'json'\n" +
>                  ")";
>          tableEnv.executeSql(kafka_source_sql2);
>         tableEnv.executeSql("" +
>                  "CREATE TEMPORARY VIEW result_data_view " +
>                  " as " +
>                  " select " +
>                  " t1.name, t1.age,t1.ts as ts1,t2.message, cast (t2.ts as 
> string) as ts2 " +
>                  " from persons_table_kafka2 t1 " +
>                  " left  join persons_message_table_kafka2 t2 on t1.name = 
> t2.name and t1.ts between " +
>                          " t2.ts and t2.ts +  INTERVAL '3' MINUTE"
>                  );
>         Table resultTable = tableEnv.from("result_data_view");
>          DataStream<RowData> rowDataDataStream = 
> tableEnv.toAppendStream(resultTable, RowData.class);
>          rowDataDataStream.print();
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to