This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 37578e103f [Fix][Kafka-Sink] fix kafka sink factory option rule (#6657)
37578e103f is described below
commit 37578e103fd2e0be9cc6334e1ffabff22c9f5938
Author: Jarvis <[email protected]>
AuthorDate: Fri Apr 12 19:17:24 2024 +0800
[Fix][Kafka-Sink] fix kafka sink factory option rule (#6657)
---
.../connectors/seatunnel/kafka/sink/KafkaSinkFactory.java | 15 ++-------------
1 file changed, 2 insertions(+), 13 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java
index 3fbf6bb99b..fe6965132d 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java
@@ -23,12 +23,9 @@ import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
import org.apache.seatunnel.connectors.seatunnel.kafka.config.Config;
-import org.apache.seatunnel.connectors.seatunnel.kafka.config.MessageFormat;
import com.google.auto.service.AutoService;
-import java.util.Arrays;
-
@AutoService(Factory.class)
public class KafkaSinkFactory implements TableSinkFactory {
@Override
@@ -39,17 +36,9 @@ public class KafkaSinkFactory implements TableSinkFactory {
@Override
public OptionRule optionRule() {
return OptionRule.builder()
- .required(Config.FORMAT, Config.BOOTSTRAP_SERVERS)
- .conditional(
- Config.FORMAT,
- Arrays.asList(
- MessageFormat.JSON,
- MessageFormat.CANAL_JSON,
- MessageFormat.TEXT,
- MessageFormat.OGG_JSON,
- MessageFormat.AVRO),
- Config.TOPIC)
+ .required(Config.TOPIC, Config.BOOTSTRAP_SERVERS)
.optional(
+ Config.FORMAT,
Config.KAFKA_CONFIG,
Config.ASSIGN_PARTITIONS,
Config.TRANSACTION_PREFIX,