This is an automated email from the ASF dual-hosted git repository.

yuzelin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 4db2e9a67 [flink][kafka-cdc] Support table debezium json format (#2251)
4db2e9a67 is described below

commit 4db2e9a676e07c77f0b2dfae8edb0f0a643aa213
Author: monster <60029759+monsterchenz...@users.noreply.github.com>
AuthorDate: Tue Nov 7 16:06:18 2023 +0800

    [flink][kafka-cdc] Support table debezium json format (#2251)
---
 .../org/apache/paimon/utils/JsonSerdeUtil.java     |  36 +++++
 .../paimon/flink/action/cdc/format/DataFormat.java |   4 +-
 .../cdc/format/debezium/DebeziumRecordParser.java  | 143 +++++++++++++++++
 .../JsonPrimaryKeyDeserializationSchema.java       |  67 ++++++++
 .../flink/action/cdc/kafka/KafkaActionUtils.java   |  43 +++--
 .../action/cdc/kafka/KafkaSyncTableAction.java     |   4 +-
 .../kafka/KafkaDebeziumSyncTableActionITCase.java  | 178 +++++++++++++++++++++
 .../table/computedcolumn/debezium-data-1.txt       |  19 +++
 .../table/schemaevolution/debezium-data-1.txt      |  20 +++
 .../table/schemaevolution/debezium-data-2.txt      |  20 +++
 .../table/schemaevolution/debezium-data-3.txt      |  22 +++
 11 files changed, 543 insertions(+), 13 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/JsonSerdeUtil.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/JsonSerdeUtil.java
index 3b49ffd6d..803f4f3da 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/JsonSerdeUtil.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/JsonSerdeUtil.java
@@ -35,11 +35,14 @@ import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMap
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.SerializerProvider;
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer;
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.module.SimpleModule;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ser.std.StdSerializer;
 
 import java.io.IOException;
 import java.io.UncheckedIOException;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.stream.Collectors;
@@ -219,6 +222,39 @@ public class JsonSerdeUtil {
         return OBJECT_MAPPER_INSTANCE.valueToTree(value);
     }
 
+    /**
+     * Adds an array of values to a JSON string under the specified key.
+     *
+     * @param origin The original JSON string.
+     * @param key The key under which the values will be added as an array.
+     * @param values A list of values to be added to the JSON string.
+     * @return The JSON string with the added array. If the JSON string is not 
a valid JSON object,
+     *     or if the list of values is empty or null, the original JSON string 
will be returned.
+     * @throws RuntimeException If an error occurs while parsing the JSON 
string or adding the
+     *     values.
+     */
+    public static String putArrayToJsonString(String origin, String key, 
List<String> values) {
+        if (values == null || values.isEmpty()) {
+            return origin;
+        }
+
+        try {
+            JsonNode jsonNode = OBJECT_MAPPER_INSTANCE.readTree(origin);
+            if (jsonNode.isObject()) {
+                ObjectNode objectNode = (ObjectNode) jsonNode;
+                ArrayNode arrayNode = objectNode.putArray(key);
+                for (String value : values) {
+                    arrayNode.add(value);
+                }
+                return OBJECT_MAPPER_INSTANCE.writeValueAsString(objectNode);
+            } else {
+                return origin;
+            }
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to add array to JSON", e);
+        }
+    }
+
     public static boolean isNull(JsonNode jsonNode) {
         return jsonNode == null || jsonNode.isNull();
     }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/DataFormat.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/DataFormat.java
index fd5f99602..28dc3e457 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/DataFormat.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/DataFormat.java
@@ -21,6 +21,7 @@ package org.apache.paimon.flink.action.cdc.format;
 import org.apache.paimon.flink.action.cdc.ComputedColumn;
 import org.apache.paimon.flink.action.cdc.TypeMapping;
 import org.apache.paimon.flink.action.cdc.format.canal.CanalRecordParser;
+import org.apache.paimon.flink.action.cdc.format.debezium.DebeziumRecordParser;
 import org.apache.paimon.flink.action.cdc.format.maxwell.MaxwellRecordParser;
 import org.apache.paimon.flink.action.cdc.format.ogg.OggRecordParser;
 
@@ -36,7 +37,8 @@ import java.util.List;
 public enum DataFormat {
     CANAL_JSON(CanalRecordParser::new),
     OGG_JSON(OggRecordParser::new),
-    MAXWELL_JSON(MaxwellRecordParser::new);
+    MAXWELL_JSON(MaxwellRecordParser::new),
+    DEBEZIUM_JSON(DebeziumRecordParser::new);
     // Add more data formats here if needed
 
     private final RecordParserFactory parser;
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumRecordParser.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumRecordParser.java
new file mode 100644
index 000000000..b1d858bf4
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumRecordParser.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.format.debezium;
+
+import org.apache.paimon.flink.action.cdc.ComputedColumn;
+import org.apache.paimon.flink.action.cdc.TypeMapping;
+import org.apache.paimon.flink.action.cdc.format.RecordParser;
+import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
+import org.apache.paimon.types.RowKind;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static 
org.apache.flink.shaded.guava30.com.google.common.base.Preconditions.checkArgument;
+import static org.apache.paimon.utils.JsonSerdeUtil.isNull;
+
+/**
+ * The {@code DebeziumRecordParser} class extends the abstract {@link 
RecordParser} and is designed
+ * to parse records from Debezium's JSON change data capture (CDC) format. 
Debezium is a CDC
+ * solution for MySQL databases that captures row-level changes to database 
tables and outputs them
+ * in JSON format. This parser extracts relevant information from the 
Debezium-JSON format and
+ * converts it into a list of {@link RichCdcMultiplexRecord} objects.
+ *
+ * <p>The class supports various database operations such as INSERT, UPDATE, 
DELETE, and READ
+ * (snapshot reads), and creates corresponding {@link RichCdcMultiplexRecord} 
objects to represent
+ * these changes.
+ *
+ * <p>Validation is performed to ensure that the JSON records contain all 
necessary fields,
+ * including the 'before' and 'after' states for UPDATE operations, and the 
class also supports
+ * schema extraction for the Kafka topic. Debezium's specific fields such as 
'source', 'op' for
+ * operation type, and primary key field names are used to construct the 
details of each record
+ * event.
+ */
+public class DebeziumRecordParser extends RecordParser {
+
+    private static final String FIELD_BEFORE = "before";
+    private static final String FIELD_AFTER = "after";
+    private static final String FIELD_SOURCE = "source";
+    private static final String FIELD_PRIMARY = "pkNames";
+    private static final String FIELD_DB = "db";
+    private static final String FIELD_TYPE = "op";
+    private static final String OP_INSERT = "c";
+    private static final String OP_UPDATE = "u";
+    private static final String OP_DELETE = "d";
+    private static final String OP_READE = "r";
+
+    public DebeziumRecordParser(
+            boolean caseSensitive, TypeMapping typeMapping, 
List<ComputedColumn> computedColumns) {
+        super(caseSensitive, typeMapping, computedColumns);
+    }
+
+    @Override
+    public List<RichCdcMultiplexRecord> extractRecords() {
+        String operation = extractStringFromRootJson(FIELD_TYPE);
+        List<RichCdcMultiplexRecord> records = new ArrayList<>();
+        switch (operation) {
+            case OP_INSERT:
+            case OP_READE:
+                processRecord(root.get(dataField()), RowKind.INSERT, records);
+                break;
+            case OP_UPDATE:
+                processRecord(
+                        mergeOldRecord(root.get(dataField()), 
root.get(FIELD_BEFORE)),
+                        RowKind.DELETE,
+                        records);
+                processRecord(root.get(dataField()), RowKind.INSERT, records);
+                break;
+            case OP_DELETE:
+                processRecord(root.get(FIELD_BEFORE), RowKind.DELETE, records);
+                break;
+            default:
+                throw new UnsupportedOperationException("Unknown record 
operation: " + operation);
+        }
+        return records;
+    }
+
+    @Override
+    protected void validateFormat() {
+        String errorMessageTemplate =
+                "Didn't find '%s' node in json. Please make sure your topic's 
format is correct.";
+        checkArgument(
+                !isNull(root.get(FIELD_SOURCE).get(FIELD_TABLE)),
+                errorMessageTemplate,
+                FIELD_TABLE);
+        checkArgument(
+                !isNull(root.get(FIELD_SOURCE).get(FIELD_DB)),
+                errorMessageTemplate,
+                FIELD_DATABASE);
+        checkArgument(!isNull(root.get(FIELD_TYPE)), errorMessageTemplate, 
FIELD_TYPE);
+        String operation = root.get(FIELD_TYPE).asText();
+        switch (operation) {
+            case OP_INSERT:
+            case OP_READE:
+                checkArgument(!isNull(root.get(dataField())), 
errorMessageTemplate, dataField());
+                break;
+            case OP_UPDATE:
+            case OP_DELETE:
+                checkArgument(!isNull(root.get(FIELD_BEFORE)), 
errorMessageTemplate, FIELD_BEFORE);
+                break;
+            default:
+                throw new IllegalArgumentException("Unsupported operation 
type: " + operation);
+        }
+        checkArgument(!isNull(root.get(primaryField())), errorMessageTemplate, 
primaryField());
+    }
+
+    @Override
+    protected String primaryField() {
+        return FIELD_PRIMARY;
+    }
+
+    @Override
+    protected String dataField() {
+        return FIELD_AFTER;
+    }
+
+    @Override
+    protected String extractStringFromRootJson(String key) {
+        if (key.equals(FIELD_TABLE)) {
+            tableName = root.get(FIELD_SOURCE).get(FIELD_TABLE).asText();
+            return tableName;
+        } else if (key.equals(FIELD_DATABASE)) {
+            databaseName = root.get(FIELD_SOURCE).get(FIELD_DB).asText();
+            return databaseName;
+        }
+        return root.get(key) != null ? root.get(key).asText() : null;
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/JsonPrimaryKeyDeserializationSchema.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/JsonPrimaryKeyDeserializationSchema.java
new file mode 100644
index 000000000..e854a9bb7
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/JsonPrimaryKeyDeserializationSchema.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.format.debezium;
+
+import org.apache.paimon.utils.JsonSerdeUtil;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
+import static org.apache.paimon.utils.Preconditions.checkNotNull;
+
+/**
+ * This class is used to deserialize byte[] messages into String format, and 
then add primary key
+ * fields to the JSON string.
+ */
+public class JsonPrimaryKeyDeserializationSchema implements 
DeserializationSchema<String> {
+
+    public static final String PRIMARY_KEY_NAMES = "pkNames";
+    private final List<String> primaryKeyNames;
+
+    public JsonPrimaryKeyDeserializationSchema(List<String> primaryKeyNames) {
+        this.primaryKeyNames = checkNotNull(primaryKeyNames);
+        if (this.primaryKeyNames.isEmpty()) {
+            throw new IllegalArgumentException("primary key must not be 
empty");
+        }
+    }
+
+    @Override
+    public String deserialize(byte[] message) {
+        try {
+            String value = new String(message, StandardCharsets.UTF_8);
+            return JsonSerdeUtil.putArrayToJsonString(value, 
PRIMARY_KEY_NAMES, primaryKeyNames);
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to deserialize message", e);
+        }
+    }
+
+    @Override
+    public boolean isEndOfStream(String nextElement) {
+        return false;
+    }
+
+    @Override
+    public TypeInformation<String> getProducedType() {
+        return BasicTypeInfo.STRING_TYPE_INFO;
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java
index 5f3c98f9b..8dceaeb5d 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java
@@ -20,6 +20,8 @@ package org.apache.paimon.flink.action.cdc.kafka;
 
 import org.apache.paimon.flink.action.cdc.MessageQueueSchemaUtils;
 import org.apache.paimon.flink.action.cdc.format.DataFormat;
+import 
org.apache.paimon.flink.action.cdc.format.debezium.JsonPrimaryKeyDeserializationSchema;
+import org.apache.paimon.utils.JsonSerdeUtil;
 import org.apache.paimon.utils.StringUtils;
 
 import org.apache.flink.api.common.serialization.SimpleStringSchema;
@@ -34,7 +36,6 @@ import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.util.CollectionUtil;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
@@ -54,6 +55,7 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.UUID;
 import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
 
 import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_STARTUP_SPECIFIC_OFFSETS;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
@@ -65,8 +67,14 @@ public class KafkaActionUtils {
 
     private static final String PARTITION = "partition";
     private static final String OFFSET = "offset";
+    private static final String DEBEZIUM_JSON = "debezium-json";
 
     public static KafkaSource<String> buildKafkaSource(Configuration 
kafkaConfig) {
+        return buildKafkaSource(kafkaConfig, new ArrayList<>());
+    }
+
+    public static KafkaSource<String> buildKafkaSource(
+            Configuration kafkaConfig, List<String> primaryKeys) {
         validateKafkaConfig(kafkaConfig);
         KafkaSourceBuilder<String> kafkaSourceBuilder = KafkaSource.builder();
 
@@ -77,8 +85,11 @@ public class KafkaActionUtils {
 
         kafkaSourceBuilder
                 .setTopics(topics)
-                .setValueOnlyDeserializer(new SimpleStringSchema())
-                .setGroupId(kafkaPropertiesGroupId(kafkaConfig));
+                .setGroupId(kafkaPropertiesGroupId(kafkaConfig))
+                .setValueOnlyDeserializer(
+                        
DEBEZIUM_JSON.equals(kafkaConfig.get(KafkaConnectorOptions.VALUE_FORMAT))
+                                ? new 
JsonPrimaryKeyDeserializationSchema(primaryKeys)
+                                : new SimpleStringSchema());
         Properties properties = new Properties();
         for (Map.Entry<String, String> entry : kafkaConfig.toMap().entrySet()) 
{
             String key = entry.getKey();
@@ -262,6 +273,11 @@ public class KafkaActionUtils {
 
     static MessageQueueSchemaUtils.ConsumerWrapper getKafkaEarliestConsumer(
             Configuration kafkaConfig, String topic) {
+        return getKafkaEarliestConsumer(kafkaConfig, topic, new ArrayList<>());
+    }
+
+    static MessageQueueSchemaUtils.ConsumerWrapper getKafkaEarliestConsumer(
+            Configuration kafkaConfig, String topic, List<String> primaryKeys) 
{
         Properties props = new Properties();
         props.put(
                 ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
@@ -286,26 +302,33 @@ public class KafkaActionUtils {
                 Collections.singletonList(new TopicPartition(topic, 
firstPartition));
         consumer.assign(topicPartitions);
         consumer.seekToBeginning(topicPartitions);
-
-        return new KafkaConsumerWrapper(consumer);
+        return new KafkaConsumerWrapper(
+                consumer,
+                
DEBEZIUM_JSON.equals(kafkaConfig.get(KafkaConnectorOptions.VALUE_FORMAT))
+                        ? primaryKeys
+                        : new ArrayList<>());
     }
 
     private static class KafkaConsumerWrapper implements 
MessageQueueSchemaUtils.ConsumerWrapper {
 
+        private static final String PK_NAMES_KEY = "pkNames";
+
         private final KafkaConsumer<String, String> consumer;
 
-        KafkaConsumerWrapper(KafkaConsumer<String, String> kafkaConsumer) {
+        private final List<String> pkNames;
+
+        KafkaConsumerWrapper(KafkaConsumer<String, String> kafkaConsumer, 
List<String> pkNames) {
             this.consumer = kafkaConsumer;
+            this.pkNames = pkNames;
         }
 
         @Override
         public List<String> getRecords(String topic, int pollTimeOutMills) {
             ConsumerRecords<String, String> consumerRecords =
                     consumer.poll(Duration.ofMillis(pollTimeOutMills));
-            Iterable<ConsumerRecord<String, String>> records = 
consumerRecords.records(topic);
-            List<String> result = new ArrayList<>();
-            records.forEach(r -> result.add(r.value()));
-            return result;
+            return 
StreamSupport.stream(consumerRecords.records(topic).spliterator(), false)
+                    .map(r -> JsonSerdeUtil.putArrayToJsonString(r.value(), 
PK_NAMES_KEY, pkNames))
+                    .collect(Collectors.toList());
         }
 
         @Override
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java
index badbb518a..0f5ce4f1b 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java
@@ -41,7 +41,7 @@ public class KafkaSyncTableAction extends 
MessageQueueSyncTableActionBase {
 
     @Override
     protected Source<String, ?, ?> buildSource() {
-        return KafkaActionUtils.buildKafkaSource(mqConfig);
+        return KafkaActionUtils.buildKafkaSource(mqConfig, primaryKeys);
     }
 
     @Override
@@ -51,7 +51,7 @@ public class KafkaSyncTableAction extends 
MessageQueueSyncTableActionBase {
 
     @Override
     protected MessageQueueSchemaUtils.ConsumerWrapper consumer(String topic) {
-        return KafkaActionUtils.getKafkaEarliestConsumer(mqConfig, topic);
+        return KafkaActionUtils.getKafkaEarliestConsumer(mqConfig, topic, 
primaryKeys);
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableActionITCase.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableActionITCase.java
new file mode 100644
index 000000000..f73734df3
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableActionITCase.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.kafka;
+
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/** IT cases for {@link KafkaDebeziumSyncTableActionITCase}. */
+public class KafkaDebeziumSyncTableActionITCase extends KafkaActionITCaseBase {
+
+    @Test
+    @Timeout(60)
+    public void testSchemaEvolution() throws Exception {
+        runSingleTableSchemaEvolution("schemaevolution");
+    }
+
+    private void runSingleTableSchemaEvolution(String sourceDir) throws 
Exception {
+        final String topic = "schema_evolution";
+        createTestTopic(topic, 1, 1);
+        // ---------- Write the debezium json into Kafka -------------------
+        List<String> lines =
+                
readLines(String.format("kafka/debezium/table/%s/debezium-data-1.txt", 
sourceDir));
+        try {
+            writeRecordsToKafka(topic, lines);
+        } catch (Exception e) {
+            throw new Exception("Failed to write debezium data to Kafka.", e);
+        }
+        Map<String, String> kafkaConfig = getBasicKafkaConfig();
+        kafkaConfig.put("value.format", "debezium-json");
+        kafkaConfig.put("topic", topic);
+        KafkaSyncTableAction action =
+                syncTableActionBuilder(kafkaConfig)
+                        .withPrimaryKeys("id")
+                        .withTableConfig(getBasicTableConfig())
+                        .build();
+        runActionWithDefaultEnv(action);
+
+        testSchemaEvolutionImpl(topic, sourceDir);
+    }
+
+    private void testSchemaEvolutionImpl(String topic, String sourceDir) 
throws Exception {
+        FileStoreTable table = getFileStoreTable(tableName);
+
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.STRING().notNull(),
+                            DataTypes.STRING(),
+                            DataTypes.STRING(),
+                            DataTypes.STRING()
+                        },
+                        new String[] {"id", "name", "description", "weight"});
+        List<String> primaryKeys = Collections.singletonList("id");
+        List<String> expected =
+                Arrays.asList(
+                        "+I[101, scooter, Small 2-wheel scooter, 3.14]",
+                        "+I[102, car battery, 12V car battery, 8.1]");
+        waitForResult(expected, table, rowType, primaryKeys);
+
+        try {
+            writeRecordsToKafka(
+                    topic,
+                    readLines(
+                            String.format(
+                                    
"kafka/debezium/table/%s/debezium-data-2.txt", sourceDir)));
+        } catch (Exception e) {
+            throw new Exception("Failed to write debezium data to Kafka.", e);
+        }
+        rowType =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.STRING().notNull(),
+                            DataTypes.STRING(),
+                            DataTypes.STRING(),
+                            DataTypes.STRING(),
+                            DataTypes.STRING()
+                        },
+                        new String[] {"id", "name", "description", "weight", 
"age"});
+        expected =
+                Arrays.asList(
+                        "+I[101, scooter, Small 2-wheel scooter, 3.14, NULL]",
+                        "+I[102, car battery, 12V car battery, 8.1, NULL]",
+                        "+I[103, 12-pack drill bits, 12-pack of drill bits 
with sizes ranging from #40 to #3, 0.8, 18]",
+                        "+I[104, hammer, 12oz carpenter's hammer, 0.75, 24]");
+        waitForResult(expected, table, rowType, primaryKeys);
+
+        try {
+            writeRecordsToKafka(
+                    topic,
+                    readLines(
+                            String.format(
+                                    
"kafka/debezium/table/%s/debezium-data-3.txt", sourceDir)));
+        } catch (Exception e) {
+            throw new Exception("Failed to write debezium data to Kafka.", e);
+        }
+        rowType =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.STRING().notNull(),
+                            DataTypes.STRING(),
+                            DataTypes.STRING(),
+                            DataTypes.STRING(),
+                            DataTypes.STRING(),
+                            DataTypes.STRING()
+                        },
+                        new String[] {"id", "name", "description", "weight", 
"age", "address"});
+        expected =
+                Arrays.asList(
+                        "+I[102, car battery, 12V car battery, 8.1, NULL, 
NULL]",
+                        "+I[103, 12-pack drill bits, 12-pack of drill bits 
with sizes ranging from #40 to #3, 0.8, 18, NULL]",
+                        "+I[104, hammer, 12oz carpenter's hammer, 0.75, 24, 
NULL]",
+                        "+I[105, hammer, 14oz carpenter's hammer, 0.875, NULL, 
Beijing]",
+                        "+I[107, rocks, box of assorted rocks, 5.3, NULL, 
NULL]");
+        waitForResult(expected, table, rowType, primaryKeys);
+    }
+
+    @Test
+    @Timeout(60)
+    public void testComputedColumn() throws Exception {
+        String topic = "computed_column";
+        createTestTopic(topic, 1, 1);
+
+        List<String> lines = 
readLines("kafka/debezium/table/computedcolumn/debezium-data-1.txt");
+        try {
+            writeRecordsToKafka(topic, lines);
+        } catch (Exception e) {
+            throw new Exception("Failed to write canal data to Kafka.", e);
+        }
+        Map<String, String> kafkaConfig = getBasicKafkaConfig();
+        kafkaConfig.put("value.format", "debezium-json");
+        kafkaConfig.put("topic", topic);
+        KafkaSyncTableAction action =
+                syncTableActionBuilder(kafkaConfig)
+                        .withPrimaryKeys("id")
+                        .withComputedColumnArgs("_year=year(date)")
+                        .withTableConfig(getBasicTableConfig())
+                        .build();
+        runActionWithDefaultEnv(action);
+
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.STRING().notNull(), DataTypes.STRING(), 
DataTypes.INT()
+                        },
+                        new String[] {"id", "date", "_year"});
+        waitForResult(
+                Collections.singletonList("+I[101, 2023-03-23, 2023]"),
+                getFileStoreTable(tableName),
+                rowType,
+                Collections.singletonList("id"));
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/computedcolumn/debezium-data-1.txt
 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/computedcolumn/debezium-data-1.txt
new file mode 100644
index 000000000..5a571d3fd
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/computedcolumn/debezium-data-1.txt
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"before": null, "after": {"id": 101, "date": "2023-03-23"}, "source": 
{"version": "1.9.7.Final", "connector": "mysql", "name": "mysql_binlog_source", 
"ts_ms": 1596684883000, "snapshot": "false", "db": "test", "sequence": null, 
"table": "product", "server_id": 0, "gtid": null, "file": "", "pos": 0, "row": 
0, "thread": null, "query": null}, "op": "c", "ts_ms": 1596684883000, 
"transaction": null}
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/schemaevolution/debezium-data-1.txt
 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/schemaevolution/debezium-data-1.txt
new file mode 100644
index 000000000..b3ff4e23a
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/schemaevolution/debezium-data-1.txt
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"before": null, "after": {"id": 101, "name": "scooter", "description": "Small 
2-wheel scooter", "weight": 3.14}, "source": {"version": "1.9.7.Final", 
"connector": "mysql", "name": "mysql_binlog_source", "ts_ms": 1596684883000, 
"snapshot": "false", "db": "test", "sequence": null, "table": "product", 
"server_id": 0, "gtid": null, "file": "", "pos": 0, "row": 0, "thread": null, 
"query": null}, "op": "c", "ts_ms": 1596684883000, "transaction": null}
+{"before": null, "after": {"id": 102, "name": "car battery", "description": 
"12V car battery", "weight": 8.1}, "source": {"version": "1.9.7.Final", 
"connector": "mysql", "name": "mysql_binlog_source", "ts_ms": 1596684883000, 
"snapshot": "false", "db": "test", "sequence": null, "table": "product", 
"server_id": 0, "gtid": null, "file": "", "pos": 0, "row": 0, "thread": null, 
"query": null}, "op": "c", "ts_ms": 1596684883000, "transaction": null}
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/schemaevolution/debezium-data-2.txt
 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/schemaevolution/debezium-data-2.txt
new file mode 100644
index 000000000..528f96522
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/schemaevolution/debezium-data-2.txt
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"before": null, "after": {"id": 103, "name": "12-pack drill bits", 
"description": "12-pack of drill bits with sizes ranging from #40 to #3", 
"weight": 0.8, "age": 18}, "source": {"version": "1.9.7.Final", "connector": 
"mysql", "name": "mysql_binlog_source", "ts_ms": 1596684883000, "snapshot": 
"false", "db": "test", "sequence": null, "table": "product", "server_id": 0, 
"gtid": null, "file": "", "pos": 0, "row": 0, "thread": null, "query": null}, 
"op": "c", "ts_ms": 1596684883000, "transa [...]
+{"before": null, "after": {"id": 104, "name": "hammer", "description": "12oz 
carpenter's hammer", "weight": 0.75, "age": 24}, "source": {"version": 
"1.9.7.Final", "connector": "mysql", "name": "mysql_binlog_source", "ts_ms": 
1596684883000, "snapshot": "false", "db": "test", "sequence": null, "table": 
"product", "server_id": 0, "gtid": null, "file": "", "pos": 0, "row": 0, 
"thread": null, "query": null}, "op": "c", "ts_ms": 1596684883000, 
"transaction": null}
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/schemaevolution/debezium-data-3.txt
 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/schemaevolution/debezium-data-3.txt
new file mode 100644
index 000000000..79210f88d
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/schemaevolution/debezium-data-3.txt
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"before": null, "after": {"id": 105, "name": "hammer", "description": "14oz 
carpenter's hammer", "weight": 0.875, "address": "Shanghai"}, "source": 
{"version": "1.9.7.Final", "connector": "mysql", "name": "mysql_binlog_source", 
"ts_ms": 1596684883000, "snapshot": "false", "db": "test", "sequence": null, 
"table": "product", "server_id": 0, "gtid": null, "file": "", "pos": 0, "row": 
0, "thread": null, "query": null}, "op": "c", "ts_ms": 1596684883000, 
"transaction": null}
+{"before": {"id": 101, "name": "scooter", "description": "Small 2-wheel 
scooter", "weight": 3.14}, "after": null, "source": {"version": "1.9.7.Final", 
"connector": "mysql", "name": "mysql_binlog_source", "ts_ms": 1596684883000, 
"snapshot": "false", "db": "test", "sequence": null, "table": "product", 
"server_id": 0, "gtid": null, "file": "", "pos": 0, "row": 0, "thread": null, 
"query": null}, "op": "d", "ts_ms": 1596684883000, "transaction": null}
+{"before": {"address": "Shanghai"}, "after": {"id": 105, "name": "hammer", 
"description": "14oz carpenter's hammer", "weight": 0.875, "address": 
"Beijing"}, "source": {"version": "1.9.7.Final", "connector": "mysql", "name": 
"mysql_binlog_source", "ts_ms": 1596684906000, "snapshot": "false", "db": 
"test", "sequence": null, "table": "product", "server_id": 0, "gtid": null, 
"file": "", "pos": 0, "row": 0, "thread": null, "query": null}, "op": "u", 
"ts_ms": 1596684906000, "transaction": null}
+{"before": null, "after": {"id": 107, "name": "rocks", "description": "box of 
assorted rocks", "weight": 5.3}, "source": {"version": "1.9.7.Final", 
"connector": "mysql", "name": "mysql_binlog_source", "ts_ms": 1596684883000, 
"snapshot": "false", "db": "test", "sequence": null, "table": "product", 
"server_id": 0, "gtid": null, "file": "", "pos": 0, "row": 0, "thread": null, 
"query": null}, "op": "c", "ts_ms": 1596684883000, "transaction": null}


Reply via email to