hailin0 commented on code in PR #3147:
URL:
https://github.com/apache/incubator-seatunnel/pull/3147#discussion_r1000534502
##########
docs/en/connector-v2/sink/Kafka.md:
##########
@@ -31,6 +31,12 @@ By default, we will use 2pc to guarantee the message is sent
to kafka exactly on
Kafka Topic.
+Currently two formats are supported:
+
+1. Fill in the name of the topic
+
+2. Use value of a field from upstream data as topic,the format is
${field_topic}, where topic is the name of one of the columns of the upstream
data.
Review Comment:
```suggestion
2. Use value of a field from upstream data as topic,the format is ${your
field name}, where topic is the value of one of the columns of the upstream
data.
```
##########
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java:
##########
@@ -86,6 +95,8 @@ public KafkaSinkWriter(
List<KafkaSinkState> kafkaStates) {
this.context = context;
this.pluginConfig = pluginConfig;
+ this.topic = extractTopic(pluginConfig.getString(TOPIC));
+ this.topicExtractor = createTopicExtractor(this.topic,
seaTunnelRowType);
Review Comment:
```suggestion
this.topicExtractor =
createTopicExtractor(pluginConfig.getString(TOPIC), seaTunnelRowType);
```
##########
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java:
##########
@@ -54,10 +56,13 @@ public class KafkaSinkWriter implements
SinkWriter<SeaTunnelRow, KafkaCommitInfo
private final SinkWriter.Context context;
private final Config pluginConfig;
private final Function<SeaTunnelRow, String> partitionExtractor;
+ private final Function<SeaTunnelRow, String> topicExtractor;
private String transactionPrefix;
private long lastCheckpointId = 0;
private int partition;
+ private String topic;
+ private boolean isExtractTopic = false;
Review Comment:
remove
##########
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java:
##########
@@ -188,6 +211,24 @@ private void restoreState(List<KafkaSinkState> states) {
}
}
+ private Function<SeaTunnelRow, String> createTopicExtractor(String
topicField, SeaTunnelRowType seaTunnelRowType) {
+ if (!isExtractTopic) {
+ return row -> null;
+ }
Review Comment:
```suggestion
private Function<SeaTunnelRow, String> createTopicExtractor(String
topicConfig, SeaTunnelRowType seaTunnelRowType) {
String regex = "\\$\\{(.*?)\\}";
Pattern pattern = Pattern.compile(regex, Pattern.DOTALL);
Matcher matcher = pattern.matcher(topicConfig);
if (!matcher.find()) {
return row -> topicConfig;
}
String topicField = matcher.group(1);
```
##########
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java:
##########
@@ -188,6 +211,24 @@ private void restoreState(List<KafkaSinkState> states) {
}
}
+ private Function<SeaTunnelRow, String> createTopicExtractor(String
topicField, SeaTunnelRowType seaTunnelRowType) {
+ if (!isExtractTopic) {
+ return row -> null;
+ }
+ List<String> fieldNames =
Arrays.asList(seaTunnelRowType.getFieldNames());
+ if (!fieldNames.contains(topicField)) {
+ throw new IllegalArgumentException("Field name is not found!");
Review Comment:
```suggestion
throw new IllegalArgumentException("Field name{" + topicField +
"} is not found!");
```
##########
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java:
##########
@@ -188,6 +211,24 @@ private void restoreState(List<KafkaSinkState> states) {
}
}
+ private Function<SeaTunnelRow, String> createTopicExtractor(String
topicField, SeaTunnelRowType seaTunnelRowType) {
+ if (!isExtractTopic) {
+ return row -> null;
+ }
+ List<String> fieldNames =
Arrays.asList(seaTunnelRowType.getFieldNames());
+ if (!fieldNames.contains(topicField)) {
+ throw new IllegalArgumentException("Field name is not found!");
+ }
+ int topicFieldIndex = seaTunnelRowType.indexOf(topicField);
+ return row -> {
+ Object topicFieldValue = row.getField(topicFieldIndex);
+ if (topicFieldValue == null) {
+ throw new IllegalArgumentException("The column value is
empty!");
Review Comment:
```suggestion
throw new IllegalArgumentException("The field{" + topicField
+ "} value is empty!");
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]