This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new aabfc8eb78 [Feature][Rabbitmq] Allow configuration of queue durability
and deletion policy (#7365)
aabfc8eb78 is described below
commit aabfc8eb78dd33ae479e9ea82bac8371cf78092d
Author: Logic <[email protected]>
AuthorDate: Thu Aug 29 21:05:25 2024 +0800
[Feature][Rabbitmq] Allow configuration of queue durability and deletion
policy (#7365)
---
docs/en/connector-v2/sink/Rabbitmq.md | 39 ++++++++++++++++++++++
.../seatunnel/rabbitmq/client/RabbitmqClient.java | 11 ++++--
.../seatunnel/rabbitmq/config/RabbitmqConfig.java | 36 ++++++++++++++++++++
.../e2e/connector/rabbitmq/RabbitmqIT.java | 9 +++++
.../src/test/resources/rabbitmq-to-rabbitmq.conf | 6 ++++
5 files changed, 98 insertions(+), 3 deletions(-)
diff --git a/docs/en/connector-v2/sink/Rabbitmq.md
b/docs/en/connector-v2/sink/Rabbitmq.md
index 489287249e..c7963525fb 100644
--- a/docs/en/connector-v2/sink/Rabbitmq.md
+++ b/docs/en/connector-v2/sink/Rabbitmq.md
@@ -57,6 +57,21 @@ convenience method for setting the fields in an AMQP URI:
host, port, username,
the queue to write the message to
+### durable [boolean]
+
+true: The queue will survive a server restart.
+false: The queue will be deleted on server restart.
+
+### exclusive [boolean]
+
+true: The queue is used only by the current connection and will be deleted
when the connection closes.
+false: The queue can be used by multiple connections.
+
+### auto_delete [boolean]
+
+true: The queue will be deleted automatically when the last consumer
unsubscribes.
+false: The queue will not be automatically deleted.
+
### schema [Config]
#### fields [Config]
@@ -112,6 +127,30 @@ sink {
}
```
+### Example 2
+
+queue with durable, exclusive, auto_delete:
+
+```hocon
+sink {
+ RabbitMQ {
+ host = "rabbitmq-e2e"
+ port = 5672
+ virtual_host = "/"
+ username = "guest"
+ password = "guest"
+ queue_name = "test1"
+ durable = "true"
+ exclusive = "false"
+ auto_delete = "false"
+ rabbitmq.config = {
+ requested-heartbeat = 10
+ connection-timeout = 10
+ }
+ }
+}
+```
+
## Changelog
### next version
diff --git
a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/client/RabbitmqClient.java
b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/client/RabbitmqClient.java
index 82ae2728d6..3f5c862cad 100644
---
a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/client/RabbitmqClient.java
+++
b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/client/RabbitmqClient.java
@@ -189,11 +189,16 @@ public class RabbitmqClient {
protected void setupQueue() throws IOException {
if (config.getQueueName() != null) {
- declareQueueDefaults(channel, config.getQueueName());
+ declareQueueDefaults(channel, config);
}
}
- private void declareQueueDefaults(Channel channel, String queueName)
throws IOException {
- channel.queueDeclare(queueName, true, false, false, null);
+ private void declareQueueDefaults(Channel channel, RabbitmqConfig config)
throws IOException {
+ channel.queueDeclare(
+ config.getQueueName(),
+ config.getDurable(),
+ config.getExclusive(),
+ config.getAutoDelete(),
+ null);
}
}
diff --git
a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/config/RabbitmqConfig.java
b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/config/RabbitmqConfig.java
index e8e2ce55c3..8475817457 100644
---
a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/config/RabbitmqConfig.java
+++
b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/config/RabbitmqConfig.java
@@ -53,6 +53,9 @@ public class RabbitmqConfig implements Serializable {
private Integer prefetchCount;
private long deliveryTimeout;
private String queueName;
+ private Boolean durable;
+ private Boolean exclusive;
+ private Boolean autoDelete;
private String routingKey;
private boolean logFailuresOnly = false;
private String exchange = "";
@@ -195,6 +198,30 @@ public class RabbitmqConfig implements Serializable {
"Whether the messages received are supplied with a
unique"
+ "id to deduplicate messages (in case of
failed acknowledgments).");
+ public static final Option<Boolean> DURABLE =
+ Options.key("durable")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription(
+ "true: The queue will survive a server restart."
+ + "false: The queue will be deleted on
server restart.");
+
+ public static final Option<Boolean> EXCLUSIVE =
+ Options.key("exclusive")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "true: The queue is used only by the current
connection and will be deleted when the connection closes."
+ + "false: The queue can be used by
multiple connections.");
+
+ public static final Option<Boolean> AUTO_DELETE =
+ Options.key("auto_delete")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "true: The queue will be deleted automatically
when the last consumer unsubscribes."
+ + "false: The queue will not be
automatically deleted.");
+
private void parseSinkOptionProperties(Config pluginConfig) {
if (CheckConfigUtil.isValidParam(pluginConfig, RABBITMQ_CONFIG.key()))
{
pluginConfig
@@ -259,6 +286,15 @@ public class RabbitmqConfig implements Serializable {
if (config.hasPath(USE_CORRELATION_ID.key())) {
this.usesCorrelationId =
config.getBoolean(USE_CORRELATION_ID.key());
}
+ if (config.hasPath(DURABLE.key())) {
+ this.durable = config.getBoolean(DURABLE.key());
+ }
+ if (config.hasPath(EXCLUSIVE.key())) {
+ this.exclusive = config.getBoolean(EXCLUSIVE.key());
+ }
+ if (config.hasPath(AUTO_DELETE.key())) {
+ this.autoDelete = config.getBoolean(AUTO_DELETE.key());
+ }
parseSinkOptionProperties(config);
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rabbitmq-e2e/src/test/java/org/apache/seatunnel/e2e/connector/rabbitmq/RabbitmqIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rabbitmq-e2e/src/test/java/org/apache/seatunnel/e2e/connector/rabbitmq/RabbitmqIT.java
index 7052aa9bef..a846949d85 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rabbitmq-e2e/src/test/java/org/apache/seatunnel/e2e/connector/rabbitmq/RabbitmqIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rabbitmq-e2e/src/test/java/org/apache/seatunnel/e2e/connector/rabbitmq/RabbitmqIT.java
@@ -75,6 +75,9 @@ public class RabbitmqIT extends TestSuiteBase implements
TestResource {
private static final String SINK_QUEUE_NAME = "test1";
private static final String USERNAME = "guest";
private static final String PASSWORD = "guest";
+ private static final Boolean DURABLE = true;
+ private static final Boolean EXCLUSIVE = false;
+ private static final Boolean AUTO_DELETE = false;
private static final Pair<SeaTunnelRowType, List<SeaTunnelRow>>
TEST_DATASET =
generateTestDataSet();
@@ -185,6 +188,9 @@ public class RabbitmqIT extends TestSuiteBase implements
TestResource {
config.setVirtualHost("/");
config.setUsername(USERNAME);
config.setPassword(PASSWORD);
+ config.setDurable(DURABLE);
+ config.setExclusive(EXCLUSIVE);
+ config.setAutoDelete(AUTO_DELETE);
rabbitmqClient = new RabbitmqClient(config);
} catch (Exception e) {
throw new RuntimeException("init Rabbitmq error", e);
@@ -201,6 +207,9 @@ public class RabbitmqIT extends TestSuiteBase implements
TestResource {
config.setVirtualHost("/");
config.setUsername(USERNAME);
config.setPassword(PASSWORD);
+ config.setDurable(DURABLE);
+ config.setExclusive(EXCLUSIVE);
+ config.setAutoDelete(AUTO_DELETE);
return new RabbitmqClient(config);
} catch (Exception e) {
throw new RuntimeException("init Rabbitmq error", e);
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rabbitmq-e2e/src/test/resources/rabbitmq-to-rabbitmq.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rabbitmq-e2e/src/test/resources/rabbitmq-to-rabbitmq.conf
index b3a834bdc2..61267a3adc 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rabbitmq-e2e/src/test/resources/rabbitmq-to-rabbitmq.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rabbitmq-e2e/src/test/resources/rabbitmq-to-rabbitmq.conf
@@ -28,6 +28,9 @@ source {
username = "guest"
password = "guest"
queue_name = "test"
+ durable = "true"
+ exclusive = "false"
+ auto_delete = "false"
for_e2e_testing = true
schema = {
fields {
@@ -61,6 +64,9 @@ sink {
virtual_host = "/"
username = "guest"
password = "guest"
+ durable = "true"
+ exclusive = "false"
+ auto_delete = "false"
queue_name = "test1"
}
}
\ No newline at end of file