[ 
https://issues.apache.org/jira/browse/FLINK-33722?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Grzegorz Kołakowski updated FLINK-33722:
----------------------------------------
    Description: 
MATCH_RECOGNIZE in batch mode seems to ignore ORDER BY clause. Let's consider 
the following example:
{code:sql}
    FROM events
        MATCH_RECOGNIZE (
            PARTITION BY user_id
            ORDER BY ts ASC
            MEASURES
                FIRST(A.ts) as _start,
                LAST(A.ts) as _middle,
                LAST(B.ts) as _finish
            ONE ROW PER MATCH
            AFTER MATCH SKIP PAST LAST ROW
            PATTERN (A{2} B) WITHIN INTERVAL '2' HOURS
            DEFINE
                A AS active is false,
                B AS active is true
        ) AS T {code}
where _events_ is a Postgresql table containing ~10000 records.
{code:java}
CREATE TABLE events (
  id INT,
  user_id INT,
  ts TIMESTAMP(3),
  active BOOLEAN,
  WATERMARK FOR ts AS ts - INTERVAL '5' SECOND,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:postgresql://postgres:5432/test',
    'username' = 'test',
    'password' = 'test',
    'table-name' = 'events'
); {code}
It can happen that _{_}finish{_} is smaller than _{_}start{_} or _{_}middle{_}, 
which is wrong.
{noformat}
   user_id                  _start                 _middle                 
_finish
         1 2023-11-23 14:34:42.346 2023-11-23 14:34:48.370 2023-11-23 
14:34:44.264{noformat}
 

Repository where I reproduced the problem: 
[https://github.com/grzegorz8/flink-match-recognize-in-batch-debugging]
----
 

According to [~dwysakowicz]:  In BATCH the CepOperator is always created to 
process records in processing time:
[https://github.com/apache/flink/blob/7f7bee70e3ac0d9fb27d7e09b41d6396b748dada/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecMatch.java#L54]
A comparator is passed along to the operator covering the sorting on ts field: 
[https://github.com/apache/flink/blob/fea9ffedecf81a97de5c31519ade3bab8228e743/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecMatch.java#L173]
 but this is only secondary sorting. It is applied only within records of the 
same timestamp.

  was:
MATCH_RECOGNIZE in batch mode seems to ignore ORDER BY clause. Let's consider 
the following example:
{code:sql}
    FROM events
        MATCH_RECOGNIZE (
            PARTITION BY user_id
            ORDER BY ts ASC
            MEASURES
                FIRST(A.ts) as _start,
                LAST(A.ts) as _middle,
                LAST(B.ts) as _finish
            ONE ROW PER MATCH
            AFTER MATCH SKIP PAST LAST ROW
            PATTERN (A{2} B) WITHIN INTERVAL '2' HOURS
            DEFINE
                A AS active is false,
                B AS active is true
        ) AS T {code}
where _events_ is a Postgresql table containing ~10000 records.
{code:java}
CREATE TABLE events (
  id INT,
  user_id INT,
  ts TIMESTAMP(3),
  active BOOLEAN,
  WATERMARK FOR ts AS ts - INTERVAL '5' SECOND,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:postgresql://postgres:5432/test',
    'username' = 'test',
    'password' = 'test',
    'table-name' = 'events'
); {code}
It can happen that __finish_ is smaller than __start_ or {_}_middle{_}, which 
is wrong.
{noformat}
   user_id                  _start                 _middle                 
_finish
         1 2023-11-23 14:34:42.346 2023-11-23 14:34:48.370 2023-11-23 
14:34:44.264{noformat}
 

Repository where I reproduced the problem: 
https://github.com/grzegorz8/flink-match-recognize-in-batch-debugging
----
 

According to [~dwysakowicz]:  In BATCH the CepOperator is always created to 
process records in processing time:
https://github.com/apache/flink/blob/7f7bee70e3ac0d9fb27d7e09b41d6396b748dada/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecMatch.java#L54
A comparator is passed along to the operator covering the sorting on ts field: 
https://github.com/apache/flink/blob/fea9ffedecf81a97de5c31519ade3bab8228e743/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecMatch.java#L173
 but this is only secondary sorting. It is applied only within records of the 
same timestamp.


> MATCH_RECOGNIZE in batch mode ignores events order
> --------------------------------------------------
>
>                 Key: FLINK-33722
>                 URL: https://issues.apache.org/jira/browse/FLINK-33722
>             Project: Flink
>          Issue Type: Bug
>          Components: Library / CEP
>    Affects Versions: 1.17.1
>            Reporter: Grzegorz Kołakowski
>            Priority: Major
>
> MATCH_RECOGNIZE in batch mode seems to ignore ORDER BY clause. Let's consider 
> the following example:
> {code:sql}
>     FROM events
>         MATCH_RECOGNIZE (
>             PARTITION BY user_id
>             ORDER BY ts ASC
>             MEASURES
>                 FIRST(A.ts) as _start,
>                 LAST(A.ts) as _middle,
>                 LAST(B.ts) as _finish
>             ONE ROW PER MATCH
>             AFTER MATCH SKIP PAST LAST ROW
>             PATTERN (A{2} B) WITHIN INTERVAL '2' HOURS
>             DEFINE
>                 A AS active is false,
>                 B AS active is true
>         ) AS T {code}
> where _events_ is a Postgresql table containing ~10000 records.
> {code:java}
> CREATE TABLE events (
>   id INT,
>   user_id INT,
>   ts TIMESTAMP(3),
>   active BOOLEAN,
>   WATERMARK FOR ts AS ts - INTERVAL '5' SECOND,
>   PRIMARY KEY (id) NOT ENFORCED
> ) WITH (
>     'connector' = 'jdbc',
>     'url' = 'jdbc:postgresql://postgres:5432/test',
>     'username' = 'test',
>     'password' = 'test',
>     'table-name' = 'events'
> ); {code}
> It can happen that _{_}finish{_} is smaller than _{_}start{_} or 
> _{_}middle{_}, which is wrong.
> {noformat}
>    user_id                  _start                 _middle                 
> _finish
>          1 2023-11-23 14:34:42.346 2023-11-23 14:34:48.370 2023-11-23 
> 14:34:44.264{noformat}
>  
> Repository where I reproduced the problem: 
> [https://github.com/grzegorz8/flink-match-recognize-in-batch-debugging]
> ----
>  
> According to [~dwysakowicz]:  In BATCH the CepOperator is always created to 
> process records in processing time:
> [https://github.com/apache/flink/blob/7f7bee70e3ac0d9fb27d7e09b41d6396b748dada/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecMatch.java#L54]
> A comparator is passed along to the operator covering the sorting on ts 
> field: 
> [https://github.com/apache/flink/blob/fea9ffedecf81a97de5c31519ade3bab8228e743/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecMatch.java#L173]
>  but this is only secondary sorting. It is applied only within records of the 
> same timestamp.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to