This is an automated email from the ASF dual-hosted git repository.
ic4y pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 5817ec07b [Feature][CDC] Support export debezium-json format to kafka
(#4339)
5817ec07b is described below
commit 5817ec07bfa83f51e4bbe8c595dcf359c1ddf21d
Author: hailin0 <[email protected]>
AuthorDate: Tue Mar 14 10:24:06 2023 +0800
[Feature][CDC] Support export debezium-json format to kafka (#4339)
---
release-note.md | 1 +
.../connector-cdc/connector-cdc-base/pom.xml | 9 +
.../connectors/cdc/base/option/SourceOptions.java | 8 +
.../reader/IncrementalSourceRecordEmitter.java | 5 +-
.../cdc/debezium/DeserializeFormat.java} | 20 +-
.../row/DebeziumJsonDeserializeSchema.java | 58 ++++++
.../row/SeaTunnelRowDebeziumDeserializeSchema.java | 6 +
.../cdc/mysql/source/MySqlIncrementalSource.java | 9 +
seatunnel-connectors-v2/connector-kafka/pom.xml | 5 +
.../connectors/seatunnel/kafka/config/Config.java | 4 +
.../serialize/DefaultSeaTunnelRowSerializer.java | 209 +++++++++++++++------
.../kafka/serialize/SeaTunnelRowSerializer.java | 2 +-
.../connectors/seatunnel/kafka/sink/KafkaSink.java | 16 ++
.../seatunnel/kafka/sink/KafkaSinkFactory.java | 12 ++
.../seatunnel/kafka/sink/KafkaSinkWriter.java | 51 +----
.../seatunnel/e2e/connector/kafka/KafkaIT.java | 27 ++-
seatunnel-formats/pom.xml | 1 +
.../pom.xml | 34 +---
...ompatibleDebeziumJsonDeserializationSchema.java | 71 +++++++
.../CompatibleDebeziumJsonSerializationSchema.java | 45 +++++
.../debezium/json/DebeziumJsonConverter.java | 108 +++++++++++
21 files changed, 558 insertions(+), 143 deletions(-)
diff --git a/release-note.md b/release-note.md
index 2b026074c..c4b9c2fb4 100644
--- a/release-note.md
+++ b/release-note.md
@@ -16,6 +16,7 @@
- [Elasticsearch] Support https protocol & compatible with opensearch
- [Hbase] Add hbase sink connector #4049
- [Github] Add Github source connector #4155
+- [CDC] Support export debezium-json format to kafka #4339
### Formats
- [Canal]Support read canal format message #3950
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/pom.xml
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/pom.xml
index 621e0ad8f..9d813003f 100644
--- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/pom.xml
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/pom.xml
@@ -45,6 +45,11 @@
<artifactId>connector-common</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+
<artifactId>seatunnel-format-compatible-debezium-json</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<!-- Debezium dependencies -->
<dependency>
<groupId>io.debezium</groupId>
@@ -87,6 +92,10 @@
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-common</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-format-compatible-debezium-json</artifactId>
+ </dependency>
</dependencies>
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/SourceOptions.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/SourceOptions.java
index f627178f7..525069d11 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/SourceOptions.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/SourceOptions.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.connectors.cdc.base.option;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.connectors.cdc.debezium.DeserializeFormat;
import java.util.Map;
@@ -107,6 +108,13 @@ public class SourceOptions {
.withDescription(
"Decides if the table options contains Debezium
client properties that start with prefix 'debezium'.");
+ public static final Option<DeserializeFormat> FORMAT =
+ Options.key("format")
+ .enumType(DeserializeFormat.class)
+ .defaultValue(DeserializeFormat.DEFAULT)
+ .withDescription(
+ "Data format. The default format is seatunnel row.
Optional compatible with debezium-json format.");
+
public static OptionRule.Builder getBaseRule() {
return OptionRule.builder()
.optional(SNAPSHOT_SPLIT_SIZE, SNAPSHOT_FETCH_SIZE)
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceRecordEmitter.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceRecordEmitter.java
index 49eb17ede..8b51fa55b 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceRecordEmitter.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceRecordEmitter.java
@@ -111,7 +111,7 @@ public class IncrementalSourceRecordEmitter<T>
splitState.asSnapshotSplitState().setHighWatermark(watermark);
}
} else if (isSchemaChangeEvent(element) &&
splitState.isIncrementalSplitState()) {
- // TODO Currently not supported Schema Change
+ emitElement(element, output);
} else if (isDataChangeRecord(element)) {
if (splitState.isIncrementalSplitState()) {
Offset position = getOffsetPosition(element);
@@ -119,8 +119,7 @@ public class IncrementalSourceRecordEmitter<T>
}
emitElement(element, output);
} else {
- // unknown element
- log.info("Meet unknown element {}, just skip.", element);
+ emitElement(element, output);
}
}
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/SeaTunnelRowSerializer.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/DeserializeFormat.java
similarity index 63%
copy from
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/SeaTunnelRowSerializer.java
copy to
seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/DeserializeFormat.java
index 5f31ca001..0acf420a3 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/SeaTunnelRowSerializer.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/DeserializeFormat.java
@@ -15,19 +15,17 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connectors.seatunnel.kafka.serialize;
+package org.apache.seatunnel.connectors.cdc.debezium;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import
org.apache.seatunnel.format.compatible.debezium.json.CompatibleDebeziumJsonDeserializationSchema;
-import org.apache.kafka.clients.producer.ProducerRecord;
+public enum DeserializeFormat {
+ DEFAULT("default"),
+
COMPATIBLE_DEBEZIUM_JSON(CompatibleDebeziumJsonDeserializationSchema.IDENTIFIER);
-public interface SeaTunnelRowSerializer<K, V> {
+ private String name;
- /**
- * Serialize the {@link SeaTunnelRow} to a Kafka {@link ProducerRecord}.
- *
- * @param row seatunnel row
- * @return kafka record.
- */
- ProducerRecord<K, V> serializeRow(String topic, SeaTunnelRow row);
+ DeserializeFormat(String name) {
+ this.name = name;
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/DebeziumJsonDeserializeSchema.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/DebeziumJsonDeserializeSchema.java
new file mode 100644
index 000000000..4f5ad0545
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/DebeziumJsonDeserializeSchema.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.cdc.debezium.row;
+
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import
org.apache.seatunnel.connectors.cdc.debezium.DebeziumDeserializationSchema;
+import
org.apache.seatunnel.format.compatible.debezium.json.CompatibleDebeziumJsonDeserializationSchema;
+
+import org.apache.kafka.connect.source.SourceRecord;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.Map;
+
+@Slf4j
+public class DebeziumJsonDeserializeSchema implements
DebeziumDeserializationSchema<SeaTunnelRow> {
+ private static final String KEY_SCHEMA_ENABLE =
"key.converter.schemas.enable";
+ private static final String VALUE_SCHEMA_ENABLE =
"value.converter.schemas.enable";
+
+ private final CompatibleDebeziumJsonDeserializationSchema
deserializationSchema;
+
+ public DebeziumJsonDeserializeSchema(Map<String, String> debeziumConfig) {
+ boolean keySchemaEnable =
+ Boolean.valueOf(debeziumConfig.getOrDefault(KEY_SCHEMA_ENABLE,
"true"));
+ boolean valueSchemaEnable =
+
Boolean.valueOf(debeziumConfig.getOrDefault(VALUE_SCHEMA_ENABLE, "true"));
+ this.deserializationSchema =
+ new
CompatibleDebeziumJsonDeserializationSchema(keySchemaEnable, valueSchemaEnable);
+ }
+
+ @Override
+ public void deserialize(SourceRecord record, Collector<SeaTunnelRow> out)
throws Exception {
+ SeaTunnelRow row = deserializationSchema.deserialize(record);
+ out.collect(row);
+ }
+
+ @Override
+ public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
+ return deserializationSchema.getProducedType();
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializeSchema.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializeSchema.java
index d77375850..855656c3c 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializeSchema.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializeSchema.java
@@ -43,6 +43,7 @@ import java.util.HashMap;
import java.util.Map;
import static com.google.common.base.Preconditions.checkNotNull;
+import static
org.apache.seatunnel.connectors.cdc.base.utils.SourceRecordUtils.isDataChangeRecord;
/** Deserialization schema from Debezium object to {@link SeaTunnelRow}. */
@Slf4j
@@ -109,6 +110,11 @@ public final class SeaTunnelRowDebeziumDeserializeSchema
@Override
public void deserialize(SourceRecord record, Collector<SeaTunnelRow>
collector)
throws Exception {
+ if (!isDataChangeRecord(record)) {
+ log.debug("Unsupported record {}, just skip.", record);
+ return;
+ }
+
Envelope.Operation operation = Envelope.operationFor(record);
Struct messageStruct = (Struct) record.value();
Schema valueSchema = record.valueSchema();
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
index 1bb371880..a8531bd05 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
@@ -35,6 +35,8 @@ import
org.apache.seatunnel.connectors.cdc.base.option.JdbcSourceOptions;
import org.apache.seatunnel.connectors.cdc.base.source.IncrementalSource;
import org.apache.seatunnel.connectors.cdc.base.source.offset.OffsetFactory;
import
org.apache.seatunnel.connectors.cdc.debezium.DebeziumDeserializationSchema;
+import org.apache.seatunnel.connectors.cdc.debezium.DeserializeFormat;
+import
org.apache.seatunnel.connectors.cdc.debezium.row.DebeziumJsonDeserializeSchema;
import
org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializeSchema;
import
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.config.MySqlSourceConfigFactory;
import
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.offset.BinlogOffsetFactory;
@@ -81,6 +83,13 @@ public class MySqlIncrementalSource<T> extends
IncrementalSource<T, JdbcSourceCo
@Override
public DebeziumDeserializationSchema<T>
createDebeziumDeserializationSchema(
ReadonlyConfig config) {
+ if (DeserializeFormat.COMPATIBLE_DEBEZIUM_JSON.equals(
+ config.get(JdbcSourceOptions.FORMAT))) {
+ return (DebeziumDeserializationSchema<T>)
+ new DebeziumJsonDeserializeSchema(
+ config.get(JdbcSourceOptions.DEBEZIUM_PROPERTIES));
+ }
+
SeaTunnelDataType<SeaTunnelRow> physicalRowType;
if (dataType == null) {
// TODO: support metadata keys
diff --git a/seatunnel-connectors-v2/connector-kafka/pom.xml
b/seatunnel-connectors-v2/connector-kafka/pom.xml
index 73466f23e..0ce4bba6b 100644
--- a/seatunnel-connectors-v2/connector-kafka/pom.xml
+++ b/seatunnel-connectors-v2/connector-kafka/pom.xml
@@ -56,6 +56,11 @@
<artifactId>seatunnel-format-text</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-format-compatible-debezium-json</artifactId>
+ <version>${project.version}</version>
+ </dependency>
</dependencies>
</project>
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
index 547700220..19a2a67c3 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
@@ -19,6 +19,7 @@ package
org.apache.seatunnel.connectors.seatunnel.kafka.config;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
+import
org.apache.seatunnel.format.compatible.debezium.json.CompatibleDebeziumJsonSerializationSchema;
import java.util.List;
import java.util.Map;
@@ -35,6 +36,9 @@ public class Config {
public static final String CANAL_FORMAT = "canal-json";
+ public static final String COMPATIBLE_DEBEZIUM_JSON =
+ CompatibleDebeziumJsonSerializationSchema.IDENTIFIER;
+
/** The default field delimiter is “,” */
public static final String DEFAULT_FIELD_DELIMITER = ",";
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
index bae76c7a9..ccf4879d4 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
@@ -22,64 +22,188 @@ import
org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonErrorCode;
+import org.apache.seatunnel.connectors.seatunnel.kafka.config.Config;
+import
org.apache.seatunnel.connectors.seatunnel.kafka.exception.KafkaConnectorException;
+import
org.apache.seatunnel.format.compatible.debezium.json.CompatibleDebeziumJsonSerializationSchema;
import org.apache.seatunnel.format.json.JsonSerializationSchema;
import org.apache.seatunnel.format.json.canal.CanalJsonSerializationSchema;
import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
import org.apache.seatunnel.format.text.TextSerializationSchema;
import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.header.Header;
+import lombok.RequiredArgsConstructor;
+
+import java.util.Arrays;
import java.util.List;
import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.CANAL_FORMAT;
+import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.COMPATIBLE_DEBEZIUM_JSON;
import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.DEFAULT_FORMAT;
import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.TEXT_FORMAT;
-public class DefaultSeaTunnelRowSerializer implements
SeaTunnelRowSerializer<byte[], byte[]> {
+@RequiredArgsConstructor
+public class DefaultSeaTunnelRowSerializer implements SeaTunnelRowSerializer {
+ private final Function<SeaTunnelRow, String> topicExtractor;
+ private final Function<SeaTunnelRow, Integer> partitionExtractor;
+ private final Function<SeaTunnelRow, Long> timestampExtractor;
+ private final Function<SeaTunnelRow, byte[]> keyExtractor;
+ private final Function<SeaTunnelRow, byte[]> valueExtractor;
+ private final Function<SeaTunnelRow, Iterable<Header>> headersExtractor;
- private Integer partition;
- private final SerializationSchema keySerialization;
- private final SerializationSchema valueSerialization;
+ @Override
+ public ProducerRecord serializeRow(SeaTunnelRow row) {
+ return new ProducerRecord(
+ topicExtractor.apply(row),
+ partitionExtractor.apply(row),
+ timestampExtractor.apply(row),
+ keyExtractor.apply(row),
+ valueExtractor.apply(row),
+ headersExtractor.apply(row));
+ }
- public DefaultSeaTunnelRowSerializer(
- SeaTunnelRowType seaTunnelRowType, String format, String
delimiter) {
- this(element -> null, createSerializationSchema(seaTunnelRowType,
format, delimiter));
+ public static DefaultSeaTunnelRowSerializer create(
+ String topic, SeaTunnelRowType rowType, String format, String
delimiter) {
+ return new DefaultSeaTunnelRowSerializer(
+ topicExtractor(topic, rowType),
+ partitionExtractor(null),
+ timestampExtractor(),
+ keyExtractor(null, rowType, format, delimiter),
+ valueExtractor(rowType, format, delimiter),
+ headersExtractor());
}
- public DefaultSeaTunnelRowSerializer(
- Integer partition, SeaTunnelRowType seaTunnelRowType, String
format, String delimiter) {
- this(seaTunnelRowType, format, delimiter);
- this.partition = partition;
+ public static DefaultSeaTunnelRowSerializer create(
+ String topic,
+ Integer partition,
+ SeaTunnelRowType rowType,
+ String format,
+ String delimiter) {
+ return new DefaultSeaTunnelRowSerializer(
+ topicExtractor(topic, rowType),
+ partitionExtractor(partition),
+ timestampExtractor(),
+ keyExtractor(null, rowType, format, delimiter),
+ valueExtractor(rowType, format, delimiter),
+ headersExtractor());
}
- public DefaultSeaTunnelRowSerializer(
- List<String> keyFieldNames,
- SeaTunnelRowType seaTunnelRowType,
+ public static DefaultSeaTunnelRowSerializer create(
+ String topic,
+ List<String> keyFields,
+ SeaTunnelRowType rowType,
String format,
String delimiter) {
- this(
- createKeySerializationSchema(keyFieldNames, seaTunnelRowType),
- createSerializationSchema(seaTunnelRowType, format,
delimiter));
+ return new DefaultSeaTunnelRowSerializer(
+ topicExtractor(topic, rowType),
+ partitionExtractor(null),
+ timestampExtractor(),
+ keyExtractor(keyFields, rowType, format, delimiter),
+ valueExtractor(rowType, format, delimiter),
+ headersExtractor());
}
- public DefaultSeaTunnelRowSerializer(
- SerializationSchema keySerialization, SerializationSchema
valueSerialization) {
- this.keySerialization = keySerialization;
- this.valueSerialization = valueSerialization;
+ private static Function<SeaTunnelRow, Integer> partitionExtractor(Integer
partition) {
+ return row -> partition;
}
- @Override
- public ProducerRecord<byte[], byte[]> serializeRow(String topic,
SeaTunnelRow row) {
- return new ProducerRecord<>(
- topic,
- partition,
- keySerialization.serialize(row),
- valueSerialization.serialize(row));
+ private static Function<SeaTunnelRow, Long> timestampExtractor() {
+ return row -> null;
}
- private static SerializationSchema createSerializationSchema(
+ private static Function<SeaTunnelRow, Iterable<Header>> headersExtractor()
{
+ return row -> null;
+ }
+
+ private static Function<SeaTunnelRow, String> topicExtractor(
+ String topic, SeaTunnelRowType rowType) {
+ String regex = "\\$\\{(.*?)\\}";
+ Pattern pattern = Pattern.compile(regex, Pattern.DOTALL);
+ Matcher matcher = pattern.matcher(topic);
+ boolean isExtractTopic = matcher.find();
+ if (!isExtractTopic) {
+ return row -> topic;
+ }
+
+ String topicField = matcher.group(1);
+ List<String> fieldNames = Arrays.asList(rowType.getFieldNames());
+ if (!fieldNames.contains(topicField)) {
+ throw new KafkaConnectorException(
+ CommonErrorCode.ILLEGAL_ARGUMENT,
+ String.format("Field name { %s } is not found!", topic));
+ }
+ int topicFieldIndex = rowType.indexOf(topicField);
+ return row -> {
+ Object topicFieldValue = row.getField(topicFieldIndex);
+ if (topicFieldValue == null) {
+ throw new KafkaConnectorException(
+ CommonErrorCode.ILLEGAL_ARGUMENT, "The column value is
empty!");
+ }
+ return topicFieldValue.toString();
+ };
+ }
+
+ private static Function<SeaTunnelRow, byte[]> keyExtractor(
+ List<String> keyFields, SeaTunnelRowType rowType, String format,
String delimiter) {
+ if (Config.COMPATIBLE_DEBEZIUM_JSON.equals(format)) {
+ CompatibleDebeziumJsonSerializationSchema serializationSchema =
+ new CompatibleDebeziumJsonSerializationSchema(rowType,
true);
+ return row -> serializationSchema.serialize(row);
+ }
+
+ if (keyFields == null || keyFields.isEmpty()) {
+ return row -> null;
+ }
+
+ SeaTunnelRowType keyType = createKeyType(keyFields, rowType);
+ Function<SeaTunnelRow, SeaTunnelRow> keyRowExtractor =
+ createKeyRowExtractor(keyType, rowType);
+ SerializationSchema serializationSchema =
+ createSerializationSchema(keyType, format, delimiter, true);
+ return row ->
serializationSchema.serialize(keyRowExtractor.apply(row));
+ }
+
+ private static Function<SeaTunnelRow, byte[]> valueExtractor(
SeaTunnelRowType rowType, String format, String delimiter) {
+ SerializationSchema serializationSchema =
+ createSerializationSchema(rowType, format, delimiter, false);
+ return row -> serializationSchema.serialize(row);
+ }
+
+ private static SeaTunnelRowType createKeyType(
+ List<String> keyFieldNames, SeaTunnelRowType rowType) {
+ int[] keyFieldIndexArr = new int[keyFieldNames.size()];
+ SeaTunnelDataType[] keyFieldDataTypeArr = new
SeaTunnelDataType[keyFieldNames.size()];
+ for (int i = 0; i < keyFieldNames.size(); i++) {
+ String keyFieldName = keyFieldNames.get(i);
+ int rowFieldIndex = rowType.indexOf(keyFieldName);
+ keyFieldIndexArr[i] = rowFieldIndex;
+ keyFieldDataTypeArr[i] = rowType.getFieldType(rowFieldIndex);
+ }
+ return new SeaTunnelRowType(keyFieldNames.toArray(new String[0]),
keyFieldDataTypeArr);
+ }
+
+ private static Function<SeaTunnelRow, SeaTunnelRow> createKeyRowExtractor(
+ SeaTunnelRowType keyType, SeaTunnelRowType rowType) {
+ int[] keyIndex = new int[keyType.getTotalFields()];
+ for (int i = 0; i < keyType.getTotalFields(); i++) {
+ keyIndex[i] = rowType.indexOf(keyType.getFieldName(i));
+ }
+ return row -> {
+ Object[] fields = new Object[keyType.getTotalFields()];
+ for (int i = 0; i < keyIndex.length; i++) {
+ fields[i] = row.getField(keyIndex[i]);
+ }
+ return new SeaTunnelRow(fields);
+ };
+ }
+
+ private static SerializationSchema createSerializationSchema(
+ SeaTunnelRowType rowType, String format, String delimiter, boolean
isKey) {
switch (format) {
case DEFAULT_FORMAT:
return new JsonSerializationSchema(rowType);
@@ -90,34 +214,11 @@ public class DefaultSeaTunnelRowSerializer implements
SeaTunnelRowSerializer<byt
.build();
case CANAL_FORMAT:
return new CanalJsonSerializationSchema(rowType);
+ case COMPATIBLE_DEBEZIUM_JSON:
+ return new CompatibleDebeziumJsonSerializationSchema(rowType,
isKey);
default:
throw new SeaTunnelJsonFormatException(
CommonErrorCode.UNSUPPORTED_DATA_TYPE, "Unsupported
format: " + format);
}
}
-
- private static SerializationSchema createKeySerializationSchema(
- List<String> keyFieldNames, SeaTunnelRowType seaTunnelRowType) {
- int[] keyFieldIndexArr = new int[keyFieldNames.size()];
- SeaTunnelDataType[] keyFieldDataTypeArr = new
SeaTunnelDataType[keyFieldNames.size()];
- for (int i = 0; i < keyFieldNames.size(); i++) {
- String keyFieldName = keyFieldNames.get(i);
- int rowFieldIndex = seaTunnelRowType.indexOf(keyFieldName);
- keyFieldIndexArr[i] = rowFieldIndex;
- keyFieldDataTypeArr[i] =
seaTunnelRowType.getFieldType(rowFieldIndex);
- }
- SeaTunnelRowType keyType =
- new SeaTunnelRowType(keyFieldNames.toArray(new String[0]),
keyFieldDataTypeArr);
- SerializationSchema keySerializationSchema = new
JsonSerializationSchema(keyType);
-
- Function<SeaTunnelRow, SeaTunnelRow> keyDataExtractor =
- row -> {
- Object[] keyFields = new Object[keyFieldIndexArr.length];
- for (int i = 0; i < keyFieldIndexArr.length; i++) {
- keyFields[i] = row.getField(keyFieldIndexArr[i]);
- }
- return new SeaTunnelRow(keyFields);
- };
- return row ->
keySerializationSchema.serialize(keyDataExtractor.apply(row));
- }
}
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/SeaTunnelRowSerializer.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/SeaTunnelRowSerializer.java
index 5f31ca001..9f12591ea 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/SeaTunnelRowSerializer.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/SeaTunnelRowSerializer.java
@@ -29,5 +29,5 @@ public interface SeaTunnelRowSerializer<K, V> {
* @param row seatunnel row
* @return kafka record.
*/
- ProducerRecord<K, V> serializeRow(String topic, SeaTunnelRow row);
+ ProducerRecord<K, V> serializeRow(SeaTunnelRow row);
}
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java
index 4fb7bfa3d..0e0ca3a4b 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java
@@ -58,6 +58,22 @@ public class KafkaSink
private Config pluginConfig;
private SeaTunnelRowType seaTunnelRowType;
+ public KafkaSink() {}
+
+ public KafkaSink(Config pluginConfig, SeaTunnelRowType rowType) {
+ CheckResult result =
+ CheckConfigUtil.checkAllExists(pluginConfig, TOPIC.key(),
BOOTSTRAP_SERVERS.key());
+ if (!result.isSuccess()) {
+ throw new KafkaConnectorException(
+ SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+ String.format(
+ "PluginName: %s, PluginType: %s, Message: %s",
+ getPluginName(), PluginType.SINK,
result.getMsg()));
+ }
+ this.pluginConfig = pluginConfig;
+ this.seaTunnelRowType = rowType;
+ }
+
@Override
public void prepare(Config pluginConfig) throws PrepareFailException {
CheckResult result =
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java
index 8f897aa20..75db7ea3b 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java
@@ -17,8 +17,12 @@
package org.apache.seatunnel.connectors.seatunnel.kafka.sink;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+
import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableFactoryContext;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
import org.apache.seatunnel.connectors.seatunnel.kafka.config.Config;
@@ -39,4 +43,12 @@ public class KafkaSinkFactory implements TableSinkFactory {
.exclusive(Config.PARTITION, Config.PARTITION_KEY_FIELDS)
.build();
}
+
+ @Override
+ public TableSink createSink(TableFactoryContext context) {
+ return () ->
+ new KafkaSink(
+ ConfigFactory.parseMap(context.getOptions().toMap()),
+
context.getCatalogTable().getTableSchema().toPhysicalRowDataType());
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
index 6d9980810..1cb0b2fd2 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
@@ -31,7 +31,6 @@ import
org.apache.seatunnel.connectors.seatunnel.kafka.serialize.SeaTunnelRowSer
import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaSinkState;
-import org.apache.commons.lang3.tuple.Pair;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;
@@ -42,8 +41,6 @@ import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.Random;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.ASSIGN_PARTITIONS;
import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.DEFAULT_FIELD_DELIMITER;
@@ -62,9 +59,7 @@ public class KafkaSinkWriter implements
SinkWriter<SeaTunnelRow, KafkaCommitInfo
private final SinkWriter.Context context;
private String transactionPrefix;
- private String topic;
private long lastCheckpointId = 0;
- private boolean isExtractTopic;
private SeaTunnelRowType seaTunnelRowType;
private final KafkaProduceSender<byte[], byte[]> kafkaProducerSender;
@@ -79,9 +74,6 @@ public class KafkaSinkWriter implements
SinkWriter<SeaTunnelRow, KafkaCommitInfo
List<KafkaSinkState> kafkaStates) {
this.context = context;
this.seaTunnelRowType = seaTunnelRowType;
- Pair<Boolean, String> topicResult =
isExtractTopic(pluginConfig.getString(TOPIC.key()));
- this.isExtractTopic = topicResult.getKey();
- this.topic = topicResult.getRight();
if (pluginConfig.hasPath(ASSIGN_PARTITIONS.key())) {
MessageContentPartitioner.setAssignPartitions(
pluginConfig.getStringList(ASSIGN_PARTITIONS.key()));
@@ -115,7 +107,7 @@ public class KafkaSinkWriter implements
SinkWriter<SeaTunnelRow, KafkaCommitInfo
@Override
public void write(SeaTunnelRow element) {
ProducerRecord<byte[], byte[]> producerRecord =
- seaTunnelRowSerializer.serializeRow(extractTopic(element),
element);
+ seaTunnelRowSerializer.serializeRow(element);
kafkaProducerSender.send(producerRecord);
}
@@ -180,11 +172,17 @@ public class KafkaSinkWriter implements
SinkWriter<SeaTunnelRow, KafkaCommitInfo
if (pluginConfig.hasPath(FIELD_DELIMITER.key())) {
delimiter = pluginConfig.getString(FIELD_DELIMITER.key());
}
+ String topic = pluginConfig.getString(TOPIC.key());
if (pluginConfig.hasPath(PARTITION.key())) {
- return new DefaultSeaTunnelRowSerializer(
- pluginConfig.getInt(PARTITION.key()), seaTunnelRowType,
format, delimiter);
+ return DefaultSeaTunnelRowSerializer.create(
+ topic,
+ pluginConfig.getInt(PARTITION.key()),
+ seaTunnelRowType,
+ format,
+ delimiter);
} else {
- return new DefaultSeaTunnelRowSerializer(
+ return DefaultSeaTunnelRowSerializer.create(
+ topic,
getPartitionKeyFields(pluginConfig, seaTunnelRowType),
seaTunnelRowType,
format,
@@ -229,33 +227,4 @@ public class KafkaSinkWriter implements
SinkWriter<SeaTunnelRow, KafkaCommitInfo
}
return Collections.emptyList();
}
-
- private Pair<Boolean, String> isExtractTopic(String topicConfig) {
- String regex = "\\$\\{(.*?)\\}";
- Pattern pattern = Pattern.compile(regex, Pattern.DOTALL);
- Matcher matcher = pattern.matcher(topicConfig);
- if (matcher.find()) {
- return Pair.of(true, matcher.group(1));
- }
- return Pair.of(false, topicConfig);
- }
-
- private String extractTopic(SeaTunnelRow row) {
- if (!isExtractTopic) {
- return topic;
- }
- List<String> fieldNames =
Arrays.asList(seaTunnelRowType.getFieldNames());
- if (!fieldNames.contains(topic)) {
- throw new KafkaConnectorException(
- CommonErrorCode.ILLEGAL_ARGUMENT,
- String.format("Field name { %s } is not found!", topic));
- }
- int topicFieldIndex = seaTunnelRowType.indexOf(topic);
- Object topicFieldValue = row.getField(topicFieldIndex);
- if (topicFieldValue == null) {
- throw new KafkaConnectorException(
- CommonErrorCode.ILLEGAL_ARGUMENT, "The column value is
empty!");
- }
- return topicFieldValue.toString();
- }
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
index 8f1ca66c5..3c73d51fc 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
@@ -113,9 +113,12 @@ public class KafkaIT extends TestSuiteBase implements
TestResource {
log.info("Write 100 records to topic test_topic_source");
DefaultSeaTunnelRowSerializer serializer =
- new DefaultSeaTunnelRowSerializer(
- SEATUNNEL_ROW_TYPE, DEFAULT_FORMAT,
DEFAULT_FIELD_DELIMITER);
- generateTestData(row -> serializer.serializeRow("test_topic_source",
row), 0, 100);
+ DefaultSeaTunnelRowSerializer.create(
+ "test_topic_source",
+ SEATUNNEL_ROW_TYPE,
+ DEFAULT_FORMAT,
+ DEFAULT_FIELD_DELIMITER);
+ generateTestData(row -> serializer.serializeRow(row), 0, 100);
}
@AfterAll
@@ -191,9 +194,12 @@ public class KafkaIT extends TestSuiteBase implements
TestResource {
public void testSourceKafkaJsonToConsole(TestContainer container)
throws IOException, InterruptedException {
DefaultSeaTunnelRowSerializer serializer =
- new DefaultSeaTunnelRowSerializer(
- SEATUNNEL_ROW_TYPE, DEFAULT_FORMAT,
DEFAULT_FIELD_DELIMITER);
- generateTestData(row -> serializer.serializeRow("test_topic_json",
row), 0, 100);
+ DefaultSeaTunnelRowSerializer.create(
+ "test_topic_json",
+ SEATUNNEL_ROW_TYPE,
+ DEFAULT_FORMAT,
+ DEFAULT_FIELD_DELIMITER);
+ generateTestData(row -> serializer.serializeRow(row), 0, 100);
Container.ExecResult execResult =
container.executeJob("/kafkasource_json_to_console.conf");
Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
}
@@ -210,9 +216,12 @@ public class KafkaIT extends TestSuiteBase implements
TestResource {
public void testSourceKafkaStartConfig(TestContainer container)
throws IOException, InterruptedException {
DefaultSeaTunnelRowSerializer serializer =
- new DefaultSeaTunnelRowSerializer(
- SEATUNNEL_ROW_TYPE, DEFAULT_FORMAT,
DEFAULT_FIELD_DELIMITER);
- generateTestData(row -> serializer.serializeRow("test_topic_group",
row), 100, 150);
+ DefaultSeaTunnelRowSerializer.create(
+ "test_topic_group",
+ SEATUNNEL_ROW_TYPE,
+ DEFAULT_FORMAT,
+ DEFAULT_FIELD_DELIMITER);
+ generateTestData(row -> serializer.serializeRow(row), 100, 150);
testKafkaGroupOffsetsToConsole(container);
}
diff --git a/seatunnel-formats/pom.xml b/seatunnel-formats/pom.xml
index 792ed8735..983a8629c 100644
--- a/seatunnel-formats/pom.xml
+++ b/seatunnel-formats/pom.xml
@@ -29,6 +29,7 @@
<modules>
<module>seatunnel-format-json</module>
<module>seatunnel-format-text</module>
+ <module>seatunnel-format-compatible-debezium-json</module>
</modules>
</project>
diff --git a/seatunnel-connectors-v2/connector-kafka/pom.xml
b/seatunnel-formats/seatunnel-format-compatible-debezium-json/pom.xml
similarity index 62%
copy from seatunnel-connectors-v2/connector-kafka/pom.xml
copy to seatunnel-formats/seatunnel-format-compatible-debezium-json/pom.xml
index 73466f23e..a2e6ab9b1 100644
--- a/seatunnel-connectors-v2/connector-kafka/pom.xml
+++ b/seatunnel-formats/seatunnel-format-compatible-debezium-json/pom.xml
@@ -1,61 +1,47 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
-
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
-
http://www.apache.org/licenses/LICENSE-2.0
-
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.seatunnel</groupId>
- <artifactId>seatunnel-connectors-v2</artifactId>
+ <artifactId>seatunnel-formats</artifactId>
<version>${revision}</version>
</parent>
- <artifactId>connector-kafka</artifactId>
- <name>SeaTunnel : Connectors V2 : Kafka</name>
+ <artifactId>seatunnel-format-compatible-debezium-json</artifactId>
+ <name>SeaTunnel : Formats : Compatible Debezium Json</name>
<properties>
- <kafka.client.version>3.2.0</kafka.client.version>
+ <debezium.version>1.6.4.Final</debezium.version>
</properties>
<dependencies>
- <!-- TODO add to dependency management after version unify-->
<dependency>
<groupId>org.apache.seatunnel</groupId>
- <artifactId>connector-common</artifactId>
+ <artifactId>seatunnel-api</artifactId>
<version>${project.version}</version>
+ <scope>provided</scope>
</dependency>
<dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka-clients</artifactId>
- <version>${kafka.client.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.seatunnel</groupId>
- <artifactId>seatunnel-format-json</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.seatunnel</groupId>
- <artifactId>seatunnel-format-text</artifactId>
- <version>${project.version}</version>
+ <groupId>io.debezium</groupId>
+ <artifactId>debezium-embedded</artifactId>
+ <version>${debezium.version}</version>
+ <scope>provided</scope>
</dependency>
</dependencies>
-
</project>
diff --git
a/seatunnel-formats/seatunnel-format-compatible-debezium-json/src/main/java/org/apache/seatunnel/format/compatible/debezium/json/CompatibleDebeziumJsonDeserializationSchema.java
b/seatunnel-formats/seatunnel-format-compatible-debezium-json/src/main/java/org/apache/seatunnel/format/compatible/debezium/json/CompatibleDebeziumJsonDeserializationSchema.java
new file mode 100644
index 000000000..005297492
--- /dev/null
+++
b/seatunnel-formats/seatunnel-format-compatible-debezium-json/src/main/java/org/apache/seatunnel/format/compatible/debezium/json/CompatibleDebeziumJsonDeserializationSchema.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.format.compatible.debezium.json;
+
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+
+import org.apache.kafka.connect.source.SourceRecord;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.lang.reflect.InvocationTargetException;
+
+public class CompatibleDebeziumJsonDeserializationSchema
+ implements DeserializationSchema<SeaTunnelRow> {
+ public static final String IDENTIFIER = "compatible_debezium_json";
+ public static final String FIELD_TOPIC = "topic";
+ public static final String FIELD_KEY = "key";
+ public static final String FIELD_VALUE = "value";
+ public static final SeaTunnelRowType DEBEZIUM_DATA_ROW_TYPE =
+ new SeaTunnelRowType(
+ new String[] {FIELD_TOPIC, FIELD_KEY, FIELD_VALUE},
+ new SeaTunnelDataType[] {
+ BasicType.STRING_TYPE, BasicType.STRING_TYPE,
BasicType.STRING_TYPE
+ });
+
+ private final DebeziumJsonConverter debeziumJsonConverter;
+
+ public CompatibleDebeziumJsonDeserializationSchema(
+ boolean keySchemaEnable, boolean valueSchemaEnable) {
+ this.debeziumJsonConverter = new
DebeziumJsonConverter(keySchemaEnable, valueSchemaEnable);
+ }
+
+ @Override
+ public SeaTunnelRow deserialize(byte[] message) throws IOException {
+ throw new UnsupportedEncodingException();
+ }
+
+ public SeaTunnelRow deserialize(SourceRecord record)
+ throws InvocationTargetException, IllegalAccessException {
+ String key = debeziumJsonConverter.serializeKey(record);
+ String value = debeziumJsonConverter.serializeValue(record);
+ Object[] fields = new Object[] {record.topic(), key, value};
+ SeaTunnelRow row = new SeaTunnelRow(fields);
+ return row;
+ }
+
+ @Override
+ public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
+ return DEBEZIUM_DATA_ROW_TYPE;
+ }
+}
diff --git
a/seatunnel-formats/seatunnel-format-compatible-debezium-json/src/main/java/org/apache/seatunnel/format/compatible/debezium/json/CompatibleDebeziumJsonSerializationSchema.java
b/seatunnel-formats/seatunnel-format-compatible-debezium-json/src/main/java/org/apache/seatunnel/format/compatible/debezium/json/CompatibleDebeziumJsonSerializationSchema.java
new file mode 100644
index 000000000..a31aaec6c
--- /dev/null
+++
b/seatunnel-formats/seatunnel-format-compatible-debezium-json/src/main/java/org/apache/seatunnel/format/compatible/debezium/json/CompatibleDebeziumJsonSerializationSchema.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.format.compatible.debezium.json;
+
+import org.apache.seatunnel.api.serialization.SerializationSchema;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+
+import lombok.RequiredArgsConstructor;
+
+import static
org.apache.seatunnel.format.compatible.debezium.json.CompatibleDebeziumJsonDeserializationSchema.FIELD_KEY;
+import static
org.apache.seatunnel.format.compatible.debezium.json.CompatibleDebeziumJsonDeserializationSchema.FIELD_VALUE;
+
+@RequiredArgsConstructor
+public class CompatibleDebeziumJsonSerializationSchema implements
SerializationSchema {
+ public static final String IDENTIFIER =
CompatibleDebeziumJsonDeserializationSchema.IDENTIFIER;
+
+ private final int index;
+
+ public CompatibleDebeziumJsonSerializationSchema(SeaTunnelRowType rowType,
boolean isKey) {
+ this.index = rowType.indexOf(isKey ? FIELD_KEY : FIELD_VALUE);
+ }
+
+ @Override
+ public byte[] serialize(SeaTunnelRow row) {
+ String field = (String) row.getField(index);
+ return field.getBytes();
+ }
+}
diff --git
a/seatunnel-formats/seatunnel-format-compatible-debezium-json/src/main/java/org/apache/seatunnel/format/compatible/debezium/json/DebeziumJsonConverter.java
b/seatunnel-formats/seatunnel-format-compatible-debezium-json/src/main/java/org/apache/seatunnel/format/compatible/debezium/json/DebeziumJsonConverter.java
new file mode 100644
index 000000000..fe96d807e
--- /dev/null
+++
b/seatunnel-formats/seatunnel-format-compatible-debezium-json/src/main/java/org/apache/seatunnel/format/compatible/debezium/json/DebeziumJsonConverter.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.format.compatible.debezium.json;
+
+import org.apache.seatunnel.common.utils.ReflectionUtils;
+
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.json.JsonConverter;
+import org.apache.kafka.connect.json.JsonConverterConfig;
+import org.apache.kafka.connect.source.SourceRecord;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import lombok.RequiredArgsConstructor;
+
+import java.io.Serializable;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.Collections;
+
+@RequiredArgsConstructor
+public class DebeziumJsonConverter implements Serializable {
+ private static final String INCLUDE_SCHEMA_METHOD =
"convertToJsonWithEnvelope";
+ private static final String EXCLUDE_SCHEMA_METHOD =
"convertToJsonWithoutEnvelope";
+
+ private final boolean keySchemaEnable;
+ private final boolean valueSchemaEnable;
+ private transient JsonConverter keyConverter;
+ private transient JsonConverter valueConverter;
+ private transient Method keyConverterMethod;
+ private transient Method valueConverterMethod;
+
+ public String serializeKey(SourceRecord record)
+ throws InvocationTargetException, IllegalAccessException {
+ tryInit();
+ JsonNode jsonNode =
+ (JsonNode)
+ keyConverterMethod.invoke(keyConverter,
record.keySchema(), record.key());
+ return jsonNode.toString();
+ }
+
+ public String serializeValue(SourceRecord record)
+ throws InvocationTargetException, IllegalAccessException {
+ tryInit();
+ JsonNode jsonNode =
+ (JsonNode)
+ valueConverterMethod.invoke(
+ valueConverter, record.valueSchema(),
record.value());
+ return jsonNode.toString();
+ }
+
+ private void tryInit() {
+ if (keyConverter == null) {
+ synchronized (this) {
+ if (keyConverter == null) {
+ keyConverter = new JsonConverter();
+ keyConverter.configure(
+ Collections.singletonMap(
+ JsonConverterConfig.SCHEMAS_ENABLE_CONFIG,
keySchemaEnable),
+ true);
+ keyConverterMethod =
+ ReflectionUtils.getDeclaredMethod(
+ JsonConverter.class,
+ keySchemaEnable
+ ? INCLUDE_SCHEMA_METHOD
+ : EXCLUDE_SCHEMA_METHOD,
+ Schema.class,
+ Object.class)
+ .get();
+ }
+ }
+ }
+ if (valueConverter == null) {
+ synchronized (this) {
+ if (valueConverter == null) {
+ valueConverter = new JsonConverter();
+ valueConverter.configure(
+ Collections.singletonMap(
+ JsonConverterConfig.SCHEMAS_ENABLE_CONFIG,
valueSchemaEnable),
+ false);
+ valueConverterMethod =
+ ReflectionUtils.getDeclaredMethod(
+ JsonConverter.class,
+ valueSchemaEnable
+ ? INCLUDE_SCHEMA_METHOD
+ : EXCLUDE_SCHEMA_METHOD,
+ Schema.class,
+ Object.class)
+ .get();
+ }
+ }
+ }
+ }
+}