This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 8aff80730 [Improve][Connector-V2][Kafka] Support extract topic from 
SeaTunnelRow field (#3742)
8aff80730 is described below

commit 8aff807305650ea2e2da8ce2faef0676554da484
Author: TaoZex <[email protected]>
AuthorDate: Fri Jan 20 19:20:06 2023 +0800

    [Improve][Connector-V2][Kafka] Support extract topic from SeaTunnelRow 
field (#3742)
---
 docs/en/connector-v2/sink/Kafka.md                 | 18 ++++-
 .../serialize/DefaultSeaTunnelRowSerializer.java   | 23 +++---
 .../kafka/serialize/SeaTunnelRowSerializer.java    |  2 +-
 .../seatunnel/kafka/sink/KafkaSinkWriter.java      | 44 +++++++++++-
 .../connector-kafka-e2e/pom.xml                    |  6 ++
 .../seatunnel/e2e/connector/kafka/KafkaIT.java     | 83 +++++++++++-----------
 .../test/resources/extractTopic_fake_to_kafka.conf | 77 ++++++++++++++++++++
 7 files changed, 197 insertions(+), 56 deletions(-)

diff --git a/docs/en/connector-v2/sink/Kafka.md 
b/docs/en/connector-v2/sink/Kafka.md
index 6908597fc..be4f87db8 100644
--- a/docs/en/connector-v2/sink/Kafka.md
+++ b/docs/en/connector-v2/sink/Kafka.md
@@ -31,6 +31,21 @@ By default, we will use 2pc to guarantee the message is sent 
to kafka exactly on
 
 Kafka Topic.
 
+Currently two formats are supported:
+
+1. Fill in the name of the topic.
+
+2. Use value of a field from upstream data as topic,the format is `${your 
field name}`, where topic is the value of one of the columns of the upstream 
data.
+
+   For example, Upstream data is the following:
+
+    | name | age  | data          |
+    | ---- | ---- | ------------- |
+    | Jack | 16   | data-example1 |
+    | Mary | 23   | data-example2 |
+
+   If `${name}` is set as the topic. So the first row is sent to Jack topic, 
and the second row is sent to Mary topic.
+
 ### bootstrap.servers [string]
 
 Kafka Brokers List.
@@ -190,4 +205,5 @@ sink {
 
 - [Improve] Support to specify multiple partition keys 
[3230](https://github.com/apache/incubator-seatunnel/pull/3230)
 - [Improve] Add text format for kafka sink connector 
[3711](https://github.com/apache/incubator-seatunnel/pull/3711)
-- [Improve] Change Connector Custom Config Prefix To Map 
[3719](https://github.com/apache/incubator-seatunnel/pull/3719)
\ No newline at end of file
+- [Improve] Support extract topic from SeaTunnelRow fields 
[3742](https://github.com/apache/incubator-seatunnel/pull/3742)
+- [Improve] Change Connector Custom Config Prefix To Map 
[3719](https://github.com/apache/incubator-seatunnel/pull/3719)
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
index 58e8ef25d..1e9ed6422 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
@@ -37,36 +37,37 @@ import java.util.function.Function;
 public class DefaultSeaTunnelRowSerializer implements 
SeaTunnelRowSerializer<byte[], byte[]> {
 
     private Integer partition;
-    private final String topic;
     private final SerializationSchema keySerialization;
     private final SerializationSchema valueSerialization;
 
-    public DefaultSeaTunnelRowSerializer(String topic, SeaTunnelRowType 
seaTunnelRowType, String format, String delimiter) {
-        this(topic, element -> null, 
createSerializationSchema(seaTunnelRowType, format, delimiter));
+    public DefaultSeaTunnelRowSerializer(SeaTunnelRowType seaTunnelRowType,
+                                         String format,
+                                         String delimiter) {
+        this(element -> null, createSerializationSchema(seaTunnelRowType, 
format, delimiter));
     }
 
-    public DefaultSeaTunnelRowSerializer(String topic, Integer partition, 
SeaTunnelRowType seaTunnelRowType, String format, String delimiter) {
-        this(topic, seaTunnelRowType, format, delimiter);
+    public DefaultSeaTunnelRowSerializer(Integer partition,
+                                         SeaTunnelRowType seaTunnelRowType,
+                                         String format, String delimiter) {
+        this(seaTunnelRowType, format, delimiter);
         this.partition = partition;
     }
 
-    public DefaultSeaTunnelRowSerializer(String topic, List<String> 
keyFieldNames,
+    public DefaultSeaTunnelRowSerializer(List<String> keyFieldNames,
                                          SeaTunnelRowType seaTunnelRowType,
                                          String format, String delimiter) {
-        this(topic, createKeySerializationSchema(keyFieldNames, 
seaTunnelRowType),
+        this(createKeySerializationSchema(keyFieldNames, seaTunnelRowType),
                 createSerializationSchema(seaTunnelRowType, format, 
delimiter));
     }
 
-    public DefaultSeaTunnelRowSerializer(String topic,
-                                         SerializationSchema keySerialization,
+    public DefaultSeaTunnelRowSerializer(SerializationSchema keySerialization,
                                          SerializationSchema 
valueSerialization) {
-        this.topic = topic;
         this.keySerialization = keySerialization;
         this.valueSerialization = valueSerialization;
     }
 
     @Override
-    public ProducerRecord<byte[], byte[]> serializeRow(SeaTunnelRow row) {
+    public ProducerRecord<byte[], byte[]> serializeRow(String topic, 
SeaTunnelRow row) {
         return new ProducerRecord<>(topic, partition,
                 keySerialization.serialize(row), 
valueSerialization.serialize(row));
     }
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/SeaTunnelRowSerializer.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/SeaTunnelRowSerializer.java
index 9f12591ea..5f31ca001 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/SeaTunnelRowSerializer.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/SeaTunnelRowSerializer.java
@@ -29,5 +29,5 @@ public interface SeaTunnelRowSerializer<K, V> {
      * @param row seatunnel row
      * @return kafka record.
      */
-    ProducerRecord<K, V> serializeRow(SeaTunnelRow row);
+    ProducerRecord<K, V> serializeRow(String topic, SeaTunnelRow row);
 }
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
index 94e4ec922..ffec8130e 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
@@ -42,6 +42,7 @@ import 
org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaSinkState;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
@@ -52,6 +53,8 @@ import java.util.List;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.Random;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 /**
  * KafkaSinkWriter is a sink writer that will write {@link SeaTunnelRow} to 
Kafka.
@@ -61,7 +64,10 @@ public class KafkaSinkWriter implements 
SinkWriter<SeaTunnelRow, KafkaCommitInfo
     private final SinkWriter.Context context;
 
     private String transactionPrefix;
+    private String topic;
     private long lastCheckpointId = 0;
+    private boolean isExtractTopic;
+    private SeaTunnelRowType seaTunnelRowType;
 
     private final KafkaProduceSender<byte[], byte[]> kafkaProducerSender;
     private final SeaTunnelRowSerializer<byte[], byte[]> 
seaTunnelRowSerializer;
@@ -74,6 +80,10 @@ public class KafkaSinkWriter implements 
SinkWriter<SeaTunnelRow, KafkaCommitInfo
             Config pluginConfig,
             List<KafkaSinkState> kafkaStates) {
         this.context = context;
+        this.seaTunnelRowType = seaTunnelRowType;
+        Pair<Boolean, String> topicResult = 
isExtractTopic(pluginConfig.getString(TOPIC.key()));
+        this.isExtractTopic = topicResult.getKey();
+        this.topic = topicResult.getRight();
         if (pluginConfig.hasPath(ASSIGN_PARTITIONS.key())) {
             
MessageContentPartitioner.setAssignPartitions(pluginConfig.getStringList(ASSIGN_PARTITIONS.key()));
         }
@@ -102,7 +112,7 @@ public class KafkaSinkWriter implements 
SinkWriter<SeaTunnelRow, KafkaCommitInfo
 
     @Override
     public void write(SeaTunnelRow element) {
-        ProducerRecord<byte[], byte[]> producerRecord = 
seaTunnelRowSerializer.serializeRow(element);
+        ProducerRecord<byte[], byte[]> producerRecord = 
seaTunnelRowSerializer.serializeRow(extractTopic(element), element);
         kafkaProducerSender.send(producerRecord);
     }
 
@@ -159,10 +169,10 @@ public class KafkaSinkWriter implements 
SinkWriter<SeaTunnelRow, KafkaCommitInfo
             delimiter = pluginConfig.getString(FIELD_DELIMITER.key());
         }
         if (pluginConfig.hasPath(PARTITION.key())) {
-            return new 
DefaultSeaTunnelRowSerializer(pluginConfig.getString(TOPIC.key()),
+            return new DefaultSeaTunnelRowSerializer(
                     pluginConfig.getInt(PARTITION.key()), seaTunnelRowType, 
format, delimiter);
         } else {
-            return new 
DefaultSeaTunnelRowSerializer(pluginConfig.getString(TOPIC.key()),
+            return new DefaultSeaTunnelRowSerializer(
                     getPartitionKeyFields(pluginConfig, seaTunnelRowType), 
seaTunnelRowType, format, delimiter);
         }
     }
@@ -199,4 +209,32 @@ public class KafkaSinkWriter implements 
SinkWriter<SeaTunnelRow, KafkaCommitInfo
         }
         return Collections.emptyList();
     }
+
+    private Pair<Boolean, String> isExtractTopic(String topicConfig){
+        String regex = "\\$\\{(.*?)\\}";
+        Pattern pattern = Pattern.compile(regex, Pattern.DOTALL);
+        Matcher matcher = pattern.matcher(topicConfig);
+        if (matcher.find()) {
+            return Pair.of(true, matcher.group(1));
+        }
+        return Pair.of(false, topicConfig);
+    }
+
+    private String extractTopic(SeaTunnelRow row) {
+        if (!isExtractTopic) {
+            return topic;
+        }
+        List<String> fieldNames = 
Arrays.asList(seaTunnelRowType.getFieldNames());
+        if (!fieldNames.contains(topic)) {
+            throw new KafkaConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT,
+                    String.format("Field name { %s } is not found!", topic));
+        }
+        int topicFieldIndex = seaTunnelRowType.indexOf(topic);
+        Object topicFieldValue = row.getField(topicFieldIndex);
+        if (topicFieldValue == null) {
+            throw new KafkaConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT,
+                    "The column value is empty!");
+        }
+        return topicFieldValue.toString();
+    }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/pom.xml 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/pom.xml
index d2ee1f526..59281c8c7 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/pom.xml
@@ -51,6 +51,12 @@
             <version>${project.version}</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-transforms-v2</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
 </project>
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
index 71a2452ae..a7fb5c183 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
@@ -106,8 +106,8 @@ public class KafkaIT extends TestSuiteBase implements 
TestResource {
                 .untilAsserted(() -> initKafkaProducer());
 
         log.info("Write 100 records to topic test_topic_source");
-        DefaultSeaTunnelRowSerializer serializer = new 
DefaultSeaTunnelRowSerializer("test_topic_source", SEATUNNEL_ROW_TYPE, 
DEFAULT_FORMAT, DEFAULT_FIELD_DELIMITER);
-        generateTestData(row -> serializer.serializeRow(row), 0, 100);
+        DefaultSeaTunnelRowSerializer serializer = new 
DefaultSeaTunnelRowSerializer(SEATUNNEL_ROW_TYPE, DEFAULT_FORMAT, 
DEFAULT_FIELD_DELIMITER);
+        generateTestData(row -> serializer.serializeRow("test_topic_source", 
row), 0, 100);
     }
 
     @AfterAll
@@ -127,25 +127,8 @@ public class KafkaIT extends TestSuiteBase implements 
TestResource {
         Assertions.assertEquals(0, execResult.getExitCode(), 
execResult.getStderr());
 
         String topicName = "test_topic";
-        Map<String, String> data = new HashMap<>();
+        Map<String, String> data = getKafkaConsumerData(topicName);
         ObjectMapper objectMapper = new ObjectMapper();
-        try (KafkaConsumer<String, String> consumer = new 
KafkaConsumer<>(kafkaConsumerConfig())) {
-            consumer.subscribe(Arrays.asList(topicName));
-            Map<TopicPartition, Long> offsets = 
consumer.endOffsets(Arrays.asList(new TopicPartition(topicName, 0)));
-            Long endOffset = offsets.entrySet().iterator().next().getValue();
-            Long lastProcessedOffset = -1L;
-
-            do {
-                ConsumerRecords<String, String> records = 
consumer.poll(Duration.ofMillis(100));
-                for (ConsumerRecord<String, String> record : records) {
-                    if (lastProcessedOffset < record.offset()) {
-
-                        data.put(record.key(), record.value());
-                    }
-                    lastProcessedOffset = record.offset();
-                }
-            } while (lastProcessedOffset < endOffset - 1);
-        }
         String key = data.keySet().iterator().next();
         ObjectNode objectNode = objectMapper.readValue(key, ObjectNode.class);
         Assertions.assertTrue(objectNode.has("c_map"));
@@ -159,23 +142,22 @@ public class KafkaIT extends TestSuiteBase implements 
TestResource {
         Assertions.assertEquals(0, execResult.getExitCode(), 
execResult.getStderr());
 
         String topicName = "test_text_topic";
-        Map<String, String> data = new HashMap<>();
-        try (KafkaConsumer<String, String> consumer = new 
KafkaConsumer<>(kafkaConsumerConfig())) {
-            consumer.subscribe(Arrays.asList(topicName));
-            Map<TopicPartition, Long> offsets = 
consumer.endOffsets(Arrays.asList(new TopicPartition(topicName, 0)));
-            Long endOffset = offsets.entrySet().iterator().next().getValue();
-            Long lastProcessedOffset = -1L;
+        Map<String, String> data = getKafkaConsumerData(topicName);
+        Assertions.assertEquals(10, data.size());
+    }
 
-            do {
-                ConsumerRecords<String, String> records = 
consumer.poll(Duration.ofMillis(100));
-                for (ConsumerRecord<String, String> record : records) {
-                    if (lastProcessedOffset < record.offset()) {
-                        data.put(record.key(), record.value());
-                    }
-                    lastProcessedOffset = record.offset();
-                }
-            } while (lastProcessedOffset < endOffset - 1);
-        }
+    @TestTemplate
+    public void testExtractTopicFunction(TestContainer container) throws 
IOException, InterruptedException {
+        Container.ExecResult execResult = 
container.executeJob("/extractTopic_fake_to_kafka.conf");
+        Assertions.assertEquals(0, execResult.getExitCode(), 
execResult.getStderr());
+
+        String topicName = "test_extract_topic";
+        Map<String, String> data = getKafkaConsumerData(topicName);
+        ObjectMapper objectMapper = new ObjectMapper();
+        String key = data.keySet().iterator().next();
+        ObjectNode objectNode = objectMapper.readValue(key, ObjectNode.class);
+        Assertions.assertTrue(objectNode.has("c_map"));
+        Assertions.assertTrue(objectNode.has("c_string"));
         Assertions.assertEquals(10, data.size());
     }
 
@@ -192,8 +174,8 @@ public class KafkaIT extends TestSuiteBase implements 
TestResource {
 
     @TestTemplate
     public void testSourceKafkaJsonToConsole(TestContainer container) throws 
IOException, InterruptedException {
-        DefaultSeaTunnelRowSerializer serializer = new 
DefaultSeaTunnelRowSerializer("test_topic_json", SEATUNNEL_ROW_TYPE, 
DEFAULT_FORMAT, DEFAULT_FIELD_DELIMITER);
-        generateTestData(row -> serializer.serializeRow(row), 0, 100);
+        DefaultSeaTunnelRowSerializer serializer = new 
DefaultSeaTunnelRowSerializer(SEATUNNEL_ROW_TYPE, DEFAULT_FORMAT, 
DEFAULT_FIELD_DELIMITER);
+        generateTestData(row -> serializer.serializeRow("test_topic_json", 
row), 0, 100);
         Container.ExecResult execResult = 
container.executeJob("/kafkasource_json_to_console.conf");
         Assertions.assertEquals(0, execResult.getExitCode(), 
execResult.getStderr());
     }
@@ -208,8 +190,8 @@ public class KafkaIT extends TestSuiteBase implements 
TestResource {
 
     @TestTemplate
     public void testSourceKafkaStartConfig(TestContainer container) throws 
IOException, InterruptedException {
-        DefaultSeaTunnelRowSerializer serializer = new 
DefaultSeaTunnelRowSerializer("test_topic_group", SEATUNNEL_ROW_TYPE, 
DEFAULT_FORMAT, DEFAULT_FIELD_DELIMITER);
-        generateTestData(row -> serializer.serializeRow(row), 100, 150);
+        DefaultSeaTunnelRowSerializer serializer = new 
DefaultSeaTunnelRowSerializer(SEATUNNEL_ROW_TYPE, DEFAULT_FORMAT, 
DEFAULT_FIELD_DELIMITER);
+        generateTestData(row -> serializer.serializeRow("test_topic_group", 
row), 100, 150);
         testKafkaGroupOffsetsToConsole(container);
     }
 
@@ -320,6 +302,27 @@ public class KafkaIT extends TestSuiteBase implements 
TestResource {
             }
     );
 
+    private Map<String, String> getKafkaConsumerData(String topicName){
+        Map<String, String> data = new HashMap<>();
+        try (KafkaConsumer<String, String> consumer = new 
KafkaConsumer<>(kafkaConsumerConfig())) {
+            consumer.subscribe(Arrays.asList(topicName));
+            Map<TopicPartition, Long> offsets = 
consumer.endOffsets(Arrays.asList(new TopicPartition(topicName, 0)));
+            Long endOffset = offsets.entrySet().iterator().next().getValue();
+            Long lastProcessedOffset = -1L;
+
+            do {
+                ConsumerRecords<String, String> records = 
consumer.poll(Duration.ofMillis(100));
+                for (ConsumerRecord<String, String> record : records) {
+                    if (lastProcessedOffset < record.offset()) {
+                        data.put(record.key(), record.value());
+                    }
+                    lastProcessedOffset = record.offset();
+                }
+            } while (lastProcessedOffset < endOffset - 1);
+        }
+        return data;
+    }
+
     interface ProducerRecordConverter {
         ProducerRecord<byte[], byte[]> convert(SeaTunnelRow row);
     }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/extractTopic_fake_to_kafka.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/extractTopic_fake_to_kafka.conf
new file mode 100644
index 000000000..62eb47680
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/extractTopic_fake_to_kafka.conf
@@ -0,0 +1,77 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+    execution.parallelism = 1
+    job.mode = "BATCH"
+
+    #spark config
+    spark.app.name = "SeaTunnel"
+    spark.executor.instances = 1
+    spark.executor.cores = 1
+    spark.executor.memory = "1g"
+    spark.master = local
+}
+
+source {
+  FakeSource {
+    result_table_name = "fake"
+    row.num = 10
+    map.size = 10
+    array.size = 10
+    bytes.length = 10
+    string.length = 10
+    schema = {
+      fields {
+        c_map = "map<string, smallint>"
+        c_array = "array<int>"
+        c_string = string
+        c_boolean = boolean
+        c_tinyint = tinyint
+        c_smallint = smallint
+        c_int = int
+        c_bigint = bigint
+        c_float = float
+        c_double = double
+        c_decimal = "decimal(30, 8)"
+        c_bytes = bytes
+        c_date = date
+        c_timestamp = timestamp
+      }
+    }
+  }
+}
+
+transform {
+  Replace {
+    source_table_name = "fake"
+    result_table_name = "fake1"
+    replace_field = "c_string"
+    pattern = ".+"
+    replacement = "test_extract_topic"
+    is_regex = true
+  }
+}
+
+sink {
+  Kafka {
+    source_table_name = "fake1"
+    bootstrap.servers = "kafkaCluster:9092"
+    topic = "${c_string}"
+    partition_key_fields = ["c_map","c_string"]
+  }
+}
\ No newline at end of file

Reply via email to