This is an automated email from the ASF dual-hosted git repository.
liugddx pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 74bbd76b6 [Improve][Connector-V2][Kafka] Add text format for kafka
sink connector (#3711)
74bbd76b6 is described below
commit 74bbd76b654deffd1c160e836e8cdf56c2fcac74
Author: TaoZex <[email protected]>
AuthorDate: Tue Dec 13 13:59:23 2022 +0800
[Improve][Connector-V2][Kafka] Add text format for kafka sink connector
(#3711)
* [Improve][Connector-V2][Kafka] Add text format for kafka sink connector
* [Improve][Connector-V2][Kafka] Add e2e test
* [Improve][Connector-V2][Kafka] Update e2e code
* [Improve][Connector-V2][Kafka] Add doc
---
docs/en/connector-v2/sink/Kafka.md | 16 ++++-
docs/en/connector-v2/source/kafka.md | 1 +
.../connectors/seatunnel/kafka/config/Config.java | 2 +
.../serialize/DefaultSeaTunnelRowSerializer.java | 38 ++++++++----
.../seatunnel/kafka/sink/KafkaSinkWriter.java | 16 ++++-
.../seatunnel/kafka/source/KafkaSource.java | 8 ++-
.../seatunnel/e2e/connector/kafka/KafkaIT.java | 36 ++++++++++-
.../resources/kafkaTextsink_fake_to_kafka.conf | 70 ++++++++++++++++++++++
8 files changed, 168 insertions(+), 19 deletions(-)
diff --git a/docs/en/connector-v2/sink/Kafka.md
b/docs/en/connector-v2/sink/Kafka.md
index d480d5ab5..f86f6b25a 100644
--- a/docs/en/connector-v2/sink/Kafka.md
+++ b/docs/en/connector-v2/sink/Kafka.md
@@ -25,6 +25,8 @@ By default, we will use 2pc to guarantee the message is sent
to kafka exactly on
| partition | int | no | - |
| assign_partitions | array | no | - |
| transaction_prefix | string | no | - |
+| format | String | no | json |
+| field_delimiter | String | no | , |
| common-options | config | no | - |
### topic [string]
@@ -88,6 +90,15 @@ This function by `MessageContentPartitioner` class
implements `org.apache.kafka.
If semantic is specified as EXACTLY_ONCE, the producer will write all messages
in a Kafka transaction.
Kafka distinguishes different transactions by different transactionId. This
parameter is prefix of kafka transactionId, make sure different job use
different prefix.
+### format
+
+Data format. The default format is json. Optional text format. The default
field separator is ",".
+If you customize the delimiter, add the "field_delimiter" option.
+
+### field_delimiter
+
+Customize the field delimiter for data format.
+
### common options [config]
Sink plugin common parameters, please refer to [Sink Common
Options](common-options.md) for details.
@@ -101,6 +112,7 @@ sink {
topic = "seatunnel"
bootstrap.servers = "localhost:9092"
partition = 3
+ format = json
kafka.request.timeout.ms = 60000
semantics = EXACTLY_ONCE
}
@@ -113,6 +125,8 @@ sink {
### 2.3.0-beta 2022-10-20
- Add Kafka Sink Connector
+
### next version
-- [Feature] Support to specify multiple partition keys
[3230](https://github.com/apache/incubator-seatunnel/pull/3230)
\ No newline at end of file
+- [Improve] Support to specify multiple partition keys
[3230](https://github.com/apache/incubator-seatunnel/pull/3230)
+- [Improve] Add text format for kafka sink connector
[3711](https://github.com/apache/incubator-seatunnel/pull/3711)
\ No newline at end of file
diff --git a/docs/en/connector-v2/source/kafka.md
b/docs/en/connector-v2/source/kafka.md
index 80a1cba60..2b6596173 100644
--- a/docs/en/connector-v2/source/kafka.md
+++ b/docs/en/connector-v2/source/kafka.md
@@ -28,6 +28,7 @@ Source connector for Apache Kafka.
| common-options | config | no | -
|
| schema | | no | -
|
| format | String | no | json
|
+| field_delimiter | String | no | ,
|
| start_mode | String | no | group_offsets
|
| start_mode.offsets | | no |
|
| start_mode.timestamp | Long | no |
|
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
index d4f982bca..f8a69901a 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
@@ -29,6 +29,8 @@ public class Config {
*/
public static final String DEFAULT_FORMAT = "json";
+ public static final String TEXT_FORMAT = "text";
+
/**
* The default field delimiter is “,”
*/
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
index b7c43acbb..58e8ef25d 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
@@ -17,11 +17,17 @@
package org.apache.seatunnel.connectors.seatunnel.kafka.serialize;
+import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.DEFAULT_FORMAT;
+import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.TEXT_FORMAT;
+
import org.apache.seatunnel.api.serialization.SerializationSchema;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.format.json.JsonSerializationSchema;
+import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
+import org.apache.seatunnel.format.text.TextSerializationSchema;
import org.apache.kafka.clients.producer.ProducerRecord;
@@ -35,20 +41,20 @@ public class DefaultSeaTunnelRowSerializer implements
SeaTunnelRowSerializer<byt
private final SerializationSchema keySerialization;
private final SerializationSchema valueSerialization;
- public DefaultSeaTunnelRowSerializer(String topic, SeaTunnelRowType
seaTunnelRowType) {
- this(topic, element -> null,
createSerializationSchema(seaTunnelRowType));
+ public DefaultSeaTunnelRowSerializer(String topic, SeaTunnelRowType
seaTunnelRowType, String format, String delimiter) {
+ this(topic, element -> null,
createSerializationSchema(seaTunnelRowType, format, delimiter));
}
- public DefaultSeaTunnelRowSerializer(String topic, Integer partition,
SeaTunnelRowType seaTunnelRowType) {
- this(topic, seaTunnelRowType);
+ public DefaultSeaTunnelRowSerializer(String topic, Integer partition,
SeaTunnelRowType seaTunnelRowType, String format, String delimiter) {
+ this(topic, seaTunnelRowType, format, delimiter);
this.partition = partition;
}
- public DefaultSeaTunnelRowSerializer(String topic,
- List<String> keyFieldNames,
- SeaTunnelRowType seaTunnelRowType) {
+ public DefaultSeaTunnelRowSerializer(String topic, List<String>
keyFieldNames,
+ SeaTunnelRowType seaTunnelRowType,
+ String format, String delimiter) {
this(topic, createKeySerializationSchema(keyFieldNames,
seaTunnelRowType),
- createSerializationSchema(seaTunnelRowType));
+ createSerializationSchema(seaTunnelRowType, format,
delimiter));
}
public DefaultSeaTunnelRowSerializer(String topic,
@@ -65,8 +71,18 @@ public class DefaultSeaTunnelRowSerializer implements
SeaTunnelRowSerializer<byt
keySerialization.serialize(row),
valueSerialization.serialize(row));
}
- private static SerializationSchema
createSerializationSchema(SeaTunnelRowType rowType) {
- return new JsonSerializationSchema(rowType);
+ private static SerializationSchema
createSerializationSchema(SeaTunnelRowType rowType, String format, String
delimiter) {
+ if (DEFAULT_FORMAT.equals(format)) {
+ return new JsonSerializationSchema(rowType);
+ } else if (TEXT_FORMAT.equals(format)) {
+ return TextSerializationSchema.builder()
+ .seaTunnelRowType(rowType)
+ .delimiter(delimiter)
+ .build();
+ } else {
+ throw new
SeaTunnelJsonFormatException(CommonErrorCode.UNSUPPORTED_DATA_TYPE,
+ "Unsupported format: " + format);
+ }
}
private static SerializationSchema
createKeySerializationSchema(List<String> keyFieldNames,
@@ -80,7 +96,7 @@ public class DefaultSeaTunnelRowSerializer implements
SeaTunnelRowSerializer<byt
keyFieldDataTypeArr[i] =
seaTunnelRowType.getFieldType(rowFieldIndex);
}
SeaTunnelRowType keyType = new
SeaTunnelRowType(keyFieldNames.toArray(new String[0]), keyFieldDataTypeArr);
- SerializationSchema keySerializationSchema =
createSerializationSchema(keyType);
+ SerializationSchema keySerializationSchema = new
JsonSerializationSchema(keyType);
Function<SeaTunnelRow, SeaTunnelRow> keyDataExtractor = row -> {
Object[] keyFields = new Object[keyFieldIndexArr.length];
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
index f7ca6365e..41b1ae3aa 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
@@ -18,6 +18,10 @@
package org.apache.seatunnel.connectors.seatunnel.kafka.sink;
import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.ASSIGN_PARTITIONS;
+import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.DEFAULT_FIELD_DELIMITER;
+import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.DEFAULT_FORMAT;
+import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.FIELD_DELIMITER;
+import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.FORMAT;
import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.KAFKA_CONFIG_PREFIX;
import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.PARTITION;
import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.PARTITION_KEY_FIELDS;
@@ -147,12 +151,20 @@ public class KafkaSinkWriter implements
SinkWriter<SeaTunnelRow, KafkaCommitInfo
}
private SeaTunnelRowSerializer<byte[], byte[]> getSerializer(Config
pluginConfig, SeaTunnelRowType seaTunnelRowType) {
+ String format = DEFAULT_FORMAT;
+ if (pluginConfig.hasPath(FORMAT.key())) {
+ format = pluginConfig.getString(FORMAT.key());
+ }
+ String delimiter = DEFAULT_FIELD_DELIMITER;
+ if (pluginConfig.hasPath(FIELD_DELIMITER.key())) {
+ delimiter = pluginConfig.getString(FIELD_DELIMITER.key());
+ }
if (pluginConfig.hasPath(PARTITION.key())) {
return new
DefaultSeaTunnelRowSerializer(pluginConfig.getString(TOPIC.key()),
- pluginConfig.getInt(PARTITION.key()), seaTunnelRowType);
+ pluginConfig.getInt(PARTITION.key()), seaTunnelRowType,
format, delimiter);
} else {
return new
DefaultSeaTunnelRowSerializer(pluginConfig.getString(TOPIC.key()),
- getPartitionKeyFields(pluginConfig, seaTunnelRowType),
seaTunnelRowType);
+ getPartitionKeyFields(pluginConfig, seaTunnelRowType),
seaTunnelRowType, format, delimiter);
}
}
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
index d0a1931f6..3cc2a998b 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
@@ -30,6 +30,7 @@ import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.SCHE
import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.START_MODE;
import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.START_MODE_OFFSETS;
import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.START_MODE_TIMESTAMP;
+import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.TEXT_FORMAT;
import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.TOPIC;
import org.apache.seatunnel.api.common.JobContext;
@@ -47,12 +48,14 @@ import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.config.TypesafeConfigUtils;
import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
import org.apache.seatunnel.connectors.seatunnel.kafka.config.StartMode;
import
org.apache.seatunnel.connectors.seatunnel.kafka.exception.KafkaConnectorException;
import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaSourceState;
import org.apache.seatunnel.format.json.JsonDeserializationSchema;
+import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
import org.apache.seatunnel.format.text.TextDeserializationSchema;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
@@ -192,7 +195,7 @@ public class KafkaSource implements
SeaTunnelSource<SeaTunnelRow, KafkaSourceSpl
}
if (DEFAULT_FORMAT.equals(format)) {
deserializationSchema = new JsonDeserializationSchema(false,
false, typeInfo);
- } else if ("text".equals(format)) {
+ } else if (TEXT_FORMAT.equals(format)) {
String delimiter = DEFAULT_FIELD_DELIMITER;
if (config.hasPath(FIELD_DELIMITER.key())) {
delimiter = config.getString(FIELD_DELIMITER.key());
@@ -203,7 +206,8 @@ public class KafkaSource implements
SeaTunnelSource<SeaTunnelRow, KafkaSourceSpl
.build();
} else {
// TODO: use format SPI
- throw new UnsupportedOperationException("Unsupported format: "
+ format);
+ throw new
SeaTunnelJsonFormatException(CommonErrorCode.UNSUPPORTED_DATA_TYPE,
+ "Unsupported format: " + format);
}
} else {
typeInfo = SeaTunnelSchema.buildSimpleTextSchema();
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
index 6d907cd09..8552e2ee7 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
@@ -79,6 +79,10 @@ public class KafkaIT extends TestSuiteBase implements
TestResource {
private static final String KAFKA_HOST = "kafkaCluster";
+ private static final String DEFAULT_FORMAT = "json";
+
+ private static final String DEFAULT_FIELD_DELIMITER = ",";
+
private KafkaProducer<byte[], byte[]> producer;
private KafkaContainer kafkaContainer;
@@ -101,7 +105,7 @@ public class KafkaIT extends TestSuiteBase implements
TestResource {
.untilAsserted(() -> initKafkaProducer());
log.info("Write 100 records to topic test_topic_source");
- DefaultSeaTunnelRowSerializer serializer = new
DefaultSeaTunnelRowSerializer("test_topic_source", SEATUNNEL_ROW_TYPE);
+ DefaultSeaTunnelRowSerializer serializer = new
DefaultSeaTunnelRowSerializer("test_topic_source", SEATUNNEL_ROW_TYPE,
DEFAULT_FORMAT, DEFAULT_FIELD_DELIMITER);
generateTestData(row -> serializer.serializeRow(row), 0, 100);
}
@@ -148,6 +152,32 @@ public class KafkaIT extends TestSuiteBase implements
TestResource {
Assertions.assertEquals(10, data.size());
}
+ @TestTemplate
+ public void testTextFormatSinkKafka(TestContainer container) throws
IOException, InterruptedException {
+ Container.ExecResult execResult =
container.executeJob("/kafkaTextsink_fake_to_kafka.conf");
+ Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
+
+ String topicName = "test_text_topic";
+ Map<String, String> data = new HashMap<>();
+ try (KafkaConsumer<String, String> consumer = new
KafkaConsumer<>(kafkaConsumerConfig())) {
+ consumer.subscribe(Arrays.asList(topicName));
+ Map<TopicPartition, Long> offsets =
consumer.endOffsets(Arrays.asList(new TopicPartition(topicName, 0)));
+ Long endOffset = offsets.entrySet().iterator().next().getValue();
+ Long lastProcessedOffset = -1L;
+
+ do {
+ ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(100));
+ for (ConsumerRecord<String, String> record : records) {
+ if (lastProcessedOffset < record.offset()) {
+ data.put(record.key(), record.value());
+ }
+ lastProcessedOffset = record.offset();
+ }
+ } while (lastProcessedOffset < endOffset - 1);
+ }
+ Assertions.assertEquals(10, data.size());
+ }
+
@TestTemplate
public void testSourceKafkaTextToConsole(TestContainer container) throws
IOException, InterruptedException {
TextSerializationSchema serializer = TextSerializationSchema.builder()
@@ -161,7 +191,7 @@ public class KafkaIT extends TestSuiteBase implements
TestResource {
@TestTemplate
public void testSourceKafkaJsonToConsole(TestContainer container) throws
IOException, InterruptedException {
- DefaultSeaTunnelRowSerializer serializer = new
DefaultSeaTunnelRowSerializer("test_topic_json", SEATUNNEL_ROW_TYPE);
+ DefaultSeaTunnelRowSerializer serializer = new
DefaultSeaTunnelRowSerializer("test_topic_json", SEATUNNEL_ROW_TYPE,
DEFAULT_FORMAT, DEFAULT_FIELD_DELIMITER);
generateTestData(row -> serializer.serializeRow(row), 0, 100);
Container.ExecResult execResult =
container.executeJob("/kafkasource_json_to_console.conf");
Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
@@ -177,7 +207,7 @@ public class KafkaIT extends TestSuiteBase implements
TestResource {
@TestTemplate
public void testSourceKafkaStartConfig(TestContainer container) throws
IOException, InterruptedException {
- DefaultSeaTunnelRowSerializer serializer = new
DefaultSeaTunnelRowSerializer("test_topic_group", SEATUNNEL_ROW_TYPE);
+ DefaultSeaTunnelRowSerializer serializer = new
DefaultSeaTunnelRowSerializer("test_topic_group", SEATUNNEL_ROW_TYPE,
DEFAULT_FORMAT, DEFAULT_FIELD_DELIMITER);
generateTestData(row -> serializer.serializeRow(row), 100, 150);
testKafkaGroupOffsetsToConsole(container);
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkaTextsink_fake_to_kafka.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkaTextsink_fake_to_kafka.conf
new file mode 100644
index 000000000..71a2605ea
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkaTextsink_fake_to_kafka.conf
@@ -0,0 +1,70 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ execution.parallelism = 1
+ job.mode = "BATCH"
+
+ #spark config
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 1
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+}
+
+source {
+ FakeSource {
+ row.num = 10
+ map.size = 10
+ array.size = 10
+ bytes.length = 10
+ string.length = 10
+ schema = {
+ fields {
+ c_map = "map<string, smallint>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_decimal = "decimal(30, 8)"
+ c_null = "null"
+ c_bytes = bytes
+ c_date = date
+ c_timestamp = timestamp
+ }
+ }
+ }
+}
+
+transform {
+}
+
+sink {
+ Kafka {
+ bootstrap.servers = "kafkaCluster:9092"
+ topic = "test_text_topic"
+ format = text
+ field_delimiter = ","
+ partition_key_fields = ["c_map","c_string"]
+ }
+}
\ No newline at end of file