[
https://issues.apache.org/jira/browse/FLINK-23219?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17376288#comment-17376288
]
waywtdcc edited comment on FLINK-23219 at 7/7/21, 6:00 AM:
-----------------------------------------------------------
Hi [~TsReaper]
Okay, I see, there are many statuses on my side that I won’t be able to access
in the future. We have this kind of use event time to left join, and it is the
latest data of the dimension table, and the dimension table data has the
requirement of expiration time. Is there any good solution for this scenario?
was (Author: waywtdcc):
Hi [~TsReaper]
Okay, I see, there are many statuses on my side that I won’t be able to access
in the future. We have this kind of use event time to left join, and it is the
latest data of the dimension table, and the dimension table data has the need
for expiration time
> temproary join ttl configruation does not take effect
> -----------------------------------------------------
>
> Key: FLINK-23219
> URL: https://issues.apache.org/jira/browse/FLINK-23219
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / API, Table SQL / Runtime
> Affects Versions: 1.12.2
> Reporter: waywtdcc
> Priority: Major
> Labels: flink, pull-request-available, sql
> Attachments: image-2021-07-02-16-29-40-310.png
>
>
> * version: flink 1.12.2
> * problem: I run the job of table A temproary left join table B, and set
> the table.exec.state.ttl configuration
> to 3 hour or 2 sencond for test. But the task status keeps growing for more
> than 7 days.
> * code
> {code:java}
> tableEnv.getConfig().setIdleStateRetention(Duration.ofSeconds(2));
> tableEnv.executeSql("drop table if exists persons_table_kafka2");
> String kafka_source_sql = "CREATE TABLE persons_table_kafka2 (\n" +
> " `id` BIGINT,\n" +
> " `name` STRING,\n" +
> " `age` INT,\n" +
> " proctime as PROCTIME(),\n" +
> " `ts` TIMESTAMP(3),\n" +
> " WATERMARK FOR ts AS ts\n" +
> ") WITH (\n" +
> " 'connector' = 'kafka',\n" +
> " 'topic' = 'persons_test_auto',\n" +
> " 'properties.bootstrap.servers' = 'node2:6667',\n" +
> " 'properties.group.id' = 'testGrodsu1765',\n" +
> " 'scan.startup.mode' = 'group-offsets',\n" +
> " 'format' = 'json'\n" +
> ")";
> tableEnv.executeSql(kafka_source_sql);
> tableEnv.executeSql("drop table if exists persons_message_table_kafka2");
> String kafka_source_sql2 = "CREATE TABLE persons_message_table_kafka2 (\n" +
> " `id` BIGINT,\n" +
> " `name` STRING,\n" +
> " `message` STRING,\n" +
> " `ts` TIMESTAMP(3) ," +
> // " WATERMARK FOR ts AS ts - INTERVAL '5' SECOND\n" +
> " WATERMARK FOR ts AS ts\n" +
> ") WITH (\n" +
> " 'connector' = 'kafka',\n" +
> " 'topic' = 'persons_extra_message_auto',\n" +
> " 'properties.bootstrap.servers' = 'node2:6667',\n" +
> " 'properties.group.id' = 'testGroud125313',\n" +
> " 'scan.startup.mode' = 'group-offsets',\n" +
> " 'format' = 'json'\n" +
> ")";
> tableEnv.executeSql(kafka_source_sql2);
> tableEnv.executeSql(
> "CREATE TEMPORARY VIEW persons_message_table22 AS \n" +
> "SELECT id, name, message,ts \n" +
> " FROM (\n" +
> " SELECT *,\n" +
> " ROW_NUMBER() OVER (PARTITION BY name \n" +
> " ORDER BY ts DESC) AS rowNum \n" +
> " FROM persons_message_table_kafka2 " +
> " )\n" +
> "WHERE rowNum = 1");
> tableEnv.executeSql("" +
> "CREATE TEMPORARY VIEW result_data_view " +
> " as " +
> " select " +
> " t1.name, t1.age,t1.ts as ts1,t2.message, cast (t2.ts as string) as ts2 " +
> " from persons_table_kafka2 t1 " +
> " left join persons_message_table22 FOR SYSTEM_TIME AS OF t1.ts AS t2 on
> t1.name = t2.name "
> );
> Table resultTable = tableEnv.from("result_data_view");
> DataStream<RowData> rowDataDataStream = tableEnv.toAppendStream(resultTable,
> RowData.class);
> rowDataDataStream.print();
> env.execute("test_it");
> {code}
> * the result like !image-2021-07-02-16-29-40-310.png!
--
This message was sent by Atlassian Jira
(v8.3.4#803005)