This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new aadfe99f88 [improve] kafka connector options (#8616)
aadfe99f88 is described below
commit aadfe99f88434829c68eeeadce53b1275318c179
Author: Daeuk Choi <[email protected]>
AuthorDate: Fri Feb 7 10:23:03 2025 +0900
[improve] kafka connector options (#8616)
---
.../seatunnel/api/ConnectorOptionCheckTest.java | 2 -
.../seatunnel/kafka/config/KafkaBaseOptions.java | 79 +++++++++++++++++
.../seatunnel/kafka/config/KafkaSinkOptions.java | 64 ++++++++++++++
.../{Config.java => KafkaSourceOptions.java} | 98 +---------------------
.../serialize/DefaultSeaTunnelRowSerializer.java | 4 +-
.../connectors/seatunnel/kafka/sink/KafkaSink.java | 3 +-
.../seatunnel/kafka/sink/KafkaSinkFactory.java | 18 ++--
.../seatunnel/kafka/sink/KafkaSinkWriter.java | 22 ++---
.../seatunnel/kafka/source/KafkaSource.java | 3 +-
.../seatunnel/kafka/source/KafkaSourceConfig.java | 36 ++++----
.../seatunnel/kafka/source/KafkaSourceFactory.java | 37 ++++----
.../kafka/source/KafkaSourceConfigTest.java | 2 +-
12 files changed, 213 insertions(+), 155 deletions(-)
diff --git
a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
index 7c77d95e2b..1e0464952b 100644
---
a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
+++
b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
@@ -209,7 +209,6 @@ public class ConnectorOptionCheckTest {
whiteList.add("RocketMqSinkOptions");
whiteList.add("ClickhouseFileSinkOptions");
whiteList.add("IcebergSinkOptions");
- whiteList.add("KafkaSourceOptions");
whiteList.add("AssertSinkOptions");
whiteList.add("MaxcomputeSourceOptions");
whiteList.add("InfluxDBSourceOptions");
@@ -230,7 +229,6 @@ public class ConnectorOptionCheckTest {
whiteList.add("TableStoreDBSourceOptions");
whiteList.add("AmazonDynamoDBSinkOptions");
whiteList.add("KuduSinkOptions");
- whiteList.add("KafkaSinkOptions");
whiteList.add("TDengineSinkOptions");
whiteList.add("Neo4jSourceOptions");
whiteList.add("HttpSourceOptions");
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/KafkaBaseOptions.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/KafkaBaseOptions.java
new file mode 100644
index 0000000000..eb37e0f5c4
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/KafkaBaseOptions.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kafka.config;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
+import java.util.Map;
+
+public class KafkaBaseOptions {
+
+ public static final String CONNECTOR_IDENTITY = "Kafka";
+ /** The default field delimiter is “,” */
+ public static final String DEFAULT_FIELD_DELIMITER = ",";
+
+ public static final Option<Map<String, String>> KAFKA_CONFIG =
+ Options.key("kafka.config")
+ .mapType()
+ .noDefaultValue()
+ .withDescription(
+ "In addition to the above parameters that must be
specified by the Kafka producer or consumer client, "
+ + "the user can also specify multiple
non-mandatory parameters for the producer or consumer client, "
+ + "covering all the producer parameters
specified in the official Kafka document.");
+
+ public static final Option<String> TOPIC =
+ Options.key("topic")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Kafka topic name. If there are multiple topics,
use , to split, for example: \"tpc1,tpc2\".");
+
+ public static final Option<String> BOOTSTRAP_SERVERS =
+ Options.key("bootstrap.servers")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Kafka cluster address, separated by
\",\".");
+
+ public static final Option<MessageFormat> FORMAT =
+ Options.key("format")
+ .enumType(MessageFormat.class)
+ .defaultValue(MessageFormat.JSON)
+ .withDescription(
+ "Data format. The default format is json. Optional
text format. The default field separator is \", \". "
+ + "If you customize the delimiter, add the
\"field_delimiter\" option.");
+
+ public static final Option<String> FIELD_DELIMITER =
+ Options.key("field_delimiter")
+ .stringType()
+ .defaultValue(DEFAULT_FIELD_DELIMITER)
+ .withDescription("Customize the field delimiter for data
format.");
+
+ public static final Option<String> PROTOBUF_SCHEMA =
+ Options.key("protobuf_schema")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Data serialization method protobuf metadata, used
to parse protobuf data.");
+
+ public static final Option<String> PROTOBUF_MESSAGE_NAME =
+ Options.key("protobuf_message_name")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Parsing entity class names from Protobuf
data.");
+}
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/KafkaSinkOptions.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/KafkaSinkOptions.java
new file mode 100644
index 0000000000..ca25d2144b
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/KafkaSinkOptions.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kafka.config;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
+import java.util.List;
+
+public class KafkaSinkOptions extends KafkaBaseOptions {
+
+ public static final Option<Integer> PARTITION =
+ Options.key("partition")
+ .intType()
+ .noDefaultValue()
+ .withDescription(
+ "We can specify the partition, all messages will
be sent to this partition.");
+
+ public static final Option<List<String>> ASSIGN_PARTITIONS =
+ Options.key("assign_partitions")
+ .listType()
+ .noDefaultValue()
+ .withDescription(
+ "We can decide which partition to send based on
the content of the message. "
+ + "The function of this parameter is to
distribute information.");
+
+ public static final Option<List<String>> PARTITION_KEY_FIELDS =
+ Options.key("partition_key_fields")
+ .listType()
+ .noDefaultValue()
+ .withDescription(
+ "Configure which fields are used as the key of the
kafka message.");
+
+ public static final Option<KafkaSemantics> SEMANTICS =
+ Options.key("semantics")
+ .enumType(KafkaSemantics.class)
+ .defaultValue(KafkaSemantics.NON)
+ .withDescription(
+ "Semantics that can be chosen
EXACTLY_ONCE/AT_LEAST_ONCE/NON, default NON.");
+
+ public static final Option<String> TRANSACTION_PREFIX =
+ Options.key("transaction_prefix")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "If semantic is specified as EXACTLY_ONCE, the
producer will write all messages in a Kafka transaction. "
+ + "Kafka distinguishes different
transactions by different transactionId. "
+ + "This parameter is prefix of kafka
transactionId, make sure different job use different prefix.");
+}
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/KafkaSourceOptions.java
similarity index 53%
rename from
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
rename to
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/KafkaSourceOptions.java
index 931eea8a70..44c7f0d23a 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/KafkaSourceOptions.java
@@ -23,30 +23,9 @@ import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
-import java.util.List;
import java.util.Map;
-public class Config {
-
- public static final String CONNECTOR_IDENTITY = "Kafka";
- /** The default field delimiter is “,” */
- public static final String DEFAULT_FIELD_DELIMITER = ",";
-
- public static final Option<Map<String, String>> KAFKA_CONFIG =
- Options.key("kafka.config")
- .mapType()
- .noDefaultValue()
- .withDescription(
- "In addition to the above parameters that must be
specified by the Kafka producer or consumer client, "
- + "the user can also specify multiple
non-mandatory parameters for the producer or consumer client, "
- + "covering all the producer parameters
specified in the official Kafka document.");
-
- public static final Option<String> TOPIC =
- Options.key("topic")
- .stringType()
- .noDefaultValue()
- .withDescription(
- "Kafka topic name. If there are multiple topics,
use , to split, for example: \"tpc1,tpc2\".");
+public class KafkaSourceOptions extends KafkaBaseOptions {
public static final Option<Boolean> PATTERN =
Options.key("pattern")
@@ -56,12 +35,6 @@ public class Config {
"If pattern is set to true,the regular expression
for a pattern of topic names to read from."
+ " All topics in clients with names that
match the specified regular expression will be subscribed by the consumer.");
- public static final Option<String> BOOTSTRAP_SERVERS =
- Options.key("bootstrap.servers")
- .stringType()
- .noDefaultValue()
- .withDescription("Kafka cluster address, separated by
\",\".");
-
public static final Option<String> CONSUMER_GROUP =
Options.key("consumer.group")
.stringType()
@@ -76,30 +49,13 @@ public class Config {
.withDescription(
"If true the consumer's offset will be
periodically committed in the background.");
- public static final Option<String> TRANSACTION_PREFIX =
- Options.key("transaction_prefix")
- .stringType()
- .noDefaultValue()
- .withDescription(
- "If semantic is specified as EXACTLY_ONCE, the
producer will write all messages in a Kafka transaction. "
- + "Kafka distinguishes different
transactions by different transactionId. "
- + "This parameter is prefix of kafka
transactionId, make sure different job use different prefix.");
-
- public static final Option<Config> SCHEMA =
+ public static final Option<KafkaBaseOptions> SCHEMA =
Options.key("schema")
- .objectType(Config.class)
+ .objectType(KafkaBaseOptions.class)
.noDefaultValue()
.withDescription(
"The structure of the data, including field names
and field types.");
- public static final Option<MessageFormat> FORMAT =
- Options.key("format")
- .enumType(MessageFormat.class)
- .defaultValue(MessageFormat.JSON)
- .withDescription(
- "Data format. The default format is json. Optional
text format. The default field separator is \", \". "
- + "If you customize the delimiter, add the
\"field_delimiter\" option.");
-
public static final Option<Boolean> DEBEZIUM_RECORD_INCLUDE_SCHEMA =
Options.key("debezium_record_include_schema")
.booleanType()
@@ -112,34 +68,6 @@ public class Config {
.noDefaultValue()
.withDescription("Debezium record table filter.");
- public static final Option<String> FIELD_DELIMITER =
- Options.key("field_delimiter")
- .stringType()
- .defaultValue(DEFAULT_FIELD_DELIMITER)
- .withDescription("Customize the field delimiter for data
format.");
-
- public static final Option<Integer> PARTITION =
- Options.key("partition")
- .intType()
- .noDefaultValue()
- .withDescription(
- "We can specify the partition, all messages will
be sent to this partition.");
-
- public static final Option<List<String>> ASSIGN_PARTITIONS =
- Options.key("assign_partitions")
- .listType()
- .noDefaultValue()
- .withDescription(
- "We can decide which partition to send based on
the content of the message. "
- + "The function of this parameter is to
distribute information.");
-
- public static final Option<List<String>> PARTITION_KEY_FIELDS =
- Options.key("partition_key_fields")
- .listType()
- .noDefaultValue()
- .withDescription(
- "Configure which fields are used as the key of the
kafka message.");
-
public static final Option<StartMode> START_MODE =
Options.key("start_mode")
.objectType(StartMode.class)
@@ -183,24 +111,4 @@ public class Config {
"The processing method of data format error. The
default value is fail, and the optional value is (fail, skip). "
+ "When fail is selected, data format
error will block and an exception will be thrown. "
+ "When skip is selected, data format
error will skip this line data.");
-
- public static final Option<KafkaSemantics> SEMANTICS =
- Options.key("semantics")
- .enumType(KafkaSemantics.class)
- .defaultValue(KafkaSemantics.NON)
- .withDescription(
- "Semantics that can be chosen
EXACTLY_ONCE/AT_LEAST_ONCE/NON, default NON.");
-
- public static final Option<String> PROTOBUF_SCHEMA =
- Options.key("protobuf_schema")
- .stringType()
- .noDefaultValue()
- .withDescription(
- "Data serialization method protobuf metadata, used
to parse protobuf data.");
-
- public static final Option<String> PROTOBUF_MESSAGE_NAME =
- Options.key("protobuf_message_name")
- .stringType()
- .noDefaultValue()
- .withDescription("Parsing entity class names from Protobuf
data.");
}
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
index 2f6559a169..1415db7459 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
@@ -48,8 +48,8 @@ import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.PROTOBUF_MESSAGE_NAME;
-import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.PROTOBUF_SCHEMA;
+import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaBaseOptions.PROTOBUF_MESSAGE_NAME;
+import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaBaseOptions.PROTOBUF_SCHEMA;
@RequiredArgsConstructor
public class DefaultSeaTunnelRowSerializer implements SeaTunnelRowSerializer {
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java
index 4deb30f547..919a602d32 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java
@@ -26,6 +26,7 @@ import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaBaseOptions;
import
org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaAggregatedCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaSinkState;
@@ -82,7 +83,7 @@ public class KafkaSink
@Override
public String getPluginName() {
- return
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.CONNECTOR_IDENTITY;
+ return KafkaBaseOptions.CONNECTOR_IDENTITY;
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java
index ed3278602a..ae236c3634 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java
@@ -22,7 +22,7 @@ import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
-import org.apache.seatunnel.connectors.seatunnel.kafka.config.Config;
+import org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSinkOptions;
import com.google.auto.service.AutoService;
@@ -36,15 +36,15 @@ public class KafkaSinkFactory implements TableSinkFactory {
@Override
public OptionRule optionRule() {
return OptionRule.builder()
- .required(Config.TOPIC, Config.BOOTSTRAP_SERVERS)
+ .required(KafkaSinkOptions.TOPIC,
KafkaSinkOptions.BOOTSTRAP_SERVERS)
.optional(
- Config.FORMAT,
- Config.KAFKA_CONFIG,
- Config.ASSIGN_PARTITIONS,
- Config.TRANSACTION_PREFIX,
- Config.SEMANTICS,
- Config.PARTITION,
- Config.PARTITION_KEY_FIELDS)
+ KafkaSinkOptions.FORMAT,
+ KafkaSinkOptions.KAFKA_CONFIG,
+ KafkaSinkOptions.ASSIGN_PARTITIONS,
+ KafkaSinkOptions.TRANSACTION_PREFIX,
+ KafkaSinkOptions.SEMANTICS,
+ KafkaSinkOptions.PARTITION,
+ KafkaSinkOptions.PARTITION_KEY_FIELDS)
.build();
}
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
index 6639a34a0b..b367587e17 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
@@ -43,17 +43,17 @@ import java.util.Optional;
import java.util.Properties;
import java.util.Random;
-import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.ASSIGN_PARTITIONS;
-import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.BOOTSTRAP_SERVERS;
-import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.DEFAULT_FIELD_DELIMITER;
-import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.FIELD_DELIMITER;
-import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.FORMAT;
-import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.KAFKA_CONFIG;
-import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.PARTITION;
-import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.PARTITION_KEY_FIELDS;
-import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.SEMANTICS;
-import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.TOPIC;
-import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.TRANSACTION_PREFIX;
+import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSinkOptions.ASSIGN_PARTITIONS;
+import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSinkOptions.BOOTSTRAP_SERVERS;
+import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSinkOptions.DEFAULT_FIELD_DELIMITER;
+import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSinkOptions.FIELD_DELIMITER;
+import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSinkOptions.FORMAT;
+import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSinkOptions.KAFKA_CONFIG;
+import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSinkOptions.PARTITION;
+import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSinkOptions.PARTITION_KEY_FIELDS;
+import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSinkOptions.SEMANTICS;
+import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSinkOptions.TOPIC;
+import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSinkOptions.TRANSACTION_PREFIX;
/** KafkaSinkWriter is a sink writer that will write {@link SeaTunnelRow} to
Kafka. */
public class KafkaSinkWriter implements SinkWriter<SeaTunnelRow,
KafkaCommitInfo, KafkaSinkState> {
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
index 271adb8e7f..d57bf03e64 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
@@ -31,6 +31,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.constants.JobMode;
import
org.apache.seatunnel.connectors.seatunnel.common.source.reader.RecordsWithSplitIds;
import
org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderOptions;
+import org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaBaseOptions;
import
org.apache.seatunnel.connectors.seatunnel.kafka.source.fetch.KafkaSourceFetcherManager;
import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaSourceState;
@@ -64,7 +65,7 @@ public class KafkaSource
@Override
public String getPluginName() {
- return
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.CONNECTOR_IDENTITY;
+ return KafkaBaseOptions.CONNECTOR_IDENTITY;
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java
index db591cfdf0..508ba7061a 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java
@@ -64,24 +64,24 @@ import java.util.Optional;
import java.util.Properties;
import java.util.stream.Collectors;
-import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.BOOTSTRAP_SERVERS;
-import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.COMMIT_ON_CHECKPOINT;
-import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.CONSUMER_GROUP;
-import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.DEBEZIUM_RECORD_INCLUDE_SCHEMA;
-import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.DEBEZIUM_RECORD_TABLE_FILTER;
-import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.FIELD_DELIMITER;
-import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.FORMAT;
-import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.KAFKA_CONFIG;
-import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS;
-import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.KEY_POLL_TIMEOUT;
-import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.MESSAGE_FORMAT_ERROR_HANDLE_WAY_OPTION;
-import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.PATTERN;
-import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.PROTOBUF_MESSAGE_NAME;
-import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.PROTOBUF_SCHEMA;
-import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.START_MODE;
-import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.START_MODE_OFFSETS;
-import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.START_MODE_TIMESTAMP;
-import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.TOPIC;
+import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSourceOptions.BOOTSTRAP_SERVERS;
+import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSourceOptions.COMMIT_ON_CHECKPOINT;
+import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSourceOptions.CONSUMER_GROUP;
+import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSourceOptions.DEBEZIUM_RECORD_INCLUDE_SCHEMA;
+import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSourceOptions.DEBEZIUM_RECORD_TABLE_FILTER;
+import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSourceOptions.FIELD_DELIMITER;
+import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSourceOptions.FORMAT;
+import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSourceOptions.KAFKA_CONFIG;
+import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSourceOptions.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS;
+import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSourceOptions.KEY_POLL_TIMEOUT;
+import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSourceOptions.MESSAGE_FORMAT_ERROR_HANDLE_WAY_OPTION;
+import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSourceOptions.PATTERN;
+import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSourceOptions.PROTOBUF_MESSAGE_NAME;
+import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSourceOptions.PROTOBUF_SCHEMA;
+import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSourceOptions.START_MODE;
+import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSourceOptions.START_MODE_OFFSETS;
+import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSourceOptions.START_MODE_TIMESTAMP;
+import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSourceOptions.TOPIC;
public class KafkaSourceConfig implements Serializable {
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceFactory.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceFactory.java
index 0b24e7e968..e1bbf1dafc 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceFactory.java
@@ -26,7 +26,7 @@ import org.apache.seatunnel.api.table.connector.TableSource;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
-import org.apache.seatunnel.connectors.seatunnel.kafka.config.Config;
+import
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSourceOptions;
import org.apache.seatunnel.connectors.seatunnel.kafka.config.StartMode;
import com.google.auto.service.AutoService;
@@ -44,23 +44,30 @@ public class KafkaSourceFactory implements
TableSourceFactory {
@Override
public OptionRule optionRule() {
return OptionRule.builder()
- .required(Config.BOOTSTRAP_SERVERS)
+ .required(KafkaSourceOptions.BOOTSTRAP_SERVERS)
.exclusive(
- Config.TOPIC, TableSchemaOptions.TABLE_CONFIGS,
CatalogOptions.TABLE_LIST)
+ KafkaSourceOptions.TOPIC,
+ TableSchemaOptions.TABLE_CONFIGS,
+ CatalogOptions.TABLE_LIST)
.optional(
- Config.START_MODE,
- Config.PATTERN,
- Config.CONSUMER_GROUP,
- Config.COMMIT_ON_CHECKPOINT,
- Config.KAFKA_CONFIG,
- Config.SCHEMA,
- Config.FORMAT,
- Config.DEBEZIUM_RECORD_INCLUDE_SCHEMA,
- Config.DEBEZIUM_RECORD_TABLE_FILTER,
- Config.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS)
- .conditional(Config.START_MODE, StartMode.TIMESTAMP,
Config.START_MODE_TIMESTAMP)
+ KafkaSourceOptions.START_MODE,
+ KafkaSourceOptions.PATTERN,
+ KafkaSourceOptions.CONSUMER_GROUP,
+ KafkaSourceOptions.COMMIT_ON_CHECKPOINT,
+ KafkaSourceOptions.KAFKA_CONFIG,
+ KafkaSourceOptions.SCHEMA,
+ KafkaSourceOptions.FORMAT,
+ KafkaSourceOptions.DEBEZIUM_RECORD_INCLUDE_SCHEMA,
+ KafkaSourceOptions.DEBEZIUM_RECORD_TABLE_FILTER,
+
KafkaSourceOptions.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS)
.conditional(
- Config.START_MODE, StartMode.SPECIFIC_OFFSETS,
Config.START_MODE_OFFSETS)
+ KafkaSourceOptions.START_MODE,
+ StartMode.TIMESTAMP,
+ KafkaSourceOptions.START_MODE_TIMESTAMP)
+ .conditional(
+ KafkaSourceOptions.START_MODE,
+ StartMode.SPECIFIC_OFFSETS,
+ KafkaSourceOptions.START_MODE_OFFSETS)
.build();
}
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfigTest.java
b/seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfigTest.java
index d7daef4c50..cc96892d24 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfigTest.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfigTest.java
@@ -32,7 +32,7 @@ import java.util.Map;
import static
org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions.TableIdentifierOptions.DATABASE_NAME;
import static
org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions.TableIdentifierOptions.SCHEMA_NAME;
import static
org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions.TableIdentifierOptions.TABLE_NAME;
-import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.DEBEZIUM_RECORD_TABLE_FILTER;
+import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSourceOptions.DEBEZIUM_RECORD_TABLE_FILTER;
public class KafkaSourceConfigTest {