This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 53a1f0c6c1 [Feature][connector][kafka] Support read debezium format
message from kafka (#5066)
53a1f0c6c1 is described below
commit 53a1f0c6c14190c37c0b28320db049f822436a4f
Author: Xiaojian Sun <[email protected]>
AuthorDate: Tue Jul 25 10:09:24 2023 +0800
[Feature][connector][kafka] Support read debezium format message from kafka
(#5066)
---
.github/workflows/backend.yml | 2 +-
docs/en/connector-v2/formats/debezium-json.md | 107 ++++++
docs/en/connector-v2/sink/Kafka.md | 16 +-
docs/en/connector-v2/source/kafka.md | 17 +-
release-note.md | 12 +-
.../connectors/seatunnel/kafka/config/Config.java | 8 +-
.../seatunnel/kafka/config/MessageFormat.java | 1 +
.../serialize/DefaultSeaTunnelRowSerializer.java | 3 +
.../seatunnel/kafka/source/KafkaSource.java | 10 +
.../seatunnel/kafka/source/KafkaSourceFactory.java | 1 +
.../e2e/connector/kafka/DebeziumToKafkaIT.java | 418 +++++++++++++++++++++
.../test/resources/debezium/register-mysql.json | 16 +
.../kafkasource_debezium_cdc_to_pgsql.conf | 62 +++
.../resources/kafkasource_debezium_to_kafka.conf | 57 +++
.../seatunnel/format/json/JsonFormatOptions.java | 13 +-
.../DebeziumJsonDeserializationSchema.java | 168 +++++++++
.../json/debezium/DebeziumJsonFormatFactory.java | 70 ++++
.../json/debezium/DebeziumJsonFormatOptions.java | 53 +++
.../debezium/DebeziumJsonSerializationSchema.java | 80 ++++
.../org.apache.seatunnel.api.table.factory.Factory | 1 +
.../json/debezium/DebeziumJsonSerDeSchemaTest.java | 163 ++++++++
.../src/test/resources/debezium-data.txt | 16 +
22 files changed, 1270 insertions(+), 24 deletions(-)
diff --git a/.github/workflows/backend.yml b/.github/workflows/backend.yml
index fbe37acece..6da4f4a5ab 100644
--- a/.github/workflows/backend.yml
+++ b/.github/workflows/backend.yml
@@ -564,7 +564,7 @@ jobs:
matrix:
java: [ '8', '11' ]
os: [ 'ubuntu-latest' ]
- timeout-minutes: 90
+ timeout-minutes: 150
steps:
- uses: actions/checkout@v2
- name: Set up JDK ${{ matrix.java }}
diff --git a/docs/en/connector-v2/formats/debezium-json.md
b/docs/en/connector-v2/formats/debezium-json.md
new file mode 100644
index 0000000000..4c40a0298e
--- /dev/null
+++ b/docs/en/connector-v2/formats/debezium-json.md
@@ -0,0 +1,107 @@
+# Debezium Format
+
+Changelog-Data-Capture Format: Serialization Schema Format: Deserialization
Schema
+
+Debezium is a set of distributed services to capture changes in your databases
so that your applications can see those changes and respond to them. Debezium
records all row-level changes within each database table in a *change event
stream*, and applications simply read these streams to see the change events in
the same order in which they occurred.
+
+Seatunnel supports to interpret Debezium JSON messages as INSERT/UPDATE/DELETE
messages into seatunnel system. This is useful in many cases to leverage this
feature, such as
+
+ synchronizing incremental data from databases to other systems
+ auditing logs
+ real-time materialized views on databases
+ temporal join changing history of a database table and so on.
+
+Seatunnel also supports to encode the INSERT/UPDATE/DELETE messages in
Seatunnel asDebezium JSON messages, and emit to storage like Kafka.
+
+# Format Options
+
+| option | default | required |
Description
|
+|-----------------------------------|---------|----------|------------------------------------------------------------------------------------------------------|
+| format | (none) | yes | Specify what format
to use, here should be 'debezium_json'.
|
+| debezium-json.ignore-parse-errors | false | no | Skip fields and
rows with parse errors instead of failing. Fields are set to null in case of
errors. |
+
+# How to use Debezium format
+
+## Kafka uses example
+
+Debezium provides a unified format for changelog, here is a simple example for
an update operation captured from a MySQL products table:
+
+```bash
+{
+ "before": {
+ "id": 111,
+ "name": "scooter",
+ "description": "Big 2-wheel scooter ",
+ "weight": 5.18
+ },
+ "after": {
+ "id": 111,
+ "name": "scooter",
+ "description": "Big 2-wheel scooter ",
+ "weight": 5.17
+ },
+ "source": {
+ "version": "1.1.1.Final",
+ "connector": "mysql",
+ "name": "dbserver1",
+ "ts_ms": 1589362330000,
+ "snapshot": "false",
+ "db": "inventory",
+ "table": "products",
+ "server_id": 223344,
+ "gtid": null,
+ "file": "mysql-bin.000003",
+ "pos": 2090,
+ "row": 0,
+ "thread": 2,
+ "query": null
+ },
+ "op": "u",
+ "ts_ms": 1589362330904,
+ "transaction": null
+}
+```
+
+Note: please refer to Debezium documentation about the meaning of each fields.
+
+The MySQL products table has 4 columns (id, name, description and weight).
+The above JSON message is an update change event on the products table where
the weight value of the row with id = 111 is changed from 5.18 to 5.15.
+Assuming the messages have been synchronized to Kafka topic products_binlog,
then we can use the following Seatunnel conf to consume this topic and
interpret the change events by Debezium format.
+
+```bash
+env {
+ execution.parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ Kafka {
+ bootstrap.servers = "kafkaCluster:9092"
+ topic = "products_binlog"
+ result_table_name = "kafka_name"
+ start_mode = earliest
+ schema = {
+ fields {
+ id = "int"
+ name = "string"
+ description = "string"
+ weight = "string"
+ }
+ }
+ format = debezium_json
+ }
+
+}
+
+transform {
+}
+
+sink {
+ Kafka {
+ bootstrap.servers = "kafkaCluster:9092"
+ topic = "consume-binlog"
+ format = debezium_json
+ }
+}
+```
+
diff --git a/docs/en/connector-v2/sink/Kafka.md
b/docs/en/connector-v2/sink/Kafka.md
index 4dbd3a84ce..f971e5390b 100644
--- a/docs/en/connector-v2/sink/Kafka.md
+++ b/docs/en/connector-v2/sink/Kafka.md
@@ -108,8 +108,10 @@ Kafka distinguishes different transactions by different
transactionId. This para
### format
-Data format. The default format is json. Optional text format. The default
field separator is ",".
-If you customize the delimiter, add the "field_delimiter" option.
+Data format. The default format is json. Optional text format, canal-json and
debezium-json.
+If you use json or text format. The default field separator is ", ". If you
customize the delimiter, add the "field_delimiter" option.
+If you use canal format, please refer to
[canal-json](../formats/canal-json.md) for details.
+If you use debezium format, please refer to
[debezium-json](../formats/debezium-json.md) for details.
### field_delimiter
@@ -209,8 +211,10 @@ sink {
### next version
-- [Improve] Support to specify multiple partition keys
[3230](https://github.com/apache/seatunnel/pull/3230)
-- [Improve] Add text format for kafka sink connector
[3711](https://github.com/apache/seatunnel/pull/3711)
-- [Improve] Support extract topic from SeaTunnelRow fields
[3742](https://github.com/apache/seatunnel/pull/3742)
-- [Improve] Change Connector Custom Config Prefix To Map
[3719](https://github.com/apache/seatunnel/pull/3719)
+- [Improve] Support to specify multiple partition keys
[3230](https://github.com/apache/incubator-seatunnel/pull/3230)
+- [Improve] Add text format for kafka sink connector
[3711](https://github.com/apache/incubator-seatunnel/pull/3711)
+- [Improve] Support extract topic from SeaTunnelRow fields
[3742](https://github.com/apache/incubator-seatunnel/pull/3742)
+- [Improve] Change Connector Custom Config Prefix To Map
[3719](https://github.com/apache/incubator-seatunnel/pull/3719)
+- [Improve] Support read canal format message
[3950](https://github.com/apache/incubator-seatunnel/pull/3950)
+- [Improve] Support read debezium format message
[3981](https://github.com/apache/incubator-seatunnel/pull/3981)
diff --git a/docs/en/connector-v2/source/kafka.md
b/docs/en/connector-v2/source/kafka.md
index 06f60af6d8..2ed6ec6f12 100644
--- a/docs/en/connector-v2/source/kafka.md
+++ b/docs/en/connector-v2/source/kafka.md
@@ -73,8 +73,10 @@ The structure of the data, including field names and field
types.
## format
-Data format. The default format is json. Optional text format. The default
field separator is ", ".
-If you customize the delimiter, add the "field_delimiter" option.
+Data format. The default format is json. Optional text format, canal-json and
debezium-json.
+If you use json or text format. The default field separator is ", ". If you
customize the delimiter, add the "field_delimiter" option.
+If you use canal format, please refer to
[canal-json](../formats/canal-json.md) for details.
+If you use debezium format, please refer to
[debezium-json](../formats/debezium-json.md) for details.
## format_error_handle_way
@@ -221,9 +223,10 @@ source {
### Next Version
-- [Improve] Support setting read starting offset or time at startup config
([3157](https://github.com/apache/seatunnel/pull/3157))
-- [Improve] Support for dynamic discover topic & partition in streaming mode
([3125](https://github.com/apache/seatunnel/pull/3125))
-- [Improve] Change Connector Custom Config Prefix To Map
[3719](https://github.com/apache/seatunnel/pull/3719)
-- [Bug] Fixed the problem that parsing the offset format failed when the
startup mode was offset([3810](https://github.com/apache/seatunnel/pull/3810))
-- [Feature] Kafka source supports data deserialization failure
skipping([4364](https://github.com/apache/seatunnel/pull/4364))
+- [Improve] Support setting read starting offset or time at startup config
([3157](https://github.com/apache/incubator-seatunnel/pull/3157))
+- [Improve] Support for dynamic discover topic & partition in streaming mode
([3125](https://github.com/apache/incubator-seatunnel/pull/3125))
+- [Improve] Change Connector Custom Config Prefix To Map
[3719](https://github.com/apache/incubator-seatunnel/pull/3719)
+- [Bug] Fixed the problem that parsing the offset format failed when the
startup mode was
offset([3810](https://github.com/apache/incubator-seatunnel/pull/3810))
+- [Improve] Support read canal format message
[3950](https://github.com/apache/incubator-seatunnel/pull/3950)
+- [Improve] Support read debezium format message
[3981](https://github.com/apache/incubator-seatunnel/pull/3981)
diff --git a/release-note.md b/release-note.md
index 68d14e609f..0e84da433c 100644
--- a/release-note.md
+++ b/release-note.md
@@ -3,9 +3,19 @@
## Bug fix
### Core
-
- [Core] [API] Fixed generic class loss for lists (#4421)
- [Core] [API] Fix parse nested row data type key changed upper (#4459)
+- [Starter][Flink]Support transform-v2 for flink #3396
+- [Flink] Support flink 1.14.x #3963
+### Transformer
+- [Spark] Support transform-v2 for spark (#3409)
+- [ALL]Add FieldMapper Transform #3781
+### Connectors
+- [Elasticsearch] Support https protocol & compatible with opensearch
+- [Hbase] Add hbase sink connector #4049
+### Formats
+- [Canal]Support read canal format message #3950
+- [Debezium]Support debezium canal format message #3981
### Connector-V2
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
index 2dffda4f48..f126e563fb 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
@@ -26,8 +26,6 @@ import java.util.Map;
public class Config {
public static final String CONNECTOR_IDENTITY = "Kafka";
- public static final String REPLICATION_FACTOR = "replication.factor";
-
/** The default field delimiter is “,” */
public static final String DEFAULT_FIELD_DELIMITER = ",";
@@ -99,6 +97,12 @@ public class Config {
"Data format. The default format is json. Optional
text format. The default field separator is \", \". "
+ "If you customize the delimiter, add the
\"field_delimiter\" option.");
+ public static final Option<Boolean> DEBEZIUM_RECORD_INCLUDE_SCHEMA =
+ Options.key("debezium_record_include_schema")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription("Does the debezium record carry a
schema.");
+
public static final Option<String> FIELD_DELIMITER =
Options.key("field_delimiter")
.stringType()
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/MessageFormat.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/MessageFormat.java
index 65b5cc2769..1ef29f6322 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/MessageFormat.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/MessageFormat.java
@@ -21,5 +21,6 @@ public enum MessageFormat {
JSON,
TEXT,
CANAL_JSON,
+ DEBEZIUM_JSON,
COMPATIBLE_DEBEZIUM_JSON
}
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
index 06005de003..f8974d0f1a 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
@@ -28,6 +28,7 @@ import
org.apache.seatunnel.format.compatible.debezium.json.CompatibleDebeziumJs
import
org.apache.seatunnel.format.compatible.debezium.json.CompatibleDebeziumJsonSerializationSchema;
import org.apache.seatunnel.format.json.JsonSerializationSchema;
import org.apache.seatunnel.format.json.canal.CanalJsonSerializationSchema;
+import
org.apache.seatunnel.format.json.debezium.DebeziumJsonSerializationSchema;
import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
import org.apache.seatunnel.format.text.TextSerializationSchema;
@@ -219,6 +220,8 @@ public class DefaultSeaTunnelRowSerializer implements
SeaTunnelRowSerializer {
.build();
case CANAL_JSON:
return new CanalJsonSerializationSchema(rowType);
+ case DEBEZIUM_JSON:
+ return new DebeziumJsonSerializationSchema(rowType);
case COMPATIBLE_DEBEZIUM_JSON:
return new CompatibleDebeziumJsonSerializationSchema(rowType,
isKey);
default:
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
index 741d752164..30878e82a2 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
@@ -47,6 +47,7 @@ import
org.apache.seatunnel.connectors.seatunnel.kafka.exception.KafkaConnectorE
import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaSourceState;
import org.apache.seatunnel.format.json.JsonDeserializationSchema;
import org.apache.seatunnel.format.json.canal.CanalJsonDeserializationSchema;
+import
org.apache.seatunnel.format.json.debezium.DebeziumJsonDeserializationSchema;
import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
import org.apache.seatunnel.format.text.TextDeserializationSchema;
import org.apache.seatunnel.format.text.constant.TextFormatConstant;
@@ -62,6 +63,7 @@ import java.util.Properties;
import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.BOOTSTRAP_SERVERS;
import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.COMMIT_ON_CHECKPOINT;
import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.CONSUMER_GROUP;
+import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.DEBEZIUM_RECORD_INCLUDE_SCHEMA;
import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.DEFAULT_FIELD_DELIMITER;
import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.FIELD_DELIMITER;
import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.FORMAT;
@@ -266,6 +268,14 @@ public class KafkaSource
.setIgnoreParseErrors(true)
.build();
break;
+ case DEBEZIUM_JSON:
+ boolean includeSchema =
DEBEZIUM_RECORD_INCLUDE_SCHEMA.defaultValue();
+ if (config.hasPath(DEBEZIUM_RECORD_INCLUDE_SCHEMA.key())) {
+ includeSchema =
config.getBoolean(DEBEZIUM_RECORD_INCLUDE_SCHEMA.key());
+ }
+ deserializationSchema =
+ new DebeziumJsonDeserializationSchema(typeInfo,
true, includeSchema);
+ break;
default:
throw new SeaTunnelJsonFormatException(
CommonErrorCode.UNSUPPORTED_DATA_TYPE,
"Unsupported format: " + format);
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceFactory.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceFactory.java
index daa75385e4..21057040ec 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceFactory.java
@@ -46,6 +46,7 @@ public class KafkaSourceFactory implements TableSourceFactory
{
Config.KAFKA_CONFIG,
Config.SCHEMA,
Config.FORMAT,
+ Config.DEBEZIUM_RECORD_INCLUDE_SCHEMA,
Config.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS)
.conditional(Config.START_MODE, StartMode.TIMESTAMP,
Config.START_MODE_TIMESTAMP)
.conditional(
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/DebeziumToKafkaIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/DebeziumToKafkaIT.java
new file mode 100644
index 0000000000..e76a445996
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/DebeziumToKafkaIT.java
@@ -0,0 +1,418 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.kafka;
+
+import
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlContainer;
+import
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlVersion;
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
+import org.apache.seatunnel.e2e.common.container.EngineType;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.containers.PostgreSQLContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
+import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.DockerLoggerFactory;
+import org.testcontainers.utility.MountableFile;
+
+import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.net.Inet4Address;
+import java.net.InetAddress;
+import java.net.NetworkInterface;
+import java.net.SocketException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Enumeration;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.awaitility.Awaitility.given;
+import static org.testcontainers.shaded.org.awaitility.Awaitility.await;
+
+@DisabledOnContainer(
+ value = {},
+ type = {EngineType.SEATUNNEL, EngineType.SPARK})
+@Slf4j
+public class DebeziumToKafkaIT extends TestSuiteBase implements TestResource {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(DebeziumToKafkaIT.class);
+
+ private static GenericContainer<?> DEBEZIUM_CONTAINER;
+
+ private static final String DEBEZIUM_DOCKER_IMAGE =
"quay.io/debezium/connect:2.3.0.Final";
+
+ private static final String DEBEZIUM_HOST = "debezium_e2e";
+
+ private static final int DEBEZIUM_PORT = 8083;
+
+ //
----------------------------------------kafka------------------------------------
+ private static final String KAFKA_IMAGE_NAME =
"confluentinc/cp-kafka:7.0.9";
+ private static final String KAFKA_HOST = "kafka_dbz_e2e";
+ private KafkaConsumer<String, String> kafkaConsumer;
+ private KafkaContainer KAFKA_CONTAINER;
+ private String KAFKA_TOPIC = "test-debezium-sink";
+
+ //
-------------------------------------mysql---------------------------------------
+ private static final String MYSQL_HOST = "mysql";
+ private static MySqlContainer MYSQL_CONTAINER;
+
+ //
-----------------------------------------postgres-----------------------------------
+ private static final String PG_IMAGE = "postgres:alpine3.16";
+
+ private static final int PG_PORT = 5432;
+
+ private static final String PG_DRIVER_JAR =
+
"https://repo1.maven.org/maven2/org/postgresql/postgresql/42.3.3/postgresql-42.3.3.jar";
+
+ private static PostgreSQLContainer<?> POSTGRESQL_CONTAINER;
+
+ @TestContainerExtension
+ private final ContainerExtendedFactory extendedFactory =
+ container -> {
+ Container.ExecResult extraCommands =
+ container.execInContainer(
+ "bash",
+ "-c",
+ "mkdir -p /tmp/seatunnel/plugins/Jdbc/lib &&
cd /tmp/seatunnel/plugins/Jdbc/lib && curl -O "
+ + PG_DRIVER_JAR);
+ Assertions.assertEquals(0, extraCommands.getExitCode());
+ };
+
+ private void createDebeziumContainer() {
+ DEBEZIUM_CONTAINER =
+ new GenericContainer<>(DEBEZIUM_DOCKER_IMAGE)
+ .withCopyFileToContainer(
+
MountableFile.forClasspathResource("/debezium/register-mysql.json"),
+
"/tmp/seatunnel/plugins/Jdbc/register-mysql.json")
+ .withNetwork(NETWORK)
+ .withNetworkAliases(DEBEZIUM_HOST)
+ .withExposedPorts(DEBEZIUM_PORT)
+ .withEnv("GROUP_ID", "1")
+ .withEnv("CONFIG_STORAGE_TOPIC", "my-connect-configs")
+ .withEnv("OFFSET_STORAGE_TOPIC", "my-connect-offsets")
+ .withEnv("STATUS_STORAGE_TOPIC", "my-connect-status")
+ .withEnv("BOOTSTRAP_SERVERS", KAFKA_HOST + ":9092")
+ .withLogConsumer(
+ new Slf4jLogConsumer(
+
DockerLoggerFactory.getLogger(DEBEZIUM_DOCKER_IMAGE)))
+ .dependsOn(KAFKA_CONTAINER, MYSQL_CONTAINER);
+ DEBEZIUM_CONTAINER.setWaitStrategy(
+ (new HttpWaitStrategy())
+ .forPath("/connectors")
+ .forPort(DEBEZIUM_PORT)
+ .withStartupTimeout(Duration.ofSeconds(120)));
+ DEBEZIUM_CONTAINER.setPortBindings(
+ com.google.common.collect.Lists.newArrayList(
+ String.format("%s:%s", DEBEZIUM_PORT, DEBEZIUM_PORT)));
+ }
+
+ private void createKafkaContainer() {
+ KAFKA_CONTAINER =
+ new KafkaContainer(DockerImageName.parse(KAFKA_IMAGE_NAME))
+ .withNetwork(NETWORK)
+ .withNetworkAliases(KAFKA_HOST)
+ .withLogConsumer(
+ new Slf4jLogConsumer(
+
DockerLoggerFactory.getLogger(KAFKA_IMAGE_NAME)));
+ }
+
+ private void createMysqlContainer() {
+ MYSQL_CONTAINER =
+ new MySqlContainer(MySqlVersion.V8_0)
+
.withConfigurationOverride("docker/server-gtids/my.cnf")
+ .withSetupSQL("docker/setup.sql")
+ .withNetwork(NETWORK)
+ .withNetworkAliases(MYSQL_HOST)
+ .withDatabaseName("debezium")
+ .withUsername("st_user")
+ .withPassword("seatunnel")
+ .withLogConsumer(new Slf4jLogConsumer(LOG));
+ }
+
+ private void createPostgreSQLContainer() {
+ POSTGRESQL_CONTAINER =
+ new PostgreSQLContainer<>(DockerImageName.parse(PG_IMAGE))
+ .withNetwork(NETWORK)
+ .withNetworkAliases("postgresql_e2e")
+ .withLogConsumer(
+ new
Slf4jLogConsumer(DockerLoggerFactory.getLogger(PG_IMAGE)));
+ POSTGRESQL_CONTAINER.setPortBindings(
+ Lists.newArrayList(String.format("%s:%s", PG_PORT, PG_PORT)));
+ }
+
+ @BeforeAll
+ @Override
+ public void startUp() throws Exception {
+ LOG.info("The first stage: Starting Kafka containers...");
+ createKafkaContainer();
+ Startables.deepStart(Stream.of(KAFKA_CONTAINER)).join();
+
+ LOG.info("The second stage: Starting Mysql containers...");
+ createMysqlContainer();
+ Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join();
+
+ LOG.info("The third stage: Starting Debezium Connector containers...");
+ createDebeziumContainer();
+ Startables.deepStart(Stream.of(DEBEZIUM_CONTAINER)).join();
+
+ LOG.info("The fourth stage: Starting PostgreSQL container...");
+ createPostgreSQLContainer();
+ Startables.deepStart(Stream.of(POSTGRESQL_CONTAINER)).join();
+ Class.forName(POSTGRESQL_CONTAINER.getDriverClassName());
+
+ Awaitility.given()
+ .ignoreExceptions()
+ .atLeast(100, TimeUnit.MILLISECONDS)
+ .pollInterval(500, TimeUnit.MILLISECONDS)
+ .atMost(3, TimeUnit.MINUTES)
+ .untilAsserted(this::initializeSourceTableData);
+
+ given().ignoreExceptions()
+ .atLeast(100, TimeUnit.MILLISECONDS)
+ .pollInterval(500, TimeUnit.MILLISECONDS)
+ .atMost(3, TimeUnit.MINUTES)
+ .untilAsserted(this::initKafkaConsumer);
+
+ given().ignoreExceptions()
+ .await()
+ .atLeast(100, TimeUnit.MILLISECONDS)
+ .pollInterval(500, TimeUnit.MILLISECONDS)
+ .atMost(5, TimeUnit.MINUTES)
+ .untilAsserted(this::initializeSinkJdbcTable);
+
+ Container.ExecResult extraCommand =
+ DEBEZIUM_CONTAINER.execInContainer(
+ "bash",
+ "-c",
+ "cd /tmp/seatunnel/plugins/Jdbc && curl -i -X POST -H
\"Accept:application/json\" -H \"Content-Type:application/json\" http://"
+ + getLinuxLocalIp()
+ + ":8083/connectors/ -d @register-mysql.json");
+ Assertions.assertEquals(0, extraCommand.getExitCode());
+ // ensure debezium has handled the data
+ Thread.sleep(30 * 1000);
+ updateSourceTableData();
+ Thread.sleep(30 * 1000);
+ }
+
+ @AfterAll
+ @Override
+ public void tearDown() throws Exception {
+ MYSQL_CONTAINER.close();
+ KAFKA_CONTAINER.close();
+ DEBEZIUM_CONTAINER.close();
+ POSTGRESQL_CONTAINER.close();
+ }
+
+ @TestTemplate
+ public void testKafkaSinkDebeziumFormat(TestContainer container)
+ throws IOException, InterruptedException {
+ Container.ExecResult execResult =
+ container.executeJob("/kafkasource_debezium_to_kafka.conf");
+ Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
+ ArrayList<String> result = new ArrayList<>();
+ kafkaConsumer.subscribe(Lists.newArrayList(KAFKA_TOPIC));
+ await().atMost(60000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () -> {
+ ConsumerRecords<String, String> consumerRecords =
+
kafkaConsumer.poll(Duration.ofMillis(1000));
+ for (ConsumerRecord<String, String> record :
consumerRecords) {
+ result.add(record.value());
+ }
+ Assertions.assertEquals(12, result.size());
+ });
+ }
+
+ @TestTemplate
+ public void testDebeziumFormatKafkaCdcToPgsql(TestContainer container)
+ throws IOException, InterruptedException, SQLException {
+ Container.ExecResult execResult =
+
container.executeJob("/kafkasource_debezium_cdc_to_pgsql.conf");
+ Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
+ Set<List<Object>> actual = new HashSet<>();
+ try (Connection connection =
+ DriverManager.getConnection(
+ POSTGRESQL_CONTAINER.getJdbcUrl(),
+ POSTGRESQL_CONTAINER.getUsername(),
+ POSTGRESQL_CONTAINER.getPassword())) {
+ try (Statement statement = connection.createStatement()) {
+ ResultSet resultSet = statement.executeQuery("select * from
sink order by id");
+ while (resultSet.next()) {
+ List<Object> row =
+ Arrays.asList(
+ resultSet.getInt("id"),
+ resultSet.getString("name"),
+ resultSet.getString("description"),
+ resultSet.getString("weight"));
+ actual.add(row);
+ }
+ }
+ }
+ Set<List<Object>> expected =
+ Stream.<List<Object>>of(
+ Arrays.asList(101, "scooter", "Small 2-wheel
scooter", "4.56"),
+ Arrays.asList(102, "car battery", "12V car
battery", "8.1"),
+ Arrays.asList(
+ 103,
+ "12-pack drill bits",
+ "12-pack of drill bits with sizes
ranging from #40 to #3",
+ "0.8"),
+ Arrays.asList(104, "hammer", "12oz carpenter's
hammer", "0.75"),
+ Arrays.asList(105, "hammer", "14oz carpenter's
hammer", "0.875"),
+ Arrays.asList(106, "hammer", "16oz carpenter's
hammer", "1"),
+ Arrays.asList(107, "rocks", "box of assorted
rocks", "5.3"),
+ Arrays.asList(
+ 108, "jacket", "water resistent black
wind breaker", "0.1"))
+ .collect(Collectors.toSet());
+ Assertions.assertIterableEquals(expected, actual);
+ }
+
+ public void initializeSourceTableData() throws Exception {
+ try (Connection connection =
+ DriverManager.getConnection(
+ MYSQL_CONTAINER.getJdbcUrl(),
+ MYSQL_CONTAINER.getUsername(),
+ MYSQL_CONTAINER.getPassword());
+ Statement statement = connection.createStatement()) {
+ statement.execute("create database if not exists debezium");
+ statement.execute(
+ "CREATE TABLE if not exists debezium.products (\n"
+ + " id INTEGER NOT NULL AUTO_INCREMENT PRIMARY
KEY,\n"
+ + " name VARCHAR(255) NOT NULL DEFAULT
'SeaTunnel',\n"
+ + " description VARCHAR(512),\n"
+ + " weight VARCHAR(512)\n"
+ + ");");
+ statement.execute(
+ "INSERT INTO debezium.products\n"
+ + "VALUES (101,\"scooter\",\"Small 2-wheel
scooter\",\"3.14\"),\n"
+ + " (102,\"car battery\",\"12V car
battery\",\"8.1\"),\n"
+ + " (103,\"12-pack drill bits\",\"12-pack of
drill bits with sizes ranging from #40 to #3\","
+ + "\"0.8\"),\n"
+ + " (104,\"hammer\",\"12oz carpenter's
hammer\",\"0.75\"),\n"
+ + " (105,\"hammer\",\"14oz carpenter's
hammer\",\"0.875\"),\n"
+ + " (106,\"hammer\",\"16oz carpenter's
hammer\",\"1.0\"),\n"
+ + " (107,\"rocks\",\"box of assorted
rocks\",\"5.3\"),\n"
+ + " (108,\"jacket\",\"water resistent black
wind breaker\",\"0.1\"),\n"
+ + " (109,\"spare tire\",\"24 inch spare
tire\",\"22.2\")");
+ }
+ }
+
+ public void updateSourceTableData() throws Exception {
+ try (Connection connection =
+ DriverManager.getConnection(
+ MYSQL_CONTAINER.getJdbcUrl(),
+ MYSQL_CONTAINER.getUsername(),
+ MYSQL_CONTAINER.getPassword());
+ Statement statement = connection.createStatement()) {
+ statement.execute(
+ "UPDATE debezium.products SET weight = '4.56' WHERE name =
'scooter'");
+ statement.execute("DELETE FROM debezium.products WHERE name =
\"spare tire\"");
+ }
+ }
+
+ private void initializeSinkJdbcTable() {
+ try (Connection connection =
+ DriverManager.getConnection(
+ POSTGRESQL_CONTAINER.getJdbcUrl(),
+ POSTGRESQL_CONTAINER.getUsername(),
+ POSTGRESQL_CONTAINER.getPassword());
+ Statement statement = connection.createStatement()) {
+ String sink =
+ "create table sink(\n"
+ + "id INT NOT NULL PRIMARY KEY,\n"
+ + "name varchar(255),\n"
+ + "description varchar(255),\n"
+ + "weight varchar(255)"
+ + ")";
+ statement.execute(sink);
+ } catch (SQLException e) {
+ throw new RuntimeException("Initializing PostgreSql table
failed!", e);
+ }
+ }
+
+ private void initKafkaConsumer() {
+ Properties prop = new Properties();
+ String bootstrapServers = KAFKA_CONTAINER.getBootstrapServers();
+ prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+ prop.put(
+ ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+ "org.apache.kafka.common.serialization.StringDeserializer");
+ prop.put(
+ ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+ "org.apache.kafka.common.serialization.StringDeserializer");
+ prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
+ prop.put(ConsumerConfig.GROUP_ID_CONFIG,
"seatunnel-debezium-sink-group");
+ kafkaConsumer = new KafkaConsumer<>(prop);
+ }
+
+ public String getLinuxLocalIp() {
+ String ip = "";
+ try {
+ Enumeration<NetworkInterface> networkInterfaces =
+ NetworkInterface.getNetworkInterfaces();
+ while (networkInterfaces.hasMoreElements()) {
+ NetworkInterface networkInterface =
networkInterfaces.nextElement();
+ Enumeration<InetAddress> inetAddresses =
networkInterface.getInetAddresses();
+ while (inetAddresses.hasMoreElements()) {
+ InetAddress inetAddress = inetAddresses.nextElement();
+ if (!inetAddress.isLoopbackAddress() && inetAddress
instanceof Inet4Address) {
+ ip = inetAddress.getHostAddress();
+ }
+ }
+ }
+ } catch (SocketException ex) {
+ log.warn("Failed to get linux local ip, it will return [\"\"] ",
ex);
+ }
+ return ip;
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/debezium/register-mysql.json
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/debezium/register-mysql.json
new file mode 100644
index 0000000000..d70e8e0c61
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/debezium/register-mysql.json
@@ -0,0 +1,16 @@
+{
+ "name": "inventory-connector",
+ "config": {
+ "connector.class": "io.debezium.connector.mysql.MySqlConnector",
+ "tasks.max": "1",
+ "database.hostname": "mysql",
+ "database.port": "3306",
+ "database.user": "st_user",
+ "database.password": "seatunnel",
+ "database.server.id": "184054",
+ "topic.prefix": "dbserver1",
+ "database.include.list": "debezium",
+ "schema.history.internal.kafka.bootstrap.servers": "kafka_dbz_e2e:9092",
+ "schema.history.internal.kafka.topic": "schema-changes.debezium"
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_debezium_cdc_to_pgsql.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_debezium_cdc_to_pgsql.conf
new file mode 100644
index 0000000000..a0531b2345
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_debezium_cdc_to_pgsql.conf
@@ -0,0 +1,62 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ execution.parallelism = 1
+ job.mode = "BATCH"
+
+ #spark config
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 1
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+}
+
+source {
+ Kafka {
+ bootstrap.servers = "kafka_dbz_e2e:9092"
+ topic = "dbserver1.debezium.products"
+ result_table_name = "kafka_name"
+ start_mode = earliest
+ format = debezium_json
+ schema = {
+ fields {
+ id = "int"
+ name = "string"
+ description = "string"
+ weight = "float"
+ }
+ }
+ }
+}
+
+sink {
+ Jdbc {
+ driver = org.postgresql.Driver
+ url = "jdbc:postgresql://postgresql_e2e:5432/test?loggerLevel=OFF"
+ user = test
+ password = test
+ generate_sink_sql = true
+ database = public
+ table = sink
+ primary_keys = ["id"]
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_debezium_to_kafka.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_debezium_to_kafka.conf
new file mode 100644
index 0000000000..4944829c24
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_debezium_to_kafka.conf
@@ -0,0 +1,57 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ execution.parallelism = 1
+ job.mode = "BATCH"
+
+ #spark config
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 1
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+}
+
+source {
+ Kafka {
+ bootstrap.servers = "kafka_dbz_e2e:9092"
+ topic = "dbserver1.debezium.products"
+ result_table_name = "kafka_name"
+ start_mode = earliest
+ format = debezium_json
+ schema = {
+ fields {
+ id = "int"
+ name = "string"
+ description = "string"
+ weight = "float"
+ }
+ }
+ }
+}
+
+sink {
+ Kafka {
+ bootstrap.servers = "kafka_dbz_e2e:9092"
+ topic = "test-debezium-sink"
+ format = debezium_json
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonFormatOptions.java
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonFormatOptions.java
index 7b10ad57a6..9ce4dc5541 100644
---
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonFormatOptions.java
+++
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonFormatOptions.java
@@ -24,6 +24,12 @@ import org.apache.seatunnel.api.configuration.Options;
import java.util.Map;
public class JsonFormatOptions {
+ public static final Option<Boolean> FAIL_ON_MISSING_FIELD =
+ Options.key("fail-on-missing-field")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Optional flag to specify whether to fail if a
field is missing or not, false by default.");
public static final Option<Boolean> IGNORE_PARSE_ERRORS =
Options.key("ignore-parse-errors")
@@ -33,13 +39,6 @@ public class JsonFormatOptions {
"Optional flag to skip fields and rows with parse
errors instead of failing;\n"
+ "fields are set to null in case of
errors, false by default.");
- public static final Option<Boolean> FAIL_ON_MISSING_FIELD =
- Options.key("fail-on-missing-field")
- .booleanType()
- .defaultValue(false)
- .withDescription(
- "Optional flag to specify whether to fail if a
field is missing or not, false by default.");
-
public static boolean getFailOnMissingField(Map<String, String> options) {
return Boolean.parseBoolean(
options.getOrDefault(
diff --git
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonDeserializationSchema.java
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonDeserializationSchema.java
new file mode 100644
index 0000000000..3996c4ed7d
--- /dev/null
+++
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonDeserializationSchema.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.format.json.debezium;
+
+import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode;
+
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.type.RowKind;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import org.apache.seatunnel.format.json.JsonDeserializationSchema;
+import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
+
+import java.io.IOException;
+
+public class DebeziumJsonDeserializationSchema implements
DeserializationSchema<SeaTunnelRow> {
+ private static final long serialVersionUID = 1L;
+
+ private static final String OP_READ = "r"; // snapshot read
+ private static final String OP_CREATE = "c"; // insert
+ private static final String OP_UPDATE = "u"; // update
+ private static final String OP_DELETE = "d"; // delete
+
+ private static final String REPLICA_IDENTITY_EXCEPTION =
+ "The \"before\" field of %s message is null, "
+ + "if you are using Debezium Postgres Connector, "
+ + "please check the Postgres table has been set REPLICA
IDENTITY to FULL level.";
+
+ private final SeaTunnelRowType rowType;
+
+ private final JsonDeserializationSchema jsonDeserializer;
+
+ private final boolean ignoreParseErrors;
+
+ private final boolean debeziumEnabledSchema;
+
+ public DebeziumJsonDeserializationSchema(SeaTunnelRowType rowType, boolean
ignoreParseErrors) {
+ this.rowType = rowType;
+ this.ignoreParseErrors = ignoreParseErrors;
+ this.jsonDeserializer =
+ new JsonDeserializationSchema(false, ignoreParseErrors,
createJsonRowType(rowType));
+ this.debeziumEnabledSchema = false;
+ }
+
+ public DebeziumJsonDeserializationSchema(
+ SeaTunnelRowType rowType, boolean ignoreParseErrors, boolean
debeziumEnabledSchema) {
+ this.rowType = rowType;
+ this.ignoreParseErrors = ignoreParseErrors;
+ this.jsonDeserializer =
+ new JsonDeserializationSchema(false, ignoreParseErrors,
createJsonRowType(rowType));
+ this.debeziumEnabledSchema = debeziumEnabledSchema;
+ }
+
+ @Override
+ public SeaTunnelRow deserialize(byte[] message) throws IOException {
+ throw new UnsupportedOperationException(
+ "Please invoke DeserializationSchema#deserialize(byte[],
Collector<SeaTunnelRow>) instead.");
+ }
+
+ @Override
+ public void deserialize(byte[] message, Collector<SeaTunnelRow> out)
throws IOException {
+ if (message == null || message.length == 0) {
+ // skip tombstone messages
+ return;
+ }
+
+ try {
+ JsonNode payload = getPayload(convertBytes(message));
+ String op = payload.get("op").asText();
+
+ if (OP_CREATE.equals(op) || OP_READ.equals(op)) {
+ SeaTunnelRow insert = convertJsonNode(payload.get("after"));
+ insert.setRowKind(RowKind.INSERT);
+ out.collect(insert);
+ } else if (OP_UPDATE.equals(op)) {
+ SeaTunnelRow before = convertJsonNode(payload.get("before"));
+ if (before == null) {
+ throw new SeaTunnelJsonFormatException(
+ CommonErrorCode.UNSUPPORTED_DATA_TYPE,
+ String.format(REPLICA_IDENTITY_EXCEPTION,
"UPDATE"));
+ }
+ before.setRowKind(RowKind.UPDATE_BEFORE);
+ out.collect(before);
+
+ SeaTunnelRow after = convertJsonNode(payload.get("after"));
+ after.setRowKind(RowKind.UPDATE_AFTER);
+ out.collect(after);
+ } else if (OP_DELETE.equals(op)) {
+ SeaTunnelRow delete = convertJsonNode(payload.get("before"));
+ if (delete == null) {
+ throw new SeaTunnelJsonFormatException(
+ CommonErrorCode.UNSUPPORTED_DATA_TYPE,
+ String.format(REPLICA_IDENTITY_EXCEPTION,
"UPDATE"));
+ }
+ delete.setRowKind(RowKind.DELETE);
+ out.collect(delete);
+ } else {
+ if (!ignoreParseErrors) {
+ throw new SeaTunnelJsonFormatException(
+ CommonErrorCode.UNSUPPORTED_DATA_TYPE,
+ String.format(
+ "Unknown \"op\" value \"%s\". The Debezium
JSON message is '%s'",
+ op, new String(message)));
+ }
+ }
+ } catch (Throwable t) {
+ // a big try catch to protect the processing.
+ if (!ignoreParseErrors) {
+ throw new SeaTunnelJsonFormatException(
+ CommonErrorCode.UNSUPPORTED_DATA_TYPE,
+ String.format("Corrupt Debezium JSON message '%s'.",
new String(message)),
+ t);
+ }
+ }
+ }
+
+ private JsonNode getPayload(JsonNode jsonNode) {
+ if (debeziumEnabledSchema) {
+ return jsonNode.get("payload");
+ }
+ return jsonNode;
+ }
+
+ private JsonNode convertBytes(byte[] message) {
+ try {
+ return jsonDeserializer.deserializeToJsonNode(message);
+ } catch (Exception t) {
+ if (ignoreParseErrors) {
+ return null;
+ }
+ throw new SeaTunnelJsonFormatException(
+ CommonErrorCode.JSON_OPERATION_FAILED,
+ String.format("Failed to deserialize JSON '%s'.", new
String(message)),
+ t);
+ }
+ }
+
+ private SeaTunnelRow convertJsonNode(JsonNode root) {
+ return jsonDeserializer.convertToRowData(root);
+ }
+
+ @Override
+ public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
+ return this.rowType;
+ }
+
+ private static SeaTunnelRowType createJsonRowType(SeaTunnelRowType
databaseSchema) {
+ return databaseSchema;
+ }
+}
diff --git
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonFormatFactory.java
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonFormatFactory.java
new file mode 100644
index 0000000000..e59c9794fb
--- /dev/null
+++
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonFormatFactory.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.format.json.debezium;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
+import org.apache.seatunnel.api.serialization.SerializationSchema;
+import org.apache.seatunnel.api.table.connector.DeserializationFormat;
+import org.apache.seatunnel.api.table.connector.SerializationFormat;
+import org.apache.seatunnel.api.table.factory.DeserializationFormatFactory;
+import org.apache.seatunnel.api.table.factory.SerializationFormatFactory;
+import org.apache.seatunnel.api.table.factory.TableFactoryContext;
+
+import java.util.Map;
+
+public class DebeziumJsonFormatFactory
+ implements DeserializationFormatFactory, SerializationFormatFactory {
+
+ public static final String IDENTIFIER = "debezium_json";
+
+ @Override
+ public String factoryIdentifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public OptionRule optionRule() {
+ return OptionRule.builder().build();
+ }
+
+ @Override
+ public SerializationFormat createSerializationFormat(TableFactoryContext
context) {
+ return new SerializationFormat() {
+ @Override
+ public SerializationSchema createSerializationSchema() {
+ return new DebeziumJsonSerializationSchema(null);
+ }
+ };
+ }
+
+ @Override
+ public DeserializationFormat
createDeserializationFormat(TableFactoryContext context) {
+ Map<String, String> options = context.getOptions().toMap();
+ boolean ignoreParseErrors =
DebeziumJsonFormatOptions.getIgnoreParseErrors(options);
+ boolean schemaInclude =
DebeziumJsonFormatOptions.getSchemaInclude(options);
+
+ // TODO config SeaTunnelRowType
+ return new DeserializationFormat() {
+ @Override
+ public DeserializationSchema createDeserializationSchema() {
+ return new DebeziumJsonDeserializationSchema(null,
ignoreParseErrors);
+ }
+ };
+ }
+}
diff --git
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonFormatOptions.java
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonFormatOptions.java
new file mode 100644
index 0000000000..eb75bfd2b0
--- /dev/null
+++
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonFormatOptions.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.format.json.debezium;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.format.json.JsonFormatOptions;
+
+import java.util.Map;
+
+public class DebeziumJsonFormatOptions {
+
+ public static final int GENERATE_ROW_SIZE = 3;
+
+ public static final Option<Boolean> IGNORE_PARSE_ERRORS =
JsonFormatOptions.IGNORE_PARSE_ERRORS;
+
+ public static final Option<Boolean> SCHEMA_INCLUDE =
+ Options.key("schema-include")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "When setting up a Debezium Kafka Connect, users
can enable "
+ + "a Kafka configuration
'value.converter.schemas.enable' to include schema in the message. "
+ + "This option indicates the Debezium JSON
data include the schema in the message or not. "
+ + "Default is false.");
+
+ public static boolean getSchemaInclude(Map<String, String> options) {
+ return Boolean.parseBoolean(
+ options.getOrDefault(
+ SCHEMA_INCLUDE.key(),
SCHEMA_INCLUDE.defaultValue().toString()));
+ }
+
+ public static boolean getIgnoreParseErrors(Map<String, String> options) {
+ return Boolean.parseBoolean(
+ options.getOrDefault(
+ IGNORE_PARSE_ERRORS.key(),
IGNORE_PARSE_ERRORS.defaultValue().toString()));
+ }
+}
diff --git
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonSerializationSchema.java
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonSerializationSchema.java
new file mode 100644
index 0000000000..5b1e476abc
--- /dev/null
+++
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonSerializationSchema.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.format.json.debezium;
+
+import org.apache.seatunnel.api.serialization.SerializationSchema;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import org.apache.seatunnel.format.json.JsonSerializationSchema;
+import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
+
+import static org.apache.seatunnel.api.table.type.BasicType.STRING_TYPE;
+import static
org.apache.seatunnel.format.json.debezium.DebeziumJsonFormatOptions.GENERATE_ROW_SIZE;
+
+public class DebeziumJsonSerializationSchema implements SerializationSchema {
+ private static final long serialVersionUID = 1L;
+
+ private static final String OP_INSERT = "c"; // insert
+ private static final String OP_DELETE = "d"; // delete
+
+ private final JsonSerializationSchema jsonSerializer;
+
+ private transient SeaTunnelRow genericRow;
+
+ public DebeziumJsonSerializationSchema(SeaTunnelRowType rowType) {
+ this.jsonSerializer = new
JsonSerializationSchema(createJsonRowType(rowType));
+ this.genericRow = new SeaTunnelRow(GENERATE_ROW_SIZE);
+ }
+
+ @Override
+ public byte[] serialize(SeaTunnelRow row) {
+ try {
+ switch (row.getRowKind()) {
+ case INSERT:
+ case UPDATE_AFTER:
+ genericRow.setField(0, null);
+ genericRow.setField(1, row);
+ genericRow.setField(2, OP_INSERT);
+ return jsonSerializer.serialize(genericRow);
+ case UPDATE_BEFORE:
+ case DELETE:
+ genericRow.setField(0, row);
+ genericRow.setField(1, null);
+ genericRow.setField(2, OP_DELETE);
+ return jsonSerializer.serialize(genericRow);
+ default:
+ throw new UnsupportedOperationException(
+ String.format(
+ "Unsupported operation '%s' for row
kind.", row.getRowKind()));
+ }
+ } catch (Throwable t) {
+ throw new SeaTunnelJsonFormatException(
+ CommonErrorCode.JSON_OPERATION_FAILED,
+ String.format("Could not serialize row %s.", row),
+ t);
+ }
+ }
+
+ private static SeaTunnelRowType createJsonRowType(SeaTunnelRowType
databaseSchema) {
+ return new SeaTunnelRowType(
+ new String[] {"before", "after", "op"},
+ new SeaTunnelDataType[] {databaseSchema, databaseSchema,
STRING_TYPE});
+ }
+}
diff --git
a/seatunnel-formats/seatunnel-format-json/src/main/resources/META-INF/services/org.apache.seatunnel.api.table.factory.Factory
b/seatunnel-formats/seatunnel-format-json/src/main/resources/META-INF/services/org.apache.seatunnel.api.table.factory.Factory
index db11c51c4a..cedeba7515 100644
---
a/seatunnel-formats/seatunnel-format-json/src/main/resources/META-INF/services/org.apache.seatunnel.api.table.factory.Factory
+++
b/seatunnel-formats/seatunnel-format-json/src/main/resources/META-INF/services/org.apache.seatunnel.api.table.factory.Factory
@@ -17,3 +17,4 @@
org.apache.seatunnel.format.json.JsonFormatFactory
org.apache.seatunnel.format.json.canal.CanalJsonFormatFactory
+org.apache.seatunnel.format.json.debezium.DebeziumJsonFormatFactory
diff --git
a/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonSerDeSchemaTest.java
b/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonSerDeSchemaTest.java
new file mode 100644
index 0000000000..20088e525b
--- /dev/null
+++
b/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonSerDeSchemaTest.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.format.json.debezium;
+
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.seatunnel.api.table.type.BasicType.FLOAT_TYPE;
+import static org.apache.seatunnel.api.table.type.BasicType.INT_TYPE;
+import static org.apache.seatunnel.api.table.type.BasicType.STRING_TYPE;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class DebeziumJsonSerDeSchemaTest {
+
+ private static final SeaTunnelRowType PHYSICAL_DATA_TYPE =
+ new SeaTunnelRowType(
+ new String[] {"id", "name", "description", "weight"},
+ new SeaTunnelDataType[] {INT_TYPE, STRING_TYPE,
STRING_TYPE, FLOAT_TYPE});
+
+ @Test
+ void testNullRowMessages() throws Exception {
+ DebeziumJsonDeserializationSchema deserializationSchema =
+ new DebeziumJsonDeserializationSchema(PHYSICAL_DATA_TYPE,
false);
+ SimpleCollector collector = new SimpleCollector();
+
+ deserializationSchema.deserialize(null, collector);
+ deserializationSchema.deserialize(new byte[0], collector);
+ assertEquals(0, collector.list.size());
+ }
+
+ @Test
+ public void testSerializationAndSchemaExcludeDeserialization() throws
Exception {
+ testSerializationDeserialization("debezium-data.txt", false);
+ }
+
+ private void testSerializationDeserialization(String resourceFile, boolean
schemaInclude)
+ throws Exception {
+ List<String> lines = readLines(resourceFile);
+ DebeziumJsonDeserializationSchema deserializationSchema =
+ new DebeziumJsonDeserializationSchema(PHYSICAL_DATA_TYPE,
true, schemaInclude);
+
+ SimpleCollector collector = new SimpleCollector();
+
+ for (String line : lines) {
+
deserializationSchema.deserialize(line.getBytes(StandardCharsets.UTF_8),
collector);
+ }
+
+ List<String> expected =
+ Arrays.asList(
+ "SeaTunnelRow{tableId=, kind=+I, fields=[101, scooter,
Small 2-wheel scooter, 3.14]}",
+ "SeaTunnelRow{tableId=, kind=+I, fields=[102, car
battery, 12V car battery, 8.1]}",
+ "SeaTunnelRow{tableId=, kind=+I, fields=[103, 12-pack
drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8]}",
+ "SeaTunnelRow{tableId=, kind=+I, fields=[104, hammer,
12oz carpenter's hammer, 0.75]}",
+ "SeaTunnelRow{tableId=, kind=+I, fields=[105, hammer,
14oz carpenter's hammer, 0.875]}",
+ "SeaTunnelRow{tableId=, kind=+I, fields=[106, hammer,
16oz carpenter's hammer, 1.0]}",
+ "SeaTunnelRow{tableId=, kind=+I, fields=[107, rocks,
box of assorted rocks, 5.3]}",
+ "SeaTunnelRow{tableId=, kind=+I, fields=[108, jacket,
water resistent black wind breaker, 0.1]}",
+ "SeaTunnelRow{tableId=, kind=+I, fields=[109, spare
tire, 24 inch spare tire, 22.2]}",
+ "SeaTunnelRow{tableId=, kind=-U, fields=[106, hammer,
16oz carpenter's hammer, 1.0]}",
+ "SeaTunnelRow{tableId=, kind=+U, fields=[106, hammer,
18oz carpenter hammer, 1.0]}",
+ "SeaTunnelRow{tableId=, kind=-U, fields=[107, rocks,
box of assorted rocks, 5.3]}",
+ "SeaTunnelRow{tableId=, kind=+U, fields=[107, rocks,
box of assorted rocks, 5.1]}",
+ "SeaTunnelRow{tableId=, kind=+I, fields=[110, jacket,
water resistent white wind breaker, 0.2]}",
+ "SeaTunnelRow{tableId=, kind=+I, fields=[111, scooter,
Big 2-wheel scooter , 5.18]}",
+ "SeaTunnelRow{tableId=, kind=-U, fields=[110, jacket,
water resistent white wind breaker, 0.2]}",
+ "SeaTunnelRow{tableId=, kind=+U, fields=[110, jacket,
new water resistent white wind breaker, 0.5]}",
+ "SeaTunnelRow{tableId=, kind=-U, fields=[111, scooter,
Big 2-wheel scooter , 5.18]}",
+ "SeaTunnelRow{tableId=, kind=+U, fields=[111, scooter,
Big 2-wheel scooter , 5.17]}",
+ "SeaTunnelRow{tableId=, kind=-D, fields=[111, scooter,
Big 2-wheel scooter , 5.17]}");
+ List<String> actual =
+
collector.list.stream().map(Object::toString).collect(Collectors.toList());
+ assertEquals(expected, actual);
+
+ DebeziumJsonSerializationSchema serializationSchema =
+ new DebeziumJsonSerializationSchema(PHYSICAL_DATA_TYPE);
+
+ actual = new ArrayList<>();
+ for (SeaTunnelRow rowData : collector.list) {
+ actual.add(new String(serializationSchema.serialize(rowData),
StandardCharsets.UTF_8));
+ }
+
+ expected =
+ Arrays.asList(
+
"{\"before\":null,\"after\":{\"id\":101,\"name\":\"scooter\",\"description\":\"Small
2-wheel scooter\",\"weight\":3.14},\"op\":\"c\"}",
+ "{\"before\":null,\"after\":{\"id\":102,\"name\":\"car
battery\",\"description\":\"12V car battery\",\"weight\":8.1},\"op\":\"c\"}",
+
"{\"before\":null,\"after\":{\"id\":103,\"name\":\"12-pack drill
bits\",\"description\":\"12-pack of drill bits with sizes ranging from #40 to
#3\",\"weight\":0.8},\"op\":\"c\"}",
+
"{\"before\":null,\"after\":{\"id\":104,\"name\":\"hammer\",\"description\":\"12oz
carpenter's hammer\",\"weight\":0.75},\"op\":\"c\"}",
+
"{\"before\":null,\"after\":{\"id\":105,\"name\":\"hammer\",\"description\":\"14oz
carpenter's hammer\",\"weight\":0.875},\"op\":\"c\"}",
+
"{\"before\":null,\"after\":{\"id\":106,\"name\":\"hammer\",\"description\":\"16oz
carpenter's hammer\",\"weight\":1.0},\"op\":\"c\"}",
+
"{\"before\":null,\"after\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box
of assorted rocks\",\"weight\":5.3},\"op\":\"c\"}",
+
"{\"before\":null,\"after\":{\"id\":108,\"name\":\"jacket\",\"description\":\"water
resistent black wind breaker\",\"weight\":0.1},\"op\":\"c\"}",
+
"{\"before\":null,\"after\":{\"id\":109,\"name\":\"spare
tire\",\"description\":\"24 inch spare tire\",\"weight\":22.2},\"op\":\"c\"}",
+
"{\"before\":{\"id\":106,\"name\":\"hammer\",\"description\":\"16oz carpenter's
hammer\",\"weight\":1.0},\"after\":null,\"op\":\"d\"}",
+
"{\"before\":null,\"after\":{\"id\":106,\"name\":\"hammer\",\"description\":\"18oz
carpenter hammer\",\"weight\":1.0},\"op\":\"c\"}",
+
"{\"before\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted
rocks\",\"weight\":5.3},\"after\":null,\"op\":\"d\"}",
+
"{\"before\":null,\"after\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box
of assorted rocks\",\"weight\":5.1},\"op\":\"c\"}",
+
"{\"before\":null,\"after\":{\"id\":110,\"name\":\"jacket\",\"description\":\"water
resistent white wind breaker\",\"weight\":0.2},\"op\":\"c\"}",
+
"{\"before\":null,\"after\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big
2-wheel scooter \",\"weight\":5.18},\"op\":\"c\"}",
+
"{\"before\":{\"id\":110,\"name\":\"jacket\",\"description\":\"water resistent
white wind breaker\",\"weight\":0.2},\"after\":null,\"op\":\"d\"}",
+
"{\"before\":null,\"after\":{\"id\":110,\"name\":\"jacket\",\"description\":\"new
water resistent white wind breaker\",\"weight\":0.5},\"op\":\"c\"}",
+
"{\"before\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel
scooter \",\"weight\":5.18},\"after\":null,\"op\":\"d\"}",
+
"{\"before\":null,\"after\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big
2-wheel scooter \",\"weight\":5.17},\"op\":\"c\"}",
+
"{\"before\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel
scooter \",\"weight\":5.17},\"after\":null,\"op\":\"d\"}");
+ assertEquals(expected, actual);
+ }
+ //
--------------------------------------------------------------------------------------------
+ // Utilities
+ //
--------------------------------------------------------------------------------------------
+
+ private static List<String> readLines(String resource) throws IOException {
+ final URL url =
DebeziumJsonSerDeSchemaTest.class.getClassLoader().getResource(resource);
+ Assertions.assertNotNull(url);
+ Path path = new File(url.getFile()).toPath();
+ return Files.readAllLines(path);
+ }
+
+ private static class SimpleCollector implements Collector<SeaTunnelRow> {
+
+ private List<SeaTunnelRow> list = new ArrayList<>();
+
+ @Override
+ public void collect(SeaTunnelRow record) {
+ list.add(record);
+ }
+
+ @Override
+ public Object getCheckpointLock() {
+ return null;
+ }
+ }
+}
diff --git
a/seatunnel-formats/seatunnel-format-json/src/test/resources/debezium-data.txt
b/seatunnel-formats/seatunnel-format-json/src/test/resources/debezium-data.txt
new file mode 100644
index 0000000000..3763369e49
--- /dev/null
+++
b/seatunnel-formats/seatunnel-format-json/src/test/resources/debezium-data.txt
@@ -0,0 +1,16 @@
+{"before":null,"after":{"id":101,"name":"scooter","description":"Small 2-wheel
scooter","weight":3.140000104904175},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606100,"transaction":null}
+{"before":null,"after":{"id":102,"name":"car battery","description":"12V car
battery","weight":8.100000381469727},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}
+{"before":null,"after":{"id":103,"name":"12-pack drill
bits","description":"12-pack of drill bits with sizes ranging from #40 to
#3","weight":0.800000011920929},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}
+{"before":null,"after":{"id":104,"name":"hammer","description":"12oz
carpenter's
hammer","weight":0.75},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}
+{"before":null,"after":{"id":105,"name":"hammer","description":"14oz
carpenter's
hammer","weight":0.875},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}
+{"before":null,"after":{"id":106,"name":"hammer","description":"16oz
carpenter's
hammer","weight":1},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}
+{"before":null,"after":{"id":107,"name":"rocks","description":"box of assorted
rocks","weight":5.300000190734863},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}
+{"before":null,"after":{"id":108,"name":"jacket","description":"water
resistent black wind
breaker","weight":0.10000000149011612},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}
+{"before":null,"after":{"id":109,"name":"spare tire","description":"24 inch
spare
tire","weight":22.200000762939453},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}
+{"before":{"id":106,"name":"hammer","description":"16oz carpenter's
hammer","weight":1},"after":{"id":106,"name":"hammer","description":"18oz
carpenter
hammer","weight":1},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589361987000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":362,"row":0,"thread":2,"query":null},"op":"u","ts_ms":1589361987936,"transaction":null}
+{"before":{"id":107,"name":"rocks","description":"box of assorted
rocks","weight":5.300000190734863},"after":{"id":107,"name":"rocks","description":"box
of assorted
rocks","weight":5.099999904632568},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589362099000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":717,"row":0,"thread":2,"query":null},"op":"u","ts_ms":1589362099505,"transactio
[...]
+{"before":null,"after":{"id":110,"name":"jacket","description":"water
resistent white wind
breaker","weight":0.20000000298023224},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589362210000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":1068,"row":0,"thread":2,"query":null},"op":"c","ts_ms":1589362210230,"transaction":null}
+{"before":null,"after":{"id":111,"name":"scooter","description":"Big 2-wheel
scooter
","weight":5.179999828338623},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589362243000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":1394,"row":0,"thread":2,"query":null},"op":"c","ts_ms":1589362243428,"transaction":null}
+{"before":{"id":110,"name":"jacket","description":"water resistent white wind
breaker","weight":0.20000000298023224},"after":{"id":110,"name":"jacket","description":"new
water resistent white wind
breaker","weight":0.5},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589362293000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":1707,"row":0,"thread":2,"query":null},"op":"u","ts_ms":1589
[...]
+{"before":{"id":111,"name":"scooter","description":"Big 2-wheel scooter
","weight":5.179999828338623},"after":{"id":111,"name":"scooter","description":"Big
2-wheel scooter
","weight":5.170000076293945},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589362330000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":2090,"row":0,"thread":2,"query":null},"op":"u","ts_ms":1589362330904,"transac
[...]
+{"before":{"id":111,"name":"scooter","description":"Big 2-wheel scooter
","weight":5.170000076293945},"after":null,"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589362344000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":2443,"row":0,"thread":2,"query":null},"op":"d","ts_ms":1589362344455,"transaction":null}
\ No newline at end of file