This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 4eec9be012 [Improve] rabbit mq options (#8740)
4eec9be012 is described below
commit 4eec9be012a685fa2be13590c5c967b7cd6eef3b
Author: Jarvis <[email protected]>
AuthorDate: Wed Feb 19 21:42:57 2025 +0800
[Improve] rabbit mq options (#8740)
---
.../seatunnel/api/ConnectorOptionCheckTest.java | 2 -
.../rabbitmq/config/RabbitmqBaseOptions.java | 135 ++++++++++
.../seatunnel/rabbitmq/config/RabbitmqConfig.java | 271 ++++-----------------
.../rabbitmq/config/RabbitmqSinkOptions.java | 35 +++
.../rabbitmq/config/RabbitmqSourceOptions.java | 63 +++++
.../seatunnel/rabbitmq/sink/RabbitmqSink.java | 59 +----
.../rabbitmq/sink/RabbitmqSinkFactory.java | 54 ++--
.../seatunnel/rabbitmq/source/RabbitmqSource.java | 62 +----
.../rabbitmq/source/RabbitmqSourceFactory.java | 74 +++---
9 files changed, 371 insertions(+), 384 deletions(-)
diff --git
a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
index 7dce5ee5f5..533c622602 100644
---
a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
+++
b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
@@ -171,7 +171,6 @@ public class ConnectorOptionCheckTest {
Set<String> whiteList = new HashSet<>();
whiteList.add("JdbcSinkOptions");
whiteList.add("TypesenseSourceOptions");
- whiteList.add("RabbitmqSourceOptions");
whiteList.add("TypesenseSinkOptions");
whiteList.add("EmailSinkOptions");
whiteList.add("HudiSinkOptions");
@@ -188,7 +187,6 @@ public class ConnectorOptionCheckTest {
whiteList.add("MongodbSinkOptions");
whiteList.add("IoTDBSinkOptions");
whiteList.add("EasysearchSourceOptions");
- whiteList.add("RabbitmqSinkOptions");
whiteList.add("IcebergSourceOptions");
whiteList.add("HbaseSourceOptions");
whiteList.add("PaimonSourceOptions");
diff --git
a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/config/RabbitmqBaseOptions.java
b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/config/RabbitmqBaseOptions.java
new file mode 100644
index 0000000000..ff71f860e3
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/config/RabbitmqBaseOptions.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.rabbitmq.config;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.api.options.ConnectorCommonOptions;
+
+public class RabbitmqBaseOptions extends ConnectorCommonOptions {
+
+ public static final Option<String> HOST =
+ Options.key("host")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("the default host to use for
connections");
+
+ public static final Option<Integer> PORT =
+ Options.key("port")
+ .intType()
+ .noDefaultValue()
+ .withDescription("the default port to use for
connections");
+
+ public static final Option<String> VIRTUAL_HOST =
+ Options.key("virtual_host")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("the virtual host to use when connecting
to the broker");
+
+ public static final Option<String> QUEUE_NAME =
+ Options.key("queue_name")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("the queue to write the message to");
+
+ public static final Option<String> USERNAME =
+ Options.key("username")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("the AMQP user name to use when
connecting to the broker");
+
+ public static final Option<String> PASSWORD =
+ Options.key("password")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("the password to use when connecting to
the broker");
+
+ public static final Option<String> URL =
+ Options.key("url")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "convenience method for setting the fields in an
AMQP URI: host, port, username, password and virtual host");
+
+ public static final Option<String> ROUTING_KEY =
+ Options.key("routing_key")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("the routing key to publish the message
to");
+
+ public static final Option<String> EXCHANGE =
+ Options.key("exchange")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("the exchange to publish the message to");
+
+ public static final Option<Integer> NETWORK_RECOVERY_INTERVAL =
+ Options.key("network_recovery_interval")
+ .intType()
+ .noDefaultValue()
+ .withDescription(
+ "how long will automatic recovery wait before
attempting to reconnect, in ms");
+
+ public static final Option<Boolean> TOPOLOGY_RECOVERY_ENABLED =
+ Options.key("topology_recovery_enabled")
+ .booleanType()
+ .noDefaultValue()
+ .withDescription("if true, enables topology recovery");
+
+ public static final Option<Boolean> AUTOMATIC_RECOVERY_ENABLED =
+ Options.key("AUTOMATIC_RECOVERY_ENABLED")
+ .booleanType()
+ .noDefaultValue()
+ .withDescription("if true, enables connection recovery");
+
+ public static final Option<Integer> CONNECTION_TIMEOUT =
+ Options.key("connection_timeout")
+ .intType()
+ .noDefaultValue()
+ .withDescription("connection TCP establishment timeout in
milliseconds");
+
+ public static final Option<Boolean> FOR_E2E_TESTING =
+ Options.key("for_e2e_testing")
+ .booleanType()
+ .noDefaultValue()
+ .withDescription("use to recognize E2E mode");
+
+ public static final Option<Boolean> DURABLE =
+ Options.key("durable")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription(
+ "true: The queue will survive a server restart."
+ + "false: The queue will be deleted on
server restart.");
+
+ public static final Option<Boolean> EXCLUSIVE =
+ Options.key("exclusive")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "true: The queue is used only by the current
connection and will be deleted when the connection closes."
+ + "false: The queue can be used by
multiple connections.");
+
+ public static final Option<Boolean> AUTO_DELETE =
+ Options.key("auto_delete")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "true: The queue will be deleted automatically
when the last consumer unsubscribes."
+ + "false: The queue will not be
automatically deleted.");
+}
diff --git
a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/config/RabbitmqConfig.java
b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/config/RabbitmqConfig.java
index 2872c9af81..ba67ed3c69 100644
---
a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/config/RabbitmqConfig.java
+++
b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/config/RabbitmqConfig.java
@@ -17,24 +17,20 @@
package org.apache.seatunnel.connectors.seatunnel.rabbitmq.config;
-import
org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTesting;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.configuration.Option;
-import org.apache.seatunnel.api.configuration.Options;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import lombok.AllArgsConstructor;
import lombok.Getter;
+import lombok.NoArgsConstructor;
import lombok.Setter;
import java.io.Serializable;
-import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@Setter
@Getter
+@NoArgsConstructor
@AllArgsConstructor
public class RabbitmqConfig implements Serializable {
private String host;
@@ -63,241 +59,70 @@ public class RabbitmqConfig implements Serializable {
private boolean forE2ETesting = false;
private boolean usesCorrelationId = false;
- private final Map<String, Object> sinkOptionProps = new HashMap<>();
-
- public static final Option<String> HOST =
- Options.key("host")
- .stringType()
- .noDefaultValue()
- .withDescription("the default host to use for
connections");
-
- public static final Option<Integer> PORT =
- Options.key("port")
- .intType()
- .noDefaultValue()
- .withDescription("the default port to use for
connections");
-
- public static final Option<String> VIRTUAL_HOST =
- Options.key("virtual_host")
- .stringType()
- .noDefaultValue()
- .withDescription("the virtual host to use when connecting
to the broker");
-
- public static final Option<String> USERNAME =
- Options.key("username")
- .stringType()
- .noDefaultValue()
- .withDescription("the AMQP user name to use when
connecting to the broker");
-
- public static final Option<String> PASSWORD =
- Options.key("password")
- .stringType()
- .noDefaultValue()
- .withDescription("the password to use when connecting to
the broker");
-
- public static final Option<String> QUEUE_NAME =
- Options.key("queue_name")
- .stringType()
- .noDefaultValue()
- .withDescription("the queue to write the message to");
-
- public static final Option<String> URL =
- Options.key("url")
- .stringType()
- .noDefaultValue()
- .withDescription(
- "convenience method for setting the fields in an
AMQP URI: host, port, username, password and virtual host");
-
- public static final Option<Integer> NETWORK_RECOVERY_INTERVAL =
- Options.key("network_recovery_interval")
- .intType()
- .noDefaultValue()
- .withDescription(
- "how long will automatic recovery wait before
attempting to reconnect, in ms");
-
- public static final Option<Boolean> AUTOMATIC_RECOVERY_ENABLED =
- Options.key("AUTOMATIC_RECOVERY_ENABLED")
- .booleanType()
- .noDefaultValue()
- .withDescription("if true, enables connection recovery");
-
- public static final Option<Boolean> TOPOLOGY_RECOVERY_ENABLED =
- Options.key("topology_recovery_enabled")
- .booleanType()
- .noDefaultValue()
- .withDescription("if true, enables topology recovery");
-
- public static final Option<Integer> CONNECTION_TIMEOUT =
- Options.key("connection_timeout")
- .intType()
- .noDefaultValue()
- .withDescription("connection TCP establishment timeout in
milliseconds");
-
- public static final Option<Integer> REQUESTED_CHANNEL_MAX =
- Options.key("requested_channel_max")
- .intType()
- .noDefaultValue()
- .withDescription("initially requested maximum channel
number");
-
- public static final Option<Integer> REQUESTED_FRAME_MAX =
- Options.key("requested_frame_max")
- .intType()
- .noDefaultValue()
- .withDescription("the requested maximum frame size");
-
- public static final Option<Integer> REQUESTED_HEARTBEAT =
- Options.key("requested_heartbeat")
- .intType()
- .noDefaultValue()
- .withDescription("the requested heartbeat timeout");
+ private Map<String, String> sinkOptionProps = new HashMap<>();
- public static final Option<Long> PREFETCH_COUNT =
- Options.key("prefetch_count")
- .longType()
- .noDefaultValue()
- .withDescription(
- "prefetchCount the max number of messages to
receive without acknowledgement\n");
-
- public static final Option<Integer> DELIVERY_TIMEOUT =
- Options.key("delivery_timeout")
- .intType()
- .noDefaultValue()
- .withDescription("deliveryTimeout maximum wait time");
-
- public static final Option<String> ROUTING_KEY =
- Options.key("routing_key")
- .stringType()
- .noDefaultValue()
- .withDescription("the routing key to publish the message
to");
-
- public static final Option<String> EXCHANGE =
- Options.key("exchange")
- .stringType()
- .noDefaultValue()
- .withDescription("the exchange to publish the message to");
-
- public static final Option<Boolean> FOR_E2E_TESTING =
- Options.key("for_e2e_testing")
- .booleanType()
- .noDefaultValue()
- .withDescription("use to recognize E2E mode");
-
- public static final Option<Map<String, String>> RABBITMQ_CONFIG =
- Options.key("rabbitmq.config")
- .mapType()
- .defaultValue(Collections.emptyMap())
- .withDescription(
- "In addition to the above parameters that must be
specified by the RabbitMQ client, the user can also specify multiple
non-mandatory parameters for the client, "
- + "covering [all the parameters specified
in the official RabbitMQ document](https://www.rabbitmq.com/configure.html).");
-
- public static final Option<Boolean> USE_CORRELATION_ID =
- Options.key("use_correlation_id")
- .booleanType()
- .noDefaultValue()
- .withDescription(
- "Whether the messages received are supplied with a
unique"
- + "id to deduplicate messages (in case of
failed acknowledgments).");
-
- public static final Option<Boolean> DURABLE =
- Options.key("durable")
- .booleanType()
- .defaultValue(true)
- .withDescription(
- "true: The queue will survive a server restart."
- + "false: The queue will be deleted on
server restart.");
-
- public static final Option<Boolean> EXCLUSIVE =
- Options.key("exclusive")
- .booleanType()
- .defaultValue(false)
- .withDescription(
- "true: The queue is used only by the current
connection and will be deleted when the connection closes."
- + "false: The queue can be used by
multiple connections.");
-
- public static final Option<Boolean> AUTO_DELETE =
- Options.key("auto_delete")
- .booleanType()
- .defaultValue(false)
- .withDescription(
- "true: The queue will be deleted automatically
when the last consumer unsubscribes."
- + "false: The queue will not be
automatically deleted.");
-
- private void parseSinkOptionProperties(Config pluginConfig) {
- if (CheckConfigUtil.isValidParam(pluginConfig, RABBITMQ_CONFIG.key()))
{
- pluginConfig
- .getObject(RABBITMQ_CONFIG.key())
- .forEach(
- (key, value) -> {
- final String configKey = key.toLowerCase();
- this.sinkOptionProps.put(configKey,
value.unwrapped());
- });
+ public RabbitmqConfig(ReadonlyConfig config) {
+ this.host = config.get(RabbitmqBaseOptions.HOST);
+ this.port = config.get(RabbitmqBaseOptions.PORT);
+ this.queueName = config.get(RabbitmqBaseOptions.QUEUE_NAME);
+ if (config.getOptional(RabbitmqBaseOptions.USERNAME).isPresent()) {
+ this.username = config.get(RabbitmqBaseOptions.USERNAME);
}
- }
-
- public RabbitmqConfig(Config config) {
- this.host = config.getString(HOST.key());
- this.port = config.getInt(PORT.key());
- this.queueName = config.getString(QUEUE_NAME.key());
- if (config.hasPath(USERNAME.key())) {
- this.username = config.getString(USERNAME.key());
- }
- if (config.hasPath(PASSWORD.key())) {
- this.password = config.getString(PASSWORD.key());
+ if (config.getOptional(RabbitmqBaseOptions.PASSWORD).isPresent()) {
+ this.password = config.get(RabbitmqBaseOptions.PASSWORD);
}
- if (config.hasPath(VIRTUAL_HOST.key())) {
- this.virtualHost = config.getString(VIRTUAL_HOST.key());
+ if (config.getOptional(RabbitmqBaseOptions.VIRTUAL_HOST).isPresent()) {
+ this.virtualHost = config.get(RabbitmqBaseOptions.VIRTUAL_HOST);
}
- if (config.hasPath(NETWORK_RECOVERY_INTERVAL.key())) {
- this.networkRecoveryInterval =
config.getInt(NETWORK_RECOVERY_INTERVAL.key());
+ if
(config.getOptional(RabbitmqBaseOptions.NETWORK_RECOVERY_INTERVAL).isPresent())
{
+ this.networkRecoveryInterval =
+ config.get(RabbitmqBaseOptions.NETWORK_RECOVERY_INTERVAL);
}
- if (config.hasPath(AUTOMATIC_RECOVERY_ENABLED.key())) {
- this.automaticRecovery =
config.getBoolean(AUTOMATIC_RECOVERY_ENABLED.key());
+ if
(config.getOptional(RabbitmqBaseOptions.AUTOMATIC_RECOVERY_ENABLED).isPresent())
{
+ this.automaticRecovery =
config.get(RabbitmqBaseOptions.AUTOMATIC_RECOVERY_ENABLED);
}
- if (config.hasPath(TOPOLOGY_RECOVERY_ENABLED.key())) {
- this.topologyRecovery =
config.getBoolean(TOPOLOGY_RECOVERY_ENABLED.key());
+ if
(config.getOptional(RabbitmqBaseOptions.TOPOLOGY_RECOVERY_ENABLED).isPresent())
{
+ this.topologyRecovery =
config.get(RabbitmqBaseOptions.TOPOLOGY_RECOVERY_ENABLED);
}
- if (config.hasPath(CONNECTION_TIMEOUT.key())) {
- this.connectionTimeout = config.getInt(CONNECTION_TIMEOUT.key());
+ if
(config.getOptional(RabbitmqBaseOptions.CONNECTION_TIMEOUT).isPresent()) {
+ this.connectionTimeout =
config.get(RabbitmqBaseOptions.CONNECTION_TIMEOUT);
}
- if (config.hasPath(REQUESTED_CHANNEL_MAX.key())) {
- this.requestedChannelMax =
config.getInt(REQUESTED_CHANNEL_MAX.key());
+ if
(config.getOptional(RabbitmqSourceOptions.REQUESTED_CHANNEL_MAX).isPresent()) {
+ this.requestedChannelMax =
config.get(RabbitmqSourceOptions.REQUESTED_CHANNEL_MAX);
}
- if (config.hasPath(REQUESTED_FRAME_MAX.key())) {
- this.requestedFrameMax = config.getInt(REQUESTED_FRAME_MAX.key());
+ if
(config.getOptional(RabbitmqSourceOptions.REQUESTED_FRAME_MAX).isPresent()) {
+ this.requestedFrameMax =
config.get(RabbitmqSourceOptions.REQUESTED_FRAME_MAX);
}
- if (config.hasPath(REQUESTED_HEARTBEAT.key())) {
- this.requestedHeartbeat = config.getInt(REQUESTED_HEARTBEAT.key());
+ if
(config.getOptional(RabbitmqSourceOptions.REQUESTED_HEARTBEAT).isPresent()) {
+ this.requestedHeartbeat =
config.get(RabbitmqSourceOptions.REQUESTED_HEARTBEAT);
}
- if (config.hasPath(PREFETCH_COUNT.key())) {
- this.prefetchCount = config.getInt(PREFETCH_COUNT.key());
+ if
(config.getOptional(RabbitmqSourceOptions.PREFETCH_COUNT).isPresent()) {
+ this.prefetchCount =
config.get(RabbitmqSourceOptions.PREFETCH_COUNT);
}
- if (config.hasPath(DELIVERY_TIMEOUT.key())) {
- this.deliveryTimeout = config.getInt(DELIVERY_TIMEOUT.key());
+ if
(config.getOptional(RabbitmqSourceOptions.DELIVERY_TIMEOUT).isPresent()) {
+ this.deliveryTimeout =
config.get(RabbitmqSourceOptions.DELIVERY_TIMEOUT);
}
- if (config.hasPath(ROUTING_KEY.key())) {
- this.routingKey = config.getString(ROUTING_KEY.key());
+ if (config.getOptional(RabbitmqBaseOptions.ROUTING_KEY).isPresent()) {
+ this.routingKey = config.get(RabbitmqBaseOptions.ROUTING_KEY);
}
- if (config.hasPath(EXCHANGE.key())) {
- this.exchange = config.getString(EXCHANGE.key());
+ if (config.getOptional(RabbitmqBaseOptions.EXCHANGE).isPresent()) {
+ this.exchange = config.get(RabbitmqBaseOptions.EXCHANGE);
}
- if (config.hasPath(FOR_E2E_TESTING.key())) {
- this.forE2ETesting = config.getBoolean(FOR_E2E_TESTING.key());
+ if
(config.getOptional(RabbitmqBaseOptions.FOR_E2E_TESTING).isPresent()) {
+ this.forE2ETesting =
config.get(RabbitmqBaseOptions.FOR_E2E_TESTING);
}
- if (config.hasPath(USE_CORRELATION_ID.key())) {
- this.usesCorrelationId =
config.getBoolean(USE_CORRELATION_ID.key());
+ if
(config.getOptional(RabbitmqSourceOptions.USE_CORRELATION_ID).isPresent()) {
+ this.usesCorrelationId =
config.get(RabbitmqSourceOptions.USE_CORRELATION_ID);
}
- if (config.hasPath(DURABLE.key())) {
- this.durable = config.getBoolean(DURABLE.key());
+ if (config.getOptional(RabbitmqBaseOptions.DURABLE).isPresent()) {
+ this.durable = config.get(RabbitmqBaseOptions.DURABLE);
}
- if (config.hasPath(EXCLUSIVE.key())) {
- this.exclusive = config.getBoolean(EXCLUSIVE.key());
+ if (config.getOptional(RabbitmqBaseOptions.EXCLUSIVE).isPresent()) {
+ this.exclusive = config.get(RabbitmqBaseOptions.EXCLUSIVE);
}
- if (config.hasPath(AUTO_DELETE.key())) {
- this.autoDelete = config.getBoolean(AUTO_DELETE.key());
+ if (config.getOptional(RabbitmqBaseOptions.AUTO_DELETE).isPresent()) {
+ this.autoDelete = config.get(RabbitmqBaseOptions.AUTO_DELETE);
}
- parseSinkOptionProperties(config);
+ this.sinkOptionProps = config.get(RabbitmqSinkOptions.RABBITMQ_CONFIG);
}
-
- @VisibleForTesting
- public RabbitmqConfig() {}
}
diff --git
a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/config/RabbitmqSinkOptions.java
b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/config/RabbitmqSinkOptions.java
new file mode 100644
index 0000000000..cfd602596f
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/config/RabbitmqSinkOptions.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.rabbitmq.config;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
+import java.util.Collections;
+import java.util.Map;
+
+public class RabbitmqSinkOptions extends RabbitmqBaseOptions {
+
+ public static final Option<Map<String, String>> RABBITMQ_CONFIG =
+ Options.key("rabbitmq.config")
+ .mapType()
+ .defaultValue(Collections.emptyMap())
+ .withDescription(
+ "In addition to the above parameters that must be
specified by the RabbitMQ client, the user can also specify multiple
non-mandatory parameters for the client, "
+ + "covering [all the parameters specified
in the official RabbitMQ document](https://www.rabbitmq.com/configure.html).");
+}
diff --git
a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/config/RabbitmqSourceOptions.java
b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/config/RabbitmqSourceOptions.java
new file mode 100644
index 0000000000..dcc72f2067
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/config/RabbitmqSourceOptions.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.rabbitmq.config;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
+public class RabbitmqSourceOptions extends RabbitmqBaseOptions {
+
+ public static final Option<Integer> REQUESTED_CHANNEL_MAX =
+ Options.key("requested_channel_max")
+ .intType()
+ .noDefaultValue()
+ .withDescription("initially requested maximum channel
number");
+
+ public static final Option<Integer> REQUESTED_FRAME_MAX =
+ Options.key("requested_frame_max")
+ .intType()
+ .noDefaultValue()
+ .withDescription("the requested maximum frame size");
+
+ public static final Option<Integer> REQUESTED_HEARTBEAT =
+ Options.key("requested_heartbeat")
+ .intType()
+ .noDefaultValue()
+ .withDescription("the requested heartbeat timeout");
+
+ public static final Option<Integer> PREFETCH_COUNT =
+ Options.key("prefetch_count")
+ .intType()
+ .noDefaultValue()
+ .withDescription(
+ "prefetchCount the max number of messages to
receive without acknowledgement\n");
+
+ public static final Option<Integer> DELIVERY_TIMEOUT =
+ Options.key("delivery_timeout")
+ .intType()
+ .noDefaultValue()
+ .withDescription("deliveryTimeout maximum wait time");
+
+ public static final Option<Boolean> USE_CORRELATION_ID =
+ Options.key("use_correlation_id")
+ .booleanType()
+ .noDefaultValue()
+ .withDescription(
+ "Whether the messages received are supplied with a
unique"
+ + "id to deduplicate messages (in case of
failed acknowledgments).");
+}
diff --git
a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/sink/RabbitmqSink.java
b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/sink/RabbitmqSink.java
index 7d4f26272b..bfcedcc340 100644
---
a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/sink/RabbitmqSink.java
+++
b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/sink/RabbitmqSink.java
@@ -17,82 +17,39 @@
package org.apache.seatunnel.connectors.seatunnel.rabbitmq.sink;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
-import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-import org.apache.seatunnel.common.config.CheckResult;
-import org.apache.seatunnel.common.constants.PluginType;
import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
import
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig;
-import
org.apache.seatunnel.connectors.seatunnel.rabbitmq.exception.RabbitmqConnectorException;
-
-import com.google.auto.service.AutoService;
import java.io.IOException;
import java.util.Optional;
-import static
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.HOST;
-import static
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.PASSWORD;
-import static
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.PORT;
-import static
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.QUEUE_NAME;
-import static
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.USERNAME;
-import static
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.VIRTUAL_HOST;
-
-@AutoService(SeaTunnelSink.class)
public class RabbitmqSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
- private SeaTunnelRowType seaTunnelRowType;
- private Config pluginConfig;
- private RabbitmqConfig rabbitMQConfig;
+
+ private final RabbitmqConfig rabbitMQConfig;
+ private final CatalogTable catalogTable;
@Override
public String getPluginName() {
return "RabbitMQ";
}
- @Override
- public void prepare(Config pluginConfig) throws PrepareFailException {
- this.pluginConfig = pluginConfig;
-
- CheckResult result =
- CheckConfigUtil.checkAllExists(
- pluginConfig,
- HOST.key(),
- PORT.key(),
- VIRTUAL_HOST.key(),
- USERNAME.key(),
- PASSWORD.key(),
- QUEUE_NAME.key());
- if (!result.isSuccess()) {
- throw new RabbitmqConnectorException(
- SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
- String.format(
- "PluginName: %s, PluginType: %s, Message: %s",
- getPluginName(), PluginType.SINK,
result.getMsg()));
- }
- rabbitMQConfig = new RabbitmqConfig(pluginConfig);
- }
-
- @Override
- public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
- this.seaTunnelRowType = seaTunnelRowType;
+ public RabbitmqSink(RabbitmqConfig rabbitMQConfig, CatalogTable
catalogTable) {
+ this.rabbitMQConfig = rabbitMQConfig;
+ this.catalogTable = catalogTable;
}
@Override
public AbstractSinkWriter<SeaTunnelRow, Void>
createWriter(SinkWriter.Context context)
throws IOException {
- return new RabbitmqSinkWriter(rabbitMQConfig, seaTunnelRowType);
+ return new RabbitmqSinkWriter(rabbitMQConfig,
catalogTable.getSeaTunnelRowType());
}
@Override
public Optional<CatalogTable> getWriteCatalogTable() {
- return super.getWriteCatalogTable();
+ return Optional.of(catalogTable);
}
}
diff --git
a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/sink/RabbitmqSinkFactory.java
b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/sink/RabbitmqSinkFactory.java
index 4618d351d9..540707d0f0 100644
---
a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/sink/RabbitmqSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/sink/RabbitmqSinkFactory.java
@@ -18,26 +18,15 @@
package org.apache.seatunnel.connectors.seatunnel.rabbitmq.sink;
import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
+import
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig;
+import
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqSinkOptions;
import com.google.auto.service.AutoService;
-import static
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.AUTOMATIC_RECOVERY_ENABLED;
-import static
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.CONNECTION_TIMEOUT;
-import static
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.EXCHANGE;
-import static
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.HOST;
-import static
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.NETWORK_RECOVERY_INTERVAL;
-import static
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.PASSWORD;
-import static
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.PORT;
-import static
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.QUEUE_NAME;
-import static
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.RABBITMQ_CONFIG;
-import static
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.ROUTING_KEY;
-import static
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.TOPOLOGY_RECOVERY_ENABLED;
-import static
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.URL;
-import static
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.USERNAME;
-import static
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.VIRTUAL_HOST;
-
@AutoService(Factory.class)
public class RabbitmqSinkFactory implements TableSinkFactory {
@@ -49,17 +38,32 @@ public class RabbitmqSinkFactory implements
TableSinkFactory {
@Override
public OptionRule optionRule() {
return OptionRule.builder()
- .required(HOST, PORT, VIRTUAL_HOST, QUEUE_NAME)
- .bundled(USERNAME, PASSWORD)
+ .required(
+ RabbitmqSinkOptions.HOST,
+ RabbitmqSinkOptions.PORT,
+ RabbitmqSinkOptions.VIRTUAL_HOST,
+ RabbitmqSinkOptions.QUEUE_NAME)
+ .bundled(RabbitmqSinkOptions.USERNAME,
RabbitmqSinkOptions.PASSWORD)
.optional(
- URL,
- ROUTING_KEY,
- EXCHANGE,
- NETWORK_RECOVERY_INTERVAL,
- TOPOLOGY_RECOVERY_ENABLED,
- AUTOMATIC_RECOVERY_ENABLED,
- CONNECTION_TIMEOUT,
- RABBITMQ_CONFIG)
+ RabbitmqSinkOptions.URL,
+ RabbitmqSinkOptions.ROUTING_KEY,
+ RabbitmqSinkOptions.EXCHANGE,
+ RabbitmqSinkOptions.NETWORK_RECOVERY_INTERVAL,
+ RabbitmqSinkOptions.TOPOLOGY_RECOVERY_ENABLED,
+ RabbitmqSinkOptions.AUTOMATIC_RECOVERY_ENABLED,
+ RabbitmqSinkOptions.CONNECTION_TIMEOUT,
+ RabbitmqSinkOptions.FOR_E2E_TESTING,
+ RabbitmqSinkOptions.DURABLE,
+ RabbitmqSinkOptions.EXCLUSIVE,
+ RabbitmqSinkOptions.AUTO_DELETE,
+ RabbitmqSinkOptions.RABBITMQ_CONFIG)
.build();
}
+
+ @Override
+ public TableSink createSink(TableSinkFactoryContext context) {
+ return () ->
+ new RabbitmqSink(
+ new RabbitmqConfig(context.getOptions()),
context.getCatalogTable());
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/source/RabbitmqSource.java
b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/source/RabbitmqSource.java
index 01146f26ac..afe108f229 100644
---
a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/source/RabbitmqSource.java
+++
b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/source/RabbitmqSource.java
@@ -17,12 +17,8 @@
package org.apache.seatunnel.connectors.seatunnel.rabbitmq.source;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
import org.apache.seatunnel.api.common.JobContext;
-import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
-import org.apache.seatunnel.api.options.ConnectorCommonOptions;
import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
@@ -30,11 +26,7 @@ import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.api.source.SupportParallelism;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
-import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.common.constants.PluginType;
import
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig;
@@ -43,23 +35,23 @@ import
org.apache.seatunnel.connectors.seatunnel.rabbitmq.split.RabbitmqSplit;
import
org.apache.seatunnel.connectors.seatunnel.rabbitmq.split.RabbitmqSplitEnumeratorState;
import org.apache.seatunnel.format.json.JsonDeserializationSchema;
-import com.google.auto.service.AutoService;
-
-import static
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.HOST;
-import static
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.PASSWORD;
-import static
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.PORT;
-import static
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.QUEUE_NAME;
-import static
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.USERNAME;
-import static
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.VIRTUAL_HOST;
+import java.util.Collections;
+import java.util.List;
-@AutoService(SeaTunnelSource.class)
public class RabbitmqSource
implements SeaTunnelSource<SeaTunnelRow, RabbitmqSplit,
RabbitmqSplitEnumeratorState>,
SupportParallelism {
private DeserializationSchema<SeaTunnelRow> deserializationSchema;
private JobContext jobContext;
- private RabbitmqConfig rabbitMQConfig;
+ private final RabbitmqConfig rabbitMQConfig;
+ private final CatalogTable catalogTable;
+
+ public RabbitmqSource(RabbitmqConfig rabbitMQConfig, CatalogTable
catalogTable) {
+ this.rabbitMQConfig = rabbitMQConfig;
+ this.catalogTable = catalogTable;
+ this.deserializationSchema = new
JsonDeserializationSchema(catalogTable, false, false);
+ }
@Override
public Boundedness getBoundedness() {
@@ -79,31 +71,8 @@ public class RabbitmqSource
}
@Override
- public void prepare(Config config) throws PrepareFailException {
- CheckResult result =
- CheckConfigUtil.checkAllExists(
- config,
- HOST.key(),
- PORT.key(),
- VIRTUAL_HOST.key(),
- USERNAME.key(),
- PASSWORD.key(),
- QUEUE_NAME.key(),
- ConnectorCommonOptions.SCHEMA.key());
- if (!result.isSuccess()) {
- throw new RabbitmqConnectorException(
- SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
- String.format(
- "PluginName: %s, PluginType: %s, Message: %s",
- getPluginName(), PluginType.SOURCE,
result.getMsg()));
- }
- this.rabbitMQConfig = new RabbitmqConfig(config);
- setDeserialization(config);
- }
-
- @Override
- public SeaTunnelDataType getProducedType() {
- return deserializationSchema.getProducedType();
+ public List<CatalogTable> getProducedCatalogTables() {
+ return Collections.singletonList(catalogTable);
}
@Override
@@ -130,11 +99,4 @@ public class RabbitmqSource
public void setJobContext(JobContext jobContext) {
this.jobContext = jobContext;
}
-
- private void setDeserialization(Config config) {
- // TODO: format SPI
- // only support json deserializationSchema
- CatalogTable catalogTable = CatalogTableUtil.buildWithConfig(config);
- this.deserializationSchema = new
JsonDeserializationSchema(catalogTable, false, false);
- }
}
diff --git
a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/source/RabbitmqSourceFactory.java
b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/source/RabbitmqSourceFactory.java
index 212b194065..a45a2d11a1 100644
---
a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/source/RabbitmqSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/source/RabbitmqSourceFactory.java
@@ -18,31 +18,20 @@
package org.apache.seatunnel.connectors.seatunnel.rabbitmq.source;
import org.apache.seatunnel.api.configuration.util.OptionRule;
-import org.apache.seatunnel.api.options.ConnectorCommonOptions;
import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
+import org.apache.seatunnel.api.table.connector.TableSource;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
+import
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig;
+import
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqSinkOptions;
+import
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqSourceOptions;
import com.google.auto.service.AutoService;
-import static
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.AUTOMATIC_RECOVERY_ENABLED;
-import static
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.CONNECTION_TIMEOUT;
-import static
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.DELIVERY_TIMEOUT;
-import static
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.EXCHANGE;
-import static
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.HOST;
-import static
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.NETWORK_RECOVERY_INTERVAL;
-import static
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.PASSWORD;
-import static
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.PORT;
-import static
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.PREFETCH_COUNT;
-import static
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.QUEUE_NAME;
-import static
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.REQUESTED_CHANNEL_MAX;
-import static
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.REQUESTED_FRAME_MAX;
-import static
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.REQUESTED_HEARTBEAT;
-import static
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.ROUTING_KEY;
-import static
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.TOPOLOGY_RECOVERY_ENABLED;
-import static
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.URL;
-import static
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.USERNAME;
-import static
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.VIRTUAL_HOST;
+import java.io.Serializable;
@AutoService(Factory.class)
public class RabbitmqSourceFactory implements TableSourceFactory {
@@ -54,24 +43,43 @@ public class RabbitmqSourceFactory implements
TableSourceFactory {
@Override
public OptionRule optionRule() {
return OptionRule.builder()
- .required(HOST, PORT, VIRTUAL_HOST, QUEUE_NAME,
ConnectorCommonOptions.SCHEMA)
- .bundled(USERNAME, PASSWORD)
+ .required(
+ RabbitmqSourceOptions.HOST,
+ RabbitmqSourceOptions.PORT,
+ RabbitmqSourceOptions.VIRTUAL_HOST,
+ RabbitmqSourceOptions.QUEUE_NAME,
+ RabbitmqSourceOptions.SCHEMA)
+ .bundled(RabbitmqSourceOptions.USERNAME,
RabbitmqSourceOptions.PASSWORD)
.optional(
- URL,
- ROUTING_KEY,
- EXCHANGE,
- NETWORK_RECOVERY_INTERVAL,
- TOPOLOGY_RECOVERY_ENABLED,
- AUTOMATIC_RECOVERY_ENABLED,
- CONNECTION_TIMEOUT,
- REQUESTED_CHANNEL_MAX,
- REQUESTED_FRAME_MAX,
- REQUESTED_HEARTBEAT,
- PREFETCH_COUNT,
- DELIVERY_TIMEOUT)
+ RabbitmqSourceOptions.URL,
+ RabbitmqSourceOptions.ROUTING_KEY,
+ RabbitmqSourceOptions.EXCHANGE,
+ RabbitmqSourceOptions.NETWORK_RECOVERY_INTERVAL,
+ RabbitmqSourceOptions.TOPOLOGY_RECOVERY_ENABLED,
+ RabbitmqSourceOptions.AUTOMATIC_RECOVERY_ENABLED,
+ RabbitmqSourceOptions.CONNECTION_TIMEOUT,
+ RabbitmqSinkOptions.FOR_E2E_TESTING,
+ RabbitmqSinkOptions.DURABLE,
+ RabbitmqSinkOptions.EXCLUSIVE,
+ RabbitmqSinkOptions.AUTO_DELETE,
+ RabbitmqSourceOptions.REQUESTED_CHANNEL_MAX,
+ RabbitmqSourceOptions.REQUESTED_FRAME_MAX,
+ RabbitmqSourceOptions.REQUESTED_HEARTBEAT,
+ RabbitmqSourceOptions.PREFETCH_COUNT,
+ RabbitmqSourceOptions.DELIVERY_TIMEOUT)
.build();
}
+ @Override
+ public <T, SplitT extends SourceSplit, StateT extends Serializable>
+ TableSource<T, SplitT, StateT>
createSource(TableSourceFactoryContext context) {
+ return () ->
+ (SeaTunnelSource<T, SplitT, StateT>)
+ new RabbitmqSource(
+ new RabbitmqConfig(context.getOptions()),
+
CatalogTableUtil.buildWithConfig(context.getOptions()));
+ }
+
@Override
public Class<? extends SeaTunnelSource> getSourceClass() {
return RabbitmqSource.class;