[
https://issues.apache.org/jira/browse/FLINK-36547?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Yubin Li updated FLINK-36547:
-----------------------------
Description:
As official docs said, `RowKind` sematics have been changed: -U -> -D, +D -> +I
{code:java}
Flink also supports to encode the INSERT/UPDATE/DELETE messages in Flink SQL as
Debezium JSON or Avro messages, and emit to external systems like Kafka.
However, currently Flink can’t combine UPDATE_BEFORE and UPDATE_AFTER into a
single UPDATE message. Therefore, Flink encodes UPDATE_BEFORE and UDPATE_AFTER
as DELETE and INSERT Debezium messages. {code}
In fact, we also have a demand to make the `RowKind` sematics consistent in
many scenarios. we have taken advantage of the difference between UPDATE_BEFORE
and UPDATE_AFTER to implement the feature and made it run well in bussiness.
{code:java}
create table datagen1 (id int, name string) with
('connector'='datagen','number-of-rows'='5', 'fields.id.min'='1',
'fields.id.max'='2');
create table t2 (id int, name string, num bigint) WITH (
'topic' = 't2',
'connector' = 'kafka',
'properties.bootstrap.servers' = 'xx',
'properties.group.id' = 'test',
'scan.startup.mode' = 'earliest-offset',
'format' = 'debezium-json',
'key.format' = 'json',
'key.fields' = 'id',
'debezium-json.timestamp-format.standard' = 'ISO-8601',
'debezium-json.schema-include' = 'false',
'properties.enable.idempotence' = 'false'
);
insert into t2 select id, max(name) as name, count(1) as num from datagen1
group by id;
insert into print1 select * from t2;
//output result
+I[2,576c3edc8fbd0fdad9ec42fda80efa7db2ac4ba07dcc8292c474941c668a610cb7567b444a79776f7f27fb33437a012b5d4c,
1]
-U[2,576c3edc8fbd0fdad9ec42fda80efa7db2ac4ba07dcc8292c474941c668a610cb7567b444a79776f7f27fb33437a012b5d4c,
1]
+U[2,576c3edc8fbd0fdad9ec42fda80efa7db2ac4ba07dcc8292c474941c668a610cb7567b444a79776f7f27fb33437a012b5d4c,
2]
-U[2,576c3edc8fbd0fdad9ec42fda80efa7db2ac4ba07dcc8292c474941c668a610cb7567b444a79776f7f27fb33437a012b5d4c,
2]
+U[2,576c3edc8fbd0fdad9ec42fda80efa7db2ac4ba07dcc8292c474941c668a610cb7567b444a79776f7f27fb33437a012b5d4c,
3]
-U[2,576c3edc8fbd0fdad9ec42fda80efa7db2ac4ba07dcc8292c474941c668a610cb7567b444a79776f7f27fb33437a012b5d4c,
3]
+U[2,576c3edc8fbd0fdad9ec42fda80efa7db2ac4ba07dcc8292c474941c668a610cb7567b444a79776f7f27fb33437a012b5d4c,
4]
+I[1,b1c41f2c0e495fb539d673ee95c05bdee4637b3a64a9e7b1e74107f549f1b3e57ca442ee56bd8417e8b9eb4d715fd86e82d2,
1]
{code}
was:
As official docs said, `RowKind` sematics have been changed: -U -> -D, +D -> +I
{code:java}
Flink also supports to encode the INSERT/UPDATE/DELETE messages in Flink SQL as
Debezium JSON or Avro messages, and emit to external systems like Kafka.
However, currently Flink can’t combine UPDATE_BEFORE and UPDATE_AFTER into a
single UPDATE message. Therefore, Flink encodes UPDATE_BEFORE and UDPATE_AFTER
as DELETE and INSERT Debezium messages. {code}
In fact, we also have a demand to make the `RowKind` sematics consistent in
many scenarios. we have taken advantage of the difference between UPDATE_BEFORE
and UPDATE_AFTER to implement the feature and made it run well in bussiness.
{code:java}
create table datagen1 (id int, name string) with
('connector'='datagen','number-of-rows'='5', 'fields.id.min'='1',
'fields.id.max'='2');
create table t2 (id int, name string, num bigint) WITH (
'topic' = 't2',
'connector' = 'kafka',
'properties.bootstrap.servers' = 'xx',
'properties.group.id' = 'test',
'scan.startup.mode' = 'earliest-offset',
'format' = 'debezium-json',
'key.format' = 'json',
'key.fields' = 'id',
'debezium-json.timestamp-format.standard' = 'ISO-8601',
'debezium-json.schema-include' = 'false',
'properties.enable.idempotence' = 'false'
);
insert into t2 select id, max(name) as name, count(1) as num from datagen1
group by id;
insert into print1 select * from t2;
//output result
+I[2,
576c3edc8fbd0fdad9ec42fda80efa7db2ac4ba07dcc8292c474941c668a610cb7567b444a79776f7f27fb33437a012b5d4c,
1]-U[2,
576c3edc8fbd0fdad9ec42fda80efa7db2ac4ba07dcc8292c474941c668a610cb7567b444a79776f7f27fb33437a012b5d4c,
1]+U[2,
576c3edc8fbd0fdad9ec42fda80efa7db2ac4ba07dcc8292c474941c668a610cb7567b444a79776f7f27fb33437a012b5d4c,
2]-U[2,
576c3edc8fbd0fdad9ec42fda80efa7db2ac4ba07dcc8292c474941c668a610cb7567b444a79776f7f27fb33437a012b5d4c,
2]+U[2,
576c3edc8fbd0fdad9ec42fda80efa7db2ac4ba07dcc8292c474941c668a610cb7567b444a79776f7f27fb33437a012b5d4c,
3]-U[2,
576c3edc8fbd0fdad9ec42fda80efa7db2ac4ba07dcc8292c474941c668a610cb7567b444a79776f7f27fb33437a012b5d4c,
3]+U[2,
576c3edc8fbd0fdad9ec42fda80efa7db2ac4ba07dcc8292c474941c668a610cb7567b444a79776f7f27fb33437a012b5d4c,
4]+I[1,
b1c41f2c0e495fb539d673ee95c05bdee4637b3a64a9e7b1e74107f549f1b3e57ca442ee56bd8417e8b9eb4d715fd86e82d2,
1]
{code}
> Add option to retain `RowKind` sematics after serialization/deserialization
> ---------------------------------------------------------------------------
>
> Key: FLINK-36547
> URL: https://issues.apache.org/jira/browse/FLINK-36547
> Project: Flink
> Issue Type: Improvement
> Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
> Affects Versions: 2.0.0
> Reporter: Yubin Li
> Priority: Major
>
> As official docs said, `RowKind` sematics have been changed: -U -> -D, +D ->
> +I
> {code:java}
> Flink also supports to encode the INSERT/UPDATE/DELETE messages in Flink SQL
> as Debezium JSON or Avro messages, and emit to external systems like Kafka.
> However, currently Flink can’t combine UPDATE_BEFORE and UPDATE_AFTER into a
> single UPDATE message. Therefore, Flink encodes UPDATE_BEFORE and
> UDPATE_AFTER as DELETE and INSERT Debezium messages. {code}
> In fact, we also have a demand to make the `RowKind` sematics consistent in
> many scenarios. we have taken advantage of the difference between
> UPDATE_BEFORE and UPDATE_AFTER to implement the feature and made it run well
> in bussiness.
> {code:java}
> create table datagen1 (id int, name string) with
> ('connector'='datagen','number-of-rows'='5', 'fields.id.min'='1',
> 'fields.id.max'='2');
> create table t2 (id int, name string, num bigint) WITH (
> 'topic' = 't2',
> 'connector' = 'kafka',
> 'properties.bootstrap.servers' = 'xx',
> 'properties.group.id' = 'test',
> 'scan.startup.mode' = 'earliest-offset',
> 'format' = 'debezium-json',
> 'key.format' = 'json',
> 'key.fields' = 'id',
> 'debezium-json.timestamp-format.standard' = 'ISO-8601',
> 'debezium-json.schema-include' = 'false',
> 'properties.enable.idempotence' = 'false'
> );
> insert into t2 select id, max(name) as name, count(1) as num from datagen1
> group by id;
> insert into print1 select * from t2;
> //output result
> +I[2,576c3edc8fbd0fdad9ec42fda80efa7db2ac4ba07dcc8292c474941c668a610cb7567b444a79776f7f27fb33437a012b5d4c,
> 1]
> -U[2,576c3edc8fbd0fdad9ec42fda80efa7db2ac4ba07dcc8292c474941c668a610cb7567b444a79776f7f27fb33437a012b5d4c,
> 1]
> +U[2,576c3edc8fbd0fdad9ec42fda80efa7db2ac4ba07dcc8292c474941c668a610cb7567b444a79776f7f27fb33437a012b5d4c,
> 2]
> -U[2,576c3edc8fbd0fdad9ec42fda80efa7db2ac4ba07dcc8292c474941c668a610cb7567b444a79776f7f27fb33437a012b5d4c,
> 2]
> +U[2,576c3edc8fbd0fdad9ec42fda80efa7db2ac4ba07dcc8292c474941c668a610cb7567b444a79776f7f27fb33437a012b5d4c,
> 3]
> -U[2,576c3edc8fbd0fdad9ec42fda80efa7db2ac4ba07dcc8292c474941c668a610cb7567b444a79776f7f27fb33437a012b5d4c,
> 3]
> +U[2,576c3edc8fbd0fdad9ec42fda80efa7db2ac4ba07dcc8292c474941c668a610cb7567b444a79776f7f27fb33437a012b5d4c,
> 4]
> +I[1,b1c41f2c0e495fb539d673ee95c05bdee4637b3a64a9e7b1e74107f549f1b3e57ca442ee56bd8417e8b9eb4d715fd86e82d2,
> 1]
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)