This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new db40d24f99b Add support for publisher confirms which allows to fail if
sending to invalid destination (#9313)
db40d24f99b is described below
commit db40d24f99b150bd5ff1cea56df5877ae72654de
Author: Claus Ibsen <[email protected]>
AuthorDate: Thu Feb 9 06:28:28 2023 +0100
Add support for publisher confirms which allows to fail if sending to
invalid destination (#9313)
* rabbitmq confirm
* CAMEL-19008: rabbitmq - Add support for publisher confirms which allows
to fail if sending to invalid destination.
---
.../SpringRabbitMQEndpointConfigurer.java | 9 +++
.../SpringRabbitMQEndpointUriFactory.java | 4 +-
.../component/springrabbit/spring-rabbitmq.json | 4 +-
.../springrabbit/SpringRabbitMQEndpoint.java | 27 ++++++++-
.../springrabbit/SpringRabbitMQProducer.java | 24 +++++++-
.../integration/RabbitMQITSupport.java | 11 +++-
.../RabbitMQProducerInvalidExchangeIT.java | 70 ++++++++++++++++++++++
7 files changed, 143 insertions(+), 6 deletions(-)
diff --git
a/components/camel-spring-rabbitmq/src/generated/java/org/apache/camel/component/springrabbit/SpringRabbitMQEndpointConfigurer.java
b/components/camel-spring-rabbitmq/src/generated/java/org/apache/camel/component/springrabbit/SpringRabbitMQEndpointConfigurer.java
index 71847e230f6..9b44a1eb362 100644
---
a/components/camel-spring-rabbitmq/src/generated/java/org/apache/camel/component/springrabbit/SpringRabbitMQEndpointConfigurer.java
+++
b/components/camel-spring-rabbitmq/src/generated/java/org/apache/camel/component/springrabbit/SpringRabbitMQEndpointConfigurer.java
@@ -34,6 +34,9 @@ public class SpringRabbitMQEndpointConfigurer extends
PropertyConfigurerSupport
case "bridgeErrorHandler":
target.setBridgeErrorHandler(property(camelContext, boolean.class, value));
return true;
case "concurrentconsumers":
case "concurrentConsumers":
target.setConcurrentConsumers(property(camelContext, java.lang.Integer.class,
value)); return true;
+ case "confirm": target.setConfirm(property(camelContext,
java.lang.String.class, value)); return true;
+ case "confirmtimeout":
+ case "confirmTimeout": target.setConfirmTimeout(property(camelContext,
java.time.Duration.class, value).toMillis()); return true;
case "connectionfactory":
case "connectionFactory":
target.setConnectionFactory(property(camelContext,
org.springframework.amqp.rabbit.connection.ConnectionFactory.class, value));
return true;
case "deadletterexchange":
@@ -104,6 +107,9 @@ public class SpringRabbitMQEndpointConfigurer extends
PropertyConfigurerSupport
case "bridgeErrorHandler": return boolean.class;
case "concurrentconsumers":
case "concurrentConsumers": return java.lang.Integer.class;
+ case "confirm": return java.lang.String.class;
+ case "confirmtimeout":
+ case "confirmTimeout": return long.class;
case "connectionfactory":
case "connectionFactory": return
org.springframework.amqp.rabbit.connection.ConnectionFactory.class;
case "deadletterexchange":
@@ -175,6 +181,9 @@ public class SpringRabbitMQEndpointConfigurer extends
PropertyConfigurerSupport
case "bridgeErrorHandler": return target.isBridgeErrorHandler();
case "concurrentconsumers":
case "concurrentConsumers": return target.getConcurrentConsumers();
+ case "confirm": return target.getConfirm();
+ case "confirmtimeout":
+ case "confirmTimeout": return target.getConfirmTimeout();
case "connectionfactory":
case "connectionFactory": return target.getConnectionFactory();
case "deadletterexchange":
diff --git
a/components/camel-spring-rabbitmq/src/generated/java/org/apache/camel/component/springrabbit/SpringRabbitMQEndpointUriFactory.java
b/components/camel-spring-rabbitmq/src/generated/java/org/apache/camel/component/springrabbit/SpringRabbitMQEndpointUriFactory.java
index 59b7ddc3aa7..d3f7ebcade4 100644
---
a/components/camel-spring-rabbitmq/src/generated/java/org/apache/camel/component/springrabbit/SpringRabbitMQEndpointUriFactory.java
+++
b/components/camel-spring-rabbitmq/src/generated/java/org/apache/camel/component/springrabbit/SpringRabbitMQEndpointUriFactory.java
@@ -21,7 +21,7 @@ public class SpringRabbitMQEndpointUriFactory extends
org.apache.camel.support.c
private static final Set<String> SECRET_PROPERTY_NAMES;
private static final Set<String> MULTI_VALUE_PREFIXES;
static {
- Set<String> props = new HashSet<>(35);
+ Set<String> props = new HashSet<>(37);
props.add("acknowledgeMode");
props.add("args");
props.add("asyncConsumer");
@@ -29,6 +29,8 @@ public class SpringRabbitMQEndpointUriFactory extends
org.apache.camel.support.c
props.add("autoStartup");
props.add("bridgeErrorHandler");
props.add("concurrentConsumers");
+ props.add("confirm");
+ props.add("confirmTimeout");
props.add("connectionFactory");
props.add("deadLetterExchange");
props.add("deadLetterExchangeType");
diff --git
a/components/camel-spring-rabbitmq/src/generated/resources/org/apache/camel/component/springrabbit/spring-rabbitmq.json
b/components/camel-spring-rabbitmq/src/generated/resources/org/apache/camel/component/springrabbit/spring-rabbitmq.json
index 1dbd0d0ebe0..b11bbef4c2f 100644
---
a/components/camel-spring-rabbitmq/src/generated/resources/org/apache/camel/component/springrabbit/spring-rabbitmq.json
+++
b/components/camel-spring-rabbitmq/src/generated/resources/org/apache/camel/component/springrabbit/spring-rabbitmq.json
@@ -85,7 +85,9 @@
"messageListenerContainerType": { "kind": "parameter", "displayName":
"Message Listener Container Type", "group": "consumer (advanced)", "label":
"consumer,advanced", "required": false, "type": "string", "javaType":
"java.lang.String", "enum": [ "DMLC", "SMLC" ], "deprecated": false,
"autowired": false, "secret": false, "defaultValue": "DMLC", "description":
"The type of the MessageListenerContainer" },
"prefetchCount": { "kind": "parameter", "displayName": "Prefetch Count",
"group": "consumer (advanced)", "label": "consumer,advanced", "required":
false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false,
"autowired": false, "secret": false, "description": "Tell the broker how many
messages to send in a single request. Often this can be set quite high to
improve throughput." },
"retry": { "kind": "parameter", "displayName": "Retry", "group": "consumer
(advanced)", "label": "consumer,advanced", "required": false, "type": "object",
"javaType": "org.springframework.retry.interceptor.RetryOperationsInterceptor",
"deprecated": false, "autowired": false, "secret": false, "description":
"Custom retry configuration to use. If this is configured then the other
settings such as maximumRetryAttempts for retry are not in use." },
- "replyTimeout": { "kind": "parameter", "displayName": "Reply Timeout",
"group": "producer", "label": "producer", "required": false, "type":
"duration", "javaType": "long", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": "5000", "description": "Specify the timeout in
milliseconds to be used when waiting for a reply message when doing
request\/reply messaging. The default value is 5 seconds. A negative value
indicates an indefinite timeout." },
+ "confirm": { "kind": "parameter", "displayName": "Confirm", "group":
"producer", "label": "producer", "required": false, "type": "string",
"javaType": "java.lang.String", "enum": [ "auto", "enabled", "disabled" ],
"deprecated": false, "autowired": false, "secret": false, "description":
"Controls whether to wait for confirms. The connection factory must be
configured for publisher confirms and this method.auto = Camel detects if the
connection factory uses confirms or not. disabled = [...]
+ "confirmTimeout": { "kind": "parameter", "displayName": "Confirm Timeout",
"group": "producer", "label": "producer", "required": false, "type":
"duration", "javaType": "long", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": "5000", "description": "Specify the timeout in
milliseconds to be used when waiting for a message sent to be confirmed by
RabbitMQ when doing send only messaging (InOnly). The default value is 5
seconds. A negative value indicates an inde [...]
+ "replyTimeout": { "kind": "parameter", "displayName": "Reply Timeout",
"group": "producer", "label": "producer", "required": false, "type":
"duration", "javaType": "long", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": "5000", "description": "Specify the timeout in
milliseconds to be used when waiting for a reply message when doing
request\/reply (InOut) messaging. The default value is 5 seconds. A negative
value indicates an indefinite timeout." },
"usePublisherConnection": { "kind": "parameter", "displayName": "Use
Publisher Connection", "group": "producer", "label": "producer", "required":
false, "type": "boolean", "javaType": "boolean", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": false, "description": "Use
a separate connection for publishers and consumers" },
"lazyStartProducer": { "kind": "parameter", "displayName": "Lazy Start
Producer", "group": "producer (advanced)", "label": "producer,advanced",
"required": false, "type": "boolean", "javaType": "boolean", "deprecated":
false, "autowired": false, "secret": false, "defaultValue": false,
"description": "Whether the producer should be started lazy (on the first
message). By starting lazy you can use this to allow CamelContext and routes to
startup in situations where a producer may other [...]
"args": { "kind": "parameter", "displayName": "Args", "group": "advanced",
"label": "advanced", "required": false, "type": "object", "javaType":
"java.util.Map<java.lang.String, java.lang.Object>", "prefix": "arg.",
"multiValue": true, "deprecated": false, "autowired": false, "secret": false,
"description": "Specify arguments for configuring the different RabbitMQ
concepts, a different prefix is required for each element: arg.consumer.
arg.exchange. arg.queue. arg.binding. arg.dlq.ex [...]
diff --git
a/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/SpringRabbitMQEndpoint.java
b/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/SpringRabbitMQEndpoint.java
index 84b5b49f1a8..1a9caa5107e 100644
---
a/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/SpringRabbitMQEndpoint.java
+++
b/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/SpringRabbitMQEndpoint.java
@@ -147,9 +147,18 @@ public class SpringRabbitMQEndpoint extends
DefaultEndpoint implements AsyncEndp
+ " message brokers and you want to route message
from one system to another.")
private boolean disableReplyTo;
@UriParam(label = "producer", javaType = "java.time.Duration",
defaultValue = "5000",
- description = "Specify the timeout in milliseconds to be used
when waiting for a reply message when doing request/reply messaging."
+ description = "Specify the timeout in milliseconds to be used
when waiting for a reply message when doing request/reply (InOut) messaging."
+ " The default value is 5 seconds. A negative
value indicates an indefinite timeout.")
private long replyTimeout = 5000;
+ @UriParam(label = "producer", javaType = "java.time.Duration",
defaultValue = "5000",
+ description = "Specify the timeout in milliseconds to be used
when waiting for a message sent to be confirmed by RabbitMQ when doing send
only messaging (InOnly)."
+ + " The default value is 5 seconds. A negative
value indicates an indefinite timeout.")
+ private long confirmTimeout = 5000;
+ @UriParam(label = "producer", enums = "auto,enabled,disabled",
+ description = "Controls whether to wait for confirms. The
connection factory must be configured for publisher confirms and this method."
+ +
+ "auto = Camel detects if the connection factory
uses confirms or not. disabled = Confirms is disabled. enabled = Confirms is
enabled.")
+ private String confirm = "auto";
@UriParam(label = "producer", defaultValue = "false",
description = "Use a separate connection for publishers and
consumers")
private boolean usePublisherConnection;
@@ -353,6 +362,22 @@ public class SpringRabbitMQEndpoint extends
DefaultEndpoint implements AsyncEndp
this.replyTimeout = replyTimeout;
}
+ public long getConfirmTimeout() {
+ return confirmTimeout;
+ }
+
+ public void setConfirmTimeout(long confirmTimeout) {
+ this.confirmTimeout = confirmTimeout;
+ }
+
+ public String getConfirm() {
+ return confirm;
+ }
+
+ public void setConfirm(String confirm) {
+ this.confirm = confirm;
+ }
+
public boolean isUsePublisherConnection() {
return usePublisherConnection;
}
diff --git
a/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/SpringRabbitMQProducer.java
b/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/SpringRabbitMQProducer.java
index 98e47629044..dbd09d74d57 100644
---
a/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/SpringRabbitMQProducer.java
+++
b/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/SpringRabbitMQProducer.java
@@ -18,6 +18,7 @@ package org.apache.camel.component.springrabbit;
import java.util.Map;
import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeoutException;
import org.apache.camel.AsyncCallback;
import org.apache.camel.Endpoint;
@@ -201,8 +202,29 @@ public class SpringRabbitMQProducer extends
DefaultAsyncProducer {
msg = getEndpoint().getMessageConverter().toMessage(body, mp);
}
+ final String ex = exchangeName;
+ final String rk = routingKey;
+ boolean confirm;
+ if ("auto".equalsIgnoreCase(getEndpoint().getConfirm())) {
+ confirm =
getEndpoint().getConnectionFactory().isPublisherConfirms();
+ } else if ("enabled".equalsIgnoreCase(getEndpoint().getConfirm())) {
+ confirm = true;
+ } else {
+ confirm = false;
+ }
+ final long timeout = getEndpoint().getConfirmTimeout() <= 0 ?
Long.MAX_VALUE : getEndpoint().getConfirmTimeout();
try {
- getInOnlyTemplate().send(exchangeName, routingKey, msg);
+ Boolean sent = getInOnlyTemplate().invoke(t -> {
+ t.send(ex, rk, msg);
+ if (confirm) {
+ return t.waitForConfirms(timeout);
+ } else {
+ return true;
+ }
+ });
+ if (Boolean.FALSE == sent) {
+ exchange.setException(new TimeoutException("Message not sent
within " + timeout + " millis"));
+ }
} catch (Exception e) {
exchange.setException(e);
}
diff --git
a/components/camel-spring-rabbitmq/src/test/java/org/apache/camel/component/springrabbit/integration/RabbitMQITSupport.java
b/components/camel-spring-rabbitmq/src/test/java/org/apache/camel/component/springrabbit/integration/RabbitMQITSupport.java
index 5cc7526e1e0..ff80deec70b 100644
---
a/components/camel-spring-rabbitmq/src/test/java/org/apache/camel/component/springrabbit/integration/RabbitMQITSupport.java
+++
b/components/camel-spring-rabbitmq/src/test/java/org/apache/camel/component/springrabbit/integration/RabbitMQITSupport.java
@@ -36,16 +36,23 @@ public abstract class RabbitMQITSupport extends
CamelTestSupport {
protected Logger log = LoggerFactory.getLogger(getClass());
- ConnectionFactory createConnectionFactory() {
+ ConnectionFactory createConnectionFactory(boolean confirm) {
CachingConnectionFactory cf = new CachingConnectionFactory();
+ if (confirm) {
+
cf.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
+ }
cf.setUri(service.getAmqpUrl());
return cf;
}
+ protected boolean confirmEnabled() {
+ return false;
+ }
+
@Override
protected CamelContext createCamelContext() throws Exception {
CamelContext context = super.createCamelContext();
- context.getRegistry().bind("myCF", createConnectionFactory());
+ context.getRegistry().bind("myCF",
createConnectionFactory(confirmEnabled()));
SpringRabbitMQComponent rmq = context.getComponent("spring-rabbitmq",
SpringRabbitMQComponent.class);
// turn on auto declare
diff --git
a/components/camel-spring-rabbitmq/src/test/java/org/apache/camel/component/springrabbit/integration/RabbitMQProducerInvalidExchangeIT.java
b/components/camel-spring-rabbitmq/src/test/java/org/apache/camel/component/springrabbit/integration/RabbitMQProducerInvalidExchangeIT.java
new file mode 100644
index 00000000000..694a5af2ec1
--- /dev/null
+++
b/components/camel-spring-rabbitmq/src/test/java/org/apache/camel/component/springrabbit/integration/RabbitMQProducerInvalidExchangeIT.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.springrabbit.integration;
+
+import com.rabbitmq.client.ShutdownSignalException;
+import org.apache.camel.RoutesBuilder;
+import org.apache.camel.builder.RouteBuilder;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.springframework.amqp.core.AmqpAdmin;
+import org.springframework.amqp.core.BindingBuilder;
+import org.springframework.amqp.core.Queue;
+import org.springframework.amqp.core.TopicExchange;
+import org.springframework.amqp.rabbit.connection.ConnectionFactory;
+import org.springframework.amqp.rabbit.core.RabbitAdmin;
+
+import static org.junit.Assert.fail;
+
+public class RabbitMQProducerInvalidExchangeIT extends RabbitMQITSupport {
+
+ @Override
+ protected boolean confirmEnabled() {
+ return true;
+ }
+
+ @Test
+ public void testProducer() {
+ ConnectionFactory cf =
context.getRegistry().lookupByNameAndType("myCF", ConnectionFactory.class);
+
+ Queue q = new Queue("myqueue");
+ TopicExchange t = new TopicExchange("foo");
+
+ AmqpAdmin admin = new RabbitAdmin(cf);
+ admin.declareQueue(q);
+ admin.declareExchange(t);
+ admin.declareBinding(BindingBuilder.bind(q).to(t).with("foo.bar.#"));
+
+ try {
+ template.sendBody("direct:start", "Hello World");
+ fail("Should fail");
+ } catch (Exception e) {
+ Assertions.assertInstanceOf(ShutdownSignalException.class,
e.getCause());
+ }
+ }
+
+ @Override
+ protected RoutesBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:start")
+ .to("spring-rabbitmq:unknown?routingKey=foo.bar");
+ }
+ };
+ }
+}