[ 
https://issues.apache.org/jira/browse/FLINK-38751?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18047209#comment-18047209
 ] 

zzwqqq commented on FLINK-38751:
--------------------------------

Hi [Xuyang 
Zhong|https://issues.apache.org/jira/secure/ViewProfile.jspa?name=xuyangzhong] 
, could you please take a look at the issue? If the proposed solution is 
acceptable, I’d appreciate it if you could assign it to me so I can proceed. 

> RowTimeRangeBoundedPrecedingFunction incorrectly cleans 
> lastTriggeringTsState, may not discard expired data
> -----------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-38751
>                 URL: https://issues.apache.org/jira/browse/FLINK-38751
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Runtime
>    Affects Versions: 2.0.0, 1.19.0, 1.18.1, 1.20.1, 2.1.1
>            Reporter: zzwqqq
>            Priority: Major
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> When watermark is advanced and triggers cleanup, lastTriggeringTsState is 
> cleared. Then all data would be processed until lastTriggeringTsState 
> updated,  and expired data would not be correctly dropped.
> {code:java}
> public void processElement(
>         RowData input,
>         KeyedProcessFunction<K, RowData, RowData>.Context ctx,
>         Collector<RowData> out)
>         throws Exception {
>     // triggering timestamp for trigger calculation
>     long triggeringTs = input.getLong(rowTimeIdx);
>     Long lastTriggeringTs = lastTriggeringTsState.value();
>     if (lastTriggeringTs == null) {
>         lastTriggeringTs = 0L;
>     }
>     // check if the data is expired, if not, save the data and register event 
> time timer
>     if (triggeringTs > lastTriggeringTs) {
>       // process data
>        ...
>     } else {
>         numLateRecordsDropped.inc();
>     }
> } {code}
>  Steps to reproduce, the two late records are not dropped now.
> {code:java}
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
> import org.apache.flink.util.FileUtils;
> import java.io.File;
> import java.io.IOException;
> public class OverAggExample {
>     public static void main(String[] args) throws Exception {
>         // set up execution environment
>         StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>         StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
>         tEnv.getConfig().set("parallelism.default", "1");
>         // write source data into temporary file and get the absolute path
>         String contents =
>                 "1,beer,3,2019-12-12 00:00:01\n"
>                         + "1,diaper,4,2019-12-12 00:00:02\n"
>                         + "2,pen,3,2019-12-12 00:00:04\n"
>                         + "3,dirty,2,2020-12-12 00:00:05\n"
>                         + "4,dirty,1,2020-12-12 00:00:08\n"
>                         + "5,diaper,5,1998-12-12 00:00:09\n" // late record
>                         + "5,diaper,5,1999-12-12 00:00:09\n" // late record
>                         + "5,diaper,5,2020-12-12 00:00:09\n";
>         String path = createTempFile(contents);
>         // register table via DDL with watermark,
>         // the events are out of order, hence, we use 3 seconds to wait the 
> late events
>         String ddl =
>                 "CREATE TABLE orders (\n"
>                         + "  user_id INT,\n"
>                         + "  product STRING,\n"
>                         + "  amount INT,\n"
>                         + "  ts TIMESTAMP(3),\n"
>                         + "  WATERMARK FOR ts AS ts - INTERVAL '3' SECOND\n"
>                         + ") WITH (\n"
>                         + "  'connector.type' = 'filesystem',\n"
>                         + "  'connector.path' = '"
>                         + path
>                         + "',\n"
>                         + "  'format.type' = 'csv'\n"
>                         + ")";
>         tEnv.executeSql(ddl);
>         // Watermark is advanced, but no elements are processed by 
> RowTimeRangeBoundedPrecedingFunction
>         String filter = "CREATE VIEW filter_view as select * from orders 
> where product <> 'dirty'";
>         tEnv.executeSql(filter);
>         String overagg = "SELECT *, sum(amount) OVER (ORDER BY ts RANGE 
> BETWEEN INTERVAL '1' HOURS PRECEDING AND CURRENT ROW) AS sum_amount FROM 
> filter_view;";
>         tEnv.executeSql(overagg).print();
>         // output
>         
> //+----+-------------+--------------------------------+-------------+-------------------------+-------------+
>         //| op |     user_id |                        product |      amount | 
>                      ts |  sum_amount |
>         
> //+----+-------------+--------------------------------+-------------+-------------------------+-------------+
>         //| +I |           1 |                           beer |           3 | 
> 2019-12-12 00:00:01.000 |           3 |
>         //| +I |           1 |                         diaper |           4 | 
> 2019-12-12 00:00:02.000 |           7 |
>         //| +I |           2 |                            pen |           3 | 
> 2019-12-12 00:00:04.000 |          10 |
>         //| +I |           5 |                         diaper |           5 | 
> 1998-12-12 00:00:09.000 |           5 |
>         //| +I |           5 |                         diaper |           5 | 
> 1999-12-12 00:00:09.000 |           5 |
>         //| +I |           5 |                         diaper |           5 | 
> 2020-12-12 00:00:09.000 |           5 |
>         
> //+----+-------------+--------------------------------+-------------+-------------------------+-------------+
>     }
>     /** Creates a temporary file with the contents and returns the absolute 
> path. */
>     private static String createTempFile(String contents) throws IOException {
>         File tempFile = File.createTempFile("orders", ".csv");
>         tempFile.deleteOnExit();
>         FileUtils.writeFileUtf8(tempFile, contents);
>         return tempFile.toURI().toString();
>     }
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to