This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 8aff80730 [Improve][Connector-V2][Kafka] Support extract topic from
SeaTunnelRow field (#3742)
8aff80730 is described below
commit 8aff807305650ea2e2da8ce2faef0676554da484
Author: TaoZex <[email protected]>
AuthorDate: Fri Jan 20 19:20:06 2023 +0800
[Improve][Connector-V2][Kafka] Support extract topic from SeaTunnelRow
field (#3742)
---
docs/en/connector-v2/sink/Kafka.md | 18 ++++-
.../serialize/DefaultSeaTunnelRowSerializer.java | 23 +++---
.../kafka/serialize/SeaTunnelRowSerializer.java | 2 +-
.../seatunnel/kafka/sink/KafkaSinkWriter.java | 44 +++++++++++-
.../connector-kafka-e2e/pom.xml | 6 ++
.../seatunnel/e2e/connector/kafka/KafkaIT.java | 83 +++++++++++-----------
.../test/resources/extractTopic_fake_to_kafka.conf | 77 ++++++++++++++++++++
7 files changed, 197 insertions(+), 56 deletions(-)
diff --git a/docs/en/connector-v2/sink/Kafka.md
b/docs/en/connector-v2/sink/Kafka.md
index 6908597fc..be4f87db8 100644
--- a/docs/en/connector-v2/sink/Kafka.md
+++ b/docs/en/connector-v2/sink/Kafka.md
@@ -31,6 +31,21 @@ By default, we will use 2pc to guarantee the message is sent
to kafka exactly on
Kafka Topic.
+Currently two formats are supported:
+
+1. Fill in the name of the topic.
+
+2. Use value of a field from upstream data as topic,the format is `${your
field name}`, where topic is the value of one of the columns of the upstream
data.
+
+ For example, Upstream data is the following:
+
+ | name | age | data |
+ | ---- | ---- | ------------- |
+ | Jack | 16 | data-example1 |
+ | Mary | 23 | data-example2 |
+
+ If `${name}` is set as the topic. So the first row is sent to Jack topic,
and the second row is sent to Mary topic.
+
### bootstrap.servers [string]
Kafka Brokers List.
@@ -190,4 +205,5 @@ sink {
- [Improve] Support to specify multiple partition keys
[3230](https://github.com/apache/incubator-seatunnel/pull/3230)
- [Improve] Add text format for kafka sink connector
[3711](https://github.com/apache/incubator-seatunnel/pull/3711)
-- [Improve] Change Connector Custom Config Prefix To Map
[3719](https://github.com/apache/incubator-seatunnel/pull/3719)
\ No newline at end of file
+- [Improve] Support extract topic from SeaTunnelRow fields
[3742](https://github.com/apache/incubator-seatunnel/pull/3742)
+- [Improve] Change Connector Custom Config Prefix To Map
[3719](https://github.com/apache/incubator-seatunnel/pull/3719)
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
index 58e8ef25d..1e9ed6422 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
@@ -37,36 +37,37 @@ import java.util.function.Function;
public class DefaultSeaTunnelRowSerializer implements
SeaTunnelRowSerializer<byte[], byte[]> {
private Integer partition;
- private final String topic;
private final SerializationSchema keySerialization;
private final SerializationSchema valueSerialization;
- public DefaultSeaTunnelRowSerializer(String topic, SeaTunnelRowType
seaTunnelRowType, String format, String delimiter) {
- this(topic, element -> null,
createSerializationSchema(seaTunnelRowType, format, delimiter));
+ public DefaultSeaTunnelRowSerializer(SeaTunnelRowType seaTunnelRowType,
+ String format,
+ String delimiter) {
+ this(element -> null, createSerializationSchema(seaTunnelRowType,
format, delimiter));
}
- public DefaultSeaTunnelRowSerializer(String topic, Integer partition,
SeaTunnelRowType seaTunnelRowType, String format, String delimiter) {
- this(topic, seaTunnelRowType, format, delimiter);
+ public DefaultSeaTunnelRowSerializer(Integer partition,
+ SeaTunnelRowType seaTunnelRowType,
+ String format, String delimiter) {
+ this(seaTunnelRowType, format, delimiter);
this.partition = partition;
}
- public DefaultSeaTunnelRowSerializer(String topic, List<String>
keyFieldNames,
+ public DefaultSeaTunnelRowSerializer(List<String> keyFieldNames,
SeaTunnelRowType seaTunnelRowType,
String format, String delimiter) {
- this(topic, createKeySerializationSchema(keyFieldNames,
seaTunnelRowType),
+ this(createKeySerializationSchema(keyFieldNames, seaTunnelRowType),
createSerializationSchema(seaTunnelRowType, format,
delimiter));
}
- public DefaultSeaTunnelRowSerializer(String topic,
- SerializationSchema keySerialization,
+ public DefaultSeaTunnelRowSerializer(SerializationSchema keySerialization,
SerializationSchema
valueSerialization) {
- this.topic = topic;
this.keySerialization = keySerialization;
this.valueSerialization = valueSerialization;
}
@Override
- public ProducerRecord<byte[], byte[]> serializeRow(SeaTunnelRow row) {
+ public ProducerRecord<byte[], byte[]> serializeRow(String topic,
SeaTunnelRow row) {
return new ProducerRecord<>(topic, partition,
keySerialization.serialize(row),
valueSerialization.serialize(row));
}
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/SeaTunnelRowSerializer.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/SeaTunnelRowSerializer.java
index 9f12591ea..5f31ca001 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/SeaTunnelRowSerializer.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/SeaTunnelRowSerializer.java
@@ -29,5 +29,5 @@ public interface SeaTunnelRowSerializer<K, V> {
* @param row seatunnel row
* @return kafka record.
*/
- ProducerRecord<K, V> serializeRow(SeaTunnelRow row);
+ ProducerRecord<K, V> serializeRow(String topic, SeaTunnelRow row);
}
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
index 94e4ec922..ffec8130e 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
@@ -42,6 +42,7 @@ import
org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaSinkState;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;
@@ -52,6 +53,8 @@ import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.Random;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
/**
* KafkaSinkWriter is a sink writer that will write {@link SeaTunnelRow} to
Kafka.
@@ -61,7 +64,10 @@ public class KafkaSinkWriter implements
SinkWriter<SeaTunnelRow, KafkaCommitInfo
private final SinkWriter.Context context;
private String transactionPrefix;
+ private String topic;
private long lastCheckpointId = 0;
+ private boolean isExtractTopic;
+ private SeaTunnelRowType seaTunnelRowType;
private final KafkaProduceSender<byte[], byte[]> kafkaProducerSender;
private final SeaTunnelRowSerializer<byte[], byte[]>
seaTunnelRowSerializer;
@@ -74,6 +80,10 @@ public class KafkaSinkWriter implements
SinkWriter<SeaTunnelRow, KafkaCommitInfo
Config pluginConfig,
List<KafkaSinkState> kafkaStates) {
this.context = context;
+ this.seaTunnelRowType = seaTunnelRowType;
+ Pair<Boolean, String> topicResult =
isExtractTopic(pluginConfig.getString(TOPIC.key()));
+ this.isExtractTopic = topicResult.getKey();
+ this.topic = topicResult.getRight();
if (pluginConfig.hasPath(ASSIGN_PARTITIONS.key())) {
MessageContentPartitioner.setAssignPartitions(pluginConfig.getStringList(ASSIGN_PARTITIONS.key()));
}
@@ -102,7 +112,7 @@ public class KafkaSinkWriter implements
SinkWriter<SeaTunnelRow, KafkaCommitInfo
@Override
public void write(SeaTunnelRow element) {
- ProducerRecord<byte[], byte[]> producerRecord =
seaTunnelRowSerializer.serializeRow(element);
+ ProducerRecord<byte[], byte[]> producerRecord =
seaTunnelRowSerializer.serializeRow(extractTopic(element), element);
kafkaProducerSender.send(producerRecord);
}
@@ -159,10 +169,10 @@ public class KafkaSinkWriter implements
SinkWriter<SeaTunnelRow, KafkaCommitInfo
delimiter = pluginConfig.getString(FIELD_DELIMITER.key());
}
if (pluginConfig.hasPath(PARTITION.key())) {
- return new
DefaultSeaTunnelRowSerializer(pluginConfig.getString(TOPIC.key()),
+ return new DefaultSeaTunnelRowSerializer(
pluginConfig.getInt(PARTITION.key()), seaTunnelRowType,
format, delimiter);
} else {
- return new
DefaultSeaTunnelRowSerializer(pluginConfig.getString(TOPIC.key()),
+ return new DefaultSeaTunnelRowSerializer(
getPartitionKeyFields(pluginConfig, seaTunnelRowType),
seaTunnelRowType, format, delimiter);
}
}
@@ -199,4 +209,32 @@ public class KafkaSinkWriter implements
SinkWriter<SeaTunnelRow, KafkaCommitInfo
}
return Collections.emptyList();
}
+
+ private Pair<Boolean, String> isExtractTopic(String topicConfig){
+ String regex = "\\$\\{(.*?)\\}";
+ Pattern pattern = Pattern.compile(regex, Pattern.DOTALL);
+ Matcher matcher = pattern.matcher(topicConfig);
+ if (matcher.find()) {
+ return Pair.of(true, matcher.group(1));
+ }
+ return Pair.of(false, topicConfig);
+ }
+
+ private String extractTopic(SeaTunnelRow row) {
+ if (!isExtractTopic) {
+ return topic;
+ }
+ List<String> fieldNames =
Arrays.asList(seaTunnelRowType.getFieldNames());
+ if (!fieldNames.contains(topic)) {
+ throw new KafkaConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT,
+ String.format("Field name { %s } is not found!", topic));
+ }
+ int topicFieldIndex = seaTunnelRowType.indexOf(topic);
+ Object topicFieldValue = row.getField(topicFieldIndex);
+ if (topicFieldValue == null) {
+ throw new KafkaConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT,
+ "The column value is empty!");
+ }
+ return topicFieldValue.toString();
+ }
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/pom.xml
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/pom.xml
index d2ee1f526..59281c8c7 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/pom.xml
@@ -51,6 +51,12 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-transforms-v2</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
index 71a2452ae..a7fb5c183 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
@@ -106,8 +106,8 @@ public class KafkaIT extends TestSuiteBase implements
TestResource {
.untilAsserted(() -> initKafkaProducer());
log.info("Write 100 records to topic test_topic_source");
- DefaultSeaTunnelRowSerializer serializer = new
DefaultSeaTunnelRowSerializer("test_topic_source", SEATUNNEL_ROW_TYPE,
DEFAULT_FORMAT, DEFAULT_FIELD_DELIMITER);
- generateTestData(row -> serializer.serializeRow(row), 0, 100);
+ DefaultSeaTunnelRowSerializer serializer = new
DefaultSeaTunnelRowSerializer(SEATUNNEL_ROW_TYPE, DEFAULT_FORMAT,
DEFAULT_FIELD_DELIMITER);
+ generateTestData(row -> serializer.serializeRow("test_topic_source",
row), 0, 100);
}
@AfterAll
@@ -127,25 +127,8 @@ public class KafkaIT extends TestSuiteBase implements
TestResource {
Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
String topicName = "test_topic";
- Map<String, String> data = new HashMap<>();
+ Map<String, String> data = getKafkaConsumerData(topicName);
ObjectMapper objectMapper = new ObjectMapper();
- try (KafkaConsumer<String, String> consumer = new
KafkaConsumer<>(kafkaConsumerConfig())) {
- consumer.subscribe(Arrays.asList(topicName));
- Map<TopicPartition, Long> offsets =
consumer.endOffsets(Arrays.asList(new TopicPartition(topicName, 0)));
- Long endOffset = offsets.entrySet().iterator().next().getValue();
- Long lastProcessedOffset = -1L;
-
- do {
- ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(100));
- for (ConsumerRecord<String, String> record : records) {
- if (lastProcessedOffset < record.offset()) {
-
- data.put(record.key(), record.value());
- }
- lastProcessedOffset = record.offset();
- }
- } while (lastProcessedOffset < endOffset - 1);
- }
String key = data.keySet().iterator().next();
ObjectNode objectNode = objectMapper.readValue(key, ObjectNode.class);
Assertions.assertTrue(objectNode.has("c_map"));
@@ -159,23 +142,22 @@ public class KafkaIT extends TestSuiteBase implements
TestResource {
Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
String topicName = "test_text_topic";
- Map<String, String> data = new HashMap<>();
- try (KafkaConsumer<String, String> consumer = new
KafkaConsumer<>(kafkaConsumerConfig())) {
- consumer.subscribe(Arrays.asList(topicName));
- Map<TopicPartition, Long> offsets =
consumer.endOffsets(Arrays.asList(new TopicPartition(topicName, 0)));
- Long endOffset = offsets.entrySet().iterator().next().getValue();
- Long lastProcessedOffset = -1L;
+ Map<String, String> data = getKafkaConsumerData(topicName);
+ Assertions.assertEquals(10, data.size());
+ }
- do {
- ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(100));
- for (ConsumerRecord<String, String> record : records) {
- if (lastProcessedOffset < record.offset()) {
- data.put(record.key(), record.value());
- }
- lastProcessedOffset = record.offset();
- }
- } while (lastProcessedOffset < endOffset - 1);
- }
+ @TestTemplate
+ public void testExtractTopicFunction(TestContainer container) throws
IOException, InterruptedException {
+ Container.ExecResult execResult =
container.executeJob("/extractTopic_fake_to_kafka.conf");
+ Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
+
+ String topicName = "test_extract_topic";
+ Map<String, String> data = getKafkaConsumerData(topicName);
+ ObjectMapper objectMapper = new ObjectMapper();
+ String key = data.keySet().iterator().next();
+ ObjectNode objectNode = objectMapper.readValue(key, ObjectNode.class);
+ Assertions.assertTrue(objectNode.has("c_map"));
+ Assertions.assertTrue(objectNode.has("c_string"));
Assertions.assertEquals(10, data.size());
}
@@ -192,8 +174,8 @@ public class KafkaIT extends TestSuiteBase implements
TestResource {
@TestTemplate
public void testSourceKafkaJsonToConsole(TestContainer container) throws
IOException, InterruptedException {
- DefaultSeaTunnelRowSerializer serializer = new
DefaultSeaTunnelRowSerializer("test_topic_json", SEATUNNEL_ROW_TYPE,
DEFAULT_FORMAT, DEFAULT_FIELD_DELIMITER);
- generateTestData(row -> serializer.serializeRow(row), 0, 100);
+ DefaultSeaTunnelRowSerializer serializer = new
DefaultSeaTunnelRowSerializer(SEATUNNEL_ROW_TYPE, DEFAULT_FORMAT,
DEFAULT_FIELD_DELIMITER);
+ generateTestData(row -> serializer.serializeRow("test_topic_json",
row), 0, 100);
Container.ExecResult execResult =
container.executeJob("/kafkasource_json_to_console.conf");
Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
}
@@ -208,8 +190,8 @@ public class KafkaIT extends TestSuiteBase implements
TestResource {
@TestTemplate
public void testSourceKafkaStartConfig(TestContainer container) throws
IOException, InterruptedException {
- DefaultSeaTunnelRowSerializer serializer = new
DefaultSeaTunnelRowSerializer("test_topic_group", SEATUNNEL_ROW_TYPE,
DEFAULT_FORMAT, DEFAULT_FIELD_DELIMITER);
- generateTestData(row -> serializer.serializeRow(row), 100, 150);
+ DefaultSeaTunnelRowSerializer serializer = new
DefaultSeaTunnelRowSerializer(SEATUNNEL_ROW_TYPE, DEFAULT_FORMAT,
DEFAULT_FIELD_DELIMITER);
+ generateTestData(row -> serializer.serializeRow("test_topic_group",
row), 100, 150);
testKafkaGroupOffsetsToConsole(container);
}
@@ -320,6 +302,27 @@ public class KafkaIT extends TestSuiteBase implements
TestResource {
}
);
+ private Map<String, String> getKafkaConsumerData(String topicName){
+ Map<String, String> data = new HashMap<>();
+ try (KafkaConsumer<String, String> consumer = new
KafkaConsumer<>(kafkaConsumerConfig())) {
+ consumer.subscribe(Arrays.asList(topicName));
+ Map<TopicPartition, Long> offsets =
consumer.endOffsets(Arrays.asList(new TopicPartition(topicName, 0)));
+ Long endOffset = offsets.entrySet().iterator().next().getValue();
+ Long lastProcessedOffset = -1L;
+
+ do {
+ ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(100));
+ for (ConsumerRecord<String, String> record : records) {
+ if (lastProcessedOffset < record.offset()) {
+ data.put(record.key(), record.value());
+ }
+ lastProcessedOffset = record.offset();
+ }
+ } while (lastProcessedOffset < endOffset - 1);
+ }
+ return data;
+ }
+
interface ProducerRecordConverter {
ProducerRecord<byte[], byte[]> convert(SeaTunnelRow row);
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/extractTopic_fake_to_kafka.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/extractTopic_fake_to_kafka.conf
new file mode 100644
index 000000000..62eb47680
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/extractTopic_fake_to_kafka.conf
@@ -0,0 +1,77 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ execution.parallelism = 1
+ job.mode = "BATCH"
+
+ #spark config
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 1
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+}
+
+source {
+ FakeSource {
+ result_table_name = "fake"
+ row.num = 10
+ map.size = 10
+ array.size = 10
+ bytes.length = 10
+ string.length = 10
+ schema = {
+ fields {
+ c_map = "map<string, smallint>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_decimal = "decimal(30, 8)"
+ c_bytes = bytes
+ c_date = date
+ c_timestamp = timestamp
+ }
+ }
+ }
+}
+
+transform {
+ Replace {
+ source_table_name = "fake"
+ result_table_name = "fake1"
+ replace_field = "c_string"
+ pattern = ".+"
+ replacement = "test_extract_topic"
+ is_regex = true
+ }
+}
+
+sink {
+ Kafka {
+ source_table_name = "fake1"
+ bootstrap.servers = "kafkaCluster:9092"
+ topic = "${c_string}"
+ partition_key_fields = ["c_map","c_string"]
+ }
+}
\ No newline at end of file