This is an automated email from the ASF dual-hosted git repository.
corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 26a2160c80 [improve] pulsar options (#9180)
26a2160c80 is described below
commit 26a2160c808af5d65554dfdf42772c0fb3d58194
Author: Jarvis <[email protected]>
AuthorDate: Sat Jun 28 22:57:59 2025 +0800
[improve] pulsar options (#9180)
---
.../seatunnel/api/ConnectorOptionCheckTest.java | 2 -
.../apache/seatunnel/common/PropertiesUtil.java | 88 --------
.../seatunnel/pulsar/config/PulsarBaseOptions.java | 82 +++++++
.../seatunnel/pulsar/config/PulsarConfigUtil.java | 8 +-
...{SinkProperties.java => PulsarSinkOptions.java} | 47 +---
...rceProperties.java => PulsarSourceOptions.java} | 113 ++--------
.../seatunnel/pulsar/sink/PulsarSink.java | 16 +-
.../seatunnel/pulsar/sink/PulsarSinkFactory.java | 33 +--
.../seatunnel/pulsar/sink/PulsarSinkWriter.java | 39 ++--
.../seatunnel/pulsar/source/PulsarSource.java | 239 +++++++--------------
.../pulsar/source/PulsarSourceFactory.java | 88 ++++----
.../enumerator/cursor/start/StartCursor.java | 5 +-
.../cursor/start/SubscriptionStartCursor.java | 14 +-
.../pulsar/source/PulsarSourceFactoryTest.java | 4 +-
14 files changed, 286 insertions(+), 492 deletions(-)
diff --git
a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
index 47cbd7decd..4e5250bfb6 100644
---
a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
+++
b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
@@ -192,8 +192,6 @@ public class ConnectorOptionCheckTest {
whiteList.add("JdbcSinkOptions");
whiteList.add("TypesenseSourceOptions");
whiteList.add("TypesenseSinkOptions");
- whiteList.add("PulsarSinkOptions");
- whiteList.add("PulsarSourceOptions");
whiteList.add("MongodbSinkOptions");
whiteList.add("SelectDBSinkOptions");
whiteList.add("TablestoreSinkOptions");
diff --git
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/PropertiesUtil.java
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/PropertiesUtil.java
deleted file mode 100644
index 4a3f2b6b35..0000000000
---
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/PropertiesUtil.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.common;
-
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.commons.lang3.StringUtils;
-
-import java.util.Properties;
-import java.util.function.Consumer;
-import java.util.function.Function;
-
-public final class PropertiesUtil {
-
- private PropertiesUtil() {}
-
- public static void setProperties(
- Config config, Properties properties, String prefix, boolean
keepPrefix) {
- config.entrySet()
- .forEach(
- entry -> {
- String key = entry.getKey();
- Object value = entry.getValue().unwrapped();
- if (key.startsWith(prefix)) {
- if (keepPrefix) {
- properties.put(key, value);
- } else {
-
properties.put(key.substring(prefix.length()), value);
- }
- }
- });
- }
-
- public static <E extends Enum<E>> E getEnum(
- final Config conf, final String key, final Class<E> enumClass,
final E defaultEnum) {
- if (!conf.hasPath(key)) {
- return defaultEnum;
- }
- final String value = conf.getString(key);
- if (StringUtils.isBlank(value)) {
- return defaultEnum;
- }
- return Enum.valueOf(enumClass, value.toUpperCase());
- }
-
- public static <T> void setOption(
- Config config,
- String optionName,
- T defaultValue,
- Function<String, T> getter,
- Consumer<T> setter) {
- T value;
- if (config.hasPath(optionName)) {
- value = getter.apply(optionName);
- } else {
- value = defaultValue;
- }
- if (value != null) {
- setter.accept(value);
- }
- }
-
- public static <T> void setOption(
- Config config, String optionName, Function<String, T> getter,
Consumer<T> setter) {
- T value = null;
- if (config.hasPath(optionName)) {
- value = getter.apply(optionName);
- }
- if (value != null) {
- setter.accept(value);
- }
- }
-}
diff --git
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/config/PulsarBaseOptions.java
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/config/PulsarBaseOptions.java
new file mode 100644
index 0000000000..cdeb8d52da
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/config/PulsarBaseOptions.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.pulsar.config;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.api.options.ConnectorCommonOptions;
+
+public class PulsarBaseOptions extends ConnectorCommonOptions {
+
+ public static final String IDENTIFIER = "Pulsar";
+
+ public static final Option<String> TOPIC =
+ Options.key("topic")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("pulsar topic name.");
+
+ public static final Option<String> CLIENT_SERVICE_URL =
+ Options.key("client.service-url")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Service URL provider for Pulsar
service");
+
+ public static final Option<String> ADMIN_SERVICE_URL =
+ Options.key("admin.service-url")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "The Pulsar service HTTP URL for the admin
endpoint. For example, http://my-broker.example.com:8080, or
https://my-broker.example.com:8443 for TLS.");
+
+ public static final Option<String> AUTH_PLUGIN_CLASS =
+ Options.key("auth.plugin-class")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Name of the authentication plugin");
+
+ public static final Option<String> AUTH_PARAMS =
+ Options.key("auth.params")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Parameters for the authentication plugin. For
example, key1:val1,key2:val2");
+
+ /** The default data format is JSON */
+ public static final String DEFAULT_FORMAT = "json";
+
+ public static final String TEXT_FORMAT = "text";
+
+ /** The default field delimiter is “,” */
+ public static final String DEFAULT_FIELD_DELIMITER = ",";
+
+ public static final Option<String> FORMAT =
+ Options.key("format")
+ .stringType()
+ .defaultValue(DEFAULT_FORMAT)
+ .withDescription(
+ "Data format. The default format is json. Optional
text format. The default field separator is \", \". "
+ + "If you customize the delimiter, add the
\"field_delimiter\" option.");
+
+ public static final Option<String> FIELD_DELIMITER =
+ Options.key("field_delimiter")
+ .stringType()
+ .defaultValue(DEFAULT_FIELD_DELIMITER)
+ .withDescription(
+ "Customize the field delimiter for data format.The
default field_delimiter is ',' ");
+}
diff --git
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/config/PulsarConfigUtil.java
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/config/PulsarConfigUtil.java
index 1b73085ec6..667bfab4bf 100644
---
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/config/PulsarConfigUtil.java
+++
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/config/PulsarConfigUtil.java
@@ -48,12 +48,8 @@ import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
-import static
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.PULSAR_CONFIG;
-
public class PulsarConfigUtil {
- public static final String IDENTIFIER = "Pulsar";
-
private PulsarConfigUtil() {}
public static PulsarAdmin createAdmin(PulsarAdminConfig config) {
@@ -172,10 +168,10 @@ public class PulsarConfigUtil {
producerBuilder.messageRoutingMode(messageRoutingMode);
producerBuilder.blockIfQueueFull(true);
- if (pluginConfig.get(PULSAR_CONFIG) != null) {
+ if (pluginConfig.get(PulsarSinkOptions.PULSAR_CONFIG) != null) {
Map<String, String> pulsarProperties = new HashMap<>();
pluginConfig
- .get(PULSAR_CONFIG)
+ .get(PulsarSinkOptions.PULSAR_CONFIG)
.forEach((key, value) -> pulsarProperties.put(key, value));
producerBuilder.properties(pulsarProperties);
}
diff --git
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/config/SinkProperties.java
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/config/PulsarSinkOptions.java
similarity index 75%
rename from
seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/config/SinkProperties.java
rename to
seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/config/PulsarSinkOptions.java
index ccf59e87c3..240ec2f2ba 100644
---
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/config/SinkProperties.java
+++
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/config/PulsarSinkOptions.java
@@ -25,35 +25,19 @@ import org.apache.pulsar.client.api.MessageRoutingMode;
import java.util.List;
import java.util.Map;
-public class SinkProperties {
+public class PulsarSinkOptions extends PulsarBaseOptions {
- /** The default data format is JSON */
- public static final String DEFAULT_FORMAT = "json";
-
- public static final String TEXT_FORMAT = "text";
-
- /** The default field delimiter is “,” */
- public static final String DEFAULT_FIELD_DELIMITER = ",";
-
- public static final Option<String> FORMAT =
- Options.key("format")
- .stringType()
- .defaultValue(DEFAULT_FORMAT)
+ public static final Option<MessageRoutingMode> MESSAGE_ROUTING_MODE =
+ Options.key("message.routing.mode")
+ .enumType(MessageRoutingMode.class)
+ .defaultValue(MessageRoutingMode.RoundRobinPartition)
.withDescription(
- "Data format. The default format is json. Optional
text format. The default field separator is \", \". "
- + "If you customize the delimiter, add the
\"field_delimiter\" option.");
+ "Default routing mode for messages to partition. "
+ + "If you choose SinglePartition,If no key
is provided, The partitioned producer will randomly pick one single partition
and publish all the messages into that partition. "
+ + " If a key is provided on the message,
the partitioned producer will hash the key and assign message to a particular
partition."
+ + " If you choose RoundRobinPartition,If
no key is provided, the producer will publish messages across all partitions in
round-robin fashion to achieve maximum throughput. "
+ + "Please note that round-robin is not
done per individual message but rather it's set to the same boundary of
batching delay, to ensure batching is effective.");
- public static final Option<String> FIELD_DELIMITER =
- Options.key("field_delimiter")
- .stringType()
- .defaultValue(DEFAULT_FIELD_DELIMITER)
- .withDescription(
- "Customize the field delimiter for data format.The
default field_delimiter is ',' ");
- public static final Option<String> TOPIC =
- Options.key("topic")
- .stringType()
- .noDefaultValue()
- .withDescription("sink pulsar topic name.");
public static final Option<PulsarSemantics> SEMANTICS =
Options.key("semantics")
.enumType(PulsarSemantics.class)
@@ -77,17 +61,6 @@ public class SinkProperties {
+ "the user can also specify multiple
non-mandatory parameters for the producer or consumer client, "
+ "covering all the producer parameters
specified in the official Pulsar document.");
- public static final Option<MessageRoutingMode> MESSAGE_ROUTING_MODE =
- Options.key("message.routing.mode")
- .enumType(MessageRoutingMode.class)
- .defaultValue(MessageRoutingMode.RoundRobinPartition)
- .withDescription(
- "Default routing mode for messages to partition. "
- + "If you choose SinglePartition,If no key
is provided, The partitioned producer will randomly pick one single partition
and publish all the messages into that partition. "
- + " If a key is provided on the message,
the partitioned producer will hash the key and assign message to a particular
partition."
- + " If you choose RoundRobinPartition,If
no key is provided, the producer will publish messages across all partitions in
round-robin fashion to achieve maximum throughput. "
- + "Please note that round-robin is not
done per individual message but rather it's set to the same boundary of
batching delay, to ensure batching is effective.");
-
public static final Option<List<String>> PARTITION_KEY_FIELDS =
Options.key("partition_key_fields")
.listType()
diff --git
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/config/SourceProperties.java
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/config/PulsarSourceOptions.java
similarity index 59%
rename from
seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/config/SourceProperties.java
rename to
seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/config/PulsarSourceOptions.java
index ffa19a9a2c..d828bb1c70 100644
---
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/config/SourceProperties.java
+++
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/config/PulsarSourceOptions.java
@@ -17,57 +17,16 @@
package org.apache.seatunnel.connectors.seatunnel.pulsar.config;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
-public class SourceProperties {
+public class PulsarSourceOptions extends PulsarBaseOptions {
private static final Long DEFAULT_TOPIC_DISCOVERY_INTERVAL = -1L;
private static final Integer DEFAULT_POLL_TIMEOUT = 100;
private static final Long DEFAULT_POLL_INTERVAL = 50L;
private static final Integer DEFAULT_POLL_BATCH_SIZE = 500;
- //
--------------------------------------------------------------------------------------------
- // The configuration for ClientConfigurationData part.
- //
--------------------------------------------------------------------------------------------
-
- public static final Option<String> CLIENT_SERVICE_URL =
- Options.key("client.service-url")
- .stringType()
- .noDefaultValue()
- .withDescription("Service URL provider for Pulsar
service");
-
- public static final Option<String> AUTH_PLUGIN_CLASS =
- Options.key("auth.plugin-class")
- .stringType()
- .noDefaultValue()
- .withDescription("Name of the authentication plugin");
-
- public static final Option<String> AUTH_PARAMS =
- Options.key("auth.params")
- .stringType()
- .noDefaultValue()
- .withDescription(
- "Parameters for the authentication plugin. For
example, key1:val1,key2:val2");
-
- //
--------------------------------------------------------------------------------------------
- // The configuration for ClientConfigurationData part.
- // All the configuration listed below should have the pulsar.client prefix.
- //
--------------------------------------------------------------------------------------------
-
- public static final Option<String> ADMIN_SERVICE_URL =
- Options.key("admin.service-url")
- .stringType()
- .noDefaultValue()
- .withDescription(
- "The Pulsar service HTTP URL for the admin
endpoint. For example, http://my-broker.example.com:8080, or
https://my-broker.example.com:8443 for TLS.");
-
- //
--------------------------------------------------------------------------------------------
- // The configuration for ConsumerConfigurationData part.
- //
--------------------------------------------------------------------------------------------
-
public static final Option<String> SUBSCRIPTION_NAME =
Options.key("subscription.name")
.stringType()
@@ -75,30 +34,6 @@ public class SourceProperties {
.withDescription(
"Specify the subscription name for this consumer.
This argument is required when constructing the consumer.");
- // No use parameter
- public static final String SUBSCRIPTION_TYPE = "subscription.type";
- public static final String SUBSCRIPTION_MODE = "subscription.mode";
-
- //
--------------------------------------------------------------------------------------------
- // The configuration for pulsar source part.
- //
--------------------------------------------------------------------------------------------
-
- public static final Option<Long> TOPIC_DISCOVERY_INTERVAL =
- Options.key("topic-discovery.interval")
- .longType()
- .defaultValue(DEFAULT_TOPIC_DISCOVERY_INTERVAL)
- .withDescription(
- "Default value is "
- + DEFAULT_TOPIC_DISCOVERY_INTERVAL
- + ". The interval (in ms) for the Pulsar
source to discover the new topic partitions. A non-positive value disables the
topic partition discovery. Note, This option only works if the 'topic-pattern'
option is used.");
-
- public static final Option<String> TOPIC =
- Options.key("topic")
- .stringType()
- .noDefaultValue()
- .withDescription(
- "Topic name(s) to read data from when the table is
used as source. It also supports topic list for source by separating topic by
semicolon like 'topic-1;topic-2'. Note, only one of \"topic-pattern\" and
\"topic\" can be specified for sources.");
-
public static final Option<String> TOPIC_PATTERN =
Options.key("topic-pattern")
.stringType()
@@ -133,16 +68,16 @@ public class SourceProperties {
+ DEFAULT_POLL_BATCH_SIZE
+ ". The maximum number of records to
fetch to wait when polling. A longer time increases throughput but also
latency");
- public static final Option<SourceProperties.StartMode> CURSOR_STARTUP_MODE
=
+ public static final Option<StartMode> CURSOR_STARTUP_MODE =
Options.key("cursor.startup.mode")
- .enumType(SourceProperties.StartMode.class)
+ .enumType(StartMode.class)
.defaultValue(StartMode.LATEST)
.withDescription(
"Startup mode for Pulsar consumer, valid values
are 'EARLIEST', 'LATEST', 'SUBSCRIPTION', 'TIMESTAMP'.");
- public static final Option<SourceProperties.StartMode> CURSOR_RESET_MODE =
+ public static final Option<CursorResetStrategy> CURSOR_RESET_MODE =
Options.key("cursor.reset.mode")
- .enumType(SourceProperties.StartMode.class)
+ .enumType(CursorResetStrategy.class)
.noDefaultValue()
.withDescription(
"Cursor reset strategy for Pulsar consumer valid
values are 'EARLIEST', 'LATEST'. Note, This option only works if the
\"cursor.startup.mode\" option used 'SUBSCRIPTION'.");
@@ -154,12 +89,9 @@ public class SourceProperties {
.withDescription(
"Start from the specified epoch timestamp (in
milliseconds). Note, This option is required when the \"cursor.startup.mode\"
option used 'TIMESTAMP'.");
- // No use parameter
- public static final String CURSOR_STARTUP_ID = "cursor.startup.id";
-
- public static final Option<SourceProperties.StopMode> CURSOR_STOP_MODE =
+ public static final Option<StopMode> CURSOR_STOP_MODE =
Options.key("cursor.stop.mode")
- .enumType(SourceProperties.StopMode.class)
+ .enumType(StopMode.class)
.defaultValue(StopMode.NEVER)
.withDescription(
"Stop mode for Pulsar consumer, valid values are
'NEVER', 'LATEST' and 'TIMESTAMP'. Note, When 'NEVER' is specified, it is a
real-time job, and other mode are off-line jobs.");
@@ -170,26 +102,14 @@ public class SourceProperties {
.noDefaultValue()
.withDescription("Stop from the specified epoch timestamp
(in milliseconds)");
- public static final Option<Config> SCHEMA =
- Options.key("schema")
- .objectType(Config.class)
- .noDefaultValue()
- .withDescription(
- "The structure of the data, including field names
and field types.");
-
- public static final Option<String> FORMAT =
- Options.key("format")
- .stringType()
- .defaultValue("JSON")
+ public static final Option<Long> TOPIC_DISCOVERY_INTERVAL =
+ Options.key("topic-discovery.interval")
+ .longType()
+ .defaultValue(DEFAULT_TOPIC_DISCOVERY_INTERVAL)
.withDescription(
- "Data format. The default format is json. Optional
text format. The default field separator is \", \". "
- + "If you customize the delimiter, add the
\"field_delimiter\" option.");
-
- public static final Option<String> FIELD_DELIMITER =
- Options.key("field_delimiter")
- .stringType()
- .defaultValue(",")
- .withDescription("Customize the field delimiter for data
format.");
+ "Default value is "
+ + DEFAULT_TOPIC_DISCOVERY_INTERVAL
+ + ". The interval (in ms) for the Pulsar
source to discover the new topic partitions. A non-positive value disables the
topic partition discovery. Note, This option only works if the 'topic-pattern'
option is used.");
/** Startup mode for the pulsar consumer, see {@link
#CURSOR_STARTUP_MODE}. */
public enum StartMode {
@@ -215,4 +135,9 @@ public class SourceProperties {
SPECIFIC,
NEVER
}
+
+ public enum CursorResetStrategy {
+ LATEST,
+ EARLIEST
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/sink/PulsarSink.java
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/sink/PulsarSink.java
index 989e24b024..ef5aeb0c57 100644
---
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/sink/PulsarSink.java
+++
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/sink/PulsarSink.java
@@ -27,7 +27,7 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import
org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarClientConfig;
-import
org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarConfigUtil;
+import
org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarSinkOptions;
import
org.apache.seatunnel.connectors.seatunnel.pulsar.state.PulsarAggregatedCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.pulsar.state.PulsarCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.pulsar.state.PulsarSinkState;
@@ -36,10 +36,6 @@ import java.util.Collections;
import java.util.List;
import java.util.Optional;
-import static
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.AUTH_PARAMS;
-import static
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.AUTH_PLUGIN_CLASS;
-import static
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.CLIENT_SERVICE_URL;
-
/**
* Pulsar Sink implementation by using SeaTunnel sink API. This class contains
the method to create
* {@link PulsarSinkWriter} and {@link PulsarSinkCommitter}.
@@ -60,9 +56,11 @@ public class PulsarSink
/** client config */
PulsarClientConfig.Builder clientConfigBuilder =
-
PulsarClientConfig.builder().serviceUrl(readonlyConfig.get(CLIENT_SERVICE_URL));
-
clientConfigBuilder.authPluginClassName(readonlyConfig.get(AUTH_PLUGIN_CLASS));
- clientConfigBuilder.authParams(readonlyConfig.get(AUTH_PARAMS));
+ PulsarClientConfig.builder()
+
.serviceUrl(readonlyConfig.get(PulsarSinkOptions.CLIENT_SERVICE_URL));
+ clientConfigBuilder.authPluginClassName(
+ readonlyConfig.get(PulsarSinkOptions.AUTH_PLUGIN_CLASS));
+
clientConfigBuilder.authParams(readonlyConfig.get(PulsarSinkOptions.AUTH_PARAMS));
this.clientConfig = clientConfigBuilder.build();
}
@@ -97,7 +95,7 @@ public class PulsarSink
@Override
public String getPluginName() {
- return PulsarConfigUtil.IDENTIFIER;
+ return PulsarSinkOptions.IDENTIFIER;
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/sink/PulsarSinkFactory.java
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/sink/PulsarSinkFactory.java
index 7781ba7b94..6317c55f76 100644
---
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/sink/PulsarSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/sink/PulsarSinkFactory.java
@@ -22,32 +22,37 @@ import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
-import
org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarConfigUtil;
+import
org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarSinkOptions;
import com.google.auto.service.AutoService;
-import static
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.FIELD_DELIMITER;
-import static
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.FORMAT;
-import static
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.MESSAGE_ROUTING_MODE;
-import static
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.TOPIC;
-import static
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.ADMIN_SERVICE_URL;
-import static
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.AUTH_PARAMS;
-import static
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.AUTH_PLUGIN_CLASS;
-import static
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.CLIENT_SERVICE_URL;
-
@AutoService(Factory.class)
public class PulsarSinkFactory implements TableSinkFactory {
@Override
public String factoryIdentifier() {
- return PulsarConfigUtil.IDENTIFIER;
+ return PulsarSinkOptions.IDENTIFIER;
}
@Override
public OptionRule optionRule() {
return OptionRule.builder()
- .required(CLIENT_SERVICE_URL, ADMIN_SERVICE_URL, TOPIC)
- .optional(FORMAT, FIELD_DELIMITER, MESSAGE_ROUTING_MODE)
- .bundled(AUTH_PLUGIN_CLASS, AUTH_PARAMS)
+ .required(
+ PulsarSinkOptions.CLIENT_SERVICE_URL,
+ PulsarSinkOptions.ADMIN_SERVICE_URL,
+ PulsarSinkOptions.TOPIC)
+ .optional(
+ PulsarSinkOptions.FORMAT,
+ PulsarSinkOptions.FIELD_DELIMITER,
+ PulsarSinkOptions.MESSAGE_ROUTING_MODE,
+ PulsarSinkOptions.SEMANTICS,
+ PulsarSinkOptions.TRANSACTION_TIMEOUT,
+ PulsarSinkOptions.PULSAR_CONFIG,
+ PulsarSinkOptions.PARTITION_KEY_FIELDS)
+ .conditional(
+ PulsarSinkOptions.FORMAT,
+ PulsarSinkOptions.TEXT_FORMAT,
+ PulsarSinkOptions.FIELD_DELIMITER)
+ .bundled(PulsarSinkOptions.AUTH_PLUGIN_CLASS,
PulsarSinkOptions.AUTH_PARAMS)
.build();
}
diff --git
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/sink/PulsarSinkWriter.java
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/sink/PulsarSinkWriter.java
index 5a5c28fb53..26f35fab99 100644
---
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/sink/PulsarSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/sink/PulsarSinkWriter.java
@@ -30,6 +30,7 @@ import
org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import
org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarClientConfig;
import
org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarConfigUtil;
import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarSemantics;
+import
org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarSinkOptions;
import
org.apache.seatunnel.connectors.seatunnel.pulsar.exception.PulsarConnectorErrorCode;
import
org.apache.seatunnel.connectors.seatunnel.pulsar.exception.PulsarConnectorException;
import org.apache.seatunnel.connectors.seatunnel.pulsar.state.PulsarCommitInfo;
@@ -55,27 +56,16 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
-import static
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.DEFAULT_FORMAT;
-import static
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.FIELD_DELIMITER;
-import static
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.FORMAT;
-import static
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.MESSAGE_ROUTING_MODE;
-import static
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.PARTITION_KEY_FIELDS;
-import static
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.SEMANTICS;
-import static
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.TEXT_FORMAT;
-import static
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.TOPIC;
-import static
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.TRANSACTION_TIMEOUT;
-
public class PulsarSinkWriter
implements SinkWriter<SeaTunnelRow, PulsarCommitInfo, PulsarSinkState>
{
- private Context context;
private Producer<byte[]> producer;
private PulsarClient pulsarClient;
private SerializationSchema serializationSchema;
private SerializationSchema keySerializationSchema;
private TransactionImpl transaction;
- private int transactionTimeout = TRANSACTION_TIMEOUT.defaultValue();
- private PulsarSemantics pulsarSemantics = SEMANTICS.defaultValue();
+ private int transactionTimeout;
+ private PulsarSemantics pulsarSemantics;
private final AtomicLong pendingMessages;
public PulsarSinkWriter(
@@ -84,13 +74,13 @@ public class PulsarSinkWriter
SeaTunnelRowType seaTunnelRowType,
ReadonlyConfig pluginConfig,
List<PulsarSinkState> pulsarStates) {
- this.context = context;
- String topic = pluginConfig.get(TOPIC);
- String format = pluginConfig.get(FORMAT);
- String delimiter = pluginConfig.get(FIELD_DELIMITER);
- Integer transactionTimeout = pluginConfig.get(TRANSACTION_TIMEOUT);
- PulsarSemantics pulsarSemantics = pluginConfig.get(SEMANTICS);
- MessageRoutingMode messageRoutingMode =
pluginConfig.get(MESSAGE_ROUTING_MODE);
+ String topic = pluginConfig.get(PulsarSinkOptions.TOPIC);
+ String format = pluginConfig.get(PulsarSinkOptions.FORMAT);
+ String delimiter = pluginConfig.get(PulsarSinkOptions.FIELD_DELIMITER);
+ this.transactionTimeout =
pluginConfig.get(PulsarSinkOptions.TRANSACTION_TIMEOUT);
+ this.pulsarSemantics = pluginConfig.get(PulsarSinkOptions.SEMANTICS);
+ MessageRoutingMode messageRoutingMode =
+ pluginConfig.get(PulsarSinkOptions.MESSAGE_ROUTING_MODE);
this.serializationSchema = createSerializationSchema(seaTunnelRowType,
format, delimiter);
List<String> partitionKeyList = getPartitionKeyFields(pluginConfig,
seaTunnelRowType);
this.keySerializationSchema =
@@ -201,9 +191,9 @@ public class PulsarSinkWriter
private SerializationSchema createSerializationSchema(
SeaTunnelRowType rowType, String format, String delimiter) {
- if (DEFAULT_FORMAT.equals(format)) {
+ if (PulsarSinkOptions.DEFAULT_FORMAT.equals(format)) {
return new JsonSerializationSchema(rowType);
- } else if (TEXT_FORMAT.equals(format)) {
+ } else if (PulsarSinkOptions.TEXT_FORMAT.equals(format)) {
return TextSerializationSchema.builder()
.seaTunnelRowType(rowType)
.delimiter(delimiter)
@@ -244,8 +234,9 @@ public class PulsarSinkWriter
private List<String> getPartitionKeyFields(
ReadonlyConfig pluginConfig, SeaTunnelRowType seaTunnelRowType) {
- if (pluginConfig.get(PARTITION_KEY_FIELDS) != null) {
- List<String> partitionKeyFields =
pluginConfig.get(PARTITION_KEY_FIELDS);
+ if
(pluginConfig.getOptional(PulsarSinkOptions.PARTITION_KEY_FIELDS).isPresent()) {
+ List<String> partitionKeyFields =
+ pluginConfig.get(PulsarSinkOptions.PARTITION_KEY_FIELDS);
List<String> rowTypeFieldNames =
Arrays.asList(seaTunnelRowType.getFieldNames());
for (String partitionKeyField : partitionKeyFields) {
if (!rowTypeFieldNames.contains(partitionKeyField)) {
diff --git
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/PulsarSource.java
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/PulsarSource.java
index b998d85762..b82ab2f55a 100644
---
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/PulsarSource.java
+++
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/PulsarSource.java
@@ -17,10 +17,8 @@
package org.apache.seatunnel.connectors.seatunnel.pulsar.source;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
@@ -28,23 +26,16 @@ import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.api.source.SupportParallelism;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
-import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-import org.apache.seatunnel.common.config.CheckResult;
-import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import
org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarAdminConfig;
import
org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarClientConfig;
-import
org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarConfigUtil;
import
org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarConsumerConfig;
-import
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties;
+import
org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarSourceOptions;
import
org.apache.seatunnel.connectors.seatunnel.pulsar.exception.PulsarConnectorException;
import
org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.PulsarSplitEnumerator;
import
org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.PulsarSplitEnumeratorState;
import
org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.cursor.start.StartCursor;
-import
org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.cursor.start.SubscriptionStartCursor;
import
org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.cursor.stop.NeverStopCursor;
import
org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.cursor.stop.StopCursor;
import
org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.discoverer.PulsarDiscoverer;
@@ -59,41 +50,20 @@ import
org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
import org.apache.pulsar.shade.org.apache.commons.lang3.StringUtils;
-import com.google.auto.service.AutoService;
-
import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
import java.util.regex.Pattern;
-import static org.apache.seatunnel.common.PropertiesUtil.getEnum;
-import static org.apache.seatunnel.common.PropertiesUtil.setOption;
-import static
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.ADMIN_SERVICE_URL;
-import static
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.AUTH_PARAMS;
-import static
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.AUTH_PLUGIN_CLASS;
-import static
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.CLIENT_SERVICE_URL;
-import static
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.CURSOR_RESET_MODE;
-import static
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.CURSOR_STARTUP_MODE;
-import static
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.CURSOR_STARTUP_TIMESTAMP;
-import static
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.CURSOR_STOP_MODE;
-import static
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.CURSOR_STOP_TIMESTAMP;
-import static
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.FORMAT;
-import static
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.POLL_BATCH_SIZE;
-import static
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.POLL_INTERVAL;
-import static
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.POLL_TIMEOUT;
-import static
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.SCHEMA;
-import static
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.SUBSCRIPTION_NAME;
-import static
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.StartMode;
-import static
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.TOPIC;
-import static
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.TOPIC_DISCOVERY_INTERVAL;
-import static
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.TOPIC_PATTERN;
+import static
org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarSourceOptions.CURSOR_STARTUP_MODE;
+import static
org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarSourceOptions.CURSOR_STOP_MODE;
-@AutoService(SeaTunnelSource.class)
public class PulsarSource
implements SeaTunnelSource<SeaTunnelRow, PulsarPartitionSplit,
PulsarSplitEnumeratorState>,
SupportParallelism {
private DeserializationSchema<SeaTunnelRow> deserializationSchema;
-
- private SeaTunnelRowType typeInfo;
+ private CatalogTable catalogTable;
private PulsarAdminConfig adminConfig;
private PulsarClientConfig clientConfig;
@@ -107,80 +77,36 @@ public class PulsarSource
protected long pollInterval;
protected int batchSize;
- @Override
- public String getPluginName() {
- return PulsarConfigUtil.IDENTIFIER;
- }
-
- @Override
- public void prepare(Config config) throws PrepareFailException {
- CheckResult result =
- CheckConfigUtil.checkAllExists(
- config,
- SUBSCRIPTION_NAME.key(),
- CLIENT_SERVICE_URL.key(),
- ADMIN_SERVICE_URL.key());
- if (!result.isSuccess()) {
- throw new PulsarConnectorException(
- SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
- String.format(
- "PluginName: %s, PluginType: %s, Message: %s",
- getPluginName(), PluginType.SOURCE,
result.getMsg()));
- }
-
+ public PulsarSource(ReadonlyConfig config, CatalogTable catalogTable) {
+ this.catalogTable = catalogTable;
// admin config
PulsarAdminConfig.Builder adminConfigBuilder =
-
PulsarAdminConfig.builder().adminUrl(config.getString(ADMIN_SERVICE_URL.key()));
- setOption(
- config,
- AUTH_PLUGIN_CLASS.key(),
- config::getString,
- adminConfigBuilder::authPluginClassName);
- setOption(config, AUTH_PARAMS.key(), config::getString,
adminConfigBuilder::authParams);
+ PulsarAdminConfig.builder()
+
.adminUrl(config.get(PulsarSourceOptions.ADMIN_SERVICE_URL));
+
adminConfigBuilder.authPluginClassName(config.get(PulsarSourceOptions.AUTH_PLUGIN_CLASS));
+
adminConfigBuilder.authParams(config.get(PulsarSourceOptions.AUTH_PARAMS));
this.adminConfig = adminConfigBuilder.build();
// client config
PulsarClientConfig.Builder clientConfigBuilder =
-
PulsarClientConfig.builder().serviceUrl(config.getString(CLIENT_SERVICE_URL.key()));
- setOption(
- config,
- AUTH_PLUGIN_CLASS.key(),
- config::getString,
- clientConfigBuilder::authPluginClassName);
- setOption(config, AUTH_PARAMS.key(), config::getString,
clientConfigBuilder::authParams);
+ PulsarClientConfig.builder()
+
.serviceUrl(config.get(PulsarSourceOptions.CLIENT_SERVICE_URL));
+
clientConfigBuilder.authPluginClassName(config.get(PulsarSourceOptions.AUTH_PLUGIN_CLASS));
+
clientConfigBuilder.authParams(config.get(PulsarSourceOptions.AUTH_PARAMS));
this.clientConfig = clientConfigBuilder.build();
// consumer config
PulsarConsumerConfig.Builder consumerConfigBuilder =
PulsarConsumerConfig.builder()
-
.subscriptionName(config.getString(SUBSCRIPTION_NAME.key()));
+
.subscriptionName(config.get(PulsarSourceOptions.SUBSCRIPTION_NAME));
this.consumerConfig = consumerConfigBuilder.build();
// source properties
- setOption(
- config,
- TOPIC_DISCOVERY_INTERVAL.key(),
- TOPIC_DISCOVERY_INTERVAL.defaultValue(),
- config::getLong,
- v -> this.partitionDiscoveryIntervalMs = v);
- setOption(
- config,
- POLL_TIMEOUT.key(),
- POLL_TIMEOUT.defaultValue(),
- config::getInt,
- v -> this.pollTimeout = v);
- setOption(
- config,
- POLL_INTERVAL.key(),
- POLL_INTERVAL.defaultValue(),
- config::getLong,
- v -> this.pollInterval = v);
- setOption(
- config,
- POLL_BATCH_SIZE.key(),
- POLL_BATCH_SIZE.defaultValue(),
- config::getInt,
- v -> this.batchSize = v);
+ this.partitionDiscoveryIntervalMs =
+ config.get(PulsarSourceOptions.TOPIC_DISCOVERY_INTERVAL);
+ this.pollTimeout = config.get(PulsarSourceOptions.POLL_TIMEOUT);
+ this.pollInterval = config.get(PulsarSourceOptions.POLL_INTERVAL);
+ this.batchSize = config.get(PulsarSourceOptions.POLL_BATCH_SIZE);
setStartCursor(config);
setStopCursor(config);
@@ -196,13 +122,13 @@ public class PulsarSource
}
}
- private void setStartCursor(Config config) {
- StartMode startMode =
- getEnum(
- config,
- CURSOR_STARTUP_MODE.key(),
- StartMode.class,
- CURSOR_STARTUP_MODE.defaultValue());
+ @Override
+ public String getPluginName() {
+ return PulsarSourceOptions.IDENTIFIER;
+ }
+
+ private void setStartCursor(ReadonlyConfig config) {
+ PulsarSourceOptions.StartMode startMode =
config.get(CURSOR_STARTUP_MODE);
switch (startMode) {
case EARLIEST:
this.startCursor = StartCursor.earliest();
@@ -211,27 +137,22 @@ public class PulsarSource
this.startCursor = StartCursor.latest();
break;
case SUBSCRIPTION:
- SubscriptionStartCursor.CursorResetStrategy resetStrategy =
- getEnum(
- config,
- CURSOR_RESET_MODE.key(),
-
SubscriptionStartCursor.CursorResetStrategy.class,
-
SubscriptionStartCursor.CursorResetStrategy.LATEST);
+ PulsarSourceOptions.CursorResetStrategy resetStrategy =
+ config.get(PulsarSourceOptions.CURSOR_RESET_MODE);
this.startCursor = StartCursor.subscription(resetStrategy);
break;
case TIMESTAMP:
- if
(StringUtils.isBlank(config.getString(CURSOR_STARTUP_TIMESTAMP.key()))) {
+ if
(!config.getOptional(PulsarSourceOptions.CURSOR_STARTUP_TIMESTAMP).isPresent())
{
throw new PulsarConnectorException(
SeaTunnelAPIErrorCode.OPTION_VALIDATION_FAILED,
String.format(
"The '%s' property is required when the
'%s' is 'timestamp'.",
- CURSOR_STARTUP_TIMESTAMP.key(),
CURSOR_STARTUP_MODE.key()));
+
PulsarSourceOptions.CURSOR_STARTUP_TIMESTAMP.key(),
+ CURSOR_STARTUP_MODE.key()));
}
- setOption(
- config,
- CURSOR_STARTUP_TIMESTAMP.key(),
- config::getLong,
- timestamp -> this.startCursor =
StartCursor.timestamp(timestamp));
+ this.startCursor =
+ StartCursor.timestamp(
+
config.get(PulsarSourceOptions.CURSOR_STARTUP_TIMESTAMP));
break;
default:
throw new PulsarConnectorException(
@@ -240,13 +161,8 @@ public class PulsarSource
}
}
- private void setStopCursor(Config config) {
- SourceProperties.StopMode stopMode =
- getEnum(
- config,
- CURSOR_STOP_MODE.key(),
- SourceProperties.StopMode.class,
- CURSOR_STOP_MODE.defaultValue());
+ private void setStopCursor(ReadonlyConfig config) {
+ PulsarSourceOptions.StopMode stopMode = config.get(CURSOR_STOP_MODE);
switch (stopMode) {
case LATEST:
this.stopCursor = StopCursor.latest();
@@ -255,18 +171,16 @@ public class PulsarSource
this.stopCursor = StopCursor.never();
break;
case TIMESTAMP:
- if
(StringUtils.isBlank(config.getString(CURSOR_STOP_TIMESTAMP.key()))) {
+ if
(!config.getOptional(PulsarSourceOptions.CURSOR_STOP_TIMESTAMP).isPresent()) {
throw new PulsarConnectorException(
SeaTunnelAPIErrorCode.OPTION_VALIDATION_FAILED,
String.format(
"The '%s' property is required when the
'%s' is 'timestamp'.",
- CURSOR_STOP_TIMESTAMP.key(),
CURSOR_STOP_MODE.key()));
+
PulsarSourceOptions.CURSOR_STOP_TIMESTAMP.key(),
+ CURSOR_STOP_MODE.key()));
}
- setOption(
- config,
- CURSOR_STARTUP_TIMESTAMP.key(),
- config::getLong,
- timestamp -> this.stopCursor =
StopCursor.timestamp(timestamp));
+ this.stopCursor =
+
StopCursor.timestamp(config.get(PulsarSourceOptions.CURSOR_STOP_TIMESTAMP));
break;
default:
throw new PulsarConnectorException(
@@ -275,16 +189,16 @@ public class PulsarSource
}
}
- private void setPartitionDiscoverer(Config config) {
- if (config.hasPath(TOPIC.key())) {
- String topic = config.getString(TOPIC.key());
+ private void setPartitionDiscoverer(ReadonlyConfig config) {
+ if (config.getOptional(PulsarSourceOptions.TOPIC).isPresent()) {
+ String topic = config.get(PulsarSourceOptions.TOPIC);
if (StringUtils.isNotBlank(topic)) {
this.partitionDiscoverer =
new
TopicListDiscoverer(Arrays.asList(StringUtils.split(topic, ",")));
}
}
- if (config.hasPath(TOPIC_PATTERN.key())) {
- String topicPattern = config.getString(TOPIC_PATTERN.key());
+ if (config.getOptional(PulsarSourceOptions.TOPIC_PATTERN).isPresent())
{
+ String topicPattern =
config.get(PulsarSourceOptions.TOPIC_PATTERN);
if (StringUtils.isNotBlank(topicPattern)) {
this.partitionDiscoverer =
new
TopicPatternDiscoverer(Pattern.compile(topicPattern));
@@ -295,37 +209,30 @@ public class PulsarSource
SeaTunnelAPIErrorCode.OPTION_VALIDATION_FAILED,
String.format(
"The properties '%s' or '%s' is required.",
- TOPIC.key(), TOPIC_PATTERN.key()));
+ PulsarSourceOptions.TOPIC.key(),
+ PulsarSourceOptions.TOPIC_PATTERN.key()));
}
}
- private void setDeserialization(Config config) {
- if (config.hasPath(SCHEMA.key())) {
- CatalogTable catalogTable =
CatalogTableUtil.buildWithConfig(config);
- typeInfo = catalogTable.getSeaTunnelRowType();
- String format = FORMAT.defaultValue();
- if (config.hasPath(FORMAT.key())) {
- format = config.getString(FORMAT.key());
- }
- switch (format.toUpperCase()) {
- case "JSON":
- deserializationSchema = new
JsonDeserializationSchema(false, false, typeInfo);
- break;
- case "CANAL_JSON":
- deserializationSchema =
- new PulsarCanalDecorator(
-
CanalJsonDeserializationSchema.builder(catalogTable)
- .setIgnoreParseErrors(true)
- .build());
- break;
- default:
- throw new SeaTunnelJsonFormatException(
- CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
- "Unsupported format: " + format);
- }
- } else {
- typeInfo = CatalogTableUtil.buildSimpleTextSchema();
- this.deserializationSchema = new JsonDeserializationSchema(false,
false, typeInfo);
+ private void setDeserialization(ReadonlyConfig config) {
+ String format = config.get(PulsarSourceOptions.FORMAT);
+ switch (format.toUpperCase()) {
+ case "JSON":
+ this.deserializationSchema =
+ new JsonDeserializationSchema(
+ false, false,
catalogTable.getSeaTunnelRowType());
+ break;
+ case "CANAL_JSON":
+ this.deserializationSchema =
+ new PulsarCanalDecorator(
+
CanalJsonDeserializationSchema.builder(catalogTable)
+ .setIgnoreParseErrors(true)
+ .build());
+ break;
+ default:
+ throw new SeaTunnelJsonFormatException(
+ CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
+ "Unsupported format: " + format);
}
}
@@ -337,8 +244,8 @@ public class PulsarSource
}
@Override
- public SeaTunnelRowType getProducedType() {
- return this.typeInfo;
+ public List<CatalogTable> getProducedCatalogTables() {
+ return Collections.singletonList(catalogTable);
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/PulsarSourceFactory.java
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/PulsarSourceFactory.java
index 876da0314b..3c746a024b 100644
---
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/PulsarSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/PulsarSourceFactory.java
@@ -18,65 +18,61 @@
package org.apache.seatunnel.connectors.seatunnel.pulsar.source;
import org.apache.seatunnel.api.configuration.util.OptionRule;
-import org.apache.seatunnel.api.options.ConnectorCommonOptions;
import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
+import org.apache.seatunnel.api.table.connector.TableSource;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
-import
org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarConfigUtil;
-import
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties;
+import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
+import
org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarSourceOptions;
import com.google.auto.service.AutoService;
-import static
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.ADMIN_SERVICE_URL;
-import static
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.AUTH_PARAMS;
-import static
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.AUTH_PLUGIN_CLASS;
-import static
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.CLIENT_SERVICE_URL;
-import static
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.CURSOR_RESET_MODE;
-import static
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.CURSOR_STARTUP_MODE;
-import static
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.CURSOR_STARTUP_TIMESTAMP;
-import static
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.CURSOR_STOP_MODE;
-import static
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.CURSOR_STOP_TIMESTAMP;
-import static
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.POLL_BATCH_SIZE;
-import static
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.POLL_INTERVAL;
-import static
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.POLL_TIMEOUT;
-import static
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.SUBSCRIPTION_NAME;
-import static
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.TOPIC;
-import static
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.TOPIC_DISCOVERY_INTERVAL;
-import static
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.TOPIC_PATTERN;
+import java.io.Serializable;
@AutoService(Factory.class)
public class PulsarSourceFactory implements TableSourceFactory {
@Override
public String factoryIdentifier() {
- return PulsarConfigUtil.IDENTIFIER;
+ return PulsarSourceOptions.IDENTIFIER;
}
@Override
public OptionRule optionRule() {
return OptionRule.builder()
- .required(SUBSCRIPTION_NAME, CLIENT_SERVICE_URL,
ADMIN_SERVICE_URL)
+ .required(
+ PulsarSourceOptions.SUBSCRIPTION_NAME,
+ PulsarSourceOptions.CLIENT_SERVICE_URL,
+ PulsarSourceOptions.ADMIN_SERVICE_URL)
.optional(
- CURSOR_STARTUP_MODE,
- CURSOR_STOP_MODE,
- TOPIC_DISCOVERY_INTERVAL,
- POLL_TIMEOUT,
- POLL_INTERVAL,
- POLL_BATCH_SIZE,
- ConnectorCommonOptions.SCHEMA)
- .exclusive(TOPIC, TOPIC_PATTERN)
+ PulsarSourceOptions.CURSOR_STARTUP_MODE,
+ PulsarSourceOptions.CURSOR_STOP_MODE,
+ PulsarSourceOptions.TOPIC_DISCOVERY_INTERVAL,
+ PulsarSourceOptions.POLL_TIMEOUT,
+ PulsarSourceOptions.POLL_INTERVAL,
+ PulsarSourceOptions.POLL_BATCH_SIZE,
+ PulsarSourceOptions.FORMAT,
+ PulsarSourceOptions.SCHEMA)
+ .exclusive(PulsarSourceOptions.TOPIC,
PulsarSourceOptions.TOPIC_PATTERN)
.conditional(
- CURSOR_STARTUP_MODE,
- SourceProperties.StartMode.TIMESTAMP,
- CURSOR_STARTUP_TIMESTAMP)
+ PulsarSourceOptions.FORMAT,
+ PulsarSourceOptions.TEXT_FORMAT,
+ PulsarSourceOptions.FIELD_DELIMITER)
.conditional(
- CURSOR_STARTUP_MODE,
- SourceProperties.StartMode.SUBSCRIPTION,
- CURSOR_RESET_MODE)
+ PulsarSourceOptions.CURSOR_STARTUP_MODE,
+ PulsarSourceOptions.StartMode.TIMESTAMP,
+ PulsarSourceOptions.CURSOR_STARTUP_TIMESTAMP)
.conditional(
- CURSOR_STOP_MODE,
- SourceProperties.StopMode.TIMESTAMP,
- CURSOR_STOP_TIMESTAMP)
- .bundled(AUTH_PLUGIN_CLASS, AUTH_PARAMS)
+ PulsarSourceOptions.CURSOR_STARTUP_MODE,
+ PulsarSourceOptions.StartMode.SUBSCRIPTION,
+ PulsarSourceOptions.CURSOR_RESET_MODE)
+ .conditional(
+ PulsarSourceOptions.CURSOR_STOP_MODE,
+ PulsarSourceOptions.StopMode.TIMESTAMP,
+ PulsarSourceOptions.CURSOR_STOP_TIMESTAMP)
+ .bundled(PulsarSourceOptions.AUTH_PLUGIN_CLASS,
PulsarSourceOptions.AUTH_PARAMS)
.build();
}
@@ -84,4 +80,18 @@ public class PulsarSourceFactory implements
TableSourceFactory {
public Class<? extends SeaTunnelSource> getSourceClass() {
return PulsarSource.class;
}
+
+ @Override
+ public <T, SplitT extends SourceSplit, StateT extends Serializable>
+ TableSource<T, SplitT, StateT>
createSource(TableSourceFactoryContext context) {
+ CatalogTable catalogTable;
+ if
(context.getOptions().getOptional(PulsarSourceOptions.SCHEMA).isPresent()) {
+ catalogTable =
CatalogTableUtil.buildWithConfig(context.getOptions());
+ } else {
+ catalogTable = CatalogTableUtil.buildSimpleTextTable();
+ }
+ return () ->
+ (SeaTunnelSource<T, SplitT, StateT>)
+ new PulsarSource(context.getOptions(), catalogTable);
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/cursor/start/StartCursor.java
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/cursor/start/StartCursor.java
index 06136a3716..6586beab8e 100644
---
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/cursor/start/StartCursor.java
+++
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/cursor/start/StartCursor.java
@@ -18,6 +18,8 @@
package
org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.cursor.start;
+import
org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarSourceOptions;
+
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
@@ -53,8 +55,7 @@ public interface StartCursor extends Serializable {
return new SubscriptionStartCursor();
}
- static StartCursor subscription(
- SubscriptionStartCursor.CursorResetStrategy cursorResetStrategy) {
+ static StartCursor subscription(PulsarSourceOptions.CursorResetStrategy
cursorResetStrategy) {
return new SubscriptionStartCursor(cursorResetStrategy);
}
diff --git
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/cursor/start/SubscriptionStartCursor.java
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/cursor/start/SubscriptionStartCursor.java
index 00ccf31967..8c59140858 100644
---
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/cursor/start/SubscriptionStartCursor.java
+++
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/cursor/start/SubscriptionStartCursor.java
@@ -18,6 +18,7 @@
package
org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.cursor.start;
+import
org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarSourceOptions;
import
org.apache.seatunnel.connectors.seatunnel.pulsar.exception.PulsarConnectorErrorCode;
import
org.apache.seatunnel.connectors.seatunnel.pulsar.exception.PulsarConnectorException;
import
org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.topic.TopicPartition;
@@ -31,13 +32,13 @@ import org.apache.pulsar.client.api.PulsarClientException;
public class SubscriptionStartCursor implements StartCursor {
private static final long serialVersionUID = 1L;
- private final CursorResetStrategy cursorResetStrategy;
+ private final PulsarSourceOptions.CursorResetStrategy cursorResetStrategy;
public SubscriptionStartCursor() {
- this.cursorResetStrategy = CursorResetStrategy.LATEST;
+ this.cursorResetStrategy =
PulsarSourceOptions.CursorResetStrategy.LATEST;
}
- public SubscriptionStartCursor(CursorResetStrategy cursorResetStrategy) {
+ public SubscriptionStartCursor(PulsarSourceOptions.CursorResetStrategy
cursorResetStrategy) {
this.cursorResetStrategy = cursorResetStrategy;
}
@@ -55,7 +56,7 @@ public class SubscriptionStartCursor implements StartCursor {
.createSubscription(
partition.getFullTopicName(),
subscription,
- CursorResetStrategy.EARLIEST == cursorResetStrategy
+ PulsarSourceOptions.CursorResetStrategy.EARLIEST
== cursorResetStrategy
? MessageId.earliest
: MessageId.latest);
} catch (PulsarAdminException e) {
@@ -68,9 +69,4 @@ public class SubscriptionStartCursor implements StartCursor {
public void seekPosition(Consumer<?> consumer) throws
PulsarClientException {
// nothing
}
-
- public enum CursorResetStrategy {
- LATEST,
- EARLIEST
- }
}
diff --git
a/seatunnel-connectors-v2/connector-pulsar/src/test/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/PulsarSourceFactoryTest.java
b/seatunnel-connectors-v2/connector-pulsar/src/test/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/PulsarSourceFactoryTest.java
index 9e2fac0c85..6a0773298e 100644
---
a/seatunnel-connectors-v2/connector-pulsar/src/test/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/PulsarSourceFactoryTest.java
+++
b/seatunnel-connectors-v2/connector-pulsar/src/test/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/PulsarSourceFactoryTest.java
@@ -18,7 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.pulsar.source;
import org.apache.seatunnel.api.configuration.util.OptionRule;
-import
org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarConfigUtil;
+import
org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarSourceOptions;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -29,7 +29,7 @@ public class PulsarSourceFactoryTest {
void factoryIdentifier() {
PulsarSourceFactory pulsarSourceFactory = new PulsarSourceFactory();
Assertions.assertEquals(
- PulsarConfigUtil.IDENTIFIER,
pulsarSourceFactory.factoryIdentifier());
+ PulsarSourceOptions.IDENTIFIER,
pulsarSourceFactory.factoryIdentifier());
}
@Test