This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 629f85b23a [improve] update activemq connector config option (#8580)
629f85b23a is described below

commit 629f85b23a9828813a7531befb43cec3c46e05c1
Author: Jarvis <[email protected]>
AuthorDate: Fri Feb 7 13:59:49 2025 +0800

    [improve] update activemq connector config option (#8580)
---
 .../seatunnel/api/ConnectorOptionCheckTest.java    |   1 -
 .../seatunnel/activemq/client/ActivemqClient.java  |  29 +++--
 ...ctivemqConfig.java => ActivemqSinkOptions.java} | 119 +--------------------
 .../activemq/sink/ActivemqSinkFactory.java         |  34 +++---
 .../activemq/sink/ActivemqSinkWriter.java          |   7 --
 5 files changed, 26 insertions(+), 164 deletions(-)

diff --git 
a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
 
b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
index 513606c183..2a814b08ef 100644
--- 
a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
+++ 
b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
@@ -191,7 +191,6 @@ public class ConnectorOptionCheckTest {
         whiteList.add("HbaseSinkOptions");
         whiteList.add("MongodbSinkOptions");
         whiteList.add("IoTDBSinkOptions");
-        whiteList.add("ActivemqSinkOptions");
         whiteList.add("EasysearchSourceOptions");
         whiteList.add("RabbitmqSinkOptions");
         whiteList.add("StarRocksSourceOptions");
diff --git 
a/seatunnel-connectors-v2/connector-activemq/src/main/java/org/apache/seatunnel/connectors/seatunnel/activemq/client/ActivemqClient.java
 
b/seatunnel-connectors-v2/connector-activemq/src/main/java/org/apache/seatunnel/connectors/seatunnel/activemq/client/ActivemqClient.java
index f4983d35db..2090af4fbf 100644
--- 
a/seatunnel-connectors-v2/connector-activemq/src/main/java/org/apache/seatunnel/connectors/seatunnel/activemq/client/ActivemqClient.java
+++ 
b/seatunnel-connectors-v2/connector-activemq/src/main/java/org/apache/seatunnel/connectors/seatunnel/activemq/client/ActivemqClient.java
@@ -23,7 +23,6 @@ import 
org.apache.seatunnel.connectors.seatunnel.activemq.exception.ActivemqConn
 
 import org.apache.activemq.ActiveMQConnectionFactory;
 
-import lombok.AllArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 
 import javax.jms.Connection;
@@ -35,22 +34,21 @@ import javax.jms.TextMessage;
 
 import java.nio.charset.StandardCharsets;
 
-import static 
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.ALWAYS_SESSION_ASYNC;
-import static 
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.ALWAYS_SYNC_SEND;
-import static 
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.CHECK_FOR_DUPLICATE;
-import static 
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.CLIENT_ID;
-import static 
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.CLOSE_TIMEOUT;
-import static 
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.CONSUMER_EXPIRY_CHECK_ENABLED;
-import static 
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.DISPATCH_ASYNC;
-import static 
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.NESTED_MAP_AND_LIST_ENABLED;
-import static 
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.PASSWORD;
-import static 
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.QUEUE_NAME;
-import static 
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.URI;
-import static 
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.USERNAME;
-import static 
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.WARN_ABOUT_UNSTARTED_CONNECTION_TIMEOUT;
+import static 
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqSinkOptions.ALWAYS_SESSION_ASYNC;
+import static 
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqSinkOptions.ALWAYS_SYNC_SEND;
+import static 
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqSinkOptions.CHECK_FOR_DUPLICATE;
+import static 
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqSinkOptions.CLIENT_ID;
+import static 
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqSinkOptions.CLOSE_TIMEOUT;
+import static 
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqSinkOptions.CONSUMER_EXPIRY_CHECK_ENABLED;
+import static 
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqSinkOptions.DISPATCH_ASYNC;
+import static 
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqSinkOptions.NESTED_MAP_AND_LIST_ENABLED;
+import static 
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqSinkOptions.PASSWORD;
+import static 
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqSinkOptions.QUEUE_NAME;
+import static 
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqSinkOptions.URI;
+import static 
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqSinkOptions.USERNAME;
+import static 
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqSinkOptions.WARN_ABOUT_UNSTARTED_CONNECTION_TIMEOUT;
 
 @Slf4j
-@AllArgsConstructor
 public class ActivemqClient {
     private final ReadonlyConfig config;
     private final ActiveMQConnectionFactory connectionFactory;
@@ -102,7 +100,6 @@ public class ActivemqClient {
         if (config.get(DISPATCH_ASYNC) != null) {
             factory.setDispatchAsync(config.get(DISPATCH_ASYNC));
         }
-
         if (config.get(WARN_ABOUT_UNSTARTED_CONNECTION_TIMEOUT) != null) {
             factory.setWarnAboutUnstartedConnectionTimeout(
                     config.get(WARN_ABOUT_UNSTARTED_CONNECTION_TIMEOUT));
diff --git 
a/seatunnel-connectors-v2/connector-activemq/src/main/java/org/apache/seatunnel/connectors/seatunnel/activemq/config/ActivemqConfig.java
 
b/seatunnel-connectors-v2/connector-activemq/src/main/java/org/apache/seatunnel/connectors/seatunnel/activemq/config/ActivemqSinkOptions.java
similarity index 56%
rename from 
seatunnel-connectors-v2/connector-activemq/src/main/java/org/apache/seatunnel/connectors/seatunnel/activemq/config/ActivemqConfig.java
rename to 
seatunnel-connectors-v2/connector-activemq/src/main/java/org/apache/seatunnel/connectors/seatunnel/activemq/config/ActivemqSinkOptions.java
index 6bf0045361..1d4300b004 100644
--- 
a/seatunnel-connectors-v2/connector-activemq/src/main/java/org/apache/seatunnel/connectors/seatunnel/activemq/config/ActivemqConfig.java
+++ 
b/seatunnel-connectors-v2/connector-activemq/src/main/java/org/apache/seatunnel/connectors/seatunnel/activemq/config/ActivemqSinkOptions.java
@@ -17,56 +17,12 @@
 
 package org.apache.seatunnel.connectors.seatunnel.activemq.config;
 
-import 
org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTesting;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
 import org.apache.seatunnel.api.configuration.Option;
 import org.apache.seatunnel.api.configuration.Options;
 
-import lombok.AllArgsConstructor;
-import lombok.Getter;
-import lombok.Setter;
-
 import java.io.Serializable;
-import java.util.HashMap;
-import java.util.Map;
-
-@Setter
-@Getter
-@AllArgsConstructor
-public class ActivemqConfig implements Serializable {
-    private String host;
-    private Integer port;
-    private String username;
-    private String password;
-    private String uri;
-    private String queueName;
-    private Boolean checkForDuplicate;
-    private String clientID;
-    private Integer closeTimeout;
-    private Boolean consumerExpiryCheckEnabled;
-    private Boolean copyMessageOnSend;
-    private Boolean disableTimeStampsByDefault;
-    private Boolean dispatchAsync;
-    private Boolean nestedMapAndListEnabled;
-    private Boolean useCompression;
-    private Boolean alwaysSessionAsync;
-    private Boolean alwaysSyncSend;
-    private Integer warnAboutUnstartedConnectionTimeout;
-
-    private final Map<String, Object> sinkOptionProps = new HashMap<>();
-
-    public static final Option<String> HOST =
-            Options.key("host")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription("the default host to use for 
connections");
 
-    public static final Option<Integer> PORT =
-            Options.key("port")
-                    .intType()
-                    .noDefaultValue()
-                    .withDescription("the default port to use for 
connections");
+public class ActivemqSinkOptions implements Serializable {
 
     public static final Option<String> USERNAME =
             Options.key("username")
@@ -106,29 +62,6 @@ public class ActivemqConfig implements Serializable {
                     .noDefaultValue()
                     .withDescription("Sets the JMS clientID to use for the 
connection.");
 
-    public static final Option<Boolean> COPY_MESSAGE_ON_SEND =
-            Options.key("copy_message_on_send")
-                    .booleanType()
-                    .noDefaultValue()
-                    .withDescription(
-                            "Should a JMS message be copied to a new JMS 
Message object as part of the send() method in JMS. "
-                                    + "This is enabled by default to be 
compliant with the JMS specification. "
-                                    + "For a performance boost set to false if 
you do not mutate JMS messages after they are sent.");
-
-    public static final Option<Boolean> DISABLE_TIMESTAMP_BY_DEFAULT =
-            Options.key("disable_timeStamps_by_default")
-                    .booleanType()
-                    .noDefaultValue()
-                    .withDescription(
-                            "Sets whether or not timestamps on messages should 
be disabled or not. "
-                                    + "For a small performance boost set to 
false.");
-
-    public static final Option<Boolean> USE_COMPRESSION =
-            Options.key("use_compression")
-                    .booleanType()
-                    .noDefaultValue()
-                    .withDescription("Enables the use of compression on the 
message’s body.");
-
     public static final Option<Boolean> ALWAYS_SESSION_ASYNC =
             Options.key("always_session_async")
                     .booleanType()
@@ -189,54 +122,4 @@ public class ActivemqConfig implements Serializable {
                     .withDescription(
                             "Controls whether message expiration checking is 
done in each "
                                     + "MessageConsumer prior to dispatching a 
message.");
-
-    public ActivemqConfig(Config config) {
-        this.host = config.getString(HOST.key());
-        this.port = config.getInt(PORT.key());
-        this.queueName = config.getString(QUEUE_NAME.key());
-        this.uri = config.getString(URI.key());
-        if (config.hasPath(USERNAME.key())) {
-            this.username = config.getString(USERNAME.key());
-        }
-        if (config.hasPath(PASSWORD.key())) {
-            this.password = config.getString(PASSWORD.key());
-        }
-        if (config.hasPath(CHECK_FOR_DUPLICATE.key())) {
-            this.checkForDuplicate = 
config.getBoolean(CHECK_FOR_DUPLICATE.key());
-        }
-        if (config.hasPath(CLIENT_ID.key())) {
-            this.clientID = config.getString(CLIENT_ID.key());
-        }
-        if (config.hasPath(COPY_MESSAGE_ON_SEND.key())) {
-            this.copyMessageOnSend = 
config.getBoolean(COPY_MESSAGE_ON_SEND.key());
-        }
-        if (config.hasPath(DISABLE_TIMESTAMP_BY_DEFAULT.key())) {
-            this.disableTimeStampsByDefault = 
config.getBoolean(DISABLE_TIMESTAMP_BY_DEFAULT.key());
-        }
-        if (config.hasPath(USE_COMPRESSION.key())) {
-            this.useCompression = config.getBoolean(USE_COMPRESSION.key());
-        }
-        if (config.hasPath(ALWAYS_SESSION_ASYNC.key())) {
-            this.alwaysSessionAsync = 
config.getBoolean(ALWAYS_SESSION_ASYNC.key());
-        }
-        if (config.hasPath(ALWAYS_SYNC_SEND.key())) {
-            this.alwaysSyncSend = config.getBoolean(ALWAYS_SYNC_SEND.key());
-        }
-        if (config.hasPath(CLOSE_TIMEOUT.key())) {
-            this.closeTimeout = config.getInt(CLOSE_TIMEOUT.key());
-        }
-        if (config.hasPath(DISPATCH_ASYNC.key())) {
-            this.dispatchAsync = config.getBoolean(DISPATCH_ASYNC.key());
-        }
-        if (config.hasPath(NESTED_MAP_AND_LIST_ENABLED.key())) {
-            this.nestedMapAndListEnabled = 
config.getBoolean(NESTED_MAP_AND_LIST_ENABLED.key());
-        }
-        if (config.hasPath(WARN_ABOUT_UNSTARTED_CONNECTION_TIMEOUT.key())) {
-            this.warnAboutUnstartedConnectionTimeout =
-                    
config.getInt(WARN_ABOUT_UNSTARTED_CONNECTION_TIMEOUT.key());
-        }
-    }
-
-    @VisibleForTesting
-    public ActivemqConfig() {}
 }
diff --git 
a/seatunnel-connectors-v2/connector-activemq/src/main/java/org/apache/seatunnel/connectors/seatunnel/activemq/sink/ActivemqSinkFactory.java
 
b/seatunnel-connectors-v2/connector-activemq/src/main/java/org/apache/seatunnel/connectors/seatunnel/activemq/sink/ActivemqSinkFactory.java
index ec40d648ae..a34ba7a703 100644
--- 
a/seatunnel-connectors-v2/connector-activemq/src/main/java/org/apache/seatunnel/connectors/seatunnel/activemq/sink/ActivemqSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-activemq/src/main/java/org/apache/seatunnel/connectors/seatunnel/activemq/sink/ActivemqSinkFactory.java
@@ -25,23 +25,18 @@ import 
org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
 
 import com.google.auto.service.AutoService;
 
-import static 
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.ALWAYS_SESSION_ASYNC;
-import static 
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.ALWAYS_SYNC_SEND;
-import static 
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.CHECK_FOR_DUPLICATE;
-import static 
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.CLIENT_ID;
-import static 
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.CLOSE_TIMEOUT;
-import static 
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.COPY_MESSAGE_ON_SEND;
-import static 
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.DISABLE_TIMESTAMP_BY_DEFAULT;
-import static 
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.DISPATCH_ASYNC;
-import static 
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.HOST;
-import static 
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.NESTED_MAP_AND_LIST_ENABLED;
-import static 
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.PASSWORD;
-import static 
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.PORT;
-import static 
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.QUEUE_NAME;
-import static 
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.URI;
-import static 
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.USERNAME;
-import static 
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.USE_COMPRESSION;
-import static 
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.WARN_ABOUT_UNSTARTED_CONNECTION_TIMEOUT;
+import static 
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqSinkOptions.ALWAYS_SESSION_ASYNC;
+import static 
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqSinkOptions.ALWAYS_SYNC_SEND;
+import static 
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqSinkOptions.CHECK_FOR_DUPLICATE;
+import static 
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqSinkOptions.CLIENT_ID;
+import static 
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqSinkOptions.CLOSE_TIMEOUT;
+import static 
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqSinkOptions.DISPATCH_ASYNC;
+import static 
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqSinkOptions.NESTED_MAP_AND_LIST_ENABLED;
+import static 
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqSinkOptions.PASSWORD;
+import static 
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqSinkOptions.QUEUE_NAME;
+import static 
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqSinkOptions.URI;
+import static 
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqSinkOptions.USERNAME;
+import static 
org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqSinkOptions.WARN_ABOUT_UNSTARTED_CONNECTION_TIMEOUT;
 
 @AutoService(Factory.class)
 public class ActivemqSinkFactory implements TableSinkFactory {
@@ -57,13 +52,8 @@ public class ActivemqSinkFactory implements TableSinkFactory 
{
                 .required(QUEUE_NAME, URI)
                 .bundled(USERNAME, PASSWORD)
                 .optional(
-                        HOST,
-                        PORT,
                         CLIENT_ID,
                         CHECK_FOR_DUPLICATE,
-                        COPY_MESSAGE_ON_SEND,
-                        DISABLE_TIMESTAMP_BY_DEFAULT,
-                        USE_COMPRESSION,
                         ALWAYS_SESSION_ASYNC,
                         ALWAYS_SYNC_SEND,
                         CLOSE_TIMEOUT,
diff --git 
a/seatunnel-connectors-v2/connector-activemq/src/main/java/org/apache/seatunnel/connectors/seatunnel/activemq/sink/ActivemqSinkWriter.java
 
b/seatunnel-connectors-v2/connector-activemq/src/main/java/org/apache/seatunnel/connectors/seatunnel/activemq/sink/ActivemqSinkWriter.java
index f3395552c4..48a87e306b 100644
--- 
a/seatunnel-connectors-v2/connector-activemq/src/main/java/org/apache/seatunnel/connectors/seatunnel/activemq/sink/ActivemqSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-activemq/src/main/java/org/apache/seatunnel/connectors/seatunnel/activemq/sink/ActivemqSinkWriter.java
@@ -25,8 +25,6 @@ import 
org.apache.seatunnel.connectors.seatunnel.activemq.client.ActivemqClient;
 import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
 import org.apache.seatunnel.format.json.JsonSerializationSchema;
 
-import java.util.Optional;
-
 public class ActivemqSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> 
{
     private ActivemqClient activeMQClient;
 
@@ -42,11 +40,6 @@ public class ActivemqSinkWriter extends 
AbstractSinkWriter<SeaTunnelRow, Void> {
         activeMQClient.write(serializationSchema.serialize(element));
     }
 
-    @Override
-    public Optional prepareCommit() {
-        return Optional.empty();
-    }
-
     @Override
     public void close() {
         if (activeMQClient != null) {

Reply via email to