This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 2b4499275 update (#3150)
2b4499275 is described below
commit 2b44992750ec6a751ccec0f8d3d76731f144c9d7
Author: TaoZex <[email protected]>
AuthorDate: Thu Oct 20 16:47:03 2022 +0800
update (#3150)
---
.../seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java | 3 +++
1 file changed, 3 insertions(+)
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
index 9712ba331..583dc6c81 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
@@ -190,6 +190,9 @@ public class KafkaSinkWriter implements
SinkWriter<SeaTunnelRow, KafkaCommitInfo
private Function<SeaTunnelRow, String> createPartitionExtractor(Config
pluginConfig,
SeaTunnelRowType seaTunnelRowType) {
+ if (!pluginConfig.hasPath(PARTITION_KEY)){
+ return row -> null;
+ }
String partitionKey = pluginConfig.getString(PARTITION_KEY);
List<String> fieldNames =
Arrays.asList(seaTunnelRowType.getFieldNames());
if (!fieldNames.contains(partitionKey)) {