This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 2b4499275 update (#3150)
2b4499275 is described below

commit 2b44992750ec6a751ccec0f8d3d76731f144c9d7
Author: TaoZex <[email protected]>
AuthorDate: Thu Oct 20 16:47:03 2022 +0800

    update (#3150)
---
 .../seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java     | 3 +++
 1 file changed, 3 insertions(+)

diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
index 9712ba331..583dc6c81 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
@@ -190,6 +190,9 @@ public class KafkaSinkWriter implements 
SinkWriter<SeaTunnelRow, KafkaCommitInfo
 
     private Function<SeaTunnelRow, String> createPartitionExtractor(Config 
pluginConfig,
                                                                     
SeaTunnelRowType seaTunnelRowType) {
+        if (!pluginConfig.hasPath(PARTITION_KEY)){
+            return row -> null;
+        }
         String partitionKey = pluginConfig.getString(PARTITION_KEY);
         List<String> fieldNames = 
Arrays.asList(seaTunnelRowType.getFieldNames());
         if (!fieldNames.contains(partitionKey)) {

Reply via email to