This is an automated email from the ASF dual-hosted git repository.

corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 5f9492e62a [Fix][connector-rabbitmq] Set default value for durable, 
exclusive and auto-delete (#9631)
5f9492e62a is described below

commit 5f9492e62a72d0f6e4a03b33c7e1da6f5057d33e
Author: ZHANG YINGHONG <[email protected]>
AuthorDate: Wed Jul 30 10:03:02 2025 +0800

    [Fix][connector-rabbitmq] Set default value for durable, exclusive and 
auto-delete (#9631)
---
 .dlc.json                                          |  1 +
 docs/en/connector-v2/sink/Rabbitmq.md              | 19 +++++++
 docs/en/connector-v2/source/Rabbitmq.md            | 22 +++++++-
 .../seatunnel/rabbitmq/config/RabbitmqConfig.java  | 12 +---
 .../e2e/connector/rabbitmq/RabbitmqIT.java         | 63 +++++++++++----------
 .../rabbitmq-to-rabbitmq-using-default-config.conf | 66 ++++++++++++++++++++++
 6 files changed, 143 insertions(+), 40 deletions(-)

diff --git a/.dlc.json b/.dlc.json
index 42e7bd5edf..35a04e1d21 100644
--- a/.dlc.json
+++ b/.dlc.json
@@ -39,3 +39,4 @@
     403
   ]
 }
+
diff --git a/docs/en/connector-v2/sink/Rabbitmq.md 
b/docs/en/connector-v2/sink/Rabbitmq.md
index 7e7fb5ef59..b71ab415ed 100644
--- a/docs/en/connector-v2/sink/Rabbitmq.md
+++ b/docs/en/connector-v2/sink/Rabbitmq.md
@@ -30,6 +30,9 @@ Used to write data to Rabbitmq.
 | connection_timeout         | int     | no       | -             |
 | rabbitmq.config            | map     | no       | -             |
 | common-options             |         | no       | -             |
+| durable                    | boolean | no       | true          |
+| exclusive                  | boolean | no       | false         |
+| auto_delete                | boolean | no       | false         |
 
 ### host [string]
 
@@ -108,6 +111,22 @@ In addition to the above parameters that must be specified 
by the RabbitMQ clien
 
 Sink plugin common parameters, please refer to [Sink Common 
Options](../sink-common-options.md) for details
 
+### durable
+
+- true: The queue will survive on server restart.
+- false: The queue will be deleted on server restart.
+
+### exclusive
+
+- true: The queue is used only by the current connection and will be deleted 
when the connection closes.
+- false: The queue can be used by multiple connections.
+
+### auto-delete
+
+- true: The queue will be deleted automatically when the last consumer 
unsubscribes.
+- false: The queue will not be automatically deleted.
+
+
 ## Example
 
 simple:
diff --git a/docs/en/connector-v2/source/Rabbitmq.md 
b/docs/en/connector-v2/source/Rabbitmq.md
index bce43b430a..068daee7cd 100644
--- a/docs/en/connector-v2/source/Rabbitmq.md
+++ b/docs/en/connector-v2/source/Rabbitmq.md
@@ -25,8 +25,8 @@ The source must be non-parallel (parallelism set to 1) in 
order to achieve exact
 
 ## Options
 
-|            name            |  type   | required | default value |
-|----------------------------|---------|----------|---------------|
+| name                       | type    | required | default value |
+| -------------------------- | ------- | -------- | ------------- |
 | host                       | string  | yes      | -             |
 | port                       | int     | yes      | -             |
 | virtual_host               | string  | yes      | -             |
@@ -47,6 +47,9 @@ The source must be non-parallel (parallelism set to 1) in 
order to achieve exact
 | prefetch_count             | int     | no       | -             |
 | delivery_timeout           | long    | no       | -             |
 | common-options             |         | no       | -             |
+| durable                    | boolean | no       | true          |
+| exclusive                  | boolean | no       | false         |
+| auto_delete                | boolean | no       | false         |
 
 ### host [string]
 
@@ -132,6 +135,21 @@ deliveryTimeout maximum wait time, in milliseconds, for 
the next message deliver
 
 Source plugin common parameters, please refer to [Source Common 
Options](../source-common-options.md) for details
 
+### durable
+
+- true: The queue will survive on server restart.
+- false: The queue will be deleted on server restart.
+
+### exclusive
+
+- true: The queue is used only by the current connection and will be deleted 
when the connection closes.
+- false: The queue can be used by multiple connections.
+
+### auto-delete
+
+- true: The queue will be deleted automatically when the last consumer 
unsubscribes.
+- false: The queue will not be automatically deleted.
+
 ## Example
 
 simple:
diff --git 
a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/config/RabbitmqConfig.java
 
b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/config/RabbitmqConfig.java
index ba67ed3c69..8ede2a6b91 100644
--- 
a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/config/RabbitmqConfig.java
+++ 
b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/config/RabbitmqConfig.java
@@ -114,15 +114,9 @@ public class RabbitmqConfig implements Serializable {
         if 
(config.getOptional(RabbitmqSourceOptions.USE_CORRELATION_ID).isPresent()) {
             this.usesCorrelationId = 
config.get(RabbitmqSourceOptions.USE_CORRELATION_ID);
         }
-        if (config.getOptional(RabbitmqBaseOptions.DURABLE).isPresent()) {
-            this.durable = config.get(RabbitmqBaseOptions.DURABLE);
-        }
-        if (config.getOptional(RabbitmqBaseOptions.EXCLUSIVE).isPresent()) {
-            this.exclusive = config.get(RabbitmqBaseOptions.EXCLUSIVE);
-        }
-        if (config.getOptional(RabbitmqBaseOptions.AUTO_DELETE).isPresent()) {
-            this.autoDelete = config.get(RabbitmqBaseOptions.AUTO_DELETE);
-        }
+        this.durable = config.get(RabbitmqBaseOptions.DURABLE);
+        this.exclusive = config.get(RabbitmqBaseOptions.EXCLUSIVE);
+        this.autoDelete = config.get(RabbitmqBaseOptions.AUTO_DELETE);
         this.sinkOptionProps = config.get(RabbitmqSinkOptions.RABBITMQ_CONFIG);
     }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rabbitmq-e2e/src/test/java/org/apache/seatunnel/e2e/connector/rabbitmq/RabbitmqIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rabbitmq-e2e/src/test/java/org/apache/seatunnel/e2e/connector/rabbitmq/RabbitmqIT.java
index a846949d85..246bb05d76 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rabbitmq-e2e/src/test/java/org/apache/seatunnel/e2e/connector/rabbitmq/RabbitmqIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rabbitmq-e2e/src/test/java/org/apache/seatunnel/e2e/connector/rabbitmq/RabbitmqIT.java
@@ -71,8 +71,6 @@ public class RabbitmqIT extends TestSuiteBase implements 
TestResource {
     private static final String IMAGE = "rabbitmq:3-management";
     private static final String HOST = "rabbitmq-e2e";
     private static final int PORT = 5672;
-    private static final String QUEUE_NAME = "test";
-    private static final String SINK_QUEUE_NAME = "test1";
     private static final String USERNAME = "guest";
     private static final String PASSWORD = "guest";
     private static final Boolean DURABLE = true;
@@ -86,7 +84,6 @@ public class RabbitmqIT extends TestSuiteBase implements 
TestResource {
 
     private GenericContainer<?> rabbitmqContainer;
     Connection connection;
-    RabbitmqClient rabbitmqClient;
 
     @BeforeAll
     @Override
@@ -102,10 +99,10 @@ public class RabbitmqIT extends TestSuiteBase implements 
TestResource {
                                         
.withStartupTimeout(Duration.ofMinutes(2)));
         Startables.deepStart(Stream.of(rabbitmqContainer)).join();
         log.info("rabbitmq container started");
-        this.initRabbitMQ();
     }
 
-    private void initSourceData() throws IOException, InterruptedException {
+    private void initSourceData(RabbitmqClient rabbitmqClient)
+            throws IOException, InterruptedException {
         List<SeaTunnelRow> rows = TEST_DATASET.getValue();
         for (int i = 0; i < rows.size(); i++) {
             rabbitmqClient.write(
@@ -179,31 +176,12 @@ public class RabbitmqIT extends TestSuiteBase implements 
TestResource {
         return Pair.of(rowType, rows);
     }
 
-    private void initRabbitMQ() {
+    private RabbitmqClient getRabbitmqClient(String queueName) {
         try {
             RabbitmqConfig config = new RabbitmqConfig();
             config.setHost(rabbitmqContainer.getHost());
             config.setPort(rabbitmqContainer.getFirstMappedPort());
-            config.setQueueName(QUEUE_NAME);
-            config.setVirtualHost("/");
-            config.setUsername(USERNAME);
-            config.setPassword(PASSWORD);
-            config.setDurable(DURABLE);
-            config.setExclusive(EXCLUSIVE);
-            config.setAutoDelete(AUTO_DELETE);
-            rabbitmqClient = new RabbitmqClient(config);
-        } catch (Exception e) {
-            throw new RuntimeException("init Rabbitmq error", e);
-        }
-    }
-
-    private RabbitmqClient initSinkRabbitMQ() {
-
-        try {
-            RabbitmqConfig config = new RabbitmqConfig();
-            config.setHost(rabbitmqContainer.getHost());
-            config.setPort(rabbitmqContainer.getFirstMappedPort());
-            config.setQueueName(SINK_QUEUE_NAME);
+            config.setQueueName(queueName);
             config.setVirtualHost("/");
             config.setUsername(USERNAME);
             config.setPassword(PASSWORD);
@@ -227,16 +205,19 @@ public class RabbitmqIT extends TestSuiteBase implements 
TestResource {
 
     @TestTemplate
     public void testRabbitMQ(TestContainer container) throws Exception {
+        final String sourceQueueName = "test";
+        final String sinkQueueName = "test1";
+        RabbitmqClient sourceClient = this.getRabbitmqClient(sourceQueueName);
         // send data to source queue before executeJob start in every 
testContainer
-        initSourceData();
+        initSourceData(sourceClient);
 
         // init consumer client before executeJob start in every testContainer
-        RabbitmqClient sinkRabbitmqClient = initSinkRabbitMQ();
+        RabbitmqClient sinkRabbitmqClient = getRabbitmqClient(sinkQueueName);
 
         Set<String> resultSet = new HashSet<>();
         Handover handover = new Handover<>();
         DefaultConsumer consumer = 
sinkRabbitmqClient.getQueueingConsumer(handover);
-        sinkRabbitmqClient.getChannel().basicConsume(SINK_QUEUE_NAME, true, 
consumer);
+        sinkRabbitmqClient.getChannel().basicConsume(sinkQueueName, true, 
consumer);
         // assert execute Job code
         Container.ExecResult execResult = 
container.executeJob("/rabbitmq-to-rabbitmq.conf");
         Assertions.assertEquals(0, execResult.getExitCode());
@@ -263,4 +244,28 @@ public class RabbitmqIT extends TestSuiteBase implements 
TestResource {
                                         JSON_SERIALIZATION_SCHEMA.serialize(
                                                 
TEST_DATASET.getValue().get(1)))));
     }
+
+    @TestTemplate
+    public void testRabbitMQUSingDefaultConfig(TestContainer container) throws 
Exception {
+        final String sourceQueueName = "test2_0";
+        final String sinkQueueName = "test2_1";
+        RabbitmqClient sourceClient = this.getRabbitmqClient(sourceQueueName);
+        // send data to source queue before executeJob start in every 
testContainer
+        initSourceData(sourceClient);
+
+        // init consumer client before executeJob start in every testContainer
+        RabbitmqClient sinkRabbitmqClient = getRabbitmqClient(sinkQueueName);
+
+        Handover handover = new Handover<>();
+        DefaultConsumer consumer = 
sinkRabbitmqClient.getQueueingConsumer(handover);
+        sinkRabbitmqClient.getChannel().basicConsume(sinkQueueName, true, 
consumer);
+        // assert execute Job code
+        Container.ExecResult execResult = null;
+        try {
+            execResult = 
container.executeJob("/rabbitmq-to-rabbitmq-using-default-config.conf");
+        } catch (IOException | InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+        Assertions.assertEquals(0, execResult.getExitCode());
+    }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rabbitmq-e2e/src/test/resources/rabbitmq-to-rabbitmq-using-default-config.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rabbitmq-e2e/src/test/resources/rabbitmq-to-rabbitmq-using-default-config.conf
new file mode 100644
index 0000000000..d91a6d5e6d
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rabbitmq-e2e/src/test/resources/rabbitmq-to-rabbitmq-using-default-config.conf
@@ -0,0 +1,66 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "STREAMING"
+}
+
+source {
+  RabbitMQ {
+    host = "rabbitmq-e2e"
+    port = 5672
+    virtual_host = "/"
+    username = "guest"
+    password = "guest"
+    queue_name = "test2_0"
+    for_e2e_testing = true
+    schema = {
+      fields {
+        id = bigint
+        c_map = "map<string, smallint>"
+        c_array = "array<tinyint>"
+        c_string = string
+        c_boolean = boolean
+        c_tinyint = tinyint
+        c_smallint = smallint
+        c_int = int
+        c_bigint = bigint
+        c_float = float
+        c_double = double
+        c_decimal = "decimal(2, 1)"
+        c_bytes = bytes
+        c_date = date
+        c_timestamp = timestamp
+      }
+    }
+  }
+}
+
+transform {
+}
+
+sink {
+  RabbitMQ {
+    host = "rabbitmq-e2e"
+    port = 5672
+    virtual_host = "/"
+    username = "guest"
+    password = "guest"
+    queue_name = "test2_1"
+  }
+}
\ No newline at end of file

Reply via email to