This is an automated email from the ASF dual-hosted git repository.

liugddx pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 74bbd76b6 [Improve][Connector-V2][Kafka] Add text format for kafka 
sink connector (#3711)
74bbd76b6 is described below

commit 74bbd76b654deffd1c160e836e8cdf56c2fcac74
Author: TaoZex <[email protected]>
AuthorDate: Tue Dec 13 13:59:23 2022 +0800

    [Improve][Connector-V2][Kafka] Add text format for kafka sink connector 
(#3711)
    
    * [Improve][Connector-V2][Kafka] Add text format for kafka sink connector
    
    * [Improve][Connector-V2][Kafka] Add e2e test
    
    * [Improve][Connector-V2][Kafka] Update e2e code
    
    * [Improve][Connector-V2][Kafka] Add doc
---
 docs/en/connector-v2/sink/Kafka.md                 | 16 ++++-
 docs/en/connector-v2/source/kafka.md               |  1 +
 .../connectors/seatunnel/kafka/config/Config.java  |  2 +
 .../serialize/DefaultSeaTunnelRowSerializer.java   | 38 ++++++++----
 .../seatunnel/kafka/sink/KafkaSinkWriter.java      | 16 ++++-
 .../seatunnel/kafka/source/KafkaSource.java        |  8 ++-
 .../seatunnel/e2e/connector/kafka/KafkaIT.java     | 36 ++++++++++-
 .../resources/kafkaTextsink_fake_to_kafka.conf     | 70 ++++++++++++++++++++++
 8 files changed, 168 insertions(+), 19 deletions(-)

diff --git a/docs/en/connector-v2/sink/Kafka.md 
b/docs/en/connector-v2/sink/Kafka.md
index d480d5ab5..f86f6b25a 100644
--- a/docs/en/connector-v2/sink/Kafka.md
+++ b/docs/en/connector-v2/sink/Kafka.md
@@ -25,6 +25,8 @@ By default, we will use 2pc to guarantee the message is sent 
to kafka exactly on
 | partition            | int                   | no       | -             |
 | assign_partitions    | array                 | no       | -             |
 | transaction_prefix   | string                | no       | -             |
+| format               | String                | no       | json          |
+| field_delimiter      | String                | no       | ,             |
 | common-options       | config                | no       | -             |
 
 ### topic [string]
@@ -88,6 +90,15 @@ This function by `MessageContentPartitioner` class 
implements `org.apache.kafka.
 If semantic is specified as EXACTLY_ONCE, the producer will write all messages 
in a Kafka transaction.
 Kafka distinguishes different transactions by different transactionId. This 
parameter is prefix of  kafka  transactionId, make sure different job use 
different prefix.
 
+### format
+
+Data format. The default format is json. Optional text format. The default 
field separator is ",".
+If you customize the delimiter, add the "field_delimiter" option.
+
+### field_delimiter
+
+Customize the field delimiter for data format.
+
 ### common options [config]
 
 Sink plugin common parameters, please refer to [Sink Common 
Options](common-options.md) for details.
@@ -101,6 +112,7 @@ sink {
       topic = "seatunnel"
       bootstrap.servers = "localhost:9092"
       partition = 3
+      format = json
       kafka.request.timeout.ms = 60000
       semantics = EXACTLY_ONCE
   }
@@ -113,6 +125,8 @@ sink {
 ### 2.3.0-beta 2022-10-20
 
 - Add Kafka Sink Connector
+
 ### next version
 
-- [Feature] Support to specify multiple partition keys 
[3230](https://github.com/apache/incubator-seatunnel/pull/3230)
\ No newline at end of file
+- [Improve] Support to specify multiple partition keys 
[3230](https://github.com/apache/incubator-seatunnel/pull/3230)
+- [Improve] Add text format for kafka sink connector 
[3711](https://github.com/apache/incubator-seatunnel/pull/3711)
\ No newline at end of file
diff --git a/docs/en/connector-v2/source/kafka.md 
b/docs/en/connector-v2/source/kafka.md
index 80a1cba60..2b6596173 100644
--- a/docs/en/connector-v2/source/kafka.md
+++ b/docs/en/connector-v2/source/kafka.md
@@ -28,6 +28,7 @@ Source connector for Apache Kafka.
 | common-options                      | config  | no       | -                 
       |
 | schema                              |         | no       | -                 
       |
 | format                              | String  | no       | json              
       |
+| field_delimiter                     | String  | no       | ,                 
       |
 | start_mode                          | String  | no       | group_offsets     
       |
 | start_mode.offsets                  |         | no       |                   
       |
 | start_mode.timestamp                | Long    | no       |                   
       |
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
index d4f982bca..f8a69901a 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
@@ -29,6 +29,8 @@ public class Config {
      */
     public static final String DEFAULT_FORMAT = "json";
 
+    public static final String TEXT_FORMAT = "text";
+
     /**
      * The default field delimiter is “,”
      */
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
index b7c43acbb..58e8ef25d 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
@@ -17,11 +17,17 @@
 
 package org.apache.seatunnel.connectors.seatunnel.kafka.serialize;
 
+import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.DEFAULT_FORMAT;
+import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.TEXT_FORMAT;
+
 import org.apache.seatunnel.api.serialization.SerializationSchema;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
 import org.apache.seatunnel.format.json.JsonSerializationSchema;
+import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
+import org.apache.seatunnel.format.text.TextSerializationSchema;
 
 import org.apache.kafka.clients.producer.ProducerRecord;
 
@@ -35,20 +41,20 @@ public class DefaultSeaTunnelRowSerializer implements 
SeaTunnelRowSerializer<byt
     private final SerializationSchema keySerialization;
     private final SerializationSchema valueSerialization;
 
-    public DefaultSeaTunnelRowSerializer(String topic, SeaTunnelRowType 
seaTunnelRowType) {
-        this(topic, element -> null, 
createSerializationSchema(seaTunnelRowType));
+    public DefaultSeaTunnelRowSerializer(String topic, SeaTunnelRowType 
seaTunnelRowType, String format, String delimiter) {
+        this(topic, element -> null, 
createSerializationSchema(seaTunnelRowType, format, delimiter));
     }
 
-    public DefaultSeaTunnelRowSerializer(String topic, Integer partition, 
SeaTunnelRowType seaTunnelRowType) {
-        this(topic, seaTunnelRowType);
+    public DefaultSeaTunnelRowSerializer(String topic, Integer partition, 
SeaTunnelRowType seaTunnelRowType, String format, String delimiter) {
+        this(topic, seaTunnelRowType, format, delimiter);
         this.partition = partition;
     }
 
-    public DefaultSeaTunnelRowSerializer(String topic,
-                                         List<String> keyFieldNames,
-                                         SeaTunnelRowType seaTunnelRowType) {
+    public DefaultSeaTunnelRowSerializer(String topic, List<String> 
keyFieldNames,
+                                         SeaTunnelRowType seaTunnelRowType,
+                                         String format, String delimiter) {
         this(topic, createKeySerializationSchema(keyFieldNames, 
seaTunnelRowType),
-                createSerializationSchema(seaTunnelRowType));
+                createSerializationSchema(seaTunnelRowType, format, 
delimiter));
     }
 
     public DefaultSeaTunnelRowSerializer(String topic,
@@ -65,8 +71,18 @@ public class DefaultSeaTunnelRowSerializer implements 
SeaTunnelRowSerializer<byt
                 keySerialization.serialize(row), 
valueSerialization.serialize(row));
     }
 
-    private static SerializationSchema 
createSerializationSchema(SeaTunnelRowType rowType) {
-        return new JsonSerializationSchema(rowType);
+    private static SerializationSchema 
createSerializationSchema(SeaTunnelRowType rowType, String format, String 
delimiter) {
+        if (DEFAULT_FORMAT.equals(format)) {
+            return new JsonSerializationSchema(rowType);
+        } else if (TEXT_FORMAT.equals(format)) {
+            return TextSerializationSchema.builder()
+                    .seaTunnelRowType(rowType)
+                    .delimiter(delimiter)
+                    .build();
+        } else {
+            throw new 
SeaTunnelJsonFormatException(CommonErrorCode.UNSUPPORTED_DATA_TYPE,
+                    "Unsupported format: " + format);
+        }
     }
 
     private static SerializationSchema 
createKeySerializationSchema(List<String> keyFieldNames,
@@ -80,7 +96,7 @@ public class DefaultSeaTunnelRowSerializer implements 
SeaTunnelRowSerializer<byt
             keyFieldDataTypeArr[i] = 
seaTunnelRowType.getFieldType(rowFieldIndex);
         }
         SeaTunnelRowType keyType = new 
SeaTunnelRowType(keyFieldNames.toArray(new String[0]), keyFieldDataTypeArr);
-        SerializationSchema keySerializationSchema = 
createSerializationSchema(keyType);
+        SerializationSchema keySerializationSchema = new 
JsonSerializationSchema(keyType);
 
         Function<SeaTunnelRow, SeaTunnelRow> keyDataExtractor = row -> {
             Object[] keyFields = new Object[keyFieldIndexArr.length];
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
index f7ca6365e..41b1ae3aa 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
@@ -18,6 +18,10 @@
 package org.apache.seatunnel.connectors.seatunnel.kafka.sink;
 
 import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.ASSIGN_PARTITIONS;
+import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.DEFAULT_FIELD_DELIMITER;
+import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.DEFAULT_FORMAT;
+import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.FIELD_DELIMITER;
+import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.FORMAT;
 import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.KAFKA_CONFIG_PREFIX;
 import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.PARTITION;
 import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.PARTITION_KEY_FIELDS;
@@ -147,12 +151,20 @@ public class KafkaSinkWriter implements 
SinkWriter<SeaTunnelRow, KafkaCommitInfo
     }
 
     private SeaTunnelRowSerializer<byte[], byte[]> getSerializer(Config 
pluginConfig, SeaTunnelRowType seaTunnelRowType) {
+        String format = DEFAULT_FORMAT;
+        if (pluginConfig.hasPath(FORMAT.key())) {
+            format = pluginConfig.getString(FORMAT.key());
+        }
+        String delimiter = DEFAULT_FIELD_DELIMITER;
+        if (pluginConfig.hasPath(FIELD_DELIMITER.key())) {
+            delimiter = pluginConfig.getString(FIELD_DELIMITER.key());
+        }
         if (pluginConfig.hasPath(PARTITION.key())) {
             return new 
DefaultSeaTunnelRowSerializer(pluginConfig.getString(TOPIC.key()),
-                    pluginConfig.getInt(PARTITION.key()), seaTunnelRowType);
+                    pluginConfig.getInt(PARTITION.key()), seaTunnelRowType, 
format, delimiter);
         } else {
             return new 
DefaultSeaTunnelRowSerializer(pluginConfig.getString(TOPIC.key()),
-                    getPartitionKeyFields(pluginConfig, seaTunnelRowType), 
seaTunnelRowType);
+                    getPartitionKeyFields(pluginConfig, seaTunnelRowType), 
seaTunnelRowType, format, delimiter);
         }
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
index d0a1931f6..3cc2a998b 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
@@ -30,6 +30,7 @@ import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.SCHE
 import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.START_MODE;
 import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.START_MODE_OFFSETS;
 import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.START_MODE_TIMESTAMP;
+import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.TEXT_FORMAT;
 import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.TOPIC;
 
 import org.apache.seatunnel.api.common.JobContext;
@@ -47,12 +48,14 @@ import org.apache.seatunnel.common.config.CheckResult;
 import org.apache.seatunnel.common.config.TypesafeConfigUtils;
 import org.apache.seatunnel.common.constants.JobMode;
 import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
 import org.apache.seatunnel.common.utils.JsonUtils;
 import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
 import org.apache.seatunnel.connectors.seatunnel.kafka.config.StartMode;
 import 
org.apache.seatunnel.connectors.seatunnel.kafka.exception.KafkaConnectorException;
 import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaSourceState;
 import org.apache.seatunnel.format.json.JsonDeserializationSchema;
+import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
 import org.apache.seatunnel.format.text.TextDeserializationSchema;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
@@ -192,7 +195,7 @@ public class KafkaSource implements 
SeaTunnelSource<SeaTunnelRow, KafkaSourceSpl
             }
             if (DEFAULT_FORMAT.equals(format)) {
                 deserializationSchema = new JsonDeserializationSchema(false, 
false, typeInfo);
-            } else if ("text".equals(format)) {
+            } else if (TEXT_FORMAT.equals(format)) {
                 String delimiter = DEFAULT_FIELD_DELIMITER;
                 if (config.hasPath(FIELD_DELIMITER.key())) {
                     delimiter = config.getString(FIELD_DELIMITER.key());
@@ -203,7 +206,8 @@ public class KafkaSource implements 
SeaTunnelSource<SeaTunnelRow, KafkaSourceSpl
                     .build();
             } else {
                 // TODO: use format SPI
-                throw new UnsupportedOperationException("Unsupported format: " 
+ format);
+                throw new 
SeaTunnelJsonFormatException(CommonErrorCode.UNSUPPORTED_DATA_TYPE,
+                        "Unsupported format: " + format);
             }
         } else {
             typeInfo = SeaTunnelSchema.buildSimpleTextSchema();
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
index 6d907cd09..8552e2ee7 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
@@ -79,6 +79,10 @@ public class KafkaIT extends TestSuiteBase implements 
TestResource {
 
     private static final String KAFKA_HOST = "kafkaCluster";
 
+    private static final String DEFAULT_FORMAT = "json";
+
+    private static final String DEFAULT_FIELD_DELIMITER = ",";
+
     private KafkaProducer<byte[], byte[]> producer;
 
     private KafkaContainer kafkaContainer;
@@ -101,7 +105,7 @@ public class KafkaIT extends TestSuiteBase implements 
TestResource {
                 .untilAsserted(() -> initKafkaProducer());
 
         log.info("Write 100 records to topic test_topic_source");
-        DefaultSeaTunnelRowSerializer serializer = new 
DefaultSeaTunnelRowSerializer("test_topic_source", SEATUNNEL_ROW_TYPE);
+        DefaultSeaTunnelRowSerializer serializer = new 
DefaultSeaTunnelRowSerializer("test_topic_source", SEATUNNEL_ROW_TYPE, 
DEFAULT_FORMAT, DEFAULT_FIELD_DELIMITER);
         generateTestData(row -> serializer.serializeRow(row), 0, 100);
     }
 
@@ -148,6 +152,32 @@ public class KafkaIT extends TestSuiteBase implements 
TestResource {
         Assertions.assertEquals(10, data.size());
     }
 
+    @TestTemplate
+    public void testTextFormatSinkKafka(TestContainer container) throws 
IOException, InterruptedException {
+        Container.ExecResult execResult = 
container.executeJob("/kafkaTextsink_fake_to_kafka.conf");
+        Assertions.assertEquals(0, execResult.getExitCode(), 
execResult.getStderr());
+
+        String topicName = "test_text_topic";
+        Map<String, String> data = new HashMap<>();
+        try (KafkaConsumer<String, String> consumer = new 
KafkaConsumer<>(kafkaConsumerConfig())) {
+            consumer.subscribe(Arrays.asList(topicName));
+            Map<TopicPartition, Long> offsets = 
consumer.endOffsets(Arrays.asList(new TopicPartition(topicName, 0)));
+            Long endOffset = offsets.entrySet().iterator().next().getValue();
+            Long lastProcessedOffset = -1L;
+
+            do {
+                ConsumerRecords<String, String> records = 
consumer.poll(Duration.ofMillis(100));
+                for (ConsumerRecord<String, String> record : records) {
+                    if (lastProcessedOffset < record.offset()) {
+                        data.put(record.key(), record.value());
+                    }
+                    lastProcessedOffset = record.offset();
+                }
+            } while (lastProcessedOffset < endOffset - 1);
+        }
+        Assertions.assertEquals(10, data.size());
+    }
+
     @TestTemplate
     public void testSourceKafkaTextToConsole(TestContainer container) throws 
IOException, InterruptedException {
         TextSerializationSchema serializer = TextSerializationSchema.builder()
@@ -161,7 +191,7 @@ public class KafkaIT extends TestSuiteBase implements 
TestResource {
 
     @TestTemplate
     public void testSourceKafkaJsonToConsole(TestContainer container) throws 
IOException, InterruptedException {
-        DefaultSeaTunnelRowSerializer serializer = new 
DefaultSeaTunnelRowSerializer("test_topic_json", SEATUNNEL_ROW_TYPE);
+        DefaultSeaTunnelRowSerializer serializer = new 
DefaultSeaTunnelRowSerializer("test_topic_json", SEATUNNEL_ROW_TYPE, 
DEFAULT_FORMAT, DEFAULT_FIELD_DELIMITER);
         generateTestData(row -> serializer.serializeRow(row), 0, 100);
         Container.ExecResult execResult = 
container.executeJob("/kafkasource_json_to_console.conf");
         Assertions.assertEquals(0, execResult.getExitCode(), 
execResult.getStderr());
@@ -177,7 +207,7 @@ public class KafkaIT extends TestSuiteBase implements 
TestResource {
 
     @TestTemplate
     public void testSourceKafkaStartConfig(TestContainer container) throws 
IOException, InterruptedException {
-        DefaultSeaTunnelRowSerializer serializer = new 
DefaultSeaTunnelRowSerializer("test_topic_group", SEATUNNEL_ROW_TYPE);
+        DefaultSeaTunnelRowSerializer serializer = new 
DefaultSeaTunnelRowSerializer("test_topic_group", SEATUNNEL_ROW_TYPE, 
DEFAULT_FORMAT, DEFAULT_FIELD_DELIMITER);
         generateTestData(row -> serializer.serializeRow(row), 100, 150);
         testKafkaGroupOffsetsToConsole(container);
     }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkaTextsink_fake_to_kafka.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkaTextsink_fake_to_kafka.conf
new file mode 100644
index 000000000..71a2605ea
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkaTextsink_fake_to_kafka.conf
@@ -0,0 +1,70 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+    execution.parallelism = 1
+    job.mode = "BATCH"
+
+    #spark config
+    spark.app.name = "SeaTunnel"
+    spark.executor.instances = 1
+    spark.executor.cores = 1
+    spark.executor.memory = "1g"
+    spark.master = local
+}
+
+source {
+  FakeSource {
+    row.num = 10
+    map.size = 10
+    array.size = 10
+    bytes.length = 10
+    string.length = 10
+    schema = {
+      fields {
+        c_map = "map<string, smallint>"
+        c_array = "array<int>"
+        c_string = string
+        c_boolean = boolean
+        c_tinyint = tinyint
+        c_smallint = smallint
+        c_int = int
+        c_bigint = bigint
+        c_float = float
+        c_double = double
+        c_decimal = "decimal(30, 8)"
+        c_null = "null"
+        c_bytes = bytes
+        c_date = date
+        c_timestamp = timestamp
+      }
+    }
+  }
+}
+
+transform {
+}
+
+sink {
+  Kafka {
+    bootstrap.servers = "kafkaCluster:9092"
+    topic = "test_text_topic"
+    format = text
+    field_delimiter = ","
+    partition_key_fields = ["c_map","c_string"]
+  }
+}
\ No newline at end of file

Reply via email to