[ https://issues.apache.org/jira/browse/FLINK-31729?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Alexander Smirnov updated FLINK-31729: -------------------------------------- Description: Currently, in streaming LEFT/RIGHT/FULL OUTER JOIN Flink SQL doesn't emit UPDATE_BEFORE/UPDATE_AFTER records, but instead explicitly change RowKind of output records to INSERT/DELETE for simplicity. However, it doesn't work as expected, because sometimes UPDATE_BEFORE rows can be emitted. What is more confusing - after UPDATE_BEFORE record there will be INSERT record (not UPDATE_AFTER), which can cause bugs in case when downstream operators process UPDATE records in a different way than INSERT/DELETE (for example, it can assume, that after UPDATE_BEFORE there should be UPDATE_AFTER record at some point of time). How to reproduce: Suppose we have tables "source1" and "source2": CREATE TABLE source1( id int PRIMARY KEY, c3 bigint ) WITH ( 'connector' = 'kafka', ... 'format' = 'debezium-json' ); CREATE TABLE source2( id int PRIMARY KEY, c3 bigint ) WITH ( 'connector' = 'kafka', ... 'format' = 'debezium-json' ); And we execute the following query: "select t1.id, t1.c3,t2.id, t2.c3 from source1 t1 left join source2 t2 on t1.id = t2.id" Then we insert records one by one: source1: {noformat} {"before":null,"after":{"id":2,"c3":7121},"op":"c"}{noformat} source2: {noformat} {"before":null,"after":{"id":2,"c3":364},"op":"c"}{noformat} source1: {noformat} {"before":{"id":2,"c3":7121},"after":{"id":2,"c3":7222},"op":"u"}{noformat} source2: {noformat} {"before":{"id":2,"c3":364},"after":{"id":2,"c3":564},"op":"u"}{noformat} The result will be as in the following screenshot: !image-2023-04-05-00-08-32-984.png! Note, that after implementing ticket https://issues.apache.org/jira/browse/FLINK-17337 (support emitting UPDATE_BEFORE/UPDATE_AFTER records not only in inner join) the described bug won't be relevant anymore. was: Currently, in streaming LEFT/RIGHT/FULL OUTER JOIN Flink SQL doesn't emit UPDATE_BEFORE/UPDATE_AFTER records, but instead explicitly change RowKind of output records to INSERT/DELETE for simplicity. However, it doesn't work as expected, because sometimes UPDATE_BEFORE rows can be emitted. What is more confusing - after UPDATE_BEFORE record there will be INSERT record (not UPDATE_AFTER), which can cause bugs in case when downstream operators process UPDATE records in a different way than INSERT/DELETE (for example, it can assume, that after UPDATE_BEFORE there should be UPDATE_AFTER record at some point of time). How to reproduce: Suppose we have tables "source1" and "source2": CREATE TABLE source1( id int PRIMARY KEY, c3 bigint ) WITH ( 'connector' = 'kafka', ... 'format' = 'debezium-json' ); CREATE TABLE source2( id int PRIMARY KEY, c3 bigint ) WITH ( 'connector' = 'kafka', ... 'format' = 'debezium-json' ); And we execute the following query: "select t1.id, t1.c3,t2.id, t2.c3 from source1 t1 left join source2 t2 on t1.id = t2.id" Then we insert records one by one: source1: {noformat} {"before":null,"after":{"id":2,"c3":7121},"op":"c"}{noformat} source2: \{"before":null,"after":{"id":2,"c3":364},"op":"c"} source1: {"before": {"id":2,"c3":7121} ,"after":\{"id":2,"c3":7222},"op":"u"} source2: {"before": {"id":2,"c3":364} ,"after":\{"id":2,"c3":564},"op":"u"} The result will be as in the following screenshot: !image-2023-04-05-00-08-32-984.png! Note, that after implementing ticket https://issues.apache.org/jira/browse/FLINK-17337 (support emitting UPDATE_BEFORE/UPDATE_AFTER records not only in inner join) the described error won't be relevant anymore. > Unexpected UPDATE_BEFORE output record in LEFT OUTER JOIN > --------------------------------------------------------- > > Key: FLINK-31729 > URL: https://issues.apache.org/jira/browse/FLINK-31729 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime > Affects Versions: 1.17.0 > Reporter: Alexander Smirnov > Priority: Minor > Fix For: 1.18.0 > > Attachments: image-2023-04-05-00-08-32-984.png > > > Currently, in streaming LEFT/RIGHT/FULL OUTER JOIN Flink SQL doesn't emit > UPDATE_BEFORE/UPDATE_AFTER records, but instead explicitly change RowKind of > output records to INSERT/DELETE for simplicity. However, it doesn't work as > expected, because sometimes UPDATE_BEFORE rows can be emitted. What is more > confusing - after UPDATE_BEFORE record there will be INSERT record (not > UPDATE_AFTER), which can cause bugs in case when downstream operators process > UPDATE records in a different way than INSERT/DELETE (for example, it can > assume, that after UPDATE_BEFORE there should be UPDATE_AFTER record at some > point of time). > How to reproduce: > Suppose we have tables "source1" and "source2": > CREATE TABLE source1( > id int PRIMARY KEY, > c3 bigint > ) WITH ( > 'connector' = 'kafka', > ... > 'format' = 'debezium-json' > ); > > CREATE TABLE source2( > id int PRIMARY KEY, > c3 bigint > ) WITH ( > 'connector' = 'kafka', > ... > 'format' = 'debezium-json' > ); > And we execute the following query: > "select t1.id, t1.c3,t2.id, t2.c3 from source1 t1 left join source2 t2 on > t1.id = t2.id" > Then we insert records one by one: > source1: > {noformat} > {"before":null,"after":{"id":2,"c3":7121},"op":"c"}{noformat} > source2: > {noformat} > {"before":null,"after":{"id":2,"c3":364},"op":"c"}{noformat} > source1: > {noformat} > {"before":{"id":2,"c3":7121},"after":{"id":2,"c3":7222},"op":"u"}{noformat} > source2: > {noformat} > {"before":{"id":2,"c3":364},"after":{"id":2,"c3":564},"op":"u"}{noformat} > The result will be as in the following screenshot: > !image-2023-04-05-00-08-32-984.png! > Note, that after implementing ticket > https://issues.apache.org/jira/browse/FLINK-17337 (support emitting > UPDATE_BEFORE/UPDATE_AFTER records not only in inner join) the described bug > won't be relevant anymore. -- This message was sent by Atlassian Jira (v8.20.10#820010)