[
https://issues.apache.org/jira/browse/FLINK-38751?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18042204#comment-18042204
]
Dongsheng He commented on FLINK-38751:
--------------------------------------
We've seen this happen before. Expired data was incorrectly outputting in the
final calculation results.
> RowTimeRangeBoundedPrecedingFunction incorrectly cleans
> lastTriggeringTsState, may not discard expired data
> -----------------------------------------------------------------------------------------------------------
>
> Key: FLINK-38751
> URL: https://issues.apache.org/jira/browse/FLINK-38751
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Runtime
> Affects Versions: 2.0.0, 1.19.0, 1.18.1, 1.20.1, 2.1.1
> Reporter: zzwqqq
> Priority: Major
> Original Estimate: 168h
> Remaining Estimate: 168h
>
> When watermark is advanced and triggers cleanup, lastTriggeringTsState is
> cleared. Then all data would be processed until lastTriggeringTsState
> updated, and expired data would not be correctly dropped.
> {code:java}
> public void processElement(
> RowData input,
> KeyedProcessFunction<K, RowData, RowData>.Context ctx,
> Collector<RowData> out)
> throws Exception {
> // triggering timestamp for trigger calculation
> long triggeringTs = input.getLong(rowTimeIdx);
> Long lastTriggeringTs = lastTriggeringTsState.value();
> if (lastTriggeringTs == null) {
> lastTriggeringTs = 0L;
> }
> // check if the data is expired, if not, save the data and register event
> time timer
> if (triggeringTs > lastTriggeringTs) {
> // process data
> ...
> } else {
> numLateRecordsDropped.inc();
> }
> } {code}
> Steps to reproduce, the two late records are not dropped now.
> {code:java}
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
> import org.apache.flink.util.FileUtils;
> import java.io.File;
> import java.io.IOException;
> public class OverAggExample {
> public static void main(String[] args) throws Exception {
> // set up execution environment
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
> tEnv.getConfig().set("parallelism.default", "1");
> // write source data into temporary file and get the absolute path
> String contents =
> "1,beer,3,2019-12-12 00:00:01\n"
> + "1,diaper,4,2019-12-12 00:00:02\n"
> + "2,pen,3,2019-12-12 00:00:04\n"
> + "3,dirty,2,2020-12-12 00:00:05\n"
> + "4,dirty,1,2020-12-12 00:00:08\n"
> + "5,diaper,5,1998-12-12 00:00:09\n" // late record
> + "5,diaper,5,1999-12-12 00:00:09\n" // late record
> + "5,diaper,5,2020-12-12 00:00:09\n";
> String path = createTempFile(contents);
> // register table via DDL with watermark,
> // the events are out of order, hence, we use 3 seconds to wait the
> late events
> String ddl =
> "CREATE TABLE orders (\n"
> + " user_id INT,\n"
> + " product STRING,\n"
> + " amount INT,\n"
> + " ts TIMESTAMP(3),\n"
> + " WATERMARK FOR ts AS ts - INTERVAL '3' SECOND\n"
> + ") WITH (\n"
> + " 'connector.type' = 'filesystem',\n"
> + " 'connector.path' = '"
> + path
> + "',\n"
> + " 'format.type' = 'csv'\n"
> + ")";
> tEnv.executeSql(ddl);
> // Watermark is advanced, but no elements are processed by
> RowTimeRangeBoundedPrecedingFunction
> String filter = "CREATE VIEW filter_view as select * from orders
> where product <> 'dirty'";
> tEnv.executeSql(filter);
> String overagg = "SELECT *, sum(amount) OVER (ORDER BY ts RANGE
> BETWEEN INTERVAL '1' HOURS PRECEDING AND CURRENT ROW) AS sum_amount FROM
> filter_view;";
> tEnv.executeSql(overagg).print();
> // output
>
> //+----+-------------+--------------------------------+-------------+-------------------------+-------------+
> //| op | user_id | product | amount |
> ts | sum_amount |
>
> //+----+-------------+--------------------------------+-------------+-------------------------+-------------+
> //| +I | 1 | beer | 3 |
> 2019-12-12 00:00:01.000 | 3 |
> //| +I | 1 | diaper | 4 |
> 2019-12-12 00:00:02.000 | 7 |
> //| +I | 2 | pen | 3 |
> 2019-12-12 00:00:04.000 | 10 |
> //| +I | 5 | diaper | 5 |
> 1998-12-12 00:00:09.000 | 5 |
> //| +I | 5 | diaper | 5 |
> 1999-12-12 00:00:09.000 | 5 |
> //| +I | 5 | diaper | 5 |
> 2020-12-12 00:00:09.000 | 5 |
>
> //+----+-------------+--------------------------------+-------------+-------------------------+-------------+
> }
> /** Creates a temporary file with the contents and returns the absolute
> path. */
> private static String createTempFile(String contents) throws IOException {
> File tempFile = File.createTempFile("orders", ".csv");
> tempFile.deleteOnExit();
> FileUtils.writeFileUtf8(tempFile, contents);
> return tempFile.toURI().toString();
> }
> }
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)