This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 37578e103f [Fix][Kafka-Sink] fix kafka sink factory option rule (#6657)
37578e103f is described below

commit 37578e103fd2e0be9cc6334e1ffabff22c9f5938
Author: Jarvis <[email protected]>
AuthorDate: Fri Apr 12 19:17:24 2024 +0800

    [Fix][Kafka-Sink] fix kafka sink factory option rule (#6657)
---
 .../connectors/seatunnel/kafka/sink/KafkaSinkFactory.java | 15 ++-------------
 1 file changed, 2 insertions(+), 13 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java
index 3fbf6bb99b..fe6965132d 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java
@@ -23,12 +23,9 @@ import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.TableSinkFactory;
 import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
 import org.apache.seatunnel.connectors.seatunnel.kafka.config.Config;
-import org.apache.seatunnel.connectors.seatunnel.kafka.config.MessageFormat;
 
 import com.google.auto.service.AutoService;
 
-import java.util.Arrays;
-
 @AutoService(Factory.class)
 public class KafkaSinkFactory implements TableSinkFactory {
     @Override
@@ -39,17 +36,9 @@ public class KafkaSinkFactory implements TableSinkFactory {
     @Override
     public OptionRule optionRule() {
         return OptionRule.builder()
-                .required(Config.FORMAT, Config.BOOTSTRAP_SERVERS)
-                .conditional(
-                        Config.FORMAT,
-                        Arrays.asList(
-                                MessageFormat.JSON,
-                                MessageFormat.CANAL_JSON,
-                                MessageFormat.TEXT,
-                                MessageFormat.OGG_JSON,
-                                MessageFormat.AVRO),
-                        Config.TOPIC)
+                .required(Config.TOPIC, Config.BOOTSTRAP_SERVERS)
                 .optional(
+                        Config.FORMAT,
                         Config.KAFKA_CONFIG,
                         Config.ASSIGN_PARTITIONS,
                         Config.TRANSACTION_PREFIX,

Reply via email to