zzwqqq created FLINK-38751:
------------------------------
Summary: RowTimeRangeBoundedPrecedingFunction incorrectly cleans
lastTriggeringTsState, may not discard expired data
Key: FLINK-38751
URL: https://issues.apache.org/jira/browse/FLINK-38751
Project: Flink
Issue Type: Bug
Components: Table SQL / Runtime
Affects Versions: 2.1.1, 1.20.1, 1.18.1, 1.19.0, 2.0.0
Reporter: zzwqqq
When watermark is advanced and triggers cleanup, lastTriggeringTsState is
cleared. Then all data would be processed until lastTriggeringTsState updated,
and expired data would not be correctly dropped.
{code:java}
public void processElement(
RowData input,
KeyedProcessFunction<K, RowData, RowData>.Context ctx,
Collector<RowData> out)
throws Exception {
// triggering timestamp for trigger calculation
long triggeringTs = input.getLong(rowTimeIdx);
Long lastTriggeringTs = lastTriggeringTsState.value();
if (lastTriggeringTs == null) {
lastTriggeringTs = 0L;
}
// check if the data is expired, if not, save the data and register event
time timer
if (triggeringTs > lastTriggeringTs) {
// process data
...
} else {
numLateRecordsDropped.inc();
}
} {code}
Steps to reproduce, the two late records are not dropped now.
{code:java}
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.util.FileUtils;
import java.io.File;
import java.io.IOException;
public class OverAggExample {
public static void main(String[] args) throws Exception {
// set up execution environment
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
tEnv.getConfig().set("parallelism.default", "1");
// write source data into temporary file and get the absolute path
String contents =
"1,beer,3,2019-12-12 00:00:01\n"
+ "1,diaper,4,2019-12-12 00:00:02\n"
+ "2,pen,3,2019-12-12 00:00:04\n"
+ "3,dirty,2,2020-12-12 00:00:05\n"
+ "4,dirty,1,2020-12-12 00:00:08\n"
+ "5,diaper,5,1998-12-12 00:00:09\n" // late record
+ "5,diaper,5,1999-12-12 00:00:09\n" // late record
+ "5,diaper,5,2020-12-12 00:00:09\n";
String path = createTempFile(contents);
// register table via DDL with watermark,
// the events are out of order, hence, we use 3 seconds to wait the
late events
String ddl =
"CREATE TABLE orders (\n"
+ " user_id INT,\n"
+ " product STRING,\n"
+ " amount INT,\n"
+ " ts TIMESTAMP(3),\n"
+ " WATERMARK FOR ts AS ts - INTERVAL '3' SECOND\n"
+ ") WITH (\n"
+ " 'connector.type' = 'filesystem',\n"
+ " 'connector.path' = '"
+ path
+ "',\n"
+ " 'format.type' = 'csv'\n"
+ ")";
tEnv.executeSql(ddl);
// Watermark is advanced, but no elements are processed by
RowTimeRangeBoundedPrecedingFunction
String filter = "CREATE VIEW filter_view as select * from orders where
product <> 'dirty'";
tEnv.executeSql(filter);
String overagg = "SELECT *, sum(amount) OVER (ORDER BY ts RANGE BETWEEN
INTERVAL '1' HOURS PRECEDING AND CURRENT ROW) AS sum_amount FROM filter_view;";
tEnv.executeSql(overagg).print();
// output
//+----+-------------+--------------------------------+-------------+-------------------------+-------------+
//| op | user_id | product | amount |
ts | sum_amount |
//+----+-------------+--------------------------------+-------------+-------------------------+-------------+
//| +I | 1 | beer | 3 |
2019-12-12 00:00:01.000 | 3 |
//| +I | 1 | diaper | 4 |
2019-12-12 00:00:02.000 | 7 |
//| +I | 2 | pen | 3 |
2019-12-12 00:00:04.000 | 10 |
//| +I | 5 | diaper | 5 |
1998-12-12 00:00:09.000 | 5 |
//| +I | 5 | diaper | 5 |
1999-12-12 00:00:09.000 | 5 |
//| +I | 5 | diaper | 5 |
2020-12-12 00:00:09.000 | 5 |
//+----+-------------+--------------------------------+-------------+-------------------------+-------------+
}
/** Creates a temporary file with the contents and returns the absolute
path. */
private static String createTempFile(String contents) throws IOException {
File tempFile = File.createTempFile("orders", ".csv");
tempFile.deleteOnExit();
FileUtils.writeFileUtf8(tempFile, contents);
return tempFile.toURI().toString();
}
}
{code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)