This is an automated email from the ASF dual-hosted git repository.

corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 26a2160c80 [improve] pulsar options (#9180)
26a2160c80 is described below

commit 26a2160c808af5d65554dfdf42772c0fb3d58194
Author: Jarvis <[email protected]>
AuthorDate: Sat Jun 28 22:57:59 2025 +0800

    [improve] pulsar options (#9180)
---
 .../seatunnel/api/ConnectorOptionCheckTest.java    |   2 -
 .../apache/seatunnel/common/PropertiesUtil.java    |  88 --------
 .../seatunnel/pulsar/config/PulsarBaseOptions.java |  82 +++++++
 .../seatunnel/pulsar/config/PulsarConfigUtil.java  |   8 +-
 ...{SinkProperties.java => PulsarSinkOptions.java} |  47 +---
 ...rceProperties.java => PulsarSourceOptions.java} | 113 ++--------
 .../seatunnel/pulsar/sink/PulsarSink.java          |  16 +-
 .../seatunnel/pulsar/sink/PulsarSinkFactory.java   |  33 +--
 .../seatunnel/pulsar/sink/PulsarSinkWriter.java    |  39 ++--
 .../seatunnel/pulsar/source/PulsarSource.java      | 239 +++++++--------------
 .../pulsar/source/PulsarSourceFactory.java         |  88 ++++----
 .../enumerator/cursor/start/StartCursor.java       |   5 +-
 .../cursor/start/SubscriptionStartCursor.java      |  14 +-
 .../pulsar/source/PulsarSourceFactoryTest.java     |   4 +-
 14 files changed, 286 insertions(+), 492 deletions(-)

diff --git 
a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
 
b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
index 47cbd7decd..4e5250bfb6 100644
--- 
a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
+++ 
b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
@@ -192,8 +192,6 @@ public class ConnectorOptionCheckTest {
         whiteList.add("JdbcSinkOptions");
         whiteList.add("TypesenseSourceOptions");
         whiteList.add("TypesenseSinkOptions");
-        whiteList.add("PulsarSinkOptions");
-        whiteList.add("PulsarSourceOptions");
         whiteList.add("MongodbSinkOptions");
         whiteList.add("SelectDBSinkOptions");
         whiteList.add("TablestoreSinkOptions");
diff --git 
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/PropertiesUtil.java
 
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/PropertiesUtil.java
deleted file mode 100644
index 4a3f2b6b35..0000000000
--- 
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/PropertiesUtil.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.common;
-
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.commons.lang3.StringUtils;
-
-import java.util.Properties;
-import java.util.function.Consumer;
-import java.util.function.Function;
-
-public final class PropertiesUtil {
-
-    private PropertiesUtil() {}
-
-    public static void setProperties(
-            Config config, Properties properties, String prefix, boolean 
keepPrefix) {
-        config.entrySet()
-                .forEach(
-                        entry -> {
-                            String key = entry.getKey();
-                            Object value = entry.getValue().unwrapped();
-                            if (key.startsWith(prefix)) {
-                                if (keepPrefix) {
-                                    properties.put(key, value);
-                                } else {
-                                    
properties.put(key.substring(prefix.length()), value);
-                                }
-                            }
-                        });
-    }
-
-    public static <E extends Enum<E>> E getEnum(
-            final Config conf, final String key, final Class<E> enumClass, 
final E defaultEnum) {
-        if (!conf.hasPath(key)) {
-            return defaultEnum;
-        }
-        final String value = conf.getString(key);
-        if (StringUtils.isBlank(value)) {
-            return defaultEnum;
-        }
-        return Enum.valueOf(enumClass, value.toUpperCase());
-    }
-
-    public static <T> void setOption(
-            Config config,
-            String optionName,
-            T defaultValue,
-            Function<String, T> getter,
-            Consumer<T> setter) {
-        T value;
-        if (config.hasPath(optionName)) {
-            value = getter.apply(optionName);
-        } else {
-            value = defaultValue;
-        }
-        if (value != null) {
-            setter.accept(value);
-        }
-    }
-
-    public static <T> void setOption(
-            Config config, String optionName, Function<String, T> getter, 
Consumer<T> setter) {
-        T value = null;
-        if (config.hasPath(optionName)) {
-            value = getter.apply(optionName);
-        }
-        if (value != null) {
-            setter.accept(value);
-        }
-    }
-}
diff --git 
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/config/PulsarBaseOptions.java
 
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/config/PulsarBaseOptions.java
new file mode 100644
index 0000000000..cdeb8d52da
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/config/PulsarBaseOptions.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.pulsar.config;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.api.options.ConnectorCommonOptions;
+
+public class PulsarBaseOptions extends ConnectorCommonOptions {
+
+    public static final String IDENTIFIER = "Pulsar";
+
+    public static final Option<String> TOPIC =
+            Options.key("topic")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("pulsar topic name.");
+
+    public static final Option<String> CLIENT_SERVICE_URL =
+            Options.key("client.service-url")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Service URL provider for Pulsar 
service");
+
+    public static final Option<String> ADMIN_SERVICE_URL =
+            Options.key("admin.service-url")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The Pulsar service HTTP URL for the admin 
endpoint. For example, http://my-broker.example.com:8080, or 
https://my-broker.example.com:8443 for TLS.");
+
+    public static final Option<String> AUTH_PLUGIN_CLASS =
+            Options.key("auth.plugin-class")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Name of the authentication plugin");
+
+    public static final Option<String> AUTH_PARAMS =
+            Options.key("auth.params")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Parameters for the authentication plugin. For 
example, key1:val1,key2:val2");
+
+    /** The default data format is JSON */
+    public static final String DEFAULT_FORMAT = "json";
+
+    public static final String TEXT_FORMAT = "text";
+
+    /** The default field delimiter is “,” */
+    public static final String DEFAULT_FIELD_DELIMITER = ",";
+
+    public static final Option<String> FORMAT =
+            Options.key("format")
+                    .stringType()
+                    .defaultValue(DEFAULT_FORMAT)
+                    .withDescription(
+                            "Data format. The default format is json. Optional 
text format. The default field separator is \", \". "
+                                    + "If you customize the delimiter, add the 
\"field_delimiter\" option.");
+
+    public static final Option<String> FIELD_DELIMITER =
+            Options.key("field_delimiter")
+                    .stringType()
+                    .defaultValue(DEFAULT_FIELD_DELIMITER)
+                    .withDescription(
+                            "Customize the field delimiter for data format.The 
default field_delimiter is ',' ");
+}
diff --git 
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/config/PulsarConfigUtil.java
 
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/config/PulsarConfigUtil.java
index 1b73085ec6..667bfab4bf 100644
--- 
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/config/PulsarConfigUtil.java
+++ 
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/config/PulsarConfigUtil.java
@@ -48,12 +48,8 @@ import java.util.Map;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
-import static 
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.PULSAR_CONFIG;
-
 public class PulsarConfigUtil {
 
-    public static final String IDENTIFIER = "Pulsar";
-
     private PulsarConfigUtil() {}
 
     public static PulsarAdmin createAdmin(PulsarAdminConfig config) {
@@ -172,10 +168,10 @@ public class PulsarConfigUtil {
         producerBuilder.messageRoutingMode(messageRoutingMode);
         producerBuilder.blockIfQueueFull(true);
 
-        if (pluginConfig.get(PULSAR_CONFIG) != null) {
+        if (pluginConfig.get(PulsarSinkOptions.PULSAR_CONFIG) != null) {
             Map<String, String> pulsarProperties = new HashMap<>();
             pluginConfig
-                    .get(PULSAR_CONFIG)
+                    .get(PulsarSinkOptions.PULSAR_CONFIG)
                     .forEach((key, value) -> pulsarProperties.put(key, value));
             producerBuilder.properties(pulsarProperties);
         }
diff --git 
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/config/SinkProperties.java
 
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/config/PulsarSinkOptions.java
similarity index 75%
rename from 
seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/config/SinkProperties.java
rename to 
seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/config/PulsarSinkOptions.java
index ccf59e87c3..240ec2f2ba 100644
--- 
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/config/SinkProperties.java
+++ 
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/config/PulsarSinkOptions.java
@@ -25,35 +25,19 @@ import org.apache.pulsar.client.api.MessageRoutingMode;
 import java.util.List;
 import java.util.Map;
 
-public class SinkProperties {
+public class PulsarSinkOptions extends PulsarBaseOptions {
 
-    /** The default data format is JSON */
-    public static final String DEFAULT_FORMAT = "json";
-
-    public static final String TEXT_FORMAT = "text";
-
-    /** The default field delimiter is “,” */
-    public static final String DEFAULT_FIELD_DELIMITER = ",";
-
-    public static final Option<String> FORMAT =
-            Options.key("format")
-                    .stringType()
-                    .defaultValue(DEFAULT_FORMAT)
+    public static final Option<MessageRoutingMode> MESSAGE_ROUTING_MODE =
+            Options.key("message.routing.mode")
+                    .enumType(MessageRoutingMode.class)
+                    .defaultValue(MessageRoutingMode.RoundRobinPartition)
                     .withDescription(
-                            "Data format. The default format is json. Optional 
text format. The default field separator is \", \". "
-                                    + "If you customize the delimiter, add the 
\"field_delimiter\" option.");
+                            "Default routing mode for messages to partition. "
+                                    + "If you choose SinglePartition,If no key 
is provided, The partitioned producer will randomly pick one single partition 
and publish all the messages into that partition. "
+                                    + " If a key is provided on the message, 
the partitioned producer will hash the key and assign message to a particular 
partition."
+                                    + " If you choose RoundRobinPartition,If 
no key is provided, the producer will publish messages across all partitions in 
round-robin fashion to achieve maximum throughput. "
+                                    + "Please note that round-robin is not 
done per individual message but rather it's set to the same boundary of 
batching delay, to ensure batching is effective.");
 
-    public static final Option<String> FIELD_DELIMITER =
-            Options.key("field_delimiter")
-                    .stringType()
-                    .defaultValue(DEFAULT_FIELD_DELIMITER)
-                    .withDescription(
-                            "Customize the field delimiter for data format.The 
default field_delimiter is ',' ");
-    public static final Option<String> TOPIC =
-            Options.key("topic")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription("sink pulsar topic name.");
     public static final Option<PulsarSemantics> SEMANTICS =
             Options.key("semantics")
                     .enumType(PulsarSemantics.class)
@@ -77,17 +61,6 @@ public class SinkProperties {
                                     + "the user can also specify multiple 
non-mandatory parameters for the producer or consumer client, "
                                     + "covering all the producer parameters 
specified in the official Pulsar document.");
 
-    public static final Option<MessageRoutingMode> MESSAGE_ROUTING_MODE =
-            Options.key("message.routing.mode")
-                    .enumType(MessageRoutingMode.class)
-                    .defaultValue(MessageRoutingMode.RoundRobinPartition)
-                    .withDescription(
-                            "Default routing mode for messages to partition. "
-                                    + "If you choose SinglePartition,If no key 
is provided, The partitioned producer will randomly pick one single partition 
and publish all the messages into that partition. "
-                                    + " If a key is provided on the message, 
the partitioned producer will hash the key and assign message to a particular 
partition."
-                                    + " If you choose RoundRobinPartition,If 
no key is provided, the producer will publish messages across all partitions in 
round-robin fashion to achieve maximum throughput. "
-                                    + "Please note that round-robin is not 
done per individual message but rather it's set to the same boundary of 
batching delay, to ensure batching is effective.");
-
     public static final Option<List<String>> PARTITION_KEY_FIELDS =
             Options.key("partition_key_fields")
                     .listType()
diff --git 
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/config/SourceProperties.java
 
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/config/PulsarSourceOptions.java
similarity index 59%
rename from 
seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/config/SourceProperties.java
rename to 
seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/config/PulsarSourceOptions.java
index ffa19a9a2c..d828bb1c70 100644
--- 
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/config/SourceProperties.java
+++ 
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/config/PulsarSourceOptions.java
@@ -17,57 +17,16 @@
 
 package org.apache.seatunnel.connectors.seatunnel.pulsar.config;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
 import org.apache.seatunnel.api.configuration.Option;
 import org.apache.seatunnel.api.configuration.Options;
 
-public class SourceProperties {
+public class PulsarSourceOptions extends PulsarBaseOptions {
 
     private static final Long DEFAULT_TOPIC_DISCOVERY_INTERVAL = -1L;
     private static final Integer DEFAULT_POLL_TIMEOUT = 100;
     private static final Long DEFAULT_POLL_INTERVAL = 50L;
     private static final Integer DEFAULT_POLL_BATCH_SIZE = 500;
 
-    // 
--------------------------------------------------------------------------------------------
-    // The configuration for ClientConfigurationData part.
-    // 
--------------------------------------------------------------------------------------------
-
-    public static final Option<String> CLIENT_SERVICE_URL =
-            Options.key("client.service-url")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription("Service URL provider for Pulsar 
service");
-
-    public static final Option<String> AUTH_PLUGIN_CLASS =
-            Options.key("auth.plugin-class")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription("Name of the authentication plugin");
-
-    public static final Option<String> AUTH_PARAMS =
-            Options.key("auth.params")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription(
-                            "Parameters for the authentication plugin. For 
example, key1:val1,key2:val2");
-
-    // 
--------------------------------------------------------------------------------------------
-    // The configuration for ClientConfigurationData part.
-    // All the configuration listed below should have the pulsar.client prefix.
-    // 
--------------------------------------------------------------------------------------------
-
-    public static final Option<String> ADMIN_SERVICE_URL =
-            Options.key("admin.service-url")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription(
-                            "The Pulsar service HTTP URL for the admin 
endpoint. For example, http://my-broker.example.com:8080, or 
https://my-broker.example.com:8443 for TLS.");
-
-    // 
--------------------------------------------------------------------------------------------
-    // The configuration for ConsumerConfigurationData part.
-    // 
--------------------------------------------------------------------------------------------
-
     public static final Option<String> SUBSCRIPTION_NAME =
             Options.key("subscription.name")
                     .stringType()
@@ -75,30 +34,6 @@ public class SourceProperties {
                     .withDescription(
                             "Specify the subscription name for this consumer. 
This argument is required when constructing the consumer.");
 
-    // No use parameter
-    public static final String SUBSCRIPTION_TYPE = "subscription.type";
-    public static final String SUBSCRIPTION_MODE = "subscription.mode";
-
-    // 
--------------------------------------------------------------------------------------------
-    // The configuration for pulsar source part.
-    // 
--------------------------------------------------------------------------------------------
-
-    public static final Option<Long> TOPIC_DISCOVERY_INTERVAL =
-            Options.key("topic-discovery.interval")
-                    .longType()
-                    .defaultValue(DEFAULT_TOPIC_DISCOVERY_INTERVAL)
-                    .withDescription(
-                            "Default value is "
-                                    + DEFAULT_TOPIC_DISCOVERY_INTERVAL
-                                    + ". The interval (in ms) for the Pulsar 
source to discover the new topic partitions. A non-positive value disables the 
topic partition discovery. Note, This option only works if the 'topic-pattern' 
option is used.");
-
-    public static final Option<String> TOPIC =
-            Options.key("topic")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription(
-                            "Topic name(s) to read data from when the table is 
used as source. It also supports topic list for source by separating topic by 
semicolon like 'topic-1;topic-2'. Note, only one of \"topic-pattern\" and 
\"topic\" can be specified for sources.");
-
     public static final Option<String> TOPIC_PATTERN =
             Options.key("topic-pattern")
                     .stringType()
@@ -133,16 +68,16 @@ public class SourceProperties {
                                     + DEFAULT_POLL_BATCH_SIZE
                                     + ". The maximum number of records to 
fetch to wait when polling. A longer time increases throughput but also 
latency");
 
-    public static final Option<SourceProperties.StartMode> CURSOR_STARTUP_MODE 
=
+    public static final Option<StartMode> CURSOR_STARTUP_MODE =
             Options.key("cursor.startup.mode")
-                    .enumType(SourceProperties.StartMode.class)
+                    .enumType(StartMode.class)
                     .defaultValue(StartMode.LATEST)
                     .withDescription(
                             "Startup mode for Pulsar consumer, valid values 
are 'EARLIEST', 'LATEST', 'SUBSCRIPTION', 'TIMESTAMP'.");
 
-    public static final Option<SourceProperties.StartMode> CURSOR_RESET_MODE =
+    public static final Option<CursorResetStrategy> CURSOR_RESET_MODE =
             Options.key("cursor.reset.mode")
-                    .enumType(SourceProperties.StartMode.class)
+                    .enumType(CursorResetStrategy.class)
                     .noDefaultValue()
                     .withDescription(
                             "Cursor reset strategy for Pulsar consumer valid 
values are 'EARLIEST', 'LATEST'. Note, This option only works if the 
\"cursor.startup.mode\" option used 'SUBSCRIPTION'.");
@@ -154,12 +89,9 @@ public class SourceProperties {
                     .withDescription(
                             "Start from the specified epoch timestamp (in 
milliseconds). Note, This option is required when the \"cursor.startup.mode\" 
option used 'TIMESTAMP'.");
 
-    // No use parameter
-    public static final String CURSOR_STARTUP_ID = "cursor.startup.id";
-
-    public static final Option<SourceProperties.StopMode> CURSOR_STOP_MODE =
+    public static final Option<StopMode> CURSOR_STOP_MODE =
             Options.key("cursor.stop.mode")
-                    .enumType(SourceProperties.StopMode.class)
+                    .enumType(StopMode.class)
                     .defaultValue(StopMode.NEVER)
                     .withDescription(
                             "Stop mode for Pulsar consumer, valid values are 
'NEVER', 'LATEST' and 'TIMESTAMP'. Note, When 'NEVER' is specified, it is a 
real-time job, and other mode are off-line jobs.");
@@ -170,26 +102,14 @@ public class SourceProperties {
                     .noDefaultValue()
                     .withDescription("Stop from the specified epoch timestamp 
(in milliseconds)");
 
-    public static final Option<Config> SCHEMA =
-            Options.key("schema")
-                    .objectType(Config.class)
-                    .noDefaultValue()
-                    .withDescription(
-                            "The structure of the data, including field names 
and field types.");
-
-    public static final Option<String> FORMAT =
-            Options.key("format")
-                    .stringType()
-                    .defaultValue("JSON")
+    public static final Option<Long> TOPIC_DISCOVERY_INTERVAL =
+            Options.key("topic-discovery.interval")
+                    .longType()
+                    .defaultValue(DEFAULT_TOPIC_DISCOVERY_INTERVAL)
                     .withDescription(
-                            "Data format. The default format is json. Optional 
text format. The default field separator is \", \". "
-                                    + "If you customize the delimiter, add the 
\"field_delimiter\" option.");
-
-    public static final Option<String> FIELD_DELIMITER =
-            Options.key("field_delimiter")
-                    .stringType()
-                    .defaultValue(",")
-                    .withDescription("Customize the field delimiter for data 
format.");
+                            "Default value is "
+                                    + DEFAULT_TOPIC_DISCOVERY_INTERVAL
+                                    + ". The interval (in ms) for the Pulsar 
source to discover the new topic partitions. A non-positive value disables the 
topic partition discovery. Note, This option only works if the 'topic-pattern' 
option is used.");
 
     /** Startup mode for the pulsar consumer, see {@link 
#CURSOR_STARTUP_MODE}. */
     public enum StartMode {
@@ -215,4 +135,9 @@ public class SourceProperties {
         SPECIFIC,
         NEVER
     }
+
+    public enum CursorResetStrategy {
+        LATEST,
+        EARLIEST
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/sink/PulsarSink.java
 
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/sink/PulsarSink.java
index 989e24b024..ef5aeb0c57 100644
--- 
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/sink/PulsarSink.java
+++ 
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/sink/PulsarSink.java
@@ -27,7 +27,7 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import 
org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarClientConfig;
-import 
org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarConfigUtil;
+import 
org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarSinkOptions;
 import 
org.apache.seatunnel.connectors.seatunnel.pulsar.state.PulsarAggregatedCommitInfo;
 import org.apache.seatunnel.connectors.seatunnel.pulsar.state.PulsarCommitInfo;
 import org.apache.seatunnel.connectors.seatunnel.pulsar.state.PulsarSinkState;
@@ -36,10 +36,6 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 
-import static 
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.AUTH_PARAMS;
-import static 
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.AUTH_PLUGIN_CLASS;
-import static 
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.CLIENT_SERVICE_URL;
-
 /**
  * Pulsar Sink implementation by using SeaTunnel sink API. This class contains 
the method to create
  * {@link PulsarSinkWriter} and {@link PulsarSinkCommitter}.
@@ -60,9 +56,11 @@ public class PulsarSink
 
         /** client config */
         PulsarClientConfig.Builder clientConfigBuilder =
-                
PulsarClientConfig.builder().serviceUrl(readonlyConfig.get(CLIENT_SERVICE_URL));
-        
clientConfigBuilder.authPluginClassName(readonlyConfig.get(AUTH_PLUGIN_CLASS));
-        clientConfigBuilder.authParams(readonlyConfig.get(AUTH_PARAMS));
+                PulsarClientConfig.builder()
+                        
.serviceUrl(readonlyConfig.get(PulsarSinkOptions.CLIENT_SERVICE_URL));
+        clientConfigBuilder.authPluginClassName(
+                readonlyConfig.get(PulsarSinkOptions.AUTH_PLUGIN_CLASS));
+        
clientConfigBuilder.authParams(readonlyConfig.get(PulsarSinkOptions.AUTH_PARAMS));
         this.clientConfig = clientConfigBuilder.build();
     }
 
@@ -97,7 +95,7 @@ public class PulsarSink
 
     @Override
     public String getPluginName() {
-        return PulsarConfigUtil.IDENTIFIER;
+        return PulsarSinkOptions.IDENTIFIER;
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/sink/PulsarSinkFactory.java
 
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/sink/PulsarSinkFactory.java
index 7781ba7b94..6317c55f76 100644
--- 
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/sink/PulsarSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/sink/PulsarSinkFactory.java
@@ -22,32 +22,37 @@ import org.apache.seatunnel.api.table.connector.TableSink;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.TableSinkFactory;
 import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
-import 
org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarConfigUtil;
+import 
org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarSinkOptions;
 
 import com.google.auto.service.AutoService;
 
-import static 
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.FIELD_DELIMITER;
-import static 
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.FORMAT;
-import static 
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.MESSAGE_ROUTING_MODE;
-import static 
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.TOPIC;
-import static 
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.ADMIN_SERVICE_URL;
-import static 
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.AUTH_PARAMS;
-import static 
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.AUTH_PLUGIN_CLASS;
-import static 
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.CLIENT_SERVICE_URL;
-
 @AutoService(Factory.class)
 public class PulsarSinkFactory implements TableSinkFactory {
     @Override
     public String factoryIdentifier() {
-        return PulsarConfigUtil.IDENTIFIER;
+        return PulsarSinkOptions.IDENTIFIER;
     }
 
     @Override
     public OptionRule optionRule() {
         return OptionRule.builder()
-                .required(CLIENT_SERVICE_URL, ADMIN_SERVICE_URL, TOPIC)
-                .optional(FORMAT, FIELD_DELIMITER, MESSAGE_ROUTING_MODE)
-                .bundled(AUTH_PLUGIN_CLASS, AUTH_PARAMS)
+                .required(
+                        PulsarSinkOptions.CLIENT_SERVICE_URL,
+                        PulsarSinkOptions.ADMIN_SERVICE_URL,
+                        PulsarSinkOptions.TOPIC)
+                .optional(
+                        PulsarSinkOptions.FORMAT,
+                        PulsarSinkOptions.FIELD_DELIMITER,
+                        PulsarSinkOptions.MESSAGE_ROUTING_MODE,
+                        PulsarSinkOptions.SEMANTICS,
+                        PulsarSinkOptions.TRANSACTION_TIMEOUT,
+                        PulsarSinkOptions.PULSAR_CONFIG,
+                        PulsarSinkOptions.PARTITION_KEY_FIELDS)
+                .conditional(
+                        PulsarSinkOptions.FORMAT,
+                        PulsarSinkOptions.TEXT_FORMAT,
+                        PulsarSinkOptions.FIELD_DELIMITER)
+                .bundled(PulsarSinkOptions.AUTH_PLUGIN_CLASS, 
PulsarSinkOptions.AUTH_PARAMS)
                 .build();
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/sink/PulsarSinkWriter.java
 
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/sink/PulsarSinkWriter.java
index 5a5c28fb53..26f35fab99 100644
--- 
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/sink/PulsarSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/sink/PulsarSinkWriter.java
@@ -30,6 +30,7 @@ import 
org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
 import 
org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarClientConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarConfigUtil;
 import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarSemantics;
+import 
org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarSinkOptions;
 import 
org.apache.seatunnel.connectors.seatunnel.pulsar.exception.PulsarConnectorErrorCode;
 import 
org.apache.seatunnel.connectors.seatunnel.pulsar.exception.PulsarConnectorException;
 import org.apache.seatunnel.connectors.seatunnel.pulsar.state.PulsarCommitInfo;
@@ -55,27 +56,16 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Function;
 
-import static 
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.DEFAULT_FORMAT;
-import static 
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.FIELD_DELIMITER;
-import static 
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.FORMAT;
-import static 
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.MESSAGE_ROUTING_MODE;
-import static 
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.PARTITION_KEY_FIELDS;
-import static 
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.SEMANTICS;
-import static 
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.TEXT_FORMAT;
-import static 
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.TOPIC;
-import static 
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.TRANSACTION_TIMEOUT;
-
 public class PulsarSinkWriter
         implements SinkWriter<SeaTunnelRow, PulsarCommitInfo, PulsarSinkState> 
{
 
-    private Context context;
     private Producer<byte[]> producer;
     private PulsarClient pulsarClient;
     private SerializationSchema serializationSchema;
     private SerializationSchema keySerializationSchema;
     private TransactionImpl transaction;
-    private int transactionTimeout = TRANSACTION_TIMEOUT.defaultValue();
-    private PulsarSemantics pulsarSemantics = SEMANTICS.defaultValue();
+    private int transactionTimeout;
+    private PulsarSemantics pulsarSemantics;
     private final AtomicLong pendingMessages;
 
     public PulsarSinkWriter(
@@ -84,13 +74,13 @@ public class PulsarSinkWriter
             SeaTunnelRowType seaTunnelRowType,
             ReadonlyConfig pluginConfig,
             List<PulsarSinkState> pulsarStates) {
-        this.context = context;
-        String topic = pluginConfig.get(TOPIC);
-        String format = pluginConfig.get(FORMAT);
-        String delimiter = pluginConfig.get(FIELD_DELIMITER);
-        Integer transactionTimeout = pluginConfig.get(TRANSACTION_TIMEOUT);
-        PulsarSemantics pulsarSemantics = pluginConfig.get(SEMANTICS);
-        MessageRoutingMode messageRoutingMode = 
pluginConfig.get(MESSAGE_ROUTING_MODE);
+        String topic = pluginConfig.get(PulsarSinkOptions.TOPIC);
+        String format = pluginConfig.get(PulsarSinkOptions.FORMAT);
+        String delimiter = pluginConfig.get(PulsarSinkOptions.FIELD_DELIMITER);
+        this.transactionTimeout = 
pluginConfig.get(PulsarSinkOptions.TRANSACTION_TIMEOUT);
+        this.pulsarSemantics = pluginConfig.get(PulsarSinkOptions.SEMANTICS);
+        MessageRoutingMode messageRoutingMode =
+                pluginConfig.get(PulsarSinkOptions.MESSAGE_ROUTING_MODE);
         this.serializationSchema = createSerializationSchema(seaTunnelRowType, 
format, delimiter);
         List<String> partitionKeyList = getPartitionKeyFields(pluginConfig, 
seaTunnelRowType);
         this.keySerializationSchema =
@@ -201,9 +191,9 @@ public class PulsarSinkWriter
 
     private SerializationSchema createSerializationSchema(
             SeaTunnelRowType rowType, String format, String delimiter) {
-        if (DEFAULT_FORMAT.equals(format)) {
+        if (PulsarSinkOptions.DEFAULT_FORMAT.equals(format)) {
             return new JsonSerializationSchema(rowType);
-        } else if (TEXT_FORMAT.equals(format)) {
+        } else if (PulsarSinkOptions.TEXT_FORMAT.equals(format)) {
             return TextSerializationSchema.builder()
                     .seaTunnelRowType(rowType)
                     .delimiter(delimiter)
@@ -244,8 +234,9 @@ public class PulsarSinkWriter
 
     private List<String> getPartitionKeyFields(
             ReadonlyConfig pluginConfig, SeaTunnelRowType seaTunnelRowType) {
-        if (pluginConfig.get(PARTITION_KEY_FIELDS) != null) {
-            List<String> partitionKeyFields = 
pluginConfig.get(PARTITION_KEY_FIELDS);
+        if 
(pluginConfig.getOptional(PulsarSinkOptions.PARTITION_KEY_FIELDS).isPresent()) {
+            List<String> partitionKeyFields =
+                    pluginConfig.get(PulsarSinkOptions.PARTITION_KEY_FIELDS);
             List<String> rowTypeFieldNames = 
Arrays.asList(seaTunnelRowType.getFieldNames());
             for (String partitionKeyField : partitionKeyFields) {
                 if (!rowTypeFieldNames.contains(partitionKeyField)) {
diff --git 
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/PulsarSource.java
 
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/PulsarSource.java
index b998d85762..b82ab2f55a 100644
--- 
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/PulsarSource.java
+++ 
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/PulsarSource.java
@@ -17,10 +17,8 @@
 
 package org.apache.seatunnel.connectors.seatunnel.pulsar.source;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.PrepareFailException;
 import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.serialization.DeserializationSchema;
 import org.apache.seatunnel.api.source.Boundedness;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
@@ -28,23 +26,16 @@ import org.apache.seatunnel.api.source.SourceReader;
 import org.apache.seatunnel.api.source.SourceSplitEnumerator;
 import org.apache.seatunnel.api.source.SupportParallelism;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
-import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-import org.apache.seatunnel.common.config.CheckResult;
-import org.apache.seatunnel.common.constants.PluginType;
 import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
 import 
org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarAdminConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarClientConfig;
-import 
org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarConfigUtil;
 import 
org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarConsumerConfig;
-import 
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties;
+import 
org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarSourceOptions;
 import 
org.apache.seatunnel.connectors.seatunnel.pulsar.exception.PulsarConnectorException;
 import 
org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.PulsarSplitEnumerator;
 import 
org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.PulsarSplitEnumeratorState;
 import 
org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.cursor.start.StartCursor;
-import 
org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.cursor.start.SubscriptionStartCursor;
 import 
org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.cursor.stop.NeverStopCursor;
 import 
org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.cursor.stop.StopCursor;
 import 
org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.discoverer.PulsarDiscoverer;
@@ -59,41 +50,20 @@ import 
org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
 
 import org.apache.pulsar.shade.org.apache.commons.lang3.StringUtils;
 
-import com.google.auto.service.AutoService;
-
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
 import java.util.regex.Pattern;
 
-import static org.apache.seatunnel.common.PropertiesUtil.getEnum;
-import static org.apache.seatunnel.common.PropertiesUtil.setOption;
-import static 
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.ADMIN_SERVICE_URL;
-import static 
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.AUTH_PARAMS;
-import static 
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.AUTH_PLUGIN_CLASS;
-import static 
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.CLIENT_SERVICE_URL;
-import static 
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.CURSOR_RESET_MODE;
-import static 
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.CURSOR_STARTUP_MODE;
-import static 
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.CURSOR_STARTUP_TIMESTAMP;
-import static 
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.CURSOR_STOP_MODE;
-import static 
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.CURSOR_STOP_TIMESTAMP;
-import static 
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.FORMAT;
-import static 
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.POLL_BATCH_SIZE;
-import static 
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.POLL_INTERVAL;
-import static 
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.POLL_TIMEOUT;
-import static 
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.SCHEMA;
-import static 
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.SUBSCRIPTION_NAME;
-import static 
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.StartMode;
-import static 
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.TOPIC;
-import static 
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.TOPIC_DISCOVERY_INTERVAL;
-import static 
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.TOPIC_PATTERN;
+import static 
org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarSourceOptions.CURSOR_STARTUP_MODE;
+import static 
org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarSourceOptions.CURSOR_STOP_MODE;
 
-@AutoService(SeaTunnelSource.class)
 public class PulsarSource
         implements SeaTunnelSource<SeaTunnelRow, PulsarPartitionSplit, 
PulsarSplitEnumeratorState>,
                 SupportParallelism {
 
     private DeserializationSchema<SeaTunnelRow> deserializationSchema;
-
-    private SeaTunnelRowType typeInfo;
+    private CatalogTable catalogTable;
 
     private PulsarAdminConfig adminConfig;
     private PulsarClientConfig clientConfig;
@@ -107,80 +77,36 @@ public class PulsarSource
     protected long pollInterval;
     protected int batchSize;
 
-    @Override
-    public String getPluginName() {
-        return PulsarConfigUtil.IDENTIFIER;
-    }
-
-    @Override
-    public void prepare(Config config) throws PrepareFailException {
-        CheckResult result =
-                CheckConfigUtil.checkAllExists(
-                        config,
-                        SUBSCRIPTION_NAME.key(),
-                        CLIENT_SERVICE_URL.key(),
-                        ADMIN_SERVICE_URL.key());
-        if (!result.isSuccess()) {
-            throw new PulsarConnectorException(
-                    SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
-                    String.format(
-                            "PluginName: %s, PluginType: %s, Message: %s",
-                            getPluginName(), PluginType.SOURCE, 
result.getMsg()));
-        }
-
+    public PulsarSource(ReadonlyConfig config, CatalogTable catalogTable) {
+        this.catalogTable = catalogTable;
         // admin config
         PulsarAdminConfig.Builder adminConfigBuilder =
-                
PulsarAdminConfig.builder().adminUrl(config.getString(ADMIN_SERVICE_URL.key()));
-        setOption(
-                config,
-                AUTH_PLUGIN_CLASS.key(),
-                config::getString,
-                adminConfigBuilder::authPluginClassName);
-        setOption(config, AUTH_PARAMS.key(), config::getString, 
adminConfigBuilder::authParams);
+                PulsarAdminConfig.builder()
+                        
.adminUrl(config.get(PulsarSourceOptions.ADMIN_SERVICE_URL));
+        
adminConfigBuilder.authPluginClassName(config.get(PulsarSourceOptions.AUTH_PLUGIN_CLASS));
+        
adminConfigBuilder.authParams(config.get(PulsarSourceOptions.AUTH_PARAMS));
         this.adminConfig = adminConfigBuilder.build();
 
         // client config
         PulsarClientConfig.Builder clientConfigBuilder =
-                
PulsarClientConfig.builder().serviceUrl(config.getString(CLIENT_SERVICE_URL.key()));
-        setOption(
-                config,
-                AUTH_PLUGIN_CLASS.key(),
-                config::getString,
-                clientConfigBuilder::authPluginClassName);
-        setOption(config, AUTH_PARAMS.key(), config::getString, 
clientConfigBuilder::authParams);
+                PulsarClientConfig.builder()
+                        
.serviceUrl(config.get(PulsarSourceOptions.CLIENT_SERVICE_URL));
+        
clientConfigBuilder.authPluginClassName(config.get(PulsarSourceOptions.AUTH_PLUGIN_CLASS));
+        
clientConfigBuilder.authParams(config.get(PulsarSourceOptions.AUTH_PARAMS));
         this.clientConfig = clientConfigBuilder.build();
 
         // consumer config
         PulsarConsumerConfig.Builder consumerConfigBuilder =
                 PulsarConsumerConfig.builder()
-                        
.subscriptionName(config.getString(SUBSCRIPTION_NAME.key()));
+                        
.subscriptionName(config.get(PulsarSourceOptions.SUBSCRIPTION_NAME));
         this.consumerConfig = consumerConfigBuilder.build();
 
         // source properties
-        setOption(
-                config,
-                TOPIC_DISCOVERY_INTERVAL.key(),
-                TOPIC_DISCOVERY_INTERVAL.defaultValue(),
-                config::getLong,
-                v -> this.partitionDiscoveryIntervalMs = v);
-        setOption(
-                config,
-                POLL_TIMEOUT.key(),
-                POLL_TIMEOUT.defaultValue(),
-                config::getInt,
-                v -> this.pollTimeout = v);
-        setOption(
-                config,
-                POLL_INTERVAL.key(),
-                POLL_INTERVAL.defaultValue(),
-                config::getLong,
-                v -> this.pollInterval = v);
-        setOption(
-                config,
-                POLL_BATCH_SIZE.key(),
-                POLL_BATCH_SIZE.defaultValue(),
-                config::getInt,
-                v -> this.batchSize = v);
+        this.partitionDiscoveryIntervalMs =
+                config.get(PulsarSourceOptions.TOPIC_DISCOVERY_INTERVAL);
+        this.pollTimeout = config.get(PulsarSourceOptions.POLL_TIMEOUT);
+        this.pollInterval = config.get(PulsarSourceOptions.POLL_INTERVAL);
+        this.batchSize = config.get(PulsarSourceOptions.POLL_BATCH_SIZE);
 
         setStartCursor(config);
         setStopCursor(config);
@@ -196,13 +122,13 @@ public class PulsarSource
         }
     }
 
-    private void setStartCursor(Config config) {
-        StartMode startMode =
-                getEnum(
-                        config,
-                        CURSOR_STARTUP_MODE.key(),
-                        StartMode.class,
-                        CURSOR_STARTUP_MODE.defaultValue());
+    @Override
+    public String getPluginName() {
+        return PulsarSourceOptions.IDENTIFIER;
+    }
+
+    private void setStartCursor(ReadonlyConfig config) {
+        PulsarSourceOptions.StartMode startMode = 
config.get(CURSOR_STARTUP_MODE);
         switch (startMode) {
             case EARLIEST:
                 this.startCursor = StartCursor.earliest();
@@ -211,27 +137,22 @@ public class PulsarSource
                 this.startCursor = StartCursor.latest();
                 break;
             case SUBSCRIPTION:
-                SubscriptionStartCursor.CursorResetStrategy resetStrategy =
-                        getEnum(
-                                config,
-                                CURSOR_RESET_MODE.key(),
-                                
SubscriptionStartCursor.CursorResetStrategy.class,
-                                
SubscriptionStartCursor.CursorResetStrategy.LATEST);
+                PulsarSourceOptions.CursorResetStrategy resetStrategy =
+                        config.get(PulsarSourceOptions.CURSOR_RESET_MODE);
                 this.startCursor = StartCursor.subscription(resetStrategy);
                 break;
             case TIMESTAMP:
-                if 
(StringUtils.isBlank(config.getString(CURSOR_STARTUP_TIMESTAMP.key()))) {
+                if 
(!config.getOptional(PulsarSourceOptions.CURSOR_STARTUP_TIMESTAMP).isPresent()) 
{
                     throw new PulsarConnectorException(
                             SeaTunnelAPIErrorCode.OPTION_VALIDATION_FAILED,
                             String.format(
                                     "The '%s' property is required when the 
'%s' is 'timestamp'.",
-                                    CURSOR_STARTUP_TIMESTAMP.key(), 
CURSOR_STARTUP_MODE.key()));
+                                    
PulsarSourceOptions.CURSOR_STARTUP_TIMESTAMP.key(),
+                                    CURSOR_STARTUP_MODE.key()));
                 }
-                setOption(
-                        config,
-                        CURSOR_STARTUP_TIMESTAMP.key(),
-                        config::getLong,
-                        timestamp -> this.startCursor = 
StartCursor.timestamp(timestamp));
+                this.startCursor =
+                        StartCursor.timestamp(
+                                
config.get(PulsarSourceOptions.CURSOR_STARTUP_TIMESTAMP));
                 break;
             default:
                 throw new PulsarConnectorException(
@@ -240,13 +161,8 @@ public class PulsarSource
         }
     }
 
-    private void setStopCursor(Config config) {
-        SourceProperties.StopMode stopMode =
-                getEnum(
-                        config,
-                        CURSOR_STOP_MODE.key(),
-                        SourceProperties.StopMode.class,
-                        CURSOR_STOP_MODE.defaultValue());
+    private void setStopCursor(ReadonlyConfig config) {
+        PulsarSourceOptions.StopMode stopMode = config.get(CURSOR_STOP_MODE);
         switch (stopMode) {
             case LATEST:
                 this.stopCursor = StopCursor.latest();
@@ -255,18 +171,16 @@ public class PulsarSource
                 this.stopCursor = StopCursor.never();
                 break;
             case TIMESTAMP:
-                if 
(StringUtils.isBlank(config.getString(CURSOR_STOP_TIMESTAMP.key()))) {
+                if 
(!config.getOptional(PulsarSourceOptions.CURSOR_STOP_TIMESTAMP).isPresent()) {
                     throw new PulsarConnectorException(
                             SeaTunnelAPIErrorCode.OPTION_VALIDATION_FAILED,
                             String.format(
                                     "The '%s' property is required when the 
'%s' is 'timestamp'.",
-                                    CURSOR_STOP_TIMESTAMP.key(), 
CURSOR_STOP_MODE.key()));
+                                    
PulsarSourceOptions.CURSOR_STOP_TIMESTAMP.key(),
+                                    CURSOR_STOP_MODE.key()));
                 }
-                setOption(
-                        config,
-                        CURSOR_STARTUP_TIMESTAMP.key(),
-                        config::getLong,
-                        timestamp -> this.stopCursor = 
StopCursor.timestamp(timestamp));
+                this.stopCursor =
+                        
StopCursor.timestamp(config.get(PulsarSourceOptions.CURSOR_STOP_TIMESTAMP));
                 break;
             default:
                 throw new PulsarConnectorException(
@@ -275,16 +189,16 @@ public class PulsarSource
         }
     }
 
-    private void setPartitionDiscoverer(Config config) {
-        if (config.hasPath(TOPIC.key())) {
-            String topic = config.getString(TOPIC.key());
+    private void setPartitionDiscoverer(ReadonlyConfig config) {
+        if (config.getOptional(PulsarSourceOptions.TOPIC).isPresent()) {
+            String topic = config.get(PulsarSourceOptions.TOPIC);
             if (StringUtils.isNotBlank(topic)) {
                 this.partitionDiscoverer =
                         new 
TopicListDiscoverer(Arrays.asList(StringUtils.split(topic, ",")));
             }
         }
-        if (config.hasPath(TOPIC_PATTERN.key())) {
-            String topicPattern = config.getString(TOPIC_PATTERN.key());
+        if (config.getOptional(PulsarSourceOptions.TOPIC_PATTERN).isPresent()) 
{
+            String topicPattern = 
config.get(PulsarSourceOptions.TOPIC_PATTERN);
             if (StringUtils.isNotBlank(topicPattern)) {
                 this.partitionDiscoverer =
                         new 
TopicPatternDiscoverer(Pattern.compile(topicPattern));
@@ -295,37 +209,30 @@ public class PulsarSource
                     SeaTunnelAPIErrorCode.OPTION_VALIDATION_FAILED,
                     String.format(
                             "The properties '%s' or '%s' is required.",
-                            TOPIC.key(), TOPIC_PATTERN.key()));
+                            PulsarSourceOptions.TOPIC.key(),
+                            PulsarSourceOptions.TOPIC_PATTERN.key()));
         }
     }
 
-    private void setDeserialization(Config config) {
-        if (config.hasPath(SCHEMA.key())) {
-            CatalogTable catalogTable = 
CatalogTableUtil.buildWithConfig(config);
-            typeInfo = catalogTable.getSeaTunnelRowType();
-            String format = FORMAT.defaultValue();
-            if (config.hasPath(FORMAT.key())) {
-                format = config.getString(FORMAT.key());
-            }
-            switch (format.toUpperCase()) {
-                case "JSON":
-                    deserializationSchema = new 
JsonDeserializationSchema(false, false, typeInfo);
-                    break;
-                case "CANAL_JSON":
-                    deserializationSchema =
-                            new PulsarCanalDecorator(
-                                    
CanalJsonDeserializationSchema.builder(catalogTable)
-                                            .setIgnoreParseErrors(true)
-                                            .build());
-                    break;
-                default:
-                    throw new SeaTunnelJsonFormatException(
-                            CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
-                            "Unsupported format: " + format);
-            }
-        } else {
-            typeInfo = CatalogTableUtil.buildSimpleTextSchema();
-            this.deserializationSchema = new JsonDeserializationSchema(false, 
false, typeInfo);
+    private void setDeserialization(ReadonlyConfig config) {
+        String format = config.get(PulsarSourceOptions.FORMAT);
+        switch (format.toUpperCase()) {
+            case "JSON":
+                this.deserializationSchema =
+                        new JsonDeserializationSchema(
+                                false, false, 
catalogTable.getSeaTunnelRowType());
+                break;
+            case "CANAL_JSON":
+                this.deserializationSchema =
+                        new PulsarCanalDecorator(
+                                
CanalJsonDeserializationSchema.builder(catalogTable)
+                                        .setIgnoreParseErrors(true)
+                                        .build());
+                break;
+            default:
+                throw new SeaTunnelJsonFormatException(
+                        CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
+                        "Unsupported format: " + format);
         }
     }
 
@@ -337,8 +244,8 @@ public class PulsarSource
     }
 
     @Override
-    public SeaTunnelRowType getProducedType() {
-        return this.typeInfo;
+    public List<CatalogTable> getProducedCatalogTables() {
+        return Collections.singletonList(catalogTable);
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/PulsarSourceFactory.java
 
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/PulsarSourceFactory.java
index 876da0314b..3c746a024b 100644
--- 
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/PulsarSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/PulsarSourceFactory.java
@@ -18,65 +18,61 @@
 package org.apache.seatunnel.connectors.seatunnel.pulsar.source;
 
 import org.apache.seatunnel.api.configuration.util.OptionRule;
-import org.apache.seatunnel.api.options.ConnectorCommonOptions;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
+import org.apache.seatunnel.api.table.connector.TableSource;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.TableSourceFactory;
-import 
org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarConfigUtil;
-import 
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties;
+import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
+import 
org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarSourceOptions;
 
 import com.google.auto.service.AutoService;
 
-import static 
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.ADMIN_SERVICE_URL;
-import static 
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.AUTH_PARAMS;
-import static 
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.AUTH_PLUGIN_CLASS;
-import static 
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.CLIENT_SERVICE_URL;
-import static 
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.CURSOR_RESET_MODE;
-import static 
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.CURSOR_STARTUP_MODE;
-import static 
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.CURSOR_STARTUP_TIMESTAMP;
-import static 
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.CURSOR_STOP_MODE;
-import static 
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.CURSOR_STOP_TIMESTAMP;
-import static 
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.POLL_BATCH_SIZE;
-import static 
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.POLL_INTERVAL;
-import static 
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.POLL_TIMEOUT;
-import static 
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.SUBSCRIPTION_NAME;
-import static 
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.TOPIC;
-import static 
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.TOPIC_DISCOVERY_INTERVAL;
-import static 
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.TOPIC_PATTERN;
+import java.io.Serializable;
 
 @AutoService(Factory.class)
 public class PulsarSourceFactory implements TableSourceFactory {
     @Override
     public String factoryIdentifier() {
-        return PulsarConfigUtil.IDENTIFIER;
+        return PulsarSourceOptions.IDENTIFIER;
     }
 
     @Override
     public OptionRule optionRule() {
         return OptionRule.builder()
-                .required(SUBSCRIPTION_NAME, CLIENT_SERVICE_URL, 
ADMIN_SERVICE_URL)
+                .required(
+                        PulsarSourceOptions.SUBSCRIPTION_NAME,
+                        PulsarSourceOptions.CLIENT_SERVICE_URL,
+                        PulsarSourceOptions.ADMIN_SERVICE_URL)
                 .optional(
-                        CURSOR_STARTUP_MODE,
-                        CURSOR_STOP_MODE,
-                        TOPIC_DISCOVERY_INTERVAL,
-                        POLL_TIMEOUT,
-                        POLL_INTERVAL,
-                        POLL_BATCH_SIZE,
-                        ConnectorCommonOptions.SCHEMA)
-                .exclusive(TOPIC, TOPIC_PATTERN)
+                        PulsarSourceOptions.CURSOR_STARTUP_MODE,
+                        PulsarSourceOptions.CURSOR_STOP_MODE,
+                        PulsarSourceOptions.TOPIC_DISCOVERY_INTERVAL,
+                        PulsarSourceOptions.POLL_TIMEOUT,
+                        PulsarSourceOptions.POLL_INTERVAL,
+                        PulsarSourceOptions.POLL_BATCH_SIZE,
+                        PulsarSourceOptions.FORMAT,
+                        PulsarSourceOptions.SCHEMA)
+                .exclusive(PulsarSourceOptions.TOPIC, 
PulsarSourceOptions.TOPIC_PATTERN)
                 .conditional(
-                        CURSOR_STARTUP_MODE,
-                        SourceProperties.StartMode.TIMESTAMP,
-                        CURSOR_STARTUP_TIMESTAMP)
+                        PulsarSourceOptions.FORMAT,
+                        PulsarSourceOptions.TEXT_FORMAT,
+                        PulsarSourceOptions.FIELD_DELIMITER)
                 .conditional(
-                        CURSOR_STARTUP_MODE,
-                        SourceProperties.StartMode.SUBSCRIPTION,
-                        CURSOR_RESET_MODE)
+                        PulsarSourceOptions.CURSOR_STARTUP_MODE,
+                        PulsarSourceOptions.StartMode.TIMESTAMP,
+                        PulsarSourceOptions.CURSOR_STARTUP_TIMESTAMP)
                 .conditional(
-                        CURSOR_STOP_MODE,
-                        SourceProperties.StopMode.TIMESTAMP,
-                        CURSOR_STOP_TIMESTAMP)
-                .bundled(AUTH_PLUGIN_CLASS, AUTH_PARAMS)
+                        PulsarSourceOptions.CURSOR_STARTUP_MODE,
+                        PulsarSourceOptions.StartMode.SUBSCRIPTION,
+                        PulsarSourceOptions.CURSOR_RESET_MODE)
+                .conditional(
+                        PulsarSourceOptions.CURSOR_STOP_MODE,
+                        PulsarSourceOptions.StopMode.TIMESTAMP,
+                        PulsarSourceOptions.CURSOR_STOP_TIMESTAMP)
+                .bundled(PulsarSourceOptions.AUTH_PLUGIN_CLASS, 
PulsarSourceOptions.AUTH_PARAMS)
                 .build();
     }
 
@@ -84,4 +80,18 @@ public class PulsarSourceFactory implements 
TableSourceFactory {
     public Class<? extends SeaTunnelSource> getSourceClass() {
         return PulsarSource.class;
     }
+
+    @Override
+    public <T, SplitT extends SourceSplit, StateT extends Serializable>
+            TableSource<T, SplitT, StateT> 
createSource(TableSourceFactoryContext context) {
+        CatalogTable catalogTable;
+        if 
(context.getOptions().getOptional(PulsarSourceOptions.SCHEMA).isPresent()) {
+            catalogTable = 
CatalogTableUtil.buildWithConfig(context.getOptions());
+        } else {
+            catalogTable = CatalogTableUtil.buildSimpleTextTable();
+        }
+        return () ->
+                (SeaTunnelSource<T, SplitT, StateT>)
+                        new PulsarSource(context.getOptions(), catalogTable);
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/cursor/start/StartCursor.java
 
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/cursor/start/StartCursor.java
index 06136a3716..6586beab8e 100644
--- 
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/cursor/start/StartCursor.java
+++ 
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/cursor/start/StartCursor.java
@@ -18,6 +18,8 @@
 
 package 
org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.cursor.start;
 
+import 
org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarSourceOptions;
+
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClientException;
@@ -53,8 +55,7 @@ public interface StartCursor extends Serializable {
         return new SubscriptionStartCursor();
     }
 
-    static StartCursor subscription(
-            SubscriptionStartCursor.CursorResetStrategy cursorResetStrategy) {
+    static StartCursor subscription(PulsarSourceOptions.CursorResetStrategy 
cursorResetStrategy) {
         return new SubscriptionStartCursor(cursorResetStrategy);
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/cursor/start/SubscriptionStartCursor.java
 
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/cursor/start/SubscriptionStartCursor.java
index 00ccf31967..8c59140858 100644
--- 
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/cursor/start/SubscriptionStartCursor.java
+++ 
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/cursor/start/SubscriptionStartCursor.java
@@ -18,6 +18,7 @@
 
 package 
org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.cursor.start;
 
+import 
org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarSourceOptions;
 import 
org.apache.seatunnel.connectors.seatunnel.pulsar.exception.PulsarConnectorErrorCode;
 import 
org.apache.seatunnel.connectors.seatunnel.pulsar.exception.PulsarConnectorException;
 import 
org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.topic.TopicPartition;
@@ -31,13 +32,13 @@ import org.apache.pulsar.client.api.PulsarClientException;
 public class SubscriptionStartCursor implements StartCursor {
     private static final long serialVersionUID = 1L;
 
-    private final CursorResetStrategy cursorResetStrategy;
+    private final PulsarSourceOptions.CursorResetStrategy cursorResetStrategy;
 
     public SubscriptionStartCursor() {
-        this.cursorResetStrategy = CursorResetStrategy.LATEST;
+        this.cursorResetStrategy = 
PulsarSourceOptions.CursorResetStrategy.LATEST;
     }
 
-    public SubscriptionStartCursor(CursorResetStrategy cursorResetStrategy) {
+    public SubscriptionStartCursor(PulsarSourceOptions.CursorResetStrategy 
cursorResetStrategy) {
         this.cursorResetStrategy = cursorResetStrategy;
     }
 
@@ -55,7 +56,7 @@ public class SubscriptionStartCursor implements StartCursor {
                     .createSubscription(
                             partition.getFullTopicName(),
                             subscription,
-                            CursorResetStrategy.EARLIEST == cursorResetStrategy
+                            PulsarSourceOptions.CursorResetStrategy.EARLIEST 
== cursorResetStrategy
                                     ? MessageId.earliest
                                     : MessageId.latest);
         } catch (PulsarAdminException e) {
@@ -68,9 +69,4 @@ public class SubscriptionStartCursor implements StartCursor {
     public void seekPosition(Consumer<?> consumer) throws 
PulsarClientException {
         // nothing
     }
-
-    public enum CursorResetStrategy {
-        LATEST,
-        EARLIEST
-    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-pulsar/src/test/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/PulsarSourceFactoryTest.java
 
b/seatunnel-connectors-v2/connector-pulsar/src/test/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/PulsarSourceFactoryTest.java
index 9e2fac0c85..6a0773298e 100644
--- 
a/seatunnel-connectors-v2/connector-pulsar/src/test/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/PulsarSourceFactoryTest.java
+++ 
b/seatunnel-connectors-v2/connector-pulsar/src/test/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/PulsarSourceFactoryTest.java
@@ -18,7 +18,7 @@
 package org.apache.seatunnel.connectors.seatunnel.pulsar.source;
 
 import org.apache.seatunnel.api.configuration.util.OptionRule;
-import 
org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarConfigUtil;
+import 
org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarSourceOptions;
 
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
@@ -29,7 +29,7 @@ public class PulsarSourceFactoryTest {
     void factoryIdentifier() {
         PulsarSourceFactory pulsarSourceFactory = new PulsarSourceFactory();
         Assertions.assertEquals(
-                PulsarConfigUtil.IDENTIFIER, 
pulsarSourceFactory.factoryIdentifier());
+                PulsarSourceOptions.IDENTIFIER, 
pulsarSourceFactory.factoryIdentifier());
     }
 
     @Test

Reply via email to