This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 53a1f0c6c1 [Feature][connector][kafka] Support read debezium format 
message from kafka (#5066)
53a1f0c6c1 is described below

commit 53a1f0c6c14190c37c0b28320db049f822436a4f
Author: Xiaojian Sun <[email protected]>
AuthorDate: Tue Jul 25 10:09:24 2023 +0800

    [Feature][connector][kafka] Support read debezium format message from kafka 
(#5066)
---
 .github/workflows/backend.yml                      |   2 +-
 docs/en/connector-v2/formats/debezium-json.md      | 107 ++++++
 docs/en/connector-v2/sink/Kafka.md                 |  16 +-
 docs/en/connector-v2/source/kafka.md               |  17 +-
 release-note.md                                    |  12 +-
 .../connectors/seatunnel/kafka/config/Config.java  |   8 +-
 .../seatunnel/kafka/config/MessageFormat.java      |   1 +
 .../serialize/DefaultSeaTunnelRowSerializer.java   |   3 +
 .../seatunnel/kafka/source/KafkaSource.java        |  10 +
 .../seatunnel/kafka/source/KafkaSourceFactory.java |   1 +
 .../e2e/connector/kafka/DebeziumToKafkaIT.java     | 418 +++++++++++++++++++++
 .../test/resources/debezium/register-mysql.json    |  16 +
 .../kafkasource_debezium_cdc_to_pgsql.conf         |  62 +++
 .../resources/kafkasource_debezium_to_kafka.conf   |  57 +++
 .../seatunnel/format/json/JsonFormatOptions.java   |  13 +-
 .../DebeziumJsonDeserializationSchema.java         | 168 +++++++++
 .../json/debezium/DebeziumJsonFormatFactory.java   |  70 ++++
 .../json/debezium/DebeziumJsonFormatOptions.java   |  53 +++
 .../debezium/DebeziumJsonSerializationSchema.java  |  80 ++++
 .../org.apache.seatunnel.api.table.factory.Factory |   1 +
 .../json/debezium/DebeziumJsonSerDeSchemaTest.java | 163 ++++++++
 .../src/test/resources/debezium-data.txt           |  16 +
 22 files changed, 1270 insertions(+), 24 deletions(-)

diff --git a/.github/workflows/backend.yml b/.github/workflows/backend.yml
index fbe37acece..6da4f4a5ab 100644
--- a/.github/workflows/backend.yml
+++ b/.github/workflows/backend.yml
@@ -564,7 +564,7 @@ jobs:
       matrix:
         java: [ '8', '11' ]
         os: [ 'ubuntu-latest' ]
-    timeout-minutes: 90
+    timeout-minutes: 150
     steps:
       - uses: actions/checkout@v2
       - name: Set up JDK ${{ matrix.java }}
diff --git a/docs/en/connector-v2/formats/debezium-json.md 
b/docs/en/connector-v2/formats/debezium-json.md
new file mode 100644
index 0000000000..4c40a0298e
--- /dev/null
+++ b/docs/en/connector-v2/formats/debezium-json.md
@@ -0,0 +1,107 @@
+# Debezium Format
+
+Changelog-Data-Capture Format: Serialization Schema Format: Deserialization 
Schema
+
+Debezium is a set of distributed services to capture changes in your databases 
so that your applications can see those changes and respond to them. Debezium 
records all row-level changes within each database table in a *change event 
stream*, and applications simply read these streams to see the change events in 
the same order in which they occurred.
+
+Seatunnel supports to interpret Debezium JSON messages as INSERT/UPDATE/DELETE 
messages into seatunnel system. This is useful in many cases to leverage this 
feature, such as
+
+        synchronizing incremental data from databases to other systems
+        auditing logs
+        real-time materialized views on databases
+        temporal join changing history of a database table and so on.
+
+Seatunnel also supports to encode the INSERT/UPDATE/DELETE messages in 
Seatunnel asDebezium JSON messages, and emit to storage like Kafka.
+
+# Format Options
+
+|              option               | default | required |                     
                        Description                                             
 |
+|-----------------------------------|---------|----------|------------------------------------------------------------------------------------------------------|
+| format                            | (none)  | yes      | Specify what format 
to use, here should be 'debezium_json'.                                         
 |
+| debezium-json.ignore-parse-errors | false   | no       | Skip fields and 
rows with parse errors instead of failing. Fields are set to null in case of 
errors. |
+
+# How to use Debezium format
+
+## Kafka uses example
+
+Debezium provides a unified format for changelog, here is a simple example for 
an update operation captured from a MySQL products table:
+
+```bash
+{
+       "before": {
+               "id": 111,
+               "name": "scooter",
+               "description": "Big 2-wheel scooter ",
+               "weight": 5.18
+       },
+       "after": {
+               "id": 111,
+               "name": "scooter",
+               "description": "Big 2-wheel scooter ",
+               "weight": 5.17
+       },
+       "source": {
+               "version": "1.1.1.Final",
+               "connector": "mysql",
+               "name": "dbserver1",
+               "ts_ms": 1589362330000,
+               "snapshot": "false",
+               "db": "inventory",
+               "table": "products",
+               "server_id": 223344,
+               "gtid": null,
+               "file": "mysql-bin.000003",
+               "pos": 2090,
+               "row": 0,
+               "thread": 2,
+               "query": null
+       },
+       "op": "u",
+       "ts_ms": 1589362330904,
+       "transaction": null
+}
+```
+
+Note: please refer to Debezium documentation about the meaning of each fields.
+
+The MySQL products table has 4 columns (id, name, description and weight).
+The above JSON message is an update change event on the products table where 
the weight value of the row with id = 111 is changed from 5.18 to 5.15.
+Assuming the messages have been synchronized to Kafka topic products_binlog, 
then we can use the following Seatunnel conf to consume this topic and 
interpret the change events by Debezium format.
+
+```bash
+env {
+    execution.parallelism = 1
+    job.mode = "BATCH"
+}
+
+source {
+  Kafka {
+    bootstrap.servers = "kafkaCluster:9092"
+    topic = "products_binlog"
+    result_table_name = "kafka_name"
+    start_mode = earliest
+    schema = {
+      fields {
+           id = "int"
+           name = "string"
+           description = "string"
+           weight = "string"
+      }
+    }
+    format = debezium_json
+  }
+
+}
+
+transform {
+}
+
+sink {
+  Kafka {
+    bootstrap.servers = "kafkaCluster:9092"
+    topic = "consume-binlog"
+    format = debezium_json
+  }
+}
+```
+
diff --git a/docs/en/connector-v2/sink/Kafka.md 
b/docs/en/connector-v2/sink/Kafka.md
index 4dbd3a84ce..f971e5390b 100644
--- a/docs/en/connector-v2/sink/Kafka.md
+++ b/docs/en/connector-v2/sink/Kafka.md
@@ -108,8 +108,10 @@ Kafka distinguishes different transactions by different 
transactionId. This para
 
 ### format
 
-Data format. The default format is json. Optional text format. The default 
field separator is ",".
-If you customize the delimiter, add the "field_delimiter" option.
+Data format. The default format is json. Optional text format, canal-json and 
debezium-json.
+If you use json or text format. The default field separator is ", ". If you 
customize the delimiter, add the "field_delimiter" option.
+If you use canal format, please refer to 
[canal-json](../formats/canal-json.md) for details.
+If you use debezium format, please refer to 
[debezium-json](../formats/debezium-json.md) for details.
 
 ### field_delimiter
 
@@ -209,8 +211,10 @@ sink {
 
 ### next version
 
-- [Improve] Support to specify multiple partition keys 
[3230](https://github.com/apache/seatunnel/pull/3230)
-- [Improve] Add text format for kafka sink connector 
[3711](https://github.com/apache/seatunnel/pull/3711)
-- [Improve] Support extract topic from SeaTunnelRow fields 
[3742](https://github.com/apache/seatunnel/pull/3742)
-- [Improve] Change Connector Custom Config Prefix To Map 
[3719](https://github.com/apache/seatunnel/pull/3719)
+- [Improve] Support to specify multiple partition keys 
[3230](https://github.com/apache/incubator-seatunnel/pull/3230)
+- [Improve] Add text format for kafka sink connector 
[3711](https://github.com/apache/incubator-seatunnel/pull/3711)
+- [Improve] Support extract topic from SeaTunnelRow fields 
[3742](https://github.com/apache/incubator-seatunnel/pull/3742)
+- [Improve] Change Connector Custom Config Prefix To Map 
[3719](https://github.com/apache/incubator-seatunnel/pull/3719)
+- [Improve] Support read canal format message 
[3950](https://github.com/apache/incubator-seatunnel/pull/3950)
+- [Improve] Support read debezium format message 
[3981](https://github.com/apache/incubator-seatunnel/pull/3981)
 
diff --git a/docs/en/connector-v2/source/kafka.md 
b/docs/en/connector-v2/source/kafka.md
index 06f60af6d8..2ed6ec6f12 100644
--- a/docs/en/connector-v2/source/kafka.md
+++ b/docs/en/connector-v2/source/kafka.md
@@ -73,8 +73,10 @@ The structure of the data, including field names and field 
types.
 
 ## format
 
-Data format. The default format is json. Optional text format. The default 
field separator is ", ".
-If you customize the delimiter, add the "field_delimiter" option.
+Data format. The default format is json. Optional text format, canal-json and 
debezium-json.
+If you use json or text format. The default field separator is ", ". If you 
customize the delimiter, add the "field_delimiter" option.
+If you use canal format, please refer to 
[canal-json](../formats/canal-json.md) for details.
+If you use debezium format, please refer to 
[debezium-json](../formats/debezium-json.md) for details.
 
 ## format_error_handle_way
 
@@ -221,9 +223,10 @@ source {
 
 ### Next Version
 
-- [Improve] Support setting read starting offset or time at startup config 
([3157](https://github.com/apache/seatunnel/pull/3157))
-- [Improve] Support for dynamic discover topic & partition in streaming mode 
([3125](https://github.com/apache/seatunnel/pull/3125))
-- [Improve] Change Connector Custom Config Prefix To Map 
[3719](https://github.com/apache/seatunnel/pull/3719)
-- [Bug] Fixed the problem that parsing the offset format failed when the 
startup mode was offset([3810](https://github.com/apache/seatunnel/pull/3810))
-- [Feature] Kafka source supports data deserialization failure 
skipping([4364](https://github.com/apache/seatunnel/pull/4364))
+- [Improve] Support setting read starting offset or time at startup config 
([3157](https://github.com/apache/incubator-seatunnel/pull/3157))
+- [Improve] Support for dynamic discover topic & partition in streaming mode 
([3125](https://github.com/apache/incubator-seatunnel/pull/3125))
+- [Improve] Change Connector Custom Config Prefix To Map 
[3719](https://github.com/apache/incubator-seatunnel/pull/3719)
+- [Bug] Fixed the problem that parsing the offset format failed when the 
startup mode was 
offset([3810](https://github.com/apache/incubator-seatunnel/pull/3810))
+- [Improve] Support read canal format message 
[3950](https://github.com/apache/incubator-seatunnel/pull/3950)
+- [Improve] Support read debezium format message 
[3981](https://github.com/apache/incubator-seatunnel/pull/3981)
 
diff --git a/release-note.md b/release-note.md
index 68d14e609f..0e84da433c 100644
--- a/release-note.md
+++ b/release-note.md
@@ -3,9 +3,19 @@
 ## Bug fix
 
 ### Core
-
 - [Core] [API] Fixed generic class loss for lists (#4421)
 - [Core] [API] Fix parse nested row data type key changed upper (#4459)
+- [Starter][Flink]Support transform-v2 for flink #3396
+- [Flink] Support flink 1.14.x #3963
+### Transformer
+- [Spark] Support transform-v2 for spark (#3409)
+- [ALL]Add FieldMapper Transform #3781
+### Connectors
+- [Elasticsearch] Support https protocol & compatible with opensearch
+- [Hbase] Add hbase sink connector #4049
+### Formats
+- [Canal]Support read canal format message #3950
+- [Debezium]Support debezium canal format message #3981
  
 ### Connector-V2
 
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
index 2dffda4f48..f126e563fb 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
@@ -26,8 +26,6 @@ import java.util.Map;
 public class Config {
 
     public static final String CONNECTOR_IDENTITY = "Kafka";
-    public static final String REPLICATION_FACTOR = "replication.factor";
-
     /** The default field delimiter is “,” */
     public static final String DEFAULT_FIELD_DELIMITER = ",";
 
@@ -99,6 +97,12 @@ public class Config {
                             "Data format. The default format is json. Optional 
text format. The default field separator is \", \". "
                                     + "If you customize the delimiter, add the 
\"field_delimiter\" option.");
 
+    public static final Option<Boolean> DEBEZIUM_RECORD_INCLUDE_SCHEMA =
+            Options.key("debezium_record_include_schema")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription("Does the debezium record carry a 
schema.");
+
     public static final Option<String> FIELD_DELIMITER =
             Options.key("field_delimiter")
                     .stringType()
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/MessageFormat.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/MessageFormat.java
index 65b5cc2769..1ef29f6322 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/MessageFormat.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/MessageFormat.java
@@ -21,5 +21,6 @@ public enum MessageFormat {
     JSON,
     TEXT,
     CANAL_JSON,
+    DEBEZIUM_JSON,
     COMPATIBLE_DEBEZIUM_JSON
 }
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
index 06005de003..f8974d0f1a 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
@@ -28,6 +28,7 @@ import 
org.apache.seatunnel.format.compatible.debezium.json.CompatibleDebeziumJs
 import 
org.apache.seatunnel.format.compatible.debezium.json.CompatibleDebeziumJsonSerializationSchema;
 import org.apache.seatunnel.format.json.JsonSerializationSchema;
 import org.apache.seatunnel.format.json.canal.CanalJsonSerializationSchema;
+import 
org.apache.seatunnel.format.json.debezium.DebeziumJsonSerializationSchema;
 import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
 import org.apache.seatunnel.format.text.TextSerializationSchema;
 
@@ -219,6 +220,8 @@ public class DefaultSeaTunnelRowSerializer implements 
SeaTunnelRowSerializer {
                         .build();
             case CANAL_JSON:
                 return new CanalJsonSerializationSchema(rowType);
+            case DEBEZIUM_JSON:
+                return new DebeziumJsonSerializationSchema(rowType);
             case COMPATIBLE_DEBEZIUM_JSON:
                 return new CompatibleDebeziumJsonSerializationSchema(rowType, 
isKey);
             default:
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
index 741d752164..30878e82a2 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
@@ -47,6 +47,7 @@ import 
org.apache.seatunnel.connectors.seatunnel.kafka.exception.KafkaConnectorE
 import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaSourceState;
 import org.apache.seatunnel.format.json.JsonDeserializationSchema;
 import org.apache.seatunnel.format.json.canal.CanalJsonDeserializationSchema;
+import 
org.apache.seatunnel.format.json.debezium.DebeziumJsonDeserializationSchema;
 import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
 import org.apache.seatunnel.format.text.TextDeserializationSchema;
 import org.apache.seatunnel.format.text.constant.TextFormatConstant;
@@ -62,6 +63,7 @@ import java.util.Properties;
 import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.BOOTSTRAP_SERVERS;
 import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.COMMIT_ON_CHECKPOINT;
 import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.CONSUMER_GROUP;
+import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.DEBEZIUM_RECORD_INCLUDE_SCHEMA;
 import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.DEFAULT_FIELD_DELIMITER;
 import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.FIELD_DELIMITER;
 import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.FORMAT;
@@ -266,6 +268,14 @@ public class KafkaSource
                                     .setIgnoreParseErrors(true)
                                     .build();
                     break;
+                case DEBEZIUM_JSON:
+                    boolean includeSchema = 
DEBEZIUM_RECORD_INCLUDE_SCHEMA.defaultValue();
+                    if (config.hasPath(DEBEZIUM_RECORD_INCLUDE_SCHEMA.key())) {
+                        includeSchema = 
config.getBoolean(DEBEZIUM_RECORD_INCLUDE_SCHEMA.key());
+                    }
+                    deserializationSchema =
+                            new DebeziumJsonDeserializationSchema(typeInfo, 
true, includeSchema);
+                    break;
                 default:
                     throw new SeaTunnelJsonFormatException(
                             CommonErrorCode.UNSUPPORTED_DATA_TYPE, 
"Unsupported format: " + format);
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceFactory.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceFactory.java
index daa75385e4..21057040ec 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceFactory.java
@@ -46,6 +46,7 @@ public class KafkaSourceFactory implements TableSourceFactory 
{
                         Config.KAFKA_CONFIG,
                         Config.SCHEMA,
                         Config.FORMAT,
+                        Config.DEBEZIUM_RECORD_INCLUDE_SCHEMA,
                         Config.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS)
                 .conditional(Config.START_MODE, StartMode.TIMESTAMP, 
Config.START_MODE_TIMESTAMP)
                 .conditional(
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/DebeziumToKafkaIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/DebeziumToKafkaIT.java
new file mode 100644
index 0000000000..e76a445996
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/DebeziumToKafkaIT.java
@@ -0,0 +1,418 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.kafka;
+
+import 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlContainer;
+import 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlVersion;
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
+import org.apache.seatunnel.e2e.common.container.EngineType;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.containers.PostgreSQLContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
+import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.DockerLoggerFactory;
+import org.testcontainers.utility.MountableFile;
+
+import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.net.Inet4Address;
+import java.net.InetAddress;
+import java.net.NetworkInterface;
+import java.net.SocketException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Enumeration;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.awaitility.Awaitility.given;
+import static org.testcontainers.shaded.org.awaitility.Awaitility.await;
+
+@DisabledOnContainer(
+        value = {},
+        type = {EngineType.SEATUNNEL, EngineType.SPARK})
+@Slf4j
+public class DebeziumToKafkaIT extends TestSuiteBase implements TestResource {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(DebeziumToKafkaIT.class);
+
+    private static GenericContainer<?> DEBEZIUM_CONTAINER;
+
+    private static final String DEBEZIUM_DOCKER_IMAGE = 
"quay.io/debezium/connect:2.3.0.Final";
+
+    private static final String DEBEZIUM_HOST = "debezium_e2e";
+
+    private static final int DEBEZIUM_PORT = 8083;
+
+    // 
----------------------------------------kafka------------------------------------
+    private static final String KAFKA_IMAGE_NAME = 
"confluentinc/cp-kafka:7.0.9";
+    private static final String KAFKA_HOST = "kafka_dbz_e2e";
+    private KafkaConsumer<String, String> kafkaConsumer;
+    private KafkaContainer KAFKA_CONTAINER;
+    private String KAFKA_TOPIC = "test-debezium-sink";
+
+    // 
-------------------------------------mysql---------------------------------------
+    private static final String MYSQL_HOST = "mysql";
+    private static MySqlContainer MYSQL_CONTAINER;
+
+    // 
-----------------------------------------postgres-----------------------------------
+    private static final String PG_IMAGE = "postgres:alpine3.16";
+
+    private static final int PG_PORT = 5432;
+
+    private static final String PG_DRIVER_JAR =
+            
"https://repo1.maven.org/maven2/org/postgresql/postgresql/42.3.3/postgresql-42.3.3.jar";;
+
+    private static PostgreSQLContainer<?> POSTGRESQL_CONTAINER;
+
+    @TestContainerExtension
+    private final ContainerExtendedFactory extendedFactory =
+            container -> {
+                Container.ExecResult extraCommands =
+                        container.execInContainer(
+                                "bash",
+                                "-c",
+                                "mkdir -p /tmp/seatunnel/plugins/Jdbc/lib && 
cd /tmp/seatunnel/plugins/Jdbc/lib && curl -O "
+                                        + PG_DRIVER_JAR);
+                Assertions.assertEquals(0, extraCommands.getExitCode());
+            };
+
+    private void createDebeziumContainer() {
+        DEBEZIUM_CONTAINER =
+                new GenericContainer<>(DEBEZIUM_DOCKER_IMAGE)
+                        .withCopyFileToContainer(
+                                
MountableFile.forClasspathResource("/debezium/register-mysql.json"),
+                                
"/tmp/seatunnel/plugins/Jdbc/register-mysql.json")
+                        .withNetwork(NETWORK)
+                        .withNetworkAliases(DEBEZIUM_HOST)
+                        .withExposedPorts(DEBEZIUM_PORT)
+                        .withEnv("GROUP_ID", "1")
+                        .withEnv("CONFIG_STORAGE_TOPIC", "my-connect-configs")
+                        .withEnv("OFFSET_STORAGE_TOPIC", "my-connect-offsets")
+                        .withEnv("STATUS_STORAGE_TOPIC", "my-connect-status")
+                        .withEnv("BOOTSTRAP_SERVERS", KAFKA_HOST + ":9092")
+                        .withLogConsumer(
+                                new Slf4jLogConsumer(
+                                        
DockerLoggerFactory.getLogger(DEBEZIUM_DOCKER_IMAGE)))
+                        .dependsOn(KAFKA_CONTAINER, MYSQL_CONTAINER);
+        DEBEZIUM_CONTAINER.setWaitStrategy(
+                (new HttpWaitStrategy())
+                        .forPath("/connectors")
+                        .forPort(DEBEZIUM_PORT)
+                        .withStartupTimeout(Duration.ofSeconds(120)));
+        DEBEZIUM_CONTAINER.setPortBindings(
+                com.google.common.collect.Lists.newArrayList(
+                        String.format("%s:%s", DEBEZIUM_PORT, DEBEZIUM_PORT)));
+    }
+
+    private void createKafkaContainer() {
+        KAFKA_CONTAINER =
+                new KafkaContainer(DockerImageName.parse(KAFKA_IMAGE_NAME))
+                        .withNetwork(NETWORK)
+                        .withNetworkAliases(KAFKA_HOST)
+                        .withLogConsumer(
+                                new Slf4jLogConsumer(
+                                        
DockerLoggerFactory.getLogger(KAFKA_IMAGE_NAME)));
+    }
+
+    private void createMysqlContainer() {
+        MYSQL_CONTAINER =
+                new MySqlContainer(MySqlVersion.V8_0)
+                        
.withConfigurationOverride("docker/server-gtids/my.cnf")
+                        .withSetupSQL("docker/setup.sql")
+                        .withNetwork(NETWORK)
+                        .withNetworkAliases(MYSQL_HOST)
+                        .withDatabaseName("debezium")
+                        .withUsername("st_user")
+                        .withPassword("seatunnel")
+                        .withLogConsumer(new Slf4jLogConsumer(LOG));
+    }
+
+    private void createPostgreSQLContainer() {
+        POSTGRESQL_CONTAINER =
+                new PostgreSQLContainer<>(DockerImageName.parse(PG_IMAGE))
+                        .withNetwork(NETWORK)
+                        .withNetworkAliases("postgresql_e2e")
+                        .withLogConsumer(
+                                new 
Slf4jLogConsumer(DockerLoggerFactory.getLogger(PG_IMAGE)));
+        POSTGRESQL_CONTAINER.setPortBindings(
+                Lists.newArrayList(String.format("%s:%s", PG_PORT, PG_PORT)));
+    }
+
+    @BeforeAll
+    @Override
+    public void startUp() throws Exception {
+        LOG.info("The first stage: Starting Kafka containers...");
+        createKafkaContainer();
+        Startables.deepStart(Stream.of(KAFKA_CONTAINER)).join();
+
+        LOG.info("The second stage: Starting Mysql containers...");
+        createMysqlContainer();
+        Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join();
+
+        LOG.info("The third stage: Starting Debezium Connector containers...");
+        createDebeziumContainer();
+        Startables.deepStart(Stream.of(DEBEZIUM_CONTAINER)).join();
+
+        LOG.info("The fourth stage: Starting PostgreSQL container...");
+        createPostgreSQLContainer();
+        Startables.deepStart(Stream.of(POSTGRESQL_CONTAINER)).join();
+        Class.forName(POSTGRESQL_CONTAINER.getDriverClassName());
+
+        Awaitility.given()
+                .ignoreExceptions()
+                .atLeast(100, TimeUnit.MILLISECONDS)
+                .pollInterval(500, TimeUnit.MILLISECONDS)
+                .atMost(3, TimeUnit.MINUTES)
+                .untilAsserted(this::initializeSourceTableData);
+
+        given().ignoreExceptions()
+                .atLeast(100, TimeUnit.MILLISECONDS)
+                .pollInterval(500, TimeUnit.MILLISECONDS)
+                .atMost(3, TimeUnit.MINUTES)
+                .untilAsserted(this::initKafkaConsumer);
+
+        given().ignoreExceptions()
+                .await()
+                .atLeast(100, TimeUnit.MILLISECONDS)
+                .pollInterval(500, TimeUnit.MILLISECONDS)
+                .atMost(5, TimeUnit.MINUTES)
+                .untilAsserted(this::initializeSinkJdbcTable);
+
+        Container.ExecResult extraCommand =
+                DEBEZIUM_CONTAINER.execInContainer(
+                        "bash",
+                        "-c",
+                        "cd /tmp/seatunnel/plugins/Jdbc && curl -i -X POST -H 
\"Accept:application/json\" -H  \"Content-Type:application/json\" http://";
+                                + getLinuxLocalIp()
+                                + ":8083/connectors/ -d @register-mysql.json");
+        Assertions.assertEquals(0, extraCommand.getExitCode());
+        // ensure debezium has handled the data
+        Thread.sleep(30 * 1000);
+        updateSourceTableData();
+        Thread.sleep(30 * 1000);
+    }
+
+    @AfterAll
+    @Override
+    public void tearDown() throws Exception {
+        MYSQL_CONTAINER.close();
+        KAFKA_CONTAINER.close();
+        DEBEZIUM_CONTAINER.close();
+        POSTGRESQL_CONTAINER.close();
+    }
+
+    @TestTemplate
+    public void testKafkaSinkDebeziumFormat(TestContainer container)
+            throws IOException, InterruptedException {
+        Container.ExecResult execResult =
+                container.executeJob("/kafkasource_debezium_to_kafka.conf");
+        Assertions.assertEquals(0, execResult.getExitCode(), 
execResult.getStderr());
+        ArrayList<String> result = new ArrayList<>();
+        kafkaConsumer.subscribe(Lists.newArrayList(KAFKA_TOPIC));
+        await().atMost(60000, TimeUnit.MILLISECONDS)
+                .untilAsserted(
+                        () -> {
+                            ConsumerRecords<String, String> consumerRecords =
+                                    
kafkaConsumer.poll(Duration.ofMillis(1000));
+                            for (ConsumerRecord<String, String> record : 
consumerRecords) {
+                                result.add(record.value());
+                            }
+                            Assertions.assertEquals(12, result.size());
+                        });
+    }
+
+    @TestTemplate
+    public void testDebeziumFormatKafkaCdcToPgsql(TestContainer container)
+            throws IOException, InterruptedException, SQLException {
+        Container.ExecResult execResult =
+                
container.executeJob("/kafkasource_debezium_cdc_to_pgsql.conf");
+        Assertions.assertEquals(0, execResult.getExitCode(), 
execResult.getStderr());
+        Set<List<Object>> actual = new HashSet<>();
+        try (Connection connection =
+                DriverManager.getConnection(
+                        POSTGRESQL_CONTAINER.getJdbcUrl(),
+                        POSTGRESQL_CONTAINER.getUsername(),
+                        POSTGRESQL_CONTAINER.getPassword())) {
+            try (Statement statement = connection.createStatement()) {
+                ResultSet resultSet = statement.executeQuery("select * from 
sink order by id");
+                while (resultSet.next()) {
+                    List<Object> row =
+                            Arrays.asList(
+                                    resultSet.getInt("id"),
+                                    resultSet.getString("name"),
+                                    resultSet.getString("description"),
+                                    resultSet.getString("weight"));
+                    actual.add(row);
+                }
+            }
+        }
+        Set<List<Object>> expected =
+                Stream.<List<Object>>of(
+                                Arrays.asList(101, "scooter", "Small 2-wheel 
scooter", "4.56"),
+                                Arrays.asList(102, "car battery", "12V car 
battery", "8.1"),
+                                Arrays.asList(
+                                        103,
+                                        "12-pack drill bits",
+                                        "12-pack of drill bits with sizes 
ranging from #40 to #3",
+                                        "0.8"),
+                                Arrays.asList(104, "hammer", "12oz carpenter's 
hammer", "0.75"),
+                                Arrays.asList(105, "hammer", "14oz carpenter's 
hammer", "0.875"),
+                                Arrays.asList(106, "hammer", "16oz carpenter's 
hammer", "1"),
+                                Arrays.asList(107, "rocks", "box of assorted 
rocks", "5.3"),
+                                Arrays.asList(
+                                        108, "jacket", "water resistent black 
wind breaker", "0.1"))
+                        .collect(Collectors.toSet());
+        Assertions.assertIterableEquals(expected, actual);
+    }
+
+    public void initializeSourceTableData() throws Exception {
+        try (Connection connection =
+                        DriverManager.getConnection(
+                                MYSQL_CONTAINER.getJdbcUrl(),
+                                MYSQL_CONTAINER.getUsername(),
+                                MYSQL_CONTAINER.getPassword());
+                Statement statement = connection.createStatement()) {
+            statement.execute("create database if not exists debezium");
+            statement.execute(
+                    "CREATE TABLE if not exists debezium.products (\n"
+                            + "  id INTEGER NOT NULL AUTO_INCREMENT PRIMARY 
KEY,\n"
+                            + "  name VARCHAR(255) NOT NULL DEFAULT 
'SeaTunnel',\n"
+                            + "  description VARCHAR(512),\n"
+                            + "  weight VARCHAR(512)\n"
+                            + ");");
+            statement.execute(
+                    "INSERT INTO debezium.products\n"
+                            + "VALUES (101,\"scooter\",\"Small 2-wheel 
scooter\",\"3.14\"),\n"
+                            + "       (102,\"car battery\",\"12V car 
battery\",\"8.1\"),\n"
+                            + "       (103,\"12-pack drill bits\",\"12-pack of 
drill bits with sizes ranging from #40 to #3\","
+                            + "\"0.8\"),\n"
+                            + "       (104,\"hammer\",\"12oz carpenter's 
hammer\",\"0.75\"),\n"
+                            + "       (105,\"hammer\",\"14oz carpenter's 
hammer\",\"0.875\"),\n"
+                            + "       (106,\"hammer\",\"16oz carpenter's 
hammer\",\"1.0\"),\n"
+                            + "       (107,\"rocks\",\"box of assorted 
rocks\",\"5.3\"),\n"
+                            + "       (108,\"jacket\",\"water resistent black 
wind breaker\",\"0.1\"),\n"
+                            + "       (109,\"spare tire\",\"24 inch spare 
tire\",\"22.2\")");
+        }
+    }
+
+    public void updateSourceTableData() throws Exception {
+        try (Connection connection =
+                        DriverManager.getConnection(
+                                MYSQL_CONTAINER.getJdbcUrl(),
+                                MYSQL_CONTAINER.getUsername(),
+                                MYSQL_CONTAINER.getPassword());
+                Statement statement = connection.createStatement()) {
+            statement.execute(
+                    "UPDATE debezium.products SET weight = '4.56' WHERE name = 
'scooter'");
+            statement.execute("DELETE FROM debezium.products WHERE name  = 
\"spare tire\"");
+        }
+    }
+
+    private void initializeSinkJdbcTable() {
+        try (Connection connection =
+                        DriverManager.getConnection(
+                                POSTGRESQL_CONTAINER.getJdbcUrl(),
+                                POSTGRESQL_CONTAINER.getUsername(),
+                                POSTGRESQL_CONTAINER.getPassword());
+                Statement statement = connection.createStatement()) {
+            String sink =
+                    "create table sink(\n"
+                            + "id INT NOT NULL PRIMARY KEY,\n"
+                            + "name varchar(255),\n"
+                            + "description varchar(255),\n"
+                            + "weight varchar(255)"
+                            + ")";
+            statement.execute(sink);
+        } catch (SQLException e) {
+            throw new RuntimeException("Initializing PostgreSql table 
failed!", e);
+        }
+    }
+
+    private void initKafkaConsumer() {
+        Properties prop = new Properties();
+        String bootstrapServers = KAFKA_CONTAINER.getBootstrapServers();
+        prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+        prop.put(
+                ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+                "org.apache.kafka.common.serialization.StringDeserializer");
+        prop.put(
+                ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+                "org.apache.kafka.common.serialization.StringDeserializer");
+        prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
+        prop.put(ConsumerConfig.GROUP_ID_CONFIG, 
"seatunnel-debezium-sink-group");
+        kafkaConsumer = new KafkaConsumer<>(prop);
+    }
+
+    public String getLinuxLocalIp() {
+        String ip = "";
+        try {
+            Enumeration<NetworkInterface> networkInterfaces =
+                    NetworkInterface.getNetworkInterfaces();
+            while (networkInterfaces.hasMoreElements()) {
+                NetworkInterface networkInterface = 
networkInterfaces.nextElement();
+                Enumeration<InetAddress> inetAddresses = 
networkInterface.getInetAddresses();
+                while (inetAddresses.hasMoreElements()) {
+                    InetAddress inetAddress = inetAddresses.nextElement();
+                    if (!inetAddress.isLoopbackAddress() && inetAddress 
instanceof Inet4Address) {
+                        ip = inetAddress.getHostAddress();
+                    }
+                }
+            }
+        } catch (SocketException ex) {
+            log.warn("Failed to get linux local ip, it will return [\"\"] ", 
ex);
+        }
+        return ip;
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/debezium/register-mysql.json
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/debezium/register-mysql.json
new file mode 100644
index 0000000000..d70e8e0c61
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/debezium/register-mysql.json
@@ -0,0 +1,16 @@
+{
+  "name": "inventory-connector",
+  "config": {
+    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
+    "tasks.max": "1",
+    "database.hostname": "mysql",
+    "database.port": "3306",
+    "database.user": "st_user",
+    "database.password": "seatunnel",
+    "database.server.id": "184054",
+    "topic.prefix": "dbserver1",
+    "database.include.list": "debezium",
+    "schema.history.internal.kafka.bootstrap.servers": "kafka_dbz_e2e:9092",
+    "schema.history.internal.kafka.topic": "schema-changes.debezium"
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_debezium_cdc_to_pgsql.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_debezium_cdc_to_pgsql.conf
new file mode 100644
index 0000000000..a0531b2345
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_debezium_cdc_to_pgsql.conf
@@ -0,0 +1,62 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+    execution.parallelism = 1
+    job.mode = "BATCH"
+
+    #spark config
+    spark.app.name = "SeaTunnel"
+    spark.executor.instances = 1
+    spark.executor.cores = 1
+    spark.executor.memory = "1g"
+    spark.master = local
+}
+
+source {
+  Kafka {
+    bootstrap.servers = "kafka_dbz_e2e:9092"
+    topic = "dbserver1.debezium.products"
+    result_table_name = "kafka_name"
+    start_mode = earliest
+    format = debezium_json
+    schema = {
+      fields {
+           id = "int"
+           name = "string"
+           description = "string"
+           weight = "float"
+      }
+    }
+  }
+}
+
+sink {
+    Jdbc {
+        driver = org.postgresql.Driver
+        url = "jdbc:postgresql://postgresql_e2e:5432/test?loggerLevel=OFF"
+        user = test
+        password = test
+        generate_sink_sql = true
+        database = public
+        table = sink
+        primary_keys = ["id"]
+    }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_debezium_to_kafka.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_debezium_to_kafka.conf
new file mode 100644
index 0000000000..4944829c24
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_debezium_to_kafka.conf
@@ -0,0 +1,57 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+    execution.parallelism = 1
+    job.mode = "BATCH"
+
+    #spark config
+    spark.app.name = "SeaTunnel"
+    spark.executor.instances = 1
+    spark.executor.cores = 1
+    spark.executor.memory = "1g"
+    spark.master = local
+}
+
+source {
+  Kafka {
+    bootstrap.servers = "kafka_dbz_e2e:9092"
+    topic = "dbserver1.debezium.products"
+    result_table_name = "kafka_name"
+    start_mode = earliest
+    format = debezium_json
+    schema = {
+      fields {
+           id = "int"
+           name = "string"
+           description = "string"
+           weight = "float"
+      }
+    }
+  }
+}
+
+sink {
+  Kafka {
+    bootstrap.servers = "kafka_dbz_e2e:9092"
+    topic = "test-debezium-sink"
+    format = debezium_json
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonFormatOptions.java
 
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonFormatOptions.java
index 7b10ad57a6..9ce4dc5541 100644
--- 
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonFormatOptions.java
+++ 
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonFormatOptions.java
@@ -24,6 +24,12 @@ import org.apache.seatunnel.api.configuration.Options;
 import java.util.Map;
 
 public class JsonFormatOptions {
+    public static final Option<Boolean> FAIL_ON_MISSING_FIELD =
+            Options.key("fail-on-missing-field")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Optional flag to specify whether to fail if a 
field is missing or not, false by default.");
 
     public static final Option<Boolean> IGNORE_PARSE_ERRORS =
             Options.key("ignore-parse-errors")
@@ -33,13 +39,6 @@ public class JsonFormatOptions {
                             "Optional flag to skip fields and rows with parse 
errors instead of failing;\n"
                                     + "fields are set to null in case of 
errors, false by default.");
 
-    public static final Option<Boolean> FAIL_ON_MISSING_FIELD =
-            Options.key("fail-on-missing-field")
-                    .booleanType()
-                    .defaultValue(false)
-                    .withDescription(
-                            "Optional flag to specify whether to fail if a 
field is missing or not, false by default.");
-
     public static boolean getFailOnMissingField(Map<String, String> options) {
         return Boolean.parseBoolean(
                 options.getOrDefault(
diff --git 
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonDeserializationSchema.java
 
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonDeserializationSchema.java
new file mode 100644
index 0000000000..3996c4ed7d
--- /dev/null
+++ 
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonDeserializationSchema.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.format.json.debezium;
+
+import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode;
+
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.type.RowKind;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import org.apache.seatunnel.format.json.JsonDeserializationSchema;
+import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
+
+import java.io.IOException;
+
+public class DebeziumJsonDeserializationSchema implements 
DeserializationSchema<SeaTunnelRow> {
+    private static final long serialVersionUID = 1L;
+
+    private static final String OP_READ = "r"; // snapshot read
+    private static final String OP_CREATE = "c"; // insert
+    private static final String OP_UPDATE = "u"; // update
+    private static final String OP_DELETE = "d"; // delete
+
+    private static final String REPLICA_IDENTITY_EXCEPTION =
+            "The \"before\" field of %s message is null, "
+                    + "if you are using Debezium Postgres Connector, "
+                    + "please check the Postgres table has been set REPLICA 
IDENTITY to FULL level.";
+
+    private final SeaTunnelRowType rowType;
+
+    private final JsonDeserializationSchema jsonDeserializer;
+
+    private final boolean ignoreParseErrors;
+
+    private final boolean debeziumEnabledSchema;
+
+    public DebeziumJsonDeserializationSchema(SeaTunnelRowType rowType, boolean 
ignoreParseErrors) {
+        this.rowType = rowType;
+        this.ignoreParseErrors = ignoreParseErrors;
+        this.jsonDeserializer =
+                new JsonDeserializationSchema(false, ignoreParseErrors, 
createJsonRowType(rowType));
+        this.debeziumEnabledSchema = false;
+    }
+
+    public DebeziumJsonDeserializationSchema(
+            SeaTunnelRowType rowType, boolean ignoreParseErrors, boolean 
debeziumEnabledSchema) {
+        this.rowType = rowType;
+        this.ignoreParseErrors = ignoreParseErrors;
+        this.jsonDeserializer =
+                new JsonDeserializationSchema(false, ignoreParseErrors, 
createJsonRowType(rowType));
+        this.debeziumEnabledSchema = debeziumEnabledSchema;
+    }
+
+    @Override
+    public SeaTunnelRow deserialize(byte[] message) throws IOException {
+        throw new UnsupportedOperationException(
+                "Please invoke DeserializationSchema#deserialize(byte[], 
Collector<SeaTunnelRow>) instead.");
+    }
+
+    @Override
+    public void deserialize(byte[] message, Collector<SeaTunnelRow> out) 
throws IOException {
+        if (message == null || message.length == 0) {
+            // skip tombstone messages
+            return;
+        }
+
+        try {
+            JsonNode payload = getPayload(convertBytes(message));
+            String op = payload.get("op").asText();
+
+            if (OP_CREATE.equals(op) || OP_READ.equals(op)) {
+                SeaTunnelRow insert = convertJsonNode(payload.get("after"));
+                insert.setRowKind(RowKind.INSERT);
+                out.collect(insert);
+            } else if (OP_UPDATE.equals(op)) {
+                SeaTunnelRow before = convertJsonNode(payload.get("before"));
+                if (before == null) {
+                    throw new SeaTunnelJsonFormatException(
+                            CommonErrorCode.UNSUPPORTED_DATA_TYPE,
+                            String.format(REPLICA_IDENTITY_EXCEPTION, 
"UPDATE"));
+                }
+                before.setRowKind(RowKind.UPDATE_BEFORE);
+                out.collect(before);
+
+                SeaTunnelRow after = convertJsonNode(payload.get("after"));
+                after.setRowKind(RowKind.UPDATE_AFTER);
+                out.collect(after);
+            } else if (OP_DELETE.equals(op)) {
+                SeaTunnelRow delete = convertJsonNode(payload.get("before"));
+                if (delete == null) {
+                    throw new SeaTunnelJsonFormatException(
+                            CommonErrorCode.UNSUPPORTED_DATA_TYPE,
+                            String.format(REPLICA_IDENTITY_EXCEPTION, 
"UPDATE"));
+                }
+                delete.setRowKind(RowKind.DELETE);
+                out.collect(delete);
+            } else {
+                if (!ignoreParseErrors) {
+                    throw new SeaTunnelJsonFormatException(
+                            CommonErrorCode.UNSUPPORTED_DATA_TYPE,
+                            String.format(
+                                    "Unknown \"op\" value \"%s\". The Debezium 
JSON message is '%s'",
+                                    op, new String(message)));
+                }
+            }
+        } catch (Throwable t) {
+            // a big try catch to protect the processing.
+            if (!ignoreParseErrors) {
+                throw new SeaTunnelJsonFormatException(
+                        CommonErrorCode.UNSUPPORTED_DATA_TYPE,
+                        String.format("Corrupt Debezium JSON message '%s'.", 
new String(message)),
+                        t);
+            }
+        }
+    }
+
+    private JsonNode getPayload(JsonNode jsonNode) {
+        if (debeziumEnabledSchema) {
+            return jsonNode.get("payload");
+        }
+        return jsonNode;
+    }
+
+    private JsonNode convertBytes(byte[] message) {
+        try {
+            return jsonDeserializer.deserializeToJsonNode(message);
+        } catch (Exception t) {
+            if (ignoreParseErrors) {
+                return null;
+            }
+            throw new SeaTunnelJsonFormatException(
+                    CommonErrorCode.JSON_OPERATION_FAILED,
+                    String.format("Failed to deserialize JSON '%s'.", new 
String(message)),
+                    t);
+        }
+    }
+
+    private SeaTunnelRow convertJsonNode(JsonNode root) {
+        return jsonDeserializer.convertToRowData(root);
+    }
+
+    @Override
+    public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
+        return this.rowType;
+    }
+
+    private static SeaTunnelRowType createJsonRowType(SeaTunnelRowType 
databaseSchema) {
+        return databaseSchema;
+    }
+}
diff --git 
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonFormatFactory.java
 
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonFormatFactory.java
new file mode 100644
index 0000000000..e59c9794fb
--- /dev/null
+++ 
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonFormatFactory.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.format.json.debezium;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
+import org.apache.seatunnel.api.serialization.SerializationSchema;
+import org.apache.seatunnel.api.table.connector.DeserializationFormat;
+import org.apache.seatunnel.api.table.connector.SerializationFormat;
+import org.apache.seatunnel.api.table.factory.DeserializationFormatFactory;
+import org.apache.seatunnel.api.table.factory.SerializationFormatFactory;
+import org.apache.seatunnel.api.table.factory.TableFactoryContext;
+
+import java.util.Map;
+
+public class DebeziumJsonFormatFactory
+        implements DeserializationFormatFactory, SerializationFormatFactory {
+
+    public static final String IDENTIFIER = "debezium_json";
+
+    @Override
+    public String factoryIdentifier() {
+        return IDENTIFIER;
+    }
+
+    @Override
+    public OptionRule optionRule() {
+        return OptionRule.builder().build();
+    }
+
+    @Override
+    public SerializationFormat createSerializationFormat(TableFactoryContext 
context) {
+        return new SerializationFormat() {
+            @Override
+            public SerializationSchema createSerializationSchema() {
+                return new DebeziumJsonSerializationSchema(null);
+            }
+        };
+    }
+
+    @Override
+    public DeserializationFormat 
createDeserializationFormat(TableFactoryContext context) {
+        Map<String, String> options = context.getOptions().toMap();
+        boolean ignoreParseErrors = 
DebeziumJsonFormatOptions.getIgnoreParseErrors(options);
+        boolean schemaInclude = 
DebeziumJsonFormatOptions.getSchemaInclude(options);
+
+        // TODO config SeaTunnelRowType
+        return new DeserializationFormat() {
+            @Override
+            public DeserializationSchema createDeserializationSchema() {
+                return new DebeziumJsonDeserializationSchema(null, 
ignoreParseErrors);
+            }
+        };
+    }
+}
diff --git 
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonFormatOptions.java
 
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonFormatOptions.java
new file mode 100644
index 0000000000..eb75bfd2b0
--- /dev/null
+++ 
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonFormatOptions.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.format.json.debezium;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.format.json.JsonFormatOptions;
+
+import java.util.Map;
+
+public class DebeziumJsonFormatOptions {
+
+    public static final int GENERATE_ROW_SIZE = 3;
+
+    public static final Option<Boolean> IGNORE_PARSE_ERRORS = 
JsonFormatOptions.IGNORE_PARSE_ERRORS;
+
+    public static final Option<Boolean> SCHEMA_INCLUDE =
+            Options.key("schema-include")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "When setting up a Debezium Kafka Connect, users 
can enable "
+                                    + "a Kafka configuration 
'value.converter.schemas.enable' to include schema in the message. "
+                                    + "This option indicates the Debezium JSON 
data include the schema in the message or not. "
+                                    + "Default is false.");
+
+    public static boolean getSchemaInclude(Map<String, String> options) {
+        return Boolean.parseBoolean(
+                options.getOrDefault(
+                        SCHEMA_INCLUDE.key(), 
SCHEMA_INCLUDE.defaultValue().toString()));
+    }
+
+    public static boolean getIgnoreParseErrors(Map<String, String> options) {
+        return Boolean.parseBoolean(
+                options.getOrDefault(
+                        IGNORE_PARSE_ERRORS.key(), 
IGNORE_PARSE_ERRORS.defaultValue().toString()));
+    }
+}
diff --git 
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonSerializationSchema.java
 
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonSerializationSchema.java
new file mode 100644
index 0000000000..5b1e476abc
--- /dev/null
+++ 
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonSerializationSchema.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.format.json.debezium;
+
+import org.apache.seatunnel.api.serialization.SerializationSchema;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import org.apache.seatunnel.format.json.JsonSerializationSchema;
+import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
+
+import static org.apache.seatunnel.api.table.type.BasicType.STRING_TYPE;
+import static 
org.apache.seatunnel.format.json.debezium.DebeziumJsonFormatOptions.GENERATE_ROW_SIZE;
+
+public class DebeziumJsonSerializationSchema implements SerializationSchema {
+    private static final long serialVersionUID = 1L;
+
+    private static final String OP_INSERT = "c"; // insert
+    private static final String OP_DELETE = "d"; // delete
+
+    private final JsonSerializationSchema jsonSerializer;
+
+    private transient SeaTunnelRow genericRow;
+
+    public DebeziumJsonSerializationSchema(SeaTunnelRowType rowType) {
+        this.jsonSerializer = new 
JsonSerializationSchema(createJsonRowType(rowType));
+        this.genericRow = new SeaTunnelRow(GENERATE_ROW_SIZE);
+    }
+
+    @Override
+    public byte[] serialize(SeaTunnelRow row) {
+        try {
+            switch (row.getRowKind()) {
+                case INSERT:
+                case UPDATE_AFTER:
+                    genericRow.setField(0, null);
+                    genericRow.setField(1, row);
+                    genericRow.setField(2, OP_INSERT);
+                    return jsonSerializer.serialize(genericRow);
+                case UPDATE_BEFORE:
+                case DELETE:
+                    genericRow.setField(0, row);
+                    genericRow.setField(1, null);
+                    genericRow.setField(2, OP_DELETE);
+                    return jsonSerializer.serialize(genericRow);
+                default:
+                    throw new UnsupportedOperationException(
+                            String.format(
+                                    "Unsupported operation '%s' for row 
kind.", row.getRowKind()));
+            }
+        } catch (Throwable t) {
+            throw new SeaTunnelJsonFormatException(
+                    CommonErrorCode.JSON_OPERATION_FAILED,
+                    String.format("Could not serialize row %s.", row),
+                    t);
+        }
+    }
+
+    private static SeaTunnelRowType createJsonRowType(SeaTunnelRowType 
databaseSchema) {
+        return new SeaTunnelRowType(
+                new String[] {"before", "after", "op"},
+                new SeaTunnelDataType[] {databaseSchema, databaseSchema, 
STRING_TYPE});
+    }
+}
diff --git 
a/seatunnel-formats/seatunnel-format-json/src/main/resources/META-INF/services/org.apache.seatunnel.api.table.factory.Factory
 
b/seatunnel-formats/seatunnel-format-json/src/main/resources/META-INF/services/org.apache.seatunnel.api.table.factory.Factory
index db11c51c4a..cedeba7515 100644
--- 
a/seatunnel-formats/seatunnel-format-json/src/main/resources/META-INF/services/org.apache.seatunnel.api.table.factory.Factory
+++ 
b/seatunnel-formats/seatunnel-format-json/src/main/resources/META-INF/services/org.apache.seatunnel.api.table.factory.Factory
@@ -17,3 +17,4 @@
 
 org.apache.seatunnel.format.json.JsonFormatFactory
 org.apache.seatunnel.format.json.canal.CanalJsonFormatFactory
+org.apache.seatunnel.format.json.debezium.DebeziumJsonFormatFactory
diff --git 
a/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonSerDeSchemaTest.java
 
b/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonSerDeSchemaTest.java
new file mode 100644
index 0000000000..20088e525b
--- /dev/null
+++ 
b/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonSerDeSchemaTest.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.format.json.debezium;
+
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.seatunnel.api.table.type.BasicType.FLOAT_TYPE;
+import static org.apache.seatunnel.api.table.type.BasicType.INT_TYPE;
+import static org.apache.seatunnel.api.table.type.BasicType.STRING_TYPE;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class DebeziumJsonSerDeSchemaTest {
+
+    private static final SeaTunnelRowType PHYSICAL_DATA_TYPE =
+            new SeaTunnelRowType(
+                    new String[] {"id", "name", "description", "weight"},
+                    new SeaTunnelDataType[] {INT_TYPE, STRING_TYPE, 
STRING_TYPE, FLOAT_TYPE});
+
+    @Test
+    void testNullRowMessages() throws Exception {
+        DebeziumJsonDeserializationSchema deserializationSchema =
+                new DebeziumJsonDeserializationSchema(PHYSICAL_DATA_TYPE, 
false);
+        SimpleCollector collector = new SimpleCollector();
+
+        deserializationSchema.deserialize(null, collector);
+        deserializationSchema.deserialize(new byte[0], collector);
+        assertEquals(0, collector.list.size());
+    }
+
+    @Test
+    public void testSerializationAndSchemaExcludeDeserialization() throws 
Exception {
+        testSerializationDeserialization("debezium-data.txt", false);
+    }
+
+    private void testSerializationDeserialization(String resourceFile, boolean 
schemaInclude)
+            throws Exception {
+        List<String> lines = readLines(resourceFile);
+        DebeziumJsonDeserializationSchema deserializationSchema =
+                new DebeziumJsonDeserializationSchema(PHYSICAL_DATA_TYPE, 
true, schemaInclude);
+
+        SimpleCollector collector = new SimpleCollector();
+
+        for (String line : lines) {
+            
deserializationSchema.deserialize(line.getBytes(StandardCharsets.UTF_8), 
collector);
+        }
+
+        List<String> expected =
+                Arrays.asList(
+                        "SeaTunnelRow{tableId=, kind=+I, fields=[101, scooter, 
Small 2-wheel scooter, 3.14]}",
+                        "SeaTunnelRow{tableId=, kind=+I, fields=[102, car 
battery, 12V car battery, 8.1]}",
+                        "SeaTunnelRow{tableId=, kind=+I, fields=[103, 12-pack 
drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8]}",
+                        "SeaTunnelRow{tableId=, kind=+I, fields=[104, hammer, 
12oz carpenter's hammer, 0.75]}",
+                        "SeaTunnelRow{tableId=, kind=+I, fields=[105, hammer, 
14oz carpenter's hammer, 0.875]}",
+                        "SeaTunnelRow{tableId=, kind=+I, fields=[106, hammer, 
16oz carpenter's hammer, 1.0]}",
+                        "SeaTunnelRow{tableId=, kind=+I, fields=[107, rocks, 
box of assorted rocks, 5.3]}",
+                        "SeaTunnelRow{tableId=, kind=+I, fields=[108, jacket, 
water resistent black wind breaker, 0.1]}",
+                        "SeaTunnelRow{tableId=, kind=+I, fields=[109, spare 
tire, 24 inch spare tire, 22.2]}",
+                        "SeaTunnelRow{tableId=, kind=-U, fields=[106, hammer, 
16oz carpenter's hammer, 1.0]}",
+                        "SeaTunnelRow{tableId=, kind=+U, fields=[106, hammer, 
18oz carpenter hammer, 1.0]}",
+                        "SeaTunnelRow{tableId=, kind=-U, fields=[107, rocks, 
box of assorted rocks, 5.3]}",
+                        "SeaTunnelRow{tableId=, kind=+U, fields=[107, rocks, 
box of assorted rocks, 5.1]}",
+                        "SeaTunnelRow{tableId=, kind=+I, fields=[110, jacket, 
water resistent white wind breaker, 0.2]}",
+                        "SeaTunnelRow{tableId=, kind=+I, fields=[111, scooter, 
Big 2-wheel scooter , 5.18]}",
+                        "SeaTunnelRow{tableId=, kind=-U, fields=[110, jacket, 
water resistent white wind breaker, 0.2]}",
+                        "SeaTunnelRow{tableId=, kind=+U, fields=[110, jacket, 
new water resistent white wind breaker, 0.5]}",
+                        "SeaTunnelRow{tableId=, kind=-U, fields=[111, scooter, 
Big 2-wheel scooter , 5.18]}",
+                        "SeaTunnelRow{tableId=, kind=+U, fields=[111, scooter, 
Big 2-wheel scooter , 5.17]}",
+                        "SeaTunnelRow{tableId=, kind=-D, fields=[111, scooter, 
Big 2-wheel scooter , 5.17]}");
+        List<String> actual =
+                
collector.list.stream().map(Object::toString).collect(Collectors.toList());
+        assertEquals(expected, actual);
+
+        DebeziumJsonSerializationSchema serializationSchema =
+                new DebeziumJsonSerializationSchema(PHYSICAL_DATA_TYPE);
+
+        actual = new ArrayList<>();
+        for (SeaTunnelRow rowData : collector.list) {
+            actual.add(new String(serializationSchema.serialize(rowData), 
StandardCharsets.UTF_8));
+        }
+
+        expected =
+                Arrays.asList(
+                        
"{\"before\":null,\"after\":{\"id\":101,\"name\":\"scooter\",\"description\":\"Small
 2-wheel scooter\",\"weight\":3.14},\"op\":\"c\"}",
+                        "{\"before\":null,\"after\":{\"id\":102,\"name\":\"car 
battery\",\"description\":\"12V car battery\",\"weight\":8.1},\"op\":\"c\"}",
+                        
"{\"before\":null,\"after\":{\"id\":103,\"name\":\"12-pack drill 
bits\",\"description\":\"12-pack of drill bits with sizes ranging from #40 to 
#3\",\"weight\":0.8},\"op\":\"c\"}",
+                        
"{\"before\":null,\"after\":{\"id\":104,\"name\":\"hammer\",\"description\":\"12oz
 carpenter's hammer\",\"weight\":0.75},\"op\":\"c\"}",
+                        
"{\"before\":null,\"after\":{\"id\":105,\"name\":\"hammer\",\"description\":\"14oz
 carpenter's hammer\",\"weight\":0.875},\"op\":\"c\"}",
+                        
"{\"before\":null,\"after\":{\"id\":106,\"name\":\"hammer\",\"description\":\"16oz
 carpenter's hammer\",\"weight\":1.0},\"op\":\"c\"}",
+                        
"{\"before\":null,\"after\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box
 of assorted rocks\",\"weight\":5.3},\"op\":\"c\"}",
+                        
"{\"before\":null,\"after\":{\"id\":108,\"name\":\"jacket\",\"description\":\"water
 resistent black wind breaker\",\"weight\":0.1},\"op\":\"c\"}",
+                        
"{\"before\":null,\"after\":{\"id\":109,\"name\":\"spare 
tire\",\"description\":\"24 inch spare tire\",\"weight\":22.2},\"op\":\"c\"}",
+                        
"{\"before\":{\"id\":106,\"name\":\"hammer\",\"description\":\"16oz carpenter's 
hammer\",\"weight\":1.0},\"after\":null,\"op\":\"d\"}",
+                        
"{\"before\":null,\"after\":{\"id\":106,\"name\":\"hammer\",\"description\":\"18oz
 carpenter hammer\",\"weight\":1.0},\"op\":\"c\"}",
+                        
"{\"before\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted 
rocks\",\"weight\":5.3},\"after\":null,\"op\":\"d\"}",
+                        
"{\"before\":null,\"after\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box
 of assorted rocks\",\"weight\":5.1},\"op\":\"c\"}",
+                        
"{\"before\":null,\"after\":{\"id\":110,\"name\":\"jacket\",\"description\":\"water
 resistent white wind breaker\",\"weight\":0.2},\"op\":\"c\"}",
+                        
"{\"before\":null,\"after\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big
 2-wheel scooter \",\"weight\":5.18},\"op\":\"c\"}",
+                        
"{\"before\":{\"id\":110,\"name\":\"jacket\",\"description\":\"water resistent 
white wind breaker\",\"weight\":0.2},\"after\":null,\"op\":\"d\"}",
+                        
"{\"before\":null,\"after\":{\"id\":110,\"name\":\"jacket\",\"description\":\"new
 water resistent white wind breaker\",\"weight\":0.5},\"op\":\"c\"}",
+                        
"{\"before\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel 
scooter \",\"weight\":5.18},\"after\":null,\"op\":\"d\"}",
+                        
"{\"before\":null,\"after\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big
 2-wheel scooter \",\"weight\":5.17},\"op\":\"c\"}",
+                        
"{\"before\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel 
scooter \",\"weight\":5.17},\"after\":null,\"op\":\"d\"}");
+        assertEquals(expected, actual);
+    }
+    // 
--------------------------------------------------------------------------------------------
+    // Utilities
+    // 
--------------------------------------------------------------------------------------------
+
+    private static List<String> readLines(String resource) throws IOException {
+        final URL url = 
DebeziumJsonSerDeSchemaTest.class.getClassLoader().getResource(resource);
+        Assertions.assertNotNull(url);
+        Path path = new File(url.getFile()).toPath();
+        return Files.readAllLines(path);
+    }
+
+    private static class SimpleCollector implements Collector<SeaTunnelRow> {
+
+        private List<SeaTunnelRow> list = new ArrayList<>();
+
+        @Override
+        public void collect(SeaTunnelRow record) {
+            list.add(record);
+        }
+
+        @Override
+        public Object getCheckpointLock() {
+            return null;
+        }
+    }
+}
diff --git 
a/seatunnel-formats/seatunnel-format-json/src/test/resources/debezium-data.txt 
b/seatunnel-formats/seatunnel-format-json/src/test/resources/debezium-data.txt
new file mode 100644
index 0000000000..3763369e49
--- /dev/null
+++ 
b/seatunnel-formats/seatunnel-format-json/src/test/resources/debezium-data.txt
@@ -0,0 +1,16 @@
+{"before":null,"after":{"id":101,"name":"scooter","description":"Small 2-wheel 
scooter","weight":3.140000104904175},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606100,"transaction":null}
+{"before":null,"after":{"id":102,"name":"car battery","description":"12V car 
battery","weight":8.100000381469727},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}
+{"before":null,"after":{"id":103,"name":"12-pack drill 
bits","description":"12-pack of drill bits with sizes ranging from #40 to 
#3","weight":0.800000011920929},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}
+{"before":null,"after":{"id":104,"name":"hammer","description":"12oz 
carpenter's 
hammer","weight":0.75},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}
+{"before":null,"after":{"id":105,"name":"hammer","description":"14oz 
carpenter's 
hammer","weight":0.875},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}
+{"before":null,"after":{"id":106,"name":"hammer","description":"16oz 
carpenter's 
hammer","weight":1},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}
+{"before":null,"after":{"id":107,"name":"rocks","description":"box of assorted 
rocks","weight":5.300000190734863},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}
+{"before":null,"after":{"id":108,"name":"jacket","description":"water 
resistent black wind 
breaker","weight":0.10000000149011612},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}
+{"before":null,"after":{"id":109,"name":"spare tire","description":"24 inch 
spare 
tire","weight":22.200000762939453},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}
+{"before":{"id":106,"name":"hammer","description":"16oz carpenter's 
hammer","weight":1},"after":{"id":106,"name":"hammer","description":"18oz 
carpenter 
hammer","weight":1},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589361987000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":362,"row":0,"thread":2,"query":null},"op":"u","ts_ms":1589361987936,"transaction":null}
+{"before":{"id":107,"name":"rocks","description":"box of assorted 
rocks","weight":5.300000190734863},"after":{"id":107,"name":"rocks","description":"box
 of assorted 
rocks","weight":5.099999904632568},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589362099000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":717,"row":0,"thread":2,"query":null},"op":"u","ts_ms":1589362099505,"transactio
 [...]
+{"before":null,"after":{"id":110,"name":"jacket","description":"water 
resistent white wind 
breaker","weight":0.20000000298023224},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589362210000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":1068,"row":0,"thread":2,"query":null},"op":"c","ts_ms":1589362210230,"transaction":null}
+{"before":null,"after":{"id":111,"name":"scooter","description":"Big 2-wheel 
scooter 
","weight":5.179999828338623},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589362243000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":1394,"row":0,"thread":2,"query":null},"op":"c","ts_ms":1589362243428,"transaction":null}
+{"before":{"id":110,"name":"jacket","description":"water resistent white wind 
breaker","weight":0.20000000298023224},"after":{"id":110,"name":"jacket","description":"new
 water resistent white wind 
breaker","weight":0.5},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589362293000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":1707,"row":0,"thread":2,"query":null},"op":"u","ts_ms":1589
 [...]
+{"before":{"id":111,"name":"scooter","description":"Big 2-wheel scooter 
","weight":5.179999828338623},"after":{"id":111,"name":"scooter","description":"Big
 2-wheel scooter 
","weight":5.170000076293945},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589362330000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":2090,"row":0,"thread":2,"query":null},"op":"u","ts_ms":1589362330904,"transac
 [...]
+{"before":{"id":111,"name":"scooter","description":"Big 2-wheel scooter 
","weight":5.170000076293945},"after":null,"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589362344000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":2443,"row":0,"thread":2,"query":null},"op":"d","ts_ms":1589362344455,"transaction":null}
\ No newline at end of file

Reply via email to