[
https://issues.apache.org/jira/browse/FLINK-33722?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Dawid Wysakowicz reassigned FLINK-33722:
----------------------------------------
Assignee: Grzegorz Kołakowski
> MATCH_RECOGNIZE in batch mode ignores events order
> --------------------------------------------------
>
> Key: FLINK-33722
> URL: https://issues.apache.org/jira/browse/FLINK-33722
> Project: Flink
> Issue Type: Bug
> Components: Library / CEP
> Affects Versions: 1.17.1
> Reporter: Grzegorz Kołakowski
> Assignee: Grzegorz Kołakowski
> Priority: Major
> Labels: pull-request-available
> Fix For: 2.0.0
>
>
> MATCH_RECOGNIZE in batch mode seems to ignore ORDER BY clause. Let's consider
> the following example:
> {code:sql}
> FROM events
> MATCH_RECOGNIZE (
> PARTITION BY user_id
> ORDER BY ts ASC
> MEASURES
> FIRST(A.ts) as _start,
> LAST(A.ts) as _middle,
> LAST(B.ts) as _finish
> ONE ROW PER MATCH
> AFTER MATCH SKIP PAST LAST ROW
> PATTERN (A{2} B) WITHIN INTERVAL '2' HOURS
> DEFINE
> A AS active is false,
> B AS active is true
> ) AS T {code}
> where _events_ is a Postgresql table containing ~10000 records.
> {code:java}
> CREATE TABLE events (
> id INT,
> user_id INT,
> ts TIMESTAMP(3),
> active BOOLEAN,
> WATERMARK FOR ts AS ts - INTERVAL '5' SECOND,
> PRIMARY KEY (id) NOT ENFORCED
> ) WITH (
> 'connector' = 'jdbc',
> 'url' = 'jdbc:postgresql://postgres:5432/test',
> 'username' = 'test',
> 'password' = 'test',
> 'table-name' = 'events'
> ); {code}
> It can happen that _finish is smaller than _start or _middle, which is wrong.
> {noformat}
> user_id _start _middle
> _finish
> 1 2023-11-23 14:34:42.346 2023-11-23 14:34:48.370 2023-11-23
> 14:34:44.264{noformat}
>
> Repository where I reproduced the problem:
> [https://github.com/grzegorz8/flink-match-recognize-in-batch-debugging]
> ----
>
> According to [~dwysakowicz]: In BATCH the CepOperator is always created to
> process records in processing time:
> [https://github.com/apache/flink/blob/7f7bee70e3ac0d9fb27d7e09b41d6396b748dada/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecMatch.java#L54]
> A comparator is passed along to the operator covering the sorting on ts
> field:
> [https://github.com/apache/flink/blob/fea9ffedecf81a97de5c31519ade3bab8228e743/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecMatch.java#L173]
> but this is only secondary sorting. It is applied only within records of the
> same timestamp.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)