This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new aadfe99f88 [improve] kafka connector options (#8616)
aadfe99f88 is described below

commit aadfe99f88434829c68eeeadce53b1275318c179
Author: Daeuk Choi <[email protected]>
AuthorDate: Fri Feb 7 10:23:03 2025 +0900

    [improve] kafka connector options (#8616)
---
 .../seatunnel/api/ConnectorOptionCheckTest.java    |  2 -
 .../seatunnel/kafka/config/KafkaBaseOptions.java   | 79 +++++++++++++++++
 .../seatunnel/kafka/config/KafkaSinkOptions.java   | 64 ++++++++++++++
 .../{Config.java => KafkaSourceOptions.java}       | 98 +---------------------
 .../serialize/DefaultSeaTunnelRowSerializer.java   |  4 +-
 .../connectors/seatunnel/kafka/sink/KafkaSink.java |  3 +-
 .../seatunnel/kafka/sink/KafkaSinkFactory.java     | 18 ++--
 .../seatunnel/kafka/sink/KafkaSinkWriter.java      | 22 ++---
 .../seatunnel/kafka/source/KafkaSource.java        |  3 +-
 .../seatunnel/kafka/source/KafkaSourceConfig.java  | 36 ++++----
 .../seatunnel/kafka/source/KafkaSourceFactory.java | 37 ++++----
 .../kafka/source/KafkaSourceConfigTest.java        |  2 +-
 12 files changed, 213 insertions(+), 155 deletions(-)

diff --git 
a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
 
b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
index 7c77d95e2b..1e0464952b 100644
--- 
a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
+++ 
b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
@@ -209,7 +209,6 @@ public class ConnectorOptionCheckTest {
         whiteList.add("RocketMqSinkOptions");
         whiteList.add("ClickhouseFileSinkOptions");
         whiteList.add("IcebergSinkOptions");
-        whiteList.add("KafkaSourceOptions");
         whiteList.add("AssertSinkOptions");
         whiteList.add("MaxcomputeSourceOptions");
         whiteList.add("InfluxDBSourceOptions");
@@ -230,7 +229,6 @@ public class ConnectorOptionCheckTest {
         whiteList.add("TableStoreDBSourceOptions");
         whiteList.add("AmazonDynamoDBSinkOptions");
         whiteList.add("KuduSinkOptions");
-        whiteList.add("KafkaSinkOptions");
         whiteList.add("TDengineSinkOptions");
         whiteList.add("Neo4jSourceOptions");
         whiteList.add("HttpSourceOptions");
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/KafkaBaseOptions.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/KafkaBaseOptions.java
new file mode 100644
index 0000000000..eb37e0f5c4
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/KafkaBaseOptions.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kafka.config;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
+import java.util.Map;
+
+public class KafkaBaseOptions {
+
+    public static final String CONNECTOR_IDENTITY = "Kafka";
+    /** The default field delimiter is “,” */
+    public static final String DEFAULT_FIELD_DELIMITER = ",";
+
+    public static final Option<Map<String, String>> KAFKA_CONFIG =
+            Options.key("kafka.config")
+                    .mapType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "In addition to the above parameters that must be 
specified by the Kafka producer or consumer client, "
+                                    + "the user can also specify multiple 
non-mandatory parameters for the producer or consumer client, "
+                                    + "covering all the producer parameters 
specified in the official Kafka document.");
+
+    public static final Option<String> TOPIC =
+            Options.key("topic")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Kafka topic name. If there are multiple topics, 
use , to split, for example: \"tpc1,tpc2\".");
+
+    public static final Option<String> BOOTSTRAP_SERVERS =
+            Options.key("bootstrap.servers")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Kafka cluster address, separated by 
\",\".");
+
+    public static final Option<MessageFormat> FORMAT =
+            Options.key("format")
+                    .enumType(MessageFormat.class)
+                    .defaultValue(MessageFormat.JSON)
+                    .withDescription(
+                            "Data format. The default format is json. Optional 
text format. The default field separator is \", \". "
+                                    + "If you customize the delimiter, add the 
\"field_delimiter\" option.");
+
+    public static final Option<String> FIELD_DELIMITER =
+            Options.key("field_delimiter")
+                    .stringType()
+                    .defaultValue(DEFAULT_FIELD_DELIMITER)
+                    .withDescription("Customize the field delimiter for data 
format.");
+
+    public static final Option<String> PROTOBUF_SCHEMA =
+            Options.key("protobuf_schema")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Data serialization method protobuf metadata, used 
to parse protobuf data.");
+
+    public static final Option<String> PROTOBUF_MESSAGE_NAME =
+            Options.key("protobuf_message_name")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Parsing entity class names from Protobuf 
data.");
+}
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/KafkaSinkOptions.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/KafkaSinkOptions.java
new file mode 100644
index 0000000000..ca25d2144b
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/KafkaSinkOptions.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kafka.config;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
+import java.util.List;
+
+public class KafkaSinkOptions extends KafkaBaseOptions {
+
+    public static final Option<Integer> PARTITION =
+            Options.key("partition")
+                    .intType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "We can specify the partition, all messages will 
be sent to this partition.");
+
+    public static final Option<List<String>> ASSIGN_PARTITIONS =
+            Options.key("assign_partitions")
+                    .listType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "We can decide which partition to send based on 
the content of the message. "
+                                    + "The function of this parameter is to 
distribute information.");
+
+    public static final Option<List<String>> PARTITION_KEY_FIELDS =
+            Options.key("partition_key_fields")
+                    .listType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Configure which fields are used as the key of the 
kafka message.");
+
+    public static final Option<KafkaSemantics> SEMANTICS =
+            Options.key("semantics")
+                    .enumType(KafkaSemantics.class)
+                    .defaultValue(KafkaSemantics.NON)
+                    .withDescription(
+                            "Semantics that can be chosen 
EXACTLY_ONCE/AT_LEAST_ONCE/NON, default NON.");
+
+    public static final Option<String> TRANSACTION_PREFIX =
+            Options.key("transaction_prefix")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "If semantic is specified as EXACTLY_ONCE, the 
producer will write all messages in a Kafka transaction. "
+                                    + "Kafka distinguishes different 
transactions by different transactionId. "
+                                    + "This parameter is prefix of kafka 
transactionId, make sure different job use different prefix.");
+}
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/KafkaSourceOptions.java
similarity index 53%
rename from 
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
rename to 
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/KafkaSourceOptions.java
index 931eea8a70..44c7f0d23a 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/KafkaSourceOptions.java
@@ -23,30 +23,9 @@ import org.apache.seatunnel.api.configuration.Option;
 import org.apache.seatunnel.api.configuration.Options;
 import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
 
-import java.util.List;
 import java.util.Map;
 
-public class Config {
-
-    public static final String CONNECTOR_IDENTITY = "Kafka";
-    /** The default field delimiter is “,” */
-    public static final String DEFAULT_FIELD_DELIMITER = ",";
-
-    public static final Option<Map<String, String>> KAFKA_CONFIG =
-            Options.key("kafka.config")
-                    .mapType()
-                    .noDefaultValue()
-                    .withDescription(
-                            "In addition to the above parameters that must be 
specified by the Kafka producer or consumer client, "
-                                    + "the user can also specify multiple 
non-mandatory parameters for the producer or consumer client, "
-                                    + "covering all the producer parameters 
specified in the official Kafka document.");
-
-    public static final Option<String> TOPIC =
-            Options.key("topic")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription(
-                            "Kafka topic name. If there are multiple topics, 
use , to split, for example: \"tpc1,tpc2\".");
+public class KafkaSourceOptions extends KafkaBaseOptions {
 
     public static final Option<Boolean> PATTERN =
             Options.key("pattern")
@@ -56,12 +35,6 @@ public class Config {
                             "If pattern is set to true,the regular expression 
for a pattern of topic names to read from."
                                     + " All topics in clients with names that 
match the specified regular expression will be subscribed by the consumer.");
 
-    public static final Option<String> BOOTSTRAP_SERVERS =
-            Options.key("bootstrap.servers")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription("Kafka cluster address, separated by 
\",\".");
-
     public static final Option<String> CONSUMER_GROUP =
             Options.key("consumer.group")
                     .stringType()
@@ -76,30 +49,13 @@ public class Config {
                     .withDescription(
                             "If true the consumer's offset will be 
periodically committed in the background.");
 
-    public static final Option<String> TRANSACTION_PREFIX =
-            Options.key("transaction_prefix")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription(
-                            "If semantic is specified as EXACTLY_ONCE, the 
producer will write all messages in a Kafka transaction. "
-                                    + "Kafka distinguishes different 
transactions by different transactionId. "
-                                    + "This parameter is prefix of kafka 
transactionId, make sure different job use different prefix.");
-
-    public static final Option<Config> SCHEMA =
+    public static final Option<KafkaBaseOptions> SCHEMA =
             Options.key("schema")
-                    .objectType(Config.class)
+                    .objectType(KafkaBaseOptions.class)
                     .noDefaultValue()
                     .withDescription(
                             "The structure of the data, including field names 
and field types.");
 
-    public static final Option<MessageFormat> FORMAT =
-            Options.key("format")
-                    .enumType(MessageFormat.class)
-                    .defaultValue(MessageFormat.JSON)
-                    .withDescription(
-                            "Data format. The default format is json. Optional 
text format. The default field separator is \", \". "
-                                    + "If you customize the delimiter, add the 
\"field_delimiter\" option.");
-
     public static final Option<Boolean> DEBEZIUM_RECORD_INCLUDE_SCHEMA =
             Options.key("debezium_record_include_schema")
                     .booleanType()
@@ -112,34 +68,6 @@ public class Config {
                     .noDefaultValue()
                     .withDescription("Debezium record table filter.");
 
-    public static final Option<String> FIELD_DELIMITER =
-            Options.key("field_delimiter")
-                    .stringType()
-                    .defaultValue(DEFAULT_FIELD_DELIMITER)
-                    .withDescription("Customize the field delimiter for data 
format.");
-
-    public static final Option<Integer> PARTITION =
-            Options.key("partition")
-                    .intType()
-                    .noDefaultValue()
-                    .withDescription(
-                            "We can specify the partition, all messages will 
be sent to this partition.");
-
-    public static final Option<List<String>> ASSIGN_PARTITIONS =
-            Options.key("assign_partitions")
-                    .listType()
-                    .noDefaultValue()
-                    .withDescription(
-                            "We can decide which partition to send based on 
the content of the message. "
-                                    + "The function of this parameter is to 
distribute information.");
-
-    public static final Option<List<String>> PARTITION_KEY_FIELDS =
-            Options.key("partition_key_fields")
-                    .listType()
-                    .noDefaultValue()
-                    .withDescription(
-                            "Configure which fields are used as the key of the 
kafka message.");
-
     public static final Option<StartMode> START_MODE =
             Options.key("start_mode")
                     .objectType(StartMode.class)
@@ -183,24 +111,4 @@ public class Config {
                             "The processing method of data format error. The 
default value is fail, and the optional value is (fail, skip). "
                                     + "When fail is selected, data format 
error will block and an exception will be thrown. "
                                     + "When skip is selected, data format 
error will skip this line data.");
-
-    public static final Option<KafkaSemantics> SEMANTICS =
-            Options.key("semantics")
-                    .enumType(KafkaSemantics.class)
-                    .defaultValue(KafkaSemantics.NON)
-                    .withDescription(
-                            "Semantics that can be chosen 
EXACTLY_ONCE/AT_LEAST_ONCE/NON, default NON.");
-
-    public static final Option<String> PROTOBUF_SCHEMA =
-            Options.key("protobuf_schema")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription(
-                            "Data serialization method protobuf metadata, used 
to parse protobuf data.");
-
-    public static final Option<String> PROTOBUF_MESSAGE_NAME =
-            Options.key("protobuf_message_name")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription("Parsing entity class names from Protobuf 
data.");
 }
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
index 2f6559a169..1415db7459 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
@@ -48,8 +48,8 @@ import java.util.function.Function;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
-import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.PROTOBUF_MESSAGE_NAME;
-import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.PROTOBUF_SCHEMA;
+import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaBaseOptions.PROTOBUF_MESSAGE_NAME;
+import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaBaseOptions.PROTOBUF_SCHEMA;
 
 @RequiredArgsConstructor
 public class DefaultSeaTunnelRowSerializer implements SeaTunnelRowSerializer {
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java
index 4deb30f547..919a602d32 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java
@@ -26,6 +26,7 @@ import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaBaseOptions;
 import 
org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaAggregatedCommitInfo;
 import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaCommitInfo;
 import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaSinkState;
@@ -82,7 +83,7 @@ public class KafkaSink
 
     @Override
     public String getPluginName() {
-        return 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.CONNECTOR_IDENTITY;
+        return KafkaBaseOptions.CONNECTOR_IDENTITY;
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java
index ed3278602a..ae236c3634 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java
@@ -22,7 +22,7 @@ import org.apache.seatunnel.api.table.connector.TableSink;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.TableSinkFactory;
 import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
-import org.apache.seatunnel.connectors.seatunnel.kafka.config.Config;
+import org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSinkOptions;
 
 import com.google.auto.service.AutoService;
 
@@ -36,15 +36,15 @@ public class KafkaSinkFactory implements TableSinkFactory {
     @Override
     public OptionRule optionRule() {
         return OptionRule.builder()
-                .required(Config.TOPIC, Config.BOOTSTRAP_SERVERS)
+                .required(KafkaSinkOptions.TOPIC, 
KafkaSinkOptions.BOOTSTRAP_SERVERS)
                 .optional(
-                        Config.FORMAT,
-                        Config.KAFKA_CONFIG,
-                        Config.ASSIGN_PARTITIONS,
-                        Config.TRANSACTION_PREFIX,
-                        Config.SEMANTICS,
-                        Config.PARTITION,
-                        Config.PARTITION_KEY_FIELDS)
+                        KafkaSinkOptions.FORMAT,
+                        KafkaSinkOptions.KAFKA_CONFIG,
+                        KafkaSinkOptions.ASSIGN_PARTITIONS,
+                        KafkaSinkOptions.TRANSACTION_PREFIX,
+                        KafkaSinkOptions.SEMANTICS,
+                        KafkaSinkOptions.PARTITION,
+                        KafkaSinkOptions.PARTITION_KEY_FIELDS)
                 .build();
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
index 6639a34a0b..b367587e17 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
@@ -43,17 +43,17 @@ import java.util.Optional;
 import java.util.Properties;
 import java.util.Random;
 
-import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.ASSIGN_PARTITIONS;
-import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.BOOTSTRAP_SERVERS;
-import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.DEFAULT_FIELD_DELIMITER;
-import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.FIELD_DELIMITER;
-import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.FORMAT;
-import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.KAFKA_CONFIG;
-import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.PARTITION;
-import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.PARTITION_KEY_FIELDS;
-import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.SEMANTICS;
-import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.TOPIC;
-import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.TRANSACTION_PREFIX;
+import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSinkOptions.ASSIGN_PARTITIONS;
+import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSinkOptions.BOOTSTRAP_SERVERS;
+import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSinkOptions.DEFAULT_FIELD_DELIMITER;
+import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSinkOptions.FIELD_DELIMITER;
+import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSinkOptions.FORMAT;
+import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSinkOptions.KAFKA_CONFIG;
+import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSinkOptions.PARTITION;
+import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSinkOptions.PARTITION_KEY_FIELDS;
+import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSinkOptions.SEMANTICS;
+import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSinkOptions.TOPIC;
+import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSinkOptions.TRANSACTION_PREFIX;
 
 /** KafkaSinkWriter is a sink writer that will write {@link SeaTunnelRow} to 
Kafka. */
 public class KafkaSinkWriter implements SinkWriter<SeaTunnelRow, 
KafkaCommitInfo, KafkaSinkState> {
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
index 271adb8e7f..d57bf03e64 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
@@ -31,6 +31,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.common.constants.JobMode;
 import 
org.apache.seatunnel.connectors.seatunnel.common.source.reader.RecordsWithSplitIds;
 import 
org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderOptions;
+import org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaBaseOptions;
 import 
org.apache.seatunnel.connectors.seatunnel.kafka.source.fetch.KafkaSourceFetcherManager;
 import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaSourceState;
 
@@ -64,7 +65,7 @@ public class KafkaSource
 
     @Override
     public String getPluginName() {
-        return 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.CONNECTOR_IDENTITY;
+        return KafkaBaseOptions.CONNECTOR_IDENTITY;
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java
index db591cfdf0..508ba7061a 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java
@@ -64,24 +64,24 @@ import java.util.Optional;
 import java.util.Properties;
 import java.util.stream.Collectors;
 
-import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.BOOTSTRAP_SERVERS;
-import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.COMMIT_ON_CHECKPOINT;
-import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.CONSUMER_GROUP;
-import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.DEBEZIUM_RECORD_INCLUDE_SCHEMA;
-import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.DEBEZIUM_RECORD_TABLE_FILTER;
-import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.FIELD_DELIMITER;
-import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.FORMAT;
-import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.KAFKA_CONFIG;
-import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS;
-import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.KEY_POLL_TIMEOUT;
-import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.MESSAGE_FORMAT_ERROR_HANDLE_WAY_OPTION;
-import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.PATTERN;
-import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.PROTOBUF_MESSAGE_NAME;
-import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.PROTOBUF_SCHEMA;
-import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.START_MODE;
-import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.START_MODE_OFFSETS;
-import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.START_MODE_TIMESTAMP;
-import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.TOPIC;
+import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSourceOptions.BOOTSTRAP_SERVERS;
+import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSourceOptions.COMMIT_ON_CHECKPOINT;
+import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSourceOptions.CONSUMER_GROUP;
+import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSourceOptions.DEBEZIUM_RECORD_INCLUDE_SCHEMA;
+import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSourceOptions.DEBEZIUM_RECORD_TABLE_FILTER;
+import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSourceOptions.FIELD_DELIMITER;
+import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSourceOptions.FORMAT;
+import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSourceOptions.KAFKA_CONFIG;
+import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSourceOptions.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS;
+import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSourceOptions.KEY_POLL_TIMEOUT;
+import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSourceOptions.MESSAGE_FORMAT_ERROR_HANDLE_WAY_OPTION;
+import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSourceOptions.PATTERN;
+import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSourceOptions.PROTOBUF_MESSAGE_NAME;
+import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSourceOptions.PROTOBUF_SCHEMA;
+import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSourceOptions.START_MODE;
+import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSourceOptions.START_MODE_OFFSETS;
+import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSourceOptions.START_MODE_TIMESTAMP;
+import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSourceOptions.TOPIC;
 
 public class KafkaSourceConfig implements Serializable {
 
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceFactory.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceFactory.java
index 0b24e7e968..e1bbf1dafc 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceFactory.java
@@ -26,7 +26,7 @@ import org.apache.seatunnel.api.table.connector.TableSource;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.TableSourceFactory;
 import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
-import org.apache.seatunnel.connectors.seatunnel.kafka.config.Config;
+import 
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSourceOptions;
 import org.apache.seatunnel.connectors.seatunnel.kafka.config.StartMode;
 
 import com.google.auto.service.AutoService;
@@ -44,23 +44,30 @@ public class KafkaSourceFactory implements 
TableSourceFactory {
     @Override
     public OptionRule optionRule() {
         return OptionRule.builder()
-                .required(Config.BOOTSTRAP_SERVERS)
+                .required(KafkaSourceOptions.BOOTSTRAP_SERVERS)
                 .exclusive(
-                        Config.TOPIC, TableSchemaOptions.TABLE_CONFIGS, 
CatalogOptions.TABLE_LIST)
+                        KafkaSourceOptions.TOPIC,
+                        TableSchemaOptions.TABLE_CONFIGS,
+                        CatalogOptions.TABLE_LIST)
                 .optional(
-                        Config.START_MODE,
-                        Config.PATTERN,
-                        Config.CONSUMER_GROUP,
-                        Config.COMMIT_ON_CHECKPOINT,
-                        Config.KAFKA_CONFIG,
-                        Config.SCHEMA,
-                        Config.FORMAT,
-                        Config.DEBEZIUM_RECORD_INCLUDE_SCHEMA,
-                        Config.DEBEZIUM_RECORD_TABLE_FILTER,
-                        Config.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS)
-                .conditional(Config.START_MODE, StartMode.TIMESTAMP, 
Config.START_MODE_TIMESTAMP)
+                        KafkaSourceOptions.START_MODE,
+                        KafkaSourceOptions.PATTERN,
+                        KafkaSourceOptions.CONSUMER_GROUP,
+                        KafkaSourceOptions.COMMIT_ON_CHECKPOINT,
+                        KafkaSourceOptions.KAFKA_CONFIG,
+                        KafkaSourceOptions.SCHEMA,
+                        KafkaSourceOptions.FORMAT,
+                        KafkaSourceOptions.DEBEZIUM_RECORD_INCLUDE_SCHEMA,
+                        KafkaSourceOptions.DEBEZIUM_RECORD_TABLE_FILTER,
+                        
KafkaSourceOptions.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS)
                 .conditional(
-                        Config.START_MODE, StartMode.SPECIFIC_OFFSETS, 
Config.START_MODE_OFFSETS)
+                        KafkaSourceOptions.START_MODE,
+                        StartMode.TIMESTAMP,
+                        KafkaSourceOptions.START_MODE_TIMESTAMP)
+                .conditional(
+                        KafkaSourceOptions.START_MODE,
+                        StartMode.SPECIFIC_OFFSETS,
+                        KafkaSourceOptions.START_MODE_OFFSETS)
                 .build();
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfigTest.java
 
b/seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfigTest.java
index d7daef4c50..cc96892d24 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfigTest.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfigTest.java
@@ -32,7 +32,7 @@ import java.util.Map;
 import static 
org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions.TableIdentifierOptions.DATABASE_NAME;
 import static 
org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions.TableIdentifierOptions.SCHEMA_NAME;
 import static 
org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions.TableIdentifierOptions.TABLE_NAME;
-import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.DEBEZIUM_RECORD_TABLE_FILTER;
+import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSourceOptions.DEBEZIUM_RECORD_TABLE_FILTER;
 
 public class KafkaSourceConfigTest {
 


Reply via email to