[
https://issues.apache.org/jira/browse/FLINK-38751?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18041613#comment-18041613
]
zzwqqq commented on FLINK-38751:
--------------------------------
I think we should not clear lastTriggeringTsState during the cleanup process.
lastTriggeringTsState should be retained throughout the function's lifecycle.
If the proposed solution is acceptable, I'd be happy to implement the fix.
> RowTimeRangeBoundedPrecedingFunction incorrectly cleans
> lastTriggeringTsState, may not discard expired data
> -----------------------------------------------------------------------------------------------------------
>
> Key: FLINK-38751
> URL: https://issues.apache.org/jira/browse/FLINK-38751
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Runtime
> Affects Versions: 2.0.0, 1.19.0, 1.18.1, 1.20.1, 2.1.1
> Reporter: zzwqqq
> Priority: Major
> Original Estimate: 168h
> Remaining Estimate: 168h
>
> When watermark is advanced and triggers cleanup, lastTriggeringTsState is
> cleared. Then all data would be processed until lastTriggeringTsState
> updated, and expired data would not be correctly dropped.
> {code:java}
> public void processElement(
> RowData input,
> KeyedProcessFunction<K, RowData, RowData>.Context ctx,
> Collector<RowData> out)
> throws Exception {
> // triggering timestamp for trigger calculation
> long triggeringTs = input.getLong(rowTimeIdx);
> Long lastTriggeringTs = lastTriggeringTsState.value();
> if (lastTriggeringTs == null) {
> lastTriggeringTs = 0L;
> }
> // check if the data is expired, if not, save the data and register event
> time timer
> if (triggeringTs > lastTriggeringTs) {
> // process data
> ...
> } else {
> numLateRecordsDropped.inc();
> }
> } {code}
> Steps to reproduce, the two late records are not dropped now.
> {code:java}
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
> import org.apache.flink.util.FileUtils;
> import java.io.File;
> import java.io.IOException;
> public class OverAggExample {
> public static void main(String[] args) throws Exception {
> // set up execution environment
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
> tEnv.getConfig().set("parallelism.default", "1");
> // write source data into temporary file and get the absolute path
> String contents =
> "1,beer,3,2019-12-12 00:00:01\n"
> + "1,diaper,4,2019-12-12 00:00:02\n"
> + "2,pen,3,2019-12-12 00:00:04\n"
> + "3,dirty,2,2020-12-12 00:00:05\n"
> + "4,dirty,1,2020-12-12 00:00:08\n"
> + "5,diaper,5,1998-12-12 00:00:09\n" // late record
> + "5,diaper,5,1999-12-12 00:00:09\n" // late record
> + "5,diaper,5,2020-12-12 00:00:09\n";
> String path = createTempFile(contents);
> // register table via DDL with watermark,
> // the events are out of order, hence, we use 3 seconds to wait the
> late events
> String ddl =
> "CREATE TABLE orders (\n"
> + " user_id INT,\n"
> + " product STRING,\n"
> + " amount INT,\n"
> + " ts TIMESTAMP(3),\n"
> + " WATERMARK FOR ts AS ts - INTERVAL '3' SECOND\n"
> + ") WITH (\n"
> + " 'connector.type' = 'filesystem',\n"
> + " 'connector.path' = '"
> + path
> + "',\n"
> + " 'format.type' = 'csv'\n"
> + ")";
> tEnv.executeSql(ddl);
> // Watermark is advanced, but no elements are processed by
> RowTimeRangeBoundedPrecedingFunction
> String filter = "CREATE VIEW filter_view as select * from orders
> where product <> 'dirty'";
> tEnv.executeSql(filter);
> String overagg = "SELECT *, sum(amount) OVER (ORDER BY ts RANGE
> BETWEEN INTERVAL '1' HOURS PRECEDING AND CURRENT ROW) AS sum_amount FROM
> filter_view;";
> tEnv.executeSql(overagg).print();
> // output
>
> //+----+-------------+--------------------------------+-------------+-------------------------+-------------+
> //| op | user_id | product | amount |
> ts | sum_amount |
>
> //+----+-------------+--------------------------------+-------------+-------------------------+-------------+
> //| +I | 1 | beer | 3 |
> 2019-12-12 00:00:01.000 | 3 |
> //| +I | 1 | diaper | 4 |
> 2019-12-12 00:00:02.000 | 7 |
> //| +I | 2 | pen | 3 |
> 2019-12-12 00:00:04.000 | 10 |
> //| +I | 5 | diaper | 5 |
> 1998-12-12 00:00:09.000 | 5 |
> //| +I | 5 | diaper | 5 |
> 1999-12-12 00:00:09.000 | 5 |
> //| +I | 5 | diaper | 5 |
> 2020-12-12 00:00:09.000 | 5 |
>
> //+----+-------------+--------------------------------+-------------+-------------------------+-------------+
> }
> /** Creates a temporary file with the contents and returns the absolute
> path. */
> private static String createTempFile(String contents) throws IOException {
> File tempFile = File.createTempFile("orders", ".csv");
> tempFile.deleteOnExit();
> FileUtils.writeFileUtf8(tempFile, contents);
> return tempFile.toURI().toString();
> }
> }
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)