This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new a16abd5d2 [FLINK-36913][pipeline-connector][kafka] Introduce option to 
define custom mapping from upstream table id to downstream topic name
a16abd5d2 is described below

commit a16abd5d243fdf045cf1ed1be734b14dcc11458b
Author: Kunni <lvyanquan....@alibaba-inc.com>
AuthorDate: Thu Jan 16 20:23:38 2025 +0800

    [FLINK-36913][pipeline-connector][kafka] Introduce option to define custom 
mapping from upstream table id to downstream topic name
    
    This closes #3805
---
 .../docs/connectors/pipeline-connectors/kafka.md   |  7 +++
 .../docs/connectors/pipeline-connectors/kafka.md   |  7 +++
 .../cdc/connectors/kafka/sink/KafkaDataSink.java   |  9 ++-
 .../kafka/sink/KafkaDataSinkFactory.java           |  6 +-
 .../kafka/sink/KafkaDataSinkOptions.java           | 21 +++++++
 .../PipelineKafkaRecordSerializationSchema.java    | 35 ++++++++++-
 .../cdc/connectors/kafka/utils/KafkaSinkUtils.java | 52 +++++++++++++++++
 .../kafka/sink/KafkaDataSinkFactoryTest.java       |  3 +
 .../connectors/kafka/sink/KafkaDataSinkITCase.java | 67 ++++++++++++++++++++--
 9 files changed, 198 insertions(+), 9 deletions(-)

diff --git a/docs/content.zh/docs/connectors/pipeline-connectors/kafka.md 
b/docs/content.zh/docs/connectors/pipeline-connectors/kafka.md
index 66467b4d5..136eb4d4d 100644
--- a/docs/content.zh/docs/connectors/pipeline-connectors/kafka.md
+++ b/docs/content.zh/docs/connectors/pipeline-connectors/kafka.md
@@ -143,6 +143,13 @@ Pipeline 连接器配置项
       <td>String</td>
       <td>Kafka 记录自定义的 Header。每个 Header 使用 ','分割, 键值使用 ':' 分割。举例来说,可以使用这种方式 
'key1:value1,key2:value2'。 </td>
     </tr>
+    <tr>
+      <td>sink.tableId-to-topic.mapping</td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>自定义的上游表名到下游 Kafka Topic 名的映射关系。 每个映射关系由 `;` 分割,上游表的 TableId 和下游 
Kafka 的 Topic 名由 `:` 分割。 举个例子,我们可以配置 `sink.tableId-to-topic.mapping` 的值为 
`mydb.mytable1:topic1;mydb.mytable2:topic2`。 </td>
+    </tr>
     </tbody>
 </table>    
 </div>
diff --git a/docs/content/docs/connectors/pipeline-connectors/kafka.md 
b/docs/content/docs/connectors/pipeline-connectors/kafka.md
index 6bb94bc37..619ade45c 100644
--- a/docs/content/docs/connectors/pipeline-connectors/kafka.md
+++ b/docs/content/docs/connectors/pipeline-connectors/kafka.md
@@ -141,6 +141,13 @@ Pipeline Connector Options
       <td>String</td>
       <td>custom headers for each kafka record. Each header are separated by 
',', separate key and value by ':'. For example, we can set headers like 
'key1:value1,key2:value2'. </td>
     </tr>
+    <tr>
+      <td>sink.tableId-to-topic.mapping</td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Custom table mappings for each table from upstream tableId to 
downstream Kafka topic. Each mapping is separated by `;`, separate upstream 
tableId and downstream Kafka topic by `:`, For example, we can set 
`sink.tableId-to-topic.mapping` like 
`mydb.mytable1:topic1;mydb.mytable2:topic2`. </td>
+    </tr>
     </tbody>
 </table>    
 </div>
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSink.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSink.java
index 51e8b180d..1970a9b82 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSink.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSink.java
@@ -53,6 +53,8 @@ public class KafkaDataSink implements DataSink {
 
     final String customHeaders;
 
+    final String tableMapping;
+
     public KafkaDataSink(
             DeliveryGuarantee deliveryGuarantee,
             Properties kafkaProperties,
@@ -62,7 +64,8 @@ public class KafkaDataSink implements DataSink {
             SerializationSchema<Event> valueSerialization,
             String topic,
             boolean addTableToHeaderEnabled,
-            String customHeaders) {
+            String customHeaders,
+            String tableMapping) {
         this.deliveryGuarantee = deliveryGuarantee;
         this.kafkaProperties = kafkaProperties;
         this.partitionStrategy = partitionStrategy;
@@ -72,6 +75,7 @@ public class KafkaDataSink implements DataSink {
         this.topic = topic;
         this.addTableToHeaderEnabled = addTableToHeaderEnabled;
         this.customHeaders = customHeaders;
+        this.tableMapping = tableMapping;
     }
 
     @Override
@@ -92,7 +96,8 @@ public class KafkaDataSink implements DataSink {
                                         valueSerialization,
                                         topic,
                                         addTableToHeaderEnabled,
-                                        customHeaders))
+                                        customHeaders,
+                                        tableMapping))
                         .build());
     }
 
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkFactory.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkFactory.java
index d89812913..53c821517 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkFactory.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkFactory.java
@@ -41,6 +41,7 @@ import static 
org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.PA
 import static 
org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.PROPERTIES_PREFIX;
 import static 
org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.SINK_ADD_TABLEID_TO_HEADER_ENABLED;
 import static 
org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.SINK_CUSTOM_HEADER;
+import static 
org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.SINK_TABLE_ID_TO_TOPIC_MAPPING;
 import static 
org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.TOPIC;
 import static 
org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.VALUE_FORMAT;
 
@@ -92,6 +93,7 @@ public class KafkaDataSinkFactory implements DataSinkFactory {
                 
context.getFactoryConfiguration().get(KafkaDataSinkOptions.SINK_CUSTOM_HEADER);
         PartitionStrategy partitionStrategy =
                 
context.getFactoryConfiguration().get(KafkaDataSinkOptions.PARTITION_STRATEGY);
+        String tableMapping = 
context.getFactoryConfiguration().get(SINK_TABLE_ID_TO_TOPIC_MAPPING);
         return new KafkaDataSink(
                 deliveryGuarantee,
                 kafkaProperties,
@@ -101,7 +103,8 @@ public class KafkaDataSinkFactory implements 
DataSinkFactory {
                 valueSerialization,
                 topic,
                 addTableToHeaderEnabled,
-                customHeaders);
+                customHeaders,
+                tableMapping);
     }
 
     @Override
@@ -124,6 +127,7 @@ public class KafkaDataSinkFactory implements 
DataSinkFactory {
         options.add(SINK_ADD_TABLEID_TO_HEADER_ENABLED);
         options.add(SINK_CUSTOM_HEADER);
         options.add(KafkaDataSinkOptions.DELIVERY_GUARANTEE);
+        options.add(SINK_TABLE_ID_TO_TOPIC_MAPPING);
         return options;
     }
 }
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkOptions.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkOptions.java
index ca82f5c80..1b7bbf9bd 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkOptions.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkOptions.java
@@ -18,6 +18,7 @@
 package org.apache.flink.cdc.connectors.kafka.sink;
 
 import org.apache.flink.cdc.common.configuration.ConfigOption;
+import org.apache.flink.cdc.common.configuration.description.Description;
 import org.apache.flink.cdc.connectors.kafka.json.JsonSerializationType;
 import org.apache.flink.connector.base.DeliveryGuarantee;
 
@@ -29,6 +30,10 @@ public class KafkaDataSinkOptions {
     // Prefix for Kafka specific properties.
     public static final String PROPERTIES_PREFIX = "properties.";
 
+    public static final String DELIMITER_TABLE_MAPPINGS = ";";
+
+    public static final String DELIMITER_SELECTOR_TOPIC = ":";
+
     public static final ConfigOption<DeliveryGuarantee> DELIVERY_GUARANTEE =
             key("sink.delivery-guarantee")
                     .enumType(DeliveryGuarantee.class)
@@ -79,4 +84,20 @@ public class KafkaDataSinkOptions {
                     .defaultValue("")
                     .withDescription(
                             "custom headers for each kafka record. Each header 
are separated by ',', separate key and value by ':'. For example, we can set 
headers like 'key1:value1,key2:value2'.");
+
+    public static final ConfigOption<String> SINK_TABLE_ID_TO_TOPIC_MAPPING =
+            key("sink.tableId-to-topic.mapping")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "Custom table mappings for each 
table from upstream tableId to downstream Kafka topic. Each mapping is 
separated by ")
+                                    .text(DELIMITER_TABLE_MAPPINGS)
+                                    .text(
+                                            ", separate upstream tableId 
selectors and downstream Kafka topic by ")
+                                    .text(DELIMITER_SELECTOR_TOPIC)
+                                    .text(
+                                            ". For example, we can set 
'sink.tableId-to-topic.mappingg' like 
'mydb.mytable1:topic1;mydb.mytable2:topic2'.")
+                                    .build());
 }
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/PipelineKafkaRecordSerializationSchema.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/PipelineKafkaRecordSerializationSchema.java
index 85e5e3f19..6a65f836a 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/PipelineKafkaRecordSerializationSchema.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/PipelineKafkaRecordSerializationSchema.java
@@ -22,6 +22,8 @@ import org.apache.flink.cdc.common.event.ChangeEvent;
 import org.apache.flink.cdc.common.event.Event;
 import org.apache.flink.cdc.common.event.SchemaChangeEvent;
 import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.schema.Selectors;
+import org.apache.flink.cdc.connectors.kafka.utils.KafkaSinkUtils;
 import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
 
 import org.apache.kafka.clients.producer.ProducerRecord;
@@ -57,6 +59,13 @@ public class PipelineKafkaRecordSerializationSchema
     // key value pairs to be put into Kafka Record Header.
     public final Map<String, String> customHeaders;
 
+    private final String mappingRuleString;
+
+    private Map<Selectors, String> selectorsToTopicMap;
+
+    // A cache to speed up TableId to Topic mapping.
+    private Map<TableId, String> tableIdToTopicCache;
+
     public static final String NAMESPACE_HEADER_KEY = "namespace";
 
     public static final String SCHEMA_NAME_HEADER_KEY = "schemaName";
@@ -69,7 +78,8 @@ public class PipelineKafkaRecordSerializationSchema
             SerializationSchema<Event> valueSerialization,
             String unifiedTopic,
             boolean addTableToHeaderEnabled,
-            String customHeaderString) {
+            String customHeaderString,
+            String mappingRuleString) {
         this.keySerialization = keySerialization;
         this.valueSerialization = checkNotNull(valueSerialization);
         this.unifiedTopic = unifiedTopic;
@@ -90,6 +100,7 @@ public class PipelineKafkaRecordSerializationSchema
             }
         }
         partition = partitionStrategy.equals(PartitionStrategy.ALL_TO_ZERO) ? 
0 : null;
+        this.mappingRuleString = mappingRuleString;
     }
 
     @Override
@@ -102,7 +113,7 @@ public class PipelineKafkaRecordSerializationSchema
             // skip sending SchemaChangeEvent.
             return null;
         }
-        String topic = unifiedTopic == null ? changeEvent.tableId().toString() 
: unifiedTopic;
+        String topic = inferTopicName(changeEvent.tableId());
         RecordHeaders recordHeaders = new RecordHeaders();
         if (addTableToHeaderEnabled) {
             String namespace =
@@ -128,10 +139,30 @@ public class PipelineKafkaRecordSerializationSchema
                 topic, partition, null, keySerialized, valueSerialized, 
recordHeaders);
     }
 
+    private String inferTopicName(TableId tableId) {
+        return tableIdToTopicCache.computeIfAbsent(
+                tableId,
+                (table -> {
+                    if (unifiedTopic != null && !unifiedTopic.isEmpty()) {
+                        return unifiedTopic;
+                    }
+                    if (selectorsToTopicMap != null && 
!selectorsToTopicMap.isEmpty()) {
+                        for (Map.Entry<Selectors, String> entry : 
selectorsToTopicMap.entrySet()) {
+                            if (entry.getKey().isMatch(tableId)) {
+                                return entry.getValue();
+                            }
+                        }
+                    }
+                    return table.toString();
+                }));
+    }
+
     @Override
     public void open(
             SerializationSchema.InitializationContext context, 
KafkaSinkContext sinkContext)
             throws Exception {
+        this.selectorsToTopicMap = 
KafkaSinkUtils.parseSelectorsToTopicMap(mappingRuleString);
+        this.tableIdToTopicCache = new HashMap<>();
         valueSerialization.open(context);
     }
 }
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/utils/KafkaSinkUtils.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/utils/KafkaSinkUtils.java
new file mode 100644
index 000000000..b013bba9f
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/utils/KafkaSinkUtils.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.kafka.utils;
+
+import org.apache.flink.cdc.common.schema.Selectors;
+import org.apache.flink.cdc.common.utils.Preconditions;
+import org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import static 
org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.DELIMITER_SELECTOR_TOPIC;
+import static 
org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.DELIMITER_TABLE_MAPPINGS;
+
+/** Util class for {@link 
org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSink}. */
+public class KafkaSinkUtils {
+
+    /** Parse the mapping text to a map from Selectors to Kafka Topic name. */
+    public static Map<Selectors, String> parseSelectorsToTopicMap(String 
mappingRuleString) {
+        // Keep the order.
+        Map<Selectors, String> result = new LinkedHashMap<>();
+        if (mappingRuleString == null || mappingRuleString.isEmpty()) {
+            return result;
+        }
+        for (String mapping : 
mappingRuleString.split(DELIMITER_TABLE_MAPPINGS)) {
+            String[] selectorsAndTopic = 
mapping.split(DELIMITER_SELECTOR_TOPIC);
+            Preconditions.checkArgument(
+                    selectorsAndTopic.length == 2,
+                    "Please check your configuration of "
+                            + 
KafkaDataSinkOptions.SINK_TABLE_ID_TO_TOPIC_MAPPING);
+            Selectors selectors =
+                    new 
Selectors.SelectorsBuilder().includeTables(selectorsAndTopic[0]).build();
+            result.put(selectors, selectorsAndTopic[1]);
+        }
+        return result;
+    }
+}
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkFactoryTest.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkFactoryTest.java
index f0736dd0b..c473afb30 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkFactoryTest.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkFactoryTest.java
@@ -39,6 +39,9 @@ public class KafkaDataSinkFactoryTest {
         
Assertions.assertThat(sinkFactory).isInstanceOf(KafkaDataSinkFactory.class);
 
         Configuration conf = Configuration.fromMap(ImmutableMap.<String, 
String>builder().build());
+        conf.set(
+                KafkaDataSinkOptions.SINK_TABLE_ID_TO_TOPIC_MAPPING,
+                "mydb.mytable1:topic1;mydb.mytable2:topic2");
         DataSink dataSink =
                 sinkFactory.createDataSink(
                         new FactoryHelper.DefaultContext(
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkITCase.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkITCase.java
index ddce0a9e4..bb5d4b880 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkITCase.java
@@ -263,8 +263,7 @@ class KafkaDataSinkITCase extends TestLogger {
 
         final List<ConsumerRecord<byte[], byte[]>> collectedRecords =
                 drainAllRecordsFromTopic(topic, false, 0);
-        final long recordsCount = 5;
-        assertThat(recordsCount).isEqualTo(collectedRecords.size());
+        assertThat(collectedRecords).hasSize(5);
         ObjectMapper mapper =
                 JacksonMapperFactory.createObjectMapper()
                         
.configure(JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN, false);
@@ -325,8 +324,7 @@ class KafkaDataSinkITCase extends TestLogger {
 
         final List<ConsumerRecord<byte[], byte[]>> collectedRecords =
                 drainAllRecordsFromTopic(topic, false, 0);
-        final long recordsCount = 5;
-        assertThat(recordsCount).isEqualTo(collectedRecords.size());
+        assertThat(collectedRecords).hasSize(5);
         for (ConsumerRecord<byte[], byte[]> consumerRecord : collectedRecords) 
{
             assertThat(
                             consumerRecord
@@ -560,6 +558,67 @@ class KafkaDataSinkITCase extends TestLogger {
         checkProducerLeak();
     }
 
+    @Test
+    void testSinkTableMapping() throws Exception {
+        final StreamExecutionEnvironment env = new LocalStreamEnvironment();
+        env.enableCheckpointing(1000L);
+        env.setRestartStrategy(RestartStrategies.noRestart());
+        final DataStream<Event> source = env.fromData(createSourceEvents(), 
new EventTypeInfo());
+        Map<String, String> config = new HashMap<>();
+        config.put(
+                KafkaDataSinkOptions.SINK_TABLE_ID_TO_TOPIC_MAPPING.key(),
+                
"default_namespace.default_schema_copy.\\.*:test_topic_mapping_copy;default_namespace.default_schema.\\.*:test_topic_mapping");
+        Properties properties = getKafkaClientConfiguration();
+        properties.forEach(
+                (key, value) ->
+                        config.put(
+                                KafkaDataSinkOptions.PROPERTIES_PREFIX + 
key.toString(),
+                                value.toString()));
+        source.sinkTo(
+                ((FlinkSinkProvider)
+                                (new KafkaDataSinkFactory()
+                                        .createDataSink(
+                                                new 
FactoryHelper.DefaultContext(
+                                                        
Configuration.fromMap(config),
+                                                        
Configuration.fromMap(new HashMap<>()),
+                                                        
this.getClass().getClassLoader()))
+                                        .getEventSinkProvider()))
+                        .getSink());
+        env.execute();
+
+        final List<ConsumerRecord<byte[], byte[]>> collectedRecords =
+                drainAllRecordsFromTopic("test_topic_mapping", false, 0);
+        final long recordsCount = 5;
+        assertThat(recordsCount).isEqualTo(collectedRecords.size());
+        ObjectMapper mapper =
+                JacksonMapperFactory.createObjectMapper()
+                        
.configure(JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN, false);
+        List<JsonNode> expected =
+                Arrays.asList(
+                        mapper.readTree(
+                                String.format(
+                                        
"{\"before\":null,\"after\":{\"col1\":\"1\",\"col2\":\"1\"},\"op\":\"c\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}",
+                                        table1.getTableName())),
+                        mapper.readTree(
+                                String.format(
+                                        
"{\"before\":null,\"after\":{\"col1\":\"2\",\"col2\":\"2\"},\"op\":\"c\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}",
+                                        table1.getTableName())),
+                        mapper.readTree(
+                                String.format(
+                                        
"{\"before\":null,\"after\":{\"col1\":\"3\",\"col2\":\"3\"},\"op\":\"c\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}",
+                                        table1.getTableName())),
+                        mapper.readTree(
+                                String.format(
+                                        
"{\"before\":{\"col1\":\"1\",\"newCol3\":\"1\"},\"after\":null,\"op\":\"d\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}",
+                                        table1.getTableName())),
+                        mapper.readTree(
+                                String.format(
+                                        
"{\"before\":{\"col1\":\"2\",\"newCol3\":\"\"},\"after\":{\"col1\":\"2\",\"newCol3\":\"x\"},\"op\":\"u\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}",
+                                        table1.getTableName())));
+        assertThat(deserializeValues(collectedRecords)).containsAll(expected);
+        checkProducerLeak();
+    }
+
     private List<ConsumerRecord<byte[], byte[]>> drainAllRecordsFromTopic(
             String topic, boolean committed, int... partitionArr) {
         Properties properties = getKafkaClientConfiguration();

Reply via email to