[ 
https://issues.apache.org/jira/browse/FLINK-31729?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexander Smirnov updated FLINK-31729:
--------------------------------------
    Description: 
Currently, in streaming LEFT/RIGHT/FULL OUTER JOIN Flink SQL doesn't emit 
UPDATE_BEFORE/UPDATE_AFTER records, but instead explicitly change RowKind of 
output records to INSERT/DELETE for simplicity. However, it doesn't work as 
expected, because sometimes UPDATE_BEFORE rows can be emitted. What is more 
confusing - after UPDATE_BEFORE record there will be INSERT record (not 
UPDATE_AFTER), which can cause bugs in case when downstream operators process 
UPDATE records in a different way than INSERT/DELETE (for example, it can 
assume, that after UPDATE_BEFORE there should be UPDATE_AFTER record at some 
point of time).

How to reproduce:

Suppose we have tables "source1" and "source2":
CREATE TABLE source1(
  id int PRIMARY KEY,
  c3 bigint
) WITH (
  'connector' = 'kafka',
   ...
  'format' = 'debezium-json'
);

 

CREATE TABLE source2(
  id int PRIMARY KEY,
  c3 bigint
) WITH (
  'connector' = 'kafka',
   ...
  'format' = 'debezium-json'
);

And we execute the following query:
"select  t1.id, t1.c3,t2.id, t2.c3 from source1 t1 left join source2 t2 on 
t1.id = t2.id"

Then we insert records one by one:
source1: '\{"before":null,"after":{"id":2,"c3":7121},"op":"c"}'
source2: {"before":null,"after":

{"id":2,"c3":364}

,"op":"c"}
source1: {"before":

{"id":2,"c3":7121}

,"after":\{"id":2,"c3":7222},"op":"u"}
source2: {"before":

{"id":2,"c3":364}

,"after":\{"id":2,"c3":564},"op":"u"}

The result will be as in the following screenshot:

!image-2023-04-05-00-08-32-984.png!

Note, that after implementing ticket 
https://issues.apache.org/jira/browse/FLINK-17337 (support emitting 
UPDATE_BEFORE/UPDATE_AFTER records not only in inner join) the described error 
won't be relevant anymore.

  was:
Currently, in streaming LEFT/RIGHT/FULL OUTER JOIN Flink SQL doesn't emit 
UPDATE_BEFORE/UPDATE_AFTER records, but instead explicitly change RowKind of 
output records to INSERT/DELETE for simplicity. However, it doesn't work as 
expected, because sometimes UPDATE_BEFORE rows can be emitted. What is more 
confusing - after UPDATE_BEFORE record there will be INSERT record (not 
UPDATE_AFTER), which can cause bugs in case when downstream operators process 
UPDATE records in a different way than INSERT/DELETE (for example, it can 
assume, that after UPDATE_BEFORE there should be UPDATE_AFTER record at some 
point of time).

How to reproduce:

Suppose we have tables "source1" and "source2":
CREATE TABLE source1(
  id int PRIMARY KEY,
  c3 bigint
) WITH (
  'connector' = 'kafka',
   ...
  'format' = 'debezium-json'
);

 

CREATE TABLE source2(
  id int PRIMARY KEY,
  c3 bigint
) WITH (
  'connector' = 'kafka',
   ...
  'format' = 'debezium-json'
);

And we execute the following query:
"select  t1.id, t1.c3,t2.id, t2.c3 from source1 t1 left join source2 t2 on 
t1.id = t2.id"

Then we insert records one by one:
source1: \{"before":null,"after":{"id":2,"c3":7121},"op":"c"}
source2: {"before":null,"after":

{"id":2,"c3":364}

,"op":"c"}
source1: {"before":

{"id":2,"c3":7121}

,"after":\{"id":2,"c3":7222},"op":"u"}
source2: {"before":

{"id":2,"c3":364}

,"after":\{"id":2,"c3":564},"op":"u"}

The result will be as in the following screenshot:

!image-2023-04-05-00-08-32-984.png!

Note, that after implementing ticket 
https://issues.apache.org/jira/browse/FLINK-17337 (support emitting 
UPDATE_BEFORE/UPDATE_AFTER records not only in inner join) the described error 
won't be relevant anymore.


> Unexpected UPDATE_BEFORE output record in LEFT OUTER JOIN
> ---------------------------------------------------------
>
>                 Key: FLINK-31729
>                 URL: https://issues.apache.org/jira/browse/FLINK-31729
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Runtime
>    Affects Versions: 1.17.0
>            Reporter: Alexander Smirnov
>            Priority: Minor
>             Fix For: 1.18.0
>
>         Attachments: image-2023-04-05-00-08-32-984.png
>
>
> Currently, in streaming LEFT/RIGHT/FULL OUTER JOIN Flink SQL doesn't emit 
> UPDATE_BEFORE/UPDATE_AFTER records, but instead explicitly change RowKind of 
> output records to INSERT/DELETE for simplicity. However, it doesn't work as 
> expected, because sometimes UPDATE_BEFORE rows can be emitted. What is more 
> confusing - after UPDATE_BEFORE record there will be INSERT record (not 
> UPDATE_AFTER), which can cause bugs in case when downstream operators process 
> UPDATE records in a different way than INSERT/DELETE (for example, it can 
> assume, that after UPDATE_BEFORE there should be UPDATE_AFTER record at some 
> point of time).
> How to reproduce:
> Suppose we have tables "source1" and "source2":
> CREATE TABLE source1(
>   id int PRIMARY KEY,
>   c3 bigint
> ) WITH (
>   'connector' = 'kafka',
>    ...
>   'format' = 'debezium-json'
> );
>  
> CREATE TABLE source2(
>   id int PRIMARY KEY,
>   c3 bigint
> ) WITH (
>   'connector' = 'kafka',
>    ...
>   'format' = 'debezium-json'
> );
> And we execute the following query:
> "select  t1.id, t1.c3,t2.id, t2.c3 from source1 t1 left join source2 t2 on 
> t1.id = t2.id"
> Then we insert records one by one:
> source1: '\{"before":null,"after":{"id":2,"c3":7121},"op":"c"}'
> source2: {"before":null,"after":
> {"id":2,"c3":364}
> ,"op":"c"}
> source1: {"before":
> {"id":2,"c3":7121}
> ,"after":\{"id":2,"c3":7222},"op":"u"}
> source2: {"before":
> {"id":2,"c3":364}
> ,"after":\{"id":2,"c3":564},"op":"u"}
> The result will be as in the following screenshot:
> !image-2023-04-05-00-08-32-984.png!
> Note, that after implementing ticket 
> https://issues.apache.org/jira/browse/FLINK-17337 (support emitting 
> UPDATE_BEFORE/UPDATE_AFTER records not only in inner join) the described 
> error won't be relevant anymore.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to