This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 629f85b23a [improve] update activemq connector config option (#8580)
629f85b23a is described below
commit 629f85b23a9828813a7531befb43cec3c46e05c1
Author: Jarvis <[email protected]>
AuthorDate: Fri Feb 7 13:59:49 2025 +0800
[improve] update activemq connector config option (#8580)
---
.../seatunnel/api/ConnectorOptionCheckTest.java | 1 -
.../seatunnel/activemq/client/ActivemqClient.java | 29 +++--
...ctivemqConfig.java => ActivemqSinkOptions.java} | 119 +--------------------
.../activemq/sink/ActivemqSinkFactory.java | 34 +++---
.../activemq/sink/ActivemqSinkWriter.java | 7 --
5 files changed, 26 insertions(+), 164 deletions(-)
diff --git
a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
index 513606c183..2a814b08ef 100644
---
a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
+++
b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
@@ -191,7 +191,6 @@ public class ConnectorOptionCheckTest {
whiteList.add("HbaseSinkOptions");
whiteList.add("MongodbSinkOptions");
whiteList.add("IoTDBSinkOptions");
- whiteList.add("ActivemqSinkOptions");
whiteList.add("EasysearchSourceOptions");
whiteList.add("RabbitmqSinkOptions");
whiteList.add("StarRocksSourceOptions");
diff --git
a/seatunnel-connectors-v2/connector-activemq/src/main/java/org/apache/seatunnel/connectors/seatunnel/activemq/client/ActivemqClient.java
b/seatunnel-connectors-v2/connector-activemq/src/main/java/org/apache/seatunnel/connectors/seatunnel/activemq/client/ActivemqClient.java
index f4983d35db..2090af4fbf 100644
---
a/seatunnel-connectors-v2/connector-activemq/src/main/java/org/apache/seatunnel/connectors/seatunnel/activemq/client/ActivemqClient.java
+++
b/seatunnel-connectors-v2/connector-activemq/src/main/java/org/apache/seatunnel/connectors/seatunnel/activemq/client/ActivemqClient.java
@@ -23,7 +23,6 @@ import
org.apache.seatunnel.connectors.seatunnel.activemq.exception.ActivemqConn
import org.apache.activemq.ActiveMQConnectionFactory;
-import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import javax.jms.Connection;
@@ -35,22 +34,21 @@ import javax.jms.TextMessage;
import java.nio.charset.StandardCharsets;
-import static
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.ALWAYS_SESSION_ASYNC;
-import static
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.ALWAYS_SYNC_SEND;
-import static
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.CHECK_FOR_DUPLICATE;
-import static
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.CLIENT_ID;
-import static
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.CLOSE_TIMEOUT;
-import static
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.CONSUMER_EXPIRY_CHECK_ENABLED;
-import static
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.DISPATCH_ASYNC;
-import static
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.NESTED_MAP_AND_LIST_ENABLED;
-import static
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.PASSWORD;
-import static
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.QUEUE_NAME;
-import static
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.URI;
-import static
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.USERNAME;
-import static
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.WARN_ABOUT_UNSTARTED_CONNECTION_TIMEOUT;
+import static
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqSinkOptions.ALWAYS_SESSION_ASYNC;
+import static
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqSinkOptions.ALWAYS_SYNC_SEND;
+import static
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqSinkOptions.CHECK_FOR_DUPLICATE;
+import static
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqSinkOptions.CLIENT_ID;
+import static
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqSinkOptions.CLOSE_TIMEOUT;
+import static
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqSinkOptions.CONSUMER_EXPIRY_CHECK_ENABLED;
+import static
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqSinkOptions.DISPATCH_ASYNC;
+import static
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqSinkOptions.NESTED_MAP_AND_LIST_ENABLED;
+import static
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqSinkOptions.PASSWORD;
+import static
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqSinkOptions.QUEUE_NAME;
+import static
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqSinkOptions.URI;
+import static
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqSinkOptions.USERNAME;
+import static
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqSinkOptions.WARN_ABOUT_UNSTARTED_CONNECTION_TIMEOUT;
@Slf4j
-@AllArgsConstructor
public class ActivemqClient {
private final ReadonlyConfig config;
private final ActiveMQConnectionFactory connectionFactory;
@@ -102,7 +100,6 @@ public class ActivemqClient {
if (config.get(DISPATCH_ASYNC) != null) {
factory.setDispatchAsync(config.get(DISPATCH_ASYNC));
}
-
if (config.get(WARN_ABOUT_UNSTARTED_CONNECTION_TIMEOUT) != null) {
factory.setWarnAboutUnstartedConnectionTimeout(
config.get(WARN_ABOUT_UNSTARTED_CONNECTION_TIMEOUT));
diff --git
a/seatunnel-connectors-v2/connector-activemq/src/main/java/org/apache/seatunnel/connectors/seatunnel/activemq/config/ActivemqConfig.java
b/seatunnel-connectors-v2/connector-activemq/src/main/java/org/apache/seatunnel/connectors/seatunnel/activemq/config/ActivemqSinkOptions.java
similarity index 56%
rename from
seatunnel-connectors-v2/connector-activemq/src/main/java/org/apache/seatunnel/connectors/seatunnel/activemq/config/ActivemqConfig.java
rename to
seatunnel-connectors-v2/connector-activemq/src/main/java/org/apache/seatunnel/connectors/seatunnel/activemq/config/ActivemqSinkOptions.java
index 6bf0045361..1d4300b004 100644
---
a/seatunnel-connectors-v2/connector-activemq/src/main/java/org/apache/seatunnel/connectors/seatunnel/activemq/config/ActivemqConfig.java
+++
b/seatunnel-connectors-v2/connector-activemq/src/main/java/org/apache/seatunnel/connectors/seatunnel/activemq/config/ActivemqSinkOptions.java
@@ -17,56 +17,12 @@
package org.apache.seatunnel.connectors.seatunnel.activemq.config;
-import
org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTesting;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
-import lombok.AllArgsConstructor;
-import lombok.Getter;
-import lombok.Setter;
-
import java.io.Serializable;
-import java.util.HashMap;
-import java.util.Map;
-
-@Setter
-@Getter
-@AllArgsConstructor
-public class ActivemqConfig implements Serializable {
- private String host;
- private Integer port;
- private String username;
- private String password;
- private String uri;
- private String queueName;
- private Boolean checkForDuplicate;
- private String clientID;
- private Integer closeTimeout;
- private Boolean consumerExpiryCheckEnabled;
- private Boolean copyMessageOnSend;
- private Boolean disableTimeStampsByDefault;
- private Boolean dispatchAsync;
- private Boolean nestedMapAndListEnabled;
- private Boolean useCompression;
- private Boolean alwaysSessionAsync;
- private Boolean alwaysSyncSend;
- private Integer warnAboutUnstartedConnectionTimeout;
-
- private final Map<String, Object> sinkOptionProps = new HashMap<>();
-
- public static final Option<String> HOST =
- Options.key("host")
- .stringType()
- .noDefaultValue()
- .withDescription("the default host to use for
connections");
- public static final Option<Integer> PORT =
- Options.key("port")
- .intType()
- .noDefaultValue()
- .withDescription("the default port to use for
connections");
+public class ActivemqSinkOptions implements Serializable {
public static final Option<String> USERNAME =
Options.key("username")
@@ -106,29 +62,6 @@ public class ActivemqConfig implements Serializable {
.noDefaultValue()
.withDescription("Sets the JMS clientID to use for the
connection.");
- public static final Option<Boolean> COPY_MESSAGE_ON_SEND =
- Options.key("copy_message_on_send")
- .booleanType()
- .noDefaultValue()
- .withDescription(
- "Should a JMS message be copied to a new JMS
Message object as part of the send() method in JMS. "
- + "This is enabled by default to be
compliant with the JMS specification. "
- + "For a performance boost set to false if
you do not mutate JMS messages after they are sent.");
-
- public static final Option<Boolean> DISABLE_TIMESTAMP_BY_DEFAULT =
- Options.key("disable_timeStamps_by_default")
- .booleanType()
- .noDefaultValue()
- .withDescription(
- "Sets whether or not timestamps on messages should
be disabled or not. "
- + "For a small performance boost set to
false.");
-
- public static final Option<Boolean> USE_COMPRESSION =
- Options.key("use_compression")
- .booleanType()
- .noDefaultValue()
- .withDescription("Enables the use of compression on the
message’s body.");
-
public static final Option<Boolean> ALWAYS_SESSION_ASYNC =
Options.key("always_session_async")
.booleanType()
@@ -189,54 +122,4 @@ public class ActivemqConfig implements Serializable {
.withDescription(
"Controls whether message expiration checking is
done in each "
+ "MessageConsumer prior to dispatching a
message.");
-
- public ActivemqConfig(Config config) {
- this.host = config.getString(HOST.key());
- this.port = config.getInt(PORT.key());
- this.queueName = config.getString(QUEUE_NAME.key());
- this.uri = config.getString(URI.key());
- if (config.hasPath(USERNAME.key())) {
- this.username = config.getString(USERNAME.key());
- }
- if (config.hasPath(PASSWORD.key())) {
- this.password = config.getString(PASSWORD.key());
- }
- if (config.hasPath(CHECK_FOR_DUPLICATE.key())) {
- this.checkForDuplicate =
config.getBoolean(CHECK_FOR_DUPLICATE.key());
- }
- if (config.hasPath(CLIENT_ID.key())) {
- this.clientID = config.getString(CLIENT_ID.key());
- }
- if (config.hasPath(COPY_MESSAGE_ON_SEND.key())) {
- this.copyMessageOnSend =
config.getBoolean(COPY_MESSAGE_ON_SEND.key());
- }
- if (config.hasPath(DISABLE_TIMESTAMP_BY_DEFAULT.key())) {
- this.disableTimeStampsByDefault =
config.getBoolean(DISABLE_TIMESTAMP_BY_DEFAULT.key());
- }
- if (config.hasPath(USE_COMPRESSION.key())) {
- this.useCompression = config.getBoolean(USE_COMPRESSION.key());
- }
- if (config.hasPath(ALWAYS_SESSION_ASYNC.key())) {
- this.alwaysSessionAsync =
config.getBoolean(ALWAYS_SESSION_ASYNC.key());
- }
- if (config.hasPath(ALWAYS_SYNC_SEND.key())) {
- this.alwaysSyncSend = config.getBoolean(ALWAYS_SYNC_SEND.key());
- }
- if (config.hasPath(CLOSE_TIMEOUT.key())) {
- this.closeTimeout = config.getInt(CLOSE_TIMEOUT.key());
- }
- if (config.hasPath(DISPATCH_ASYNC.key())) {
- this.dispatchAsync = config.getBoolean(DISPATCH_ASYNC.key());
- }
- if (config.hasPath(NESTED_MAP_AND_LIST_ENABLED.key())) {
- this.nestedMapAndListEnabled =
config.getBoolean(NESTED_MAP_AND_LIST_ENABLED.key());
- }
- if (config.hasPath(WARN_ABOUT_UNSTARTED_CONNECTION_TIMEOUT.key())) {
- this.warnAboutUnstartedConnectionTimeout =
-
config.getInt(WARN_ABOUT_UNSTARTED_CONNECTION_TIMEOUT.key());
- }
- }
-
- @VisibleForTesting
- public ActivemqConfig() {}
}
diff --git
a/seatunnel-connectors-v2/connector-activemq/src/main/java/org/apache/seatunnel/connectors/seatunnel/activemq/sink/ActivemqSinkFactory.java
b/seatunnel-connectors-v2/connector-activemq/src/main/java/org/apache/seatunnel/connectors/seatunnel/activemq/sink/ActivemqSinkFactory.java
index ec40d648ae..a34ba7a703 100644
---
a/seatunnel-connectors-v2/connector-activemq/src/main/java/org/apache/seatunnel/connectors/seatunnel/activemq/sink/ActivemqSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-activemq/src/main/java/org/apache/seatunnel/connectors/seatunnel/activemq/sink/ActivemqSinkFactory.java
@@ -25,23 +25,18 @@ import
org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
import com.google.auto.service.AutoService;
-import static
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.ALWAYS_SESSION_ASYNC;
-import static
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.ALWAYS_SYNC_SEND;
-import static
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.CHECK_FOR_DUPLICATE;
-import static
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.CLIENT_ID;
-import static
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.CLOSE_TIMEOUT;
-import static
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.COPY_MESSAGE_ON_SEND;
-import static
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.DISABLE_TIMESTAMP_BY_DEFAULT;
-import static
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.DISPATCH_ASYNC;
-import static
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.HOST;
-import static
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.NESTED_MAP_AND_LIST_ENABLED;
-import static
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.PASSWORD;
-import static
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.PORT;
-import static
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.QUEUE_NAME;
-import static
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.URI;
-import static
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.USERNAME;
-import static
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.USE_COMPRESSION;
-import static
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.WARN_ABOUT_UNSTARTED_CONNECTION_TIMEOUT;
+import static
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqSinkOptions.ALWAYS_SESSION_ASYNC;
+import static
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqSinkOptions.ALWAYS_SYNC_SEND;
+import static
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqSinkOptions.CHECK_FOR_DUPLICATE;
+import static
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqSinkOptions.CLIENT_ID;
+import static
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqSinkOptions.CLOSE_TIMEOUT;
+import static
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqSinkOptions.DISPATCH_ASYNC;
+import static
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqSinkOptions.NESTED_MAP_AND_LIST_ENABLED;
+import static
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqSinkOptions.PASSWORD;
+import static
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqSinkOptions.QUEUE_NAME;
+import static
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqSinkOptions.URI;
+import static
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqSinkOptions.USERNAME;
+import static
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqSinkOptions.WARN_ABOUT_UNSTARTED_CONNECTION_TIMEOUT;
@AutoService(Factory.class)
public class ActivemqSinkFactory implements TableSinkFactory {
@@ -57,13 +52,8 @@ public class ActivemqSinkFactory implements TableSinkFactory
{
.required(QUEUE_NAME, URI)
.bundled(USERNAME, PASSWORD)
.optional(
- HOST,
- PORT,
CLIENT_ID,
CHECK_FOR_DUPLICATE,
- COPY_MESSAGE_ON_SEND,
- DISABLE_TIMESTAMP_BY_DEFAULT,
- USE_COMPRESSION,
ALWAYS_SESSION_ASYNC,
ALWAYS_SYNC_SEND,
CLOSE_TIMEOUT,
diff --git
a/seatunnel-connectors-v2/connector-activemq/src/main/java/org/apache/seatunnel/connectors/seatunnel/activemq/sink/ActivemqSinkWriter.java
b/seatunnel-connectors-v2/connector-activemq/src/main/java/org/apache/seatunnel/connectors/seatunnel/activemq/sink/ActivemqSinkWriter.java
index f3395552c4..48a87e306b 100644
---
a/seatunnel-connectors-v2/connector-activemq/src/main/java/org/apache/seatunnel/connectors/seatunnel/activemq/sink/ActivemqSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-activemq/src/main/java/org/apache/seatunnel/connectors/seatunnel/activemq/sink/ActivemqSinkWriter.java
@@ -25,8 +25,6 @@ import
org.apache.seatunnel.connectors.seatunnel.activemq.client.ActivemqClient;
import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
import org.apache.seatunnel.format.json.JsonSerializationSchema;
-import java.util.Optional;
-
public class ActivemqSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void>
{
private ActivemqClient activeMQClient;
@@ -42,11 +40,6 @@ public class ActivemqSinkWriter extends
AbstractSinkWriter<SeaTunnelRow, Void> {
activeMQClient.write(serializationSchema.serialize(element));
}
- @Override
- public Optional prepareCommit() {
- return Optional.empty();
- }
-
@Override
public void close() {
if (activeMQClient != null) {