zzwqqq created FLINK-38751:
------------------------------

             Summary: RowTimeRangeBoundedPrecedingFunction incorrectly cleans 
lastTriggeringTsState, may not discard expired data
                 Key: FLINK-38751
                 URL: https://issues.apache.org/jira/browse/FLINK-38751
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Runtime
    Affects Versions: 2.1.1, 1.20.1, 1.18.1, 1.19.0, 2.0.0
            Reporter: zzwqqq


When watermark is advanced and triggers cleanup, lastTriggeringTsState is 
cleared. Then all data would be processed until lastTriggeringTsState updated,  
and expired data would not be correctly dropped.
{code:java}
public void processElement(
        RowData input,
        KeyedProcessFunction<K, RowData, RowData>.Context ctx,
        Collector<RowData> out)
        throws Exception {
    // triggering timestamp for trigger calculation
    long triggeringTs = input.getLong(rowTimeIdx);

    Long lastTriggeringTs = lastTriggeringTsState.value();
    if (lastTriggeringTs == null) {
        lastTriggeringTs = 0L;
    }

    // check if the data is expired, if not, save the data and register event 
time timer
    if (triggeringTs > lastTriggeringTs) {
      // process data
       ...
    } else {
        numLateRecordsDropped.inc();
    }
} {code}
 Steps to reproduce, the two late records are not dropped now.
{code:java}
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.util.FileUtils;

import java.io.File;
import java.io.IOException;

public class OverAggExample {

    public static void main(String[] args) throws Exception {
        // set up execution environment
        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
        tEnv.getConfig().set("parallelism.default", "1");

        // write source data into temporary file and get the absolute path
        String contents =
                "1,beer,3,2019-12-12 00:00:01\n"
                        + "1,diaper,4,2019-12-12 00:00:02\n"
                        + "2,pen,3,2019-12-12 00:00:04\n"
                        + "3,dirty,2,2020-12-12 00:00:05\n"
                        + "4,dirty,1,2020-12-12 00:00:08\n"
                        + "5,diaper,5,1998-12-12 00:00:09\n" // late record
                        + "5,diaper,5,1999-12-12 00:00:09\n" // late record
                        + "5,diaper,5,2020-12-12 00:00:09\n";
        String path = createTempFile(contents);

        // register table via DDL with watermark,
        // the events are out of order, hence, we use 3 seconds to wait the 
late events
        String ddl =
                "CREATE TABLE orders (\n"
                        + "  user_id INT,\n"
                        + "  product STRING,\n"
                        + "  amount INT,\n"
                        + "  ts TIMESTAMP(3),\n"
                        + "  WATERMARK FOR ts AS ts - INTERVAL '3' SECOND\n"
                        + ") WITH (\n"
                        + "  'connector.type' = 'filesystem',\n"
                        + "  'connector.path' = '"
                        + path
                        + "',\n"
                        + "  'format.type' = 'csv'\n"
                        + ")";
        tEnv.executeSql(ddl);

        // Watermark is advanced, but no elements are processed by 
RowTimeRangeBoundedPrecedingFunction
        String filter = "CREATE VIEW filter_view as select * from orders where 
product <> 'dirty'";
        tEnv.executeSql(filter);

        String overagg = "SELECT *, sum(amount) OVER (ORDER BY ts RANGE BETWEEN 
INTERVAL '1' HOURS PRECEDING AND CURRENT ROW) AS sum_amount FROM filter_view;";

        tEnv.executeSql(overagg).print();

        // output
        
//+----+-------------+--------------------------------+-------------+-------------------------+-------------+
        //| op |     user_id |                        product |      amount |   
                   ts |  sum_amount |
        
//+----+-------------+--------------------------------+-------------+-------------------------+-------------+
        //| +I |           1 |                           beer |           3 | 
2019-12-12 00:00:01.000 |           3 |
        //| +I |           1 |                         diaper |           4 | 
2019-12-12 00:00:02.000 |           7 |
        //| +I |           2 |                            pen |           3 | 
2019-12-12 00:00:04.000 |          10 |
        //| +I |           5 |                         diaper |           5 | 
1998-12-12 00:00:09.000 |           5 |
        //| +I |           5 |                         diaper |           5 | 
1999-12-12 00:00:09.000 |           5 |
        //| +I |           5 |                         diaper |           5 | 
2020-12-12 00:00:09.000 |           5 |
        
//+----+-------------+--------------------------------+-------------+-------------------------+-------------+
    }

    /** Creates a temporary file with the contents and returns the absolute 
path. */
    private static String createTempFile(String contents) throws IOException {
        File tempFile = File.createTempFile("orders", ".csv");
        tempFile.deleteOnExit();
        FileUtils.writeFileUtf8(tempFile, contents);
        return tempFile.toURI().toString();
    }
}
{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to