This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch rabbitmq
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 85152d3c460961df311cb77932428efde3a74d38
Author: Claus Ibsen <[email protected]>
AuthorDate: Wed Feb 8 09:32:12 2023 +0100

    CAMEL-19008: rabbitmq - Add support for publisher confirms which allows to 
fail if sending to invalid destination.
---
 .../SpringRabbitMQEndpointConfigurer.java          |  9 +++++++
 .../SpringRabbitMQEndpointUriFactory.java          |  4 +++-
 .../component/springrabbit/spring-rabbitmq.json    |  4 +++-
 .../springrabbit/SpringRabbitMQEndpoint.java       | 28 ++++++++++++++++++++--
 .../springrabbit/SpringRabbitMQProducer.java       | 25 +++++++++++--------
 .../integration/RabbitMQITSupport.java             | 12 +++++++---
 .../RabbitMQProducerInvalidExchangeIT.java         | 19 ++++++++-------
 7 files changed, 76 insertions(+), 25 deletions(-)

diff --git 
a/components/camel-spring-rabbitmq/src/generated/java/org/apache/camel/component/springrabbit/SpringRabbitMQEndpointConfigurer.java
 
b/components/camel-spring-rabbitmq/src/generated/java/org/apache/camel/component/springrabbit/SpringRabbitMQEndpointConfigurer.java
index 71847e230f6..9b44a1eb362 100644
--- 
a/components/camel-spring-rabbitmq/src/generated/java/org/apache/camel/component/springrabbit/SpringRabbitMQEndpointConfigurer.java
+++ 
b/components/camel-spring-rabbitmq/src/generated/java/org/apache/camel/component/springrabbit/SpringRabbitMQEndpointConfigurer.java
@@ -34,6 +34,9 @@ public class SpringRabbitMQEndpointConfigurer extends 
PropertyConfigurerSupport
         case "bridgeErrorHandler": 
target.setBridgeErrorHandler(property(camelContext, boolean.class, value)); 
return true;
         case "concurrentconsumers":
         case "concurrentConsumers": 
target.setConcurrentConsumers(property(camelContext, java.lang.Integer.class, 
value)); return true;
+        case "confirm": target.setConfirm(property(camelContext, 
java.lang.String.class, value)); return true;
+        case "confirmtimeout":
+        case "confirmTimeout": target.setConfirmTimeout(property(camelContext, 
java.time.Duration.class, value).toMillis()); return true;
         case "connectionfactory":
         case "connectionFactory": 
target.setConnectionFactory(property(camelContext, 
org.springframework.amqp.rabbit.connection.ConnectionFactory.class, value)); 
return true;
         case "deadletterexchange":
@@ -104,6 +107,9 @@ public class SpringRabbitMQEndpointConfigurer extends 
PropertyConfigurerSupport
         case "bridgeErrorHandler": return boolean.class;
         case "concurrentconsumers":
         case "concurrentConsumers": return java.lang.Integer.class;
+        case "confirm": return java.lang.String.class;
+        case "confirmtimeout":
+        case "confirmTimeout": return long.class;
         case "connectionfactory":
         case "connectionFactory": return 
org.springframework.amqp.rabbit.connection.ConnectionFactory.class;
         case "deadletterexchange":
@@ -175,6 +181,9 @@ public class SpringRabbitMQEndpointConfigurer extends 
PropertyConfigurerSupport
         case "bridgeErrorHandler": return target.isBridgeErrorHandler();
         case "concurrentconsumers":
         case "concurrentConsumers": return target.getConcurrentConsumers();
+        case "confirm": return target.getConfirm();
+        case "confirmtimeout":
+        case "confirmTimeout": return target.getConfirmTimeout();
         case "connectionfactory":
         case "connectionFactory": return target.getConnectionFactory();
         case "deadletterexchange":
diff --git 
a/components/camel-spring-rabbitmq/src/generated/java/org/apache/camel/component/springrabbit/SpringRabbitMQEndpointUriFactory.java
 
b/components/camel-spring-rabbitmq/src/generated/java/org/apache/camel/component/springrabbit/SpringRabbitMQEndpointUriFactory.java
index 59b7ddc3aa7..d3f7ebcade4 100644
--- 
a/components/camel-spring-rabbitmq/src/generated/java/org/apache/camel/component/springrabbit/SpringRabbitMQEndpointUriFactory.java
+++ 
b/components/camel-spring-rabbitmq/src/generated/java/org/apache/camel/component/springrabbit/SpringRabbitMQEndpointUriFactory.java
@@ -21,7 +21,7 @@ public class SpringRabbitMQEndpointUriFactory extends 
org.apache.camel.support.c
     private static final Set<String> SECRET_PROPERTY_NAMES;
     private static final Set<String> MULTI_VALUE_PREFIXES;
     static {
-        Set<String> props = new HashSet<>(35);
+        Set<String> props = new HashSet<>(37);
         props.add("acknowledgeMode");
         props.add("args");
         props.add("asyncConsumer");
@@ -29,6 +29,8 @@ public class SpringRabbitMQEndpointUriFactory extends 
org.apache.camel.support.c
         props.add("autoStartup");
         props.add("bridgeErrorHandler");
         props.add("concurrentConsumers");
+        props.add("confirm");
+        props.add("confirmTimeout");
         props.add("connectionFactory");
         props.add("deadLetterExchange");
         props.add("deadLetterExchangeType");
diff --git 
a/components/camel-spring-rabbitmq/src/generated/resources/org/apache/camel/component/springrabbit/spring-rabbitmq.json
 
b/components/camel-spring-rabbitmq/src/generated/resources/org/apache/camel/component/springrabbit/spring-rabbitmq.json
index 1dbd0d0ebe0..b11bbef4c2f 100644
--- 
a/components/camel-spring-rabbitmq/src/generated/resources/org/apache/camel/component/springrabbit/spring-rabbitmq.json
+++ 
b/components/camel-spring-rabbitmq/src/generated/resources/org/apache/camel/component/springrabbit/spring-rabbitmq.json
@@ -85,7 +85,9 @@
     "messageListenerContainerType": { "kind": "parameter", "displayName": 
"Message Listener Container Type", "group": "consumer (advanced)", "label": 
"consumer,advanced", "required": false, "type": "string", "javaType": 
"java.lang.String", "enum": [ "DMLC", "SMLC" ], "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": "DMLC", "description": 
"The type of the MessageListenerContainer" },
     "prefetchCount": { "kind": "parameter", "displayName": "Prefetch Count", 
"group": "consumer (advanced)", "label": "consumer,advanced", "required": 
false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, 
"autowired": false, "secret": false, "description": "Tell the broker how many 
messages to send in a single request. Often this can be set quite high to 
improve throughput." },
     "retry": { "kind": "parameter", "displayName": "Retry", "group": "consumer 
(advanced)", "label": "consumer,advanced", "required": false, "type": "object", 
"javaType": "org.springframework.retry.interceptor.RetryOperationsInterceptor", 
"deprecated": false, "autowired": false, "secret": false, "description": 
"Custom retry configuration to use. If this is configured then the other 
settings such as maximumRetryAttempts for retry are not in use." },
-    "replyTimeout": { "kind": "parameter", "displayName": "Reply Timeout", 
"group": "producer", "label": "producer", "required": false, "type": 
"duration", "javaType": "long", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": "5000", "description": "Specify the timeout in 
milliseconds to be used when waiting for a reply message when doing 
request\/reply messaging. The default value is 5 seconds. A negative value 
indicates an indefinite timeout." },
+    "confirm": { "kind": "parameter", "displayName": "Confirm", "group": 
"producer", "label": "producer", "required": false, "type": "string", 
"javaType": "java.lang.String", "enum": [ "auto", "enabled", "disabled" ], 
"deprecated": false, "autowired": false, "secret": false, "description": 
"Controls whether to wait for confirms. The connection factory must be 
configured for publisher confirms and this method.auto = Camel detects if the 
connection factory uses confirms or not. disabled =  [...]
+    "confirmTimeout": { "kind": "parameter", "displayName": "Confirm Timeout", 
"group": "producer", "label": "producer", "required": false, "type": 
"duration", "javaType": "long", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": "5000", "description": "Specify the timeout in 
milliseconds to be used when waiting for a message sent to be confirmed by 
RabbitMQ when doing send only messaging (InOnly). The default value is 5 
seconds. A negative value indicates an inde [...]
+    "replyTimeout": { "kind": "parameter", "displayName": "Reply Timeout", 
"group": "producer", "label": "producer", "required": false, "type": 
"duration", "javaType": "long", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": "5000", "description": "Specify the timeout in 
milliseconds to be used when waiting for a reply message when doing 
request\/reply (InOut) messaging. The default value is 5 seconds. A negative 
value indicates an indefinite timeout." },
     "usePublisherConnection": { "kind": "parameter", "displayName": "Use 
Publisher Connection", "group": "producer", "label": "producer", "required": 
false, "type": "boolean", "javaType": "boolean", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": false, "description": "Use 
a separate connection for publishers and consumers" },
     "lazyStartProducer": { "kind": "parameter", "displayName": "Lazy Start 
Producer", "group": "producer (advanced)", "label": "producer,advanced", 
"required": false, "type": "boolean", "javaType": "boolean", "deprecated": 
false, "autowired": false, "secret": false, "defaultValue": false, 
"description": "Whether the producer should be started lazy (on the first 
message). By starting lazy you can use this to allow CamelContext and routes to 
startup in situations where a producer may other [...]
     "args": { "kind": "parameter", "displayName": "Args", "group": "advanced", 
"label": "advanced", "required": false, "type": "object", "javaType": 
"java.util.Map<java.lang.String, java.lang.Object>", "prefix": "arg.", 
"multiValue": true, "deprecated": false, "autowired": false, "secret": false, 
"description": "Specify arguments for configuring the different RabbitMQ 
concepts, a different prefix is required for each element: arg.consumer. 
arg.exchange. arg.queue. arg.binding. arg.dlq.ex [...]
diff --git 
a/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/SpringRabbitMQEndpoint.java
 
b/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/SpringRabbitMQEndpoint.java
index 47082029075..1a9caa5107e 100644
--- 
a/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/SpringRabbitMQEndpoint.java
+++ 
b/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/SpringRabbitMQEndpoint.java
@@ -43,7 +43,6 @@ import org.springframework.amqp.core.Queue;
 import org.springframework.amqp.core.QueueBuilder;
 import org.springframework.amqp.rabbit.AsyncRabbitTemplate;
 import org.springframework.amqp.rabbit.connection.ConnectionFactory;
-import org.springframework.amqp.rabbit.connection.CorrelationData;
 import org.springframework.amqp.rabbit.core.RabbitTemplate;
 import 
org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer;
 import org.springframework.amqp.support.converter.MessageConverter;
@@ -148,9 +147,18 @@ public class SpringRabbitMQEndpoint extends 
DefaultEndpoint implements AsyncEndp
                             + " message brokers and you want to route message 
from one system to another.")
     private boolean disableReplyTo;
     @UriParam(label = "producer", javaType = "java.time.Duration", 
defaultValue = "5000",
-              description = "Specify the timeout in milliseconds to be used 
when waiting for a reply message when doing request/reply messaging."
+              description = "Specify the timeout in milliseconds to be used 
when waiting for a reply message when doing request/reply (InOut) messaging."
                             + " The default value is 5 seconds. A negative 
value indicates an indefinite timeout.")
     private long replyTimeout = 5000;
+    @UriParam(label = "producer", javaType = "java.time.Duration", 
defaultValue = "5000",
+              description = "Specify the timeout in milliseconds to be used 
when waiting for a message sent to be confirmed by RabbitMQ when doing send 
only messaging (InOnly)."
+                            + " The default value is 5 seconds. A negative 
value indicates an indefinite timeout.")
+    private long confirmTimeout = 5000;
+    @UriParam(label = "producer", enums = "auto,enabled,disabled",
+              description = "Controls whether to wait for confirms. The 
connection factory must be configured for publisher confirms and this method."
+                            +
+                            "auto = Camel detects if the connection factory 
uses confirms or not. disabled = Confirms is disabled. enabled = Confirms is 
enabled.")
+    private String confirm = "auto";
     @UriParam(label = "producer", defaultValue = "false",
               description = "Use a separate connection for publishers and 
consumers")
     private boolean usePublisherConnection;
@@ -354,6 +362,22 @@ public class SpringRabbitMQEndpoint extends 
DefaultEndpoint implements AsyncEndp
         this.replyTimeout = replyTimeout;
     }
 
+    public long getConfirmTimeout() {
+        return confirmTimeout;
+    }
+
+    public void setConfirmTimeout(long confirmTimeout) {
+        this.confirmTimeout = confirmTimeout;
+    }
+
+    public String getConfirm() {
+        return confirm;
+    }
+
+    public void setConfirm(String confirm) {
+        this.confirm = confirm;
+    }
+
     public boolean isUsePublisherConnection() {
         return usePublisherConnection;
     }
diff --git 
a/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/SpringRabbitMQProducer.java
 
b/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/SpringRabbitMQProducer.java
index 0c1690df20e..dbd09d74d57 100644
--- 
a/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/SpringRabbitMQProducer.java
+++ 
b/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/SpringRabbitMQProducer.java
@@ -32,7 +32,6 @@ import org.springframework.amqp.core.MessageProperties;
 import org.springframework.amqp.rabbit.AsyncRabbitTemplate;
 import org.springframework.amqp.rabbit.RabbitMessageFuture;
 import org.springframework.amqp.rabbit.connection.Connection;
-import org.springframework.amqp.rabbit.connection.CorrelationData;
 import org.springframework.amqp.rabbit.connection.RabbitUtils;
 import org.springframework.amqp.rabbit.core.RabbitTemplate;
 
@@ -205,20 +204,26 @@ public class SpringRabbitMQProducer extends 
DefaultAsyncProducer {
 
         final String ex = exchangeName;
         final String rk = routingKey;
+        boolean confirm;
+        if ("auto".equalsIgnoreCase(getEndpoint().getConfirm())) {
+            confirm = 
getEndpoint().getConnectionFactory().isPublisherConfirms();
+        } else if ("enabled".equalsIgnoreCase(getEndpoint().getConfirm())) {
+            confirm = true;
+        } else {
+            confirm = false;
+        }
+        final long timeout = getEndpoint().getConfirmTimeout() <= 0 ? 
Long.MAX_VALUE : getEndpoint().getConfirmTimeout();
         try {
-            getInOnlyTemplate().setConfirmCallback(new 
RabbitTemplate.ConfirmCallback() {
-                @Override
-                public void confirm(CorrelationData correlationData, boolean 
ack, String cause) {
-
-                }
-            });
-
             Boolean sent = getInOnlyTemplate().invoke(t -> {
                 t.send(ex, rk, msg);
-                return t.waitForConfirms(5000);
+                if (confirm) {
+                    return t.waitForConfirms(timeout);
+                } else {
+                    return true;
+                }
             });
             if (Boolean.FALSE == sent) {
-                exchange.setException(new TimeoutException("Message not sent 
within " + 5000 + " millis"));
+                exchange.setException(new TimeoutException("Message not sent 
within " + timeout + " millis"));
             }
         } catch (Exception e) {
             exchange.setException(e);
diff --git 
a/components/camel-spring-rabbitmq/src/test/java/org/apache/camel/component/springrabbit/integration/RabbitMQITSupport.java
 
b/components/camel-spring-rabbitmq/src/test/java/org/apache/camel/component/springrabbit/integration/RabbitMQITSupport.java
index c7225057048..ff80deec70b 100644
--- 
a/components/camel-spring-rabbitmq/src/test/java/org/apache/camel/component/springrabbit/integration/RabbitMQITSupport.java
+++ 
b/components/camel-spring-rabbitmq/src/test/java/org/apache/camel/component/springrabbit/integration/RabbitMQITSupport.java
@@ -36,17 +36,23 @@ public abstract class RabbitMQITSupport extends 
CamelTestSupport {
 
     protected Logger log = LoggerFactory.getLogger(getClass());
 
-    ConnectionFactory createConnectionFactory() {
+    ConnectionFactory createConnectionFactory(boolean confirm) {
         CachingConnectionFactory cf = new CachingConnectionFactory();
-        
cf.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.SIMPLE);
+        if (confirm) {
+            
cf.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
+        }
         cf.setUri(service.getAmqpUrl());
         return cf;
     }
 
+    protected boolean confirmEnabled() {
+        return false;
+    }
+
     @Override
     protected CamelContext createCamelContext() throws Exception {
         CamelContext context = super.createCamelContext();
-        context.getRegistry().bind("myCF", createConnectionFactory());
+        context.getRegistry().bind("myCF", 
createConnectionFactory(confirmEnabled()));
 
         SpringRabbitMQComponent rmq = context.getComponent("spring-rabbitmq", 
SpringRabbitMQComponent.class);
         // turn on auto declare
diff --git 
a/components/camel-spring-rabbitmq/src/test/java/org/apache/camel/component/springrabbit/integration/RabbitMQProducerInvalidExchangeIT.java
 
b/components/camel-spring-rabbitmq/src/test/java/org/apache/camel/component/springrabbit/integration/RabbitMQProducerInvalidExchangeIT.java
index 5d1fc01bef8..694a5af2ec1 100644
--- 
a/components/camel-spring-rabbitmq/src/test/java/org/apache/camel/component/springrabbit/integration/RabbitMQProducerInvalidExchangeIT.java
+++ 
b/components/camel-spring-rabbitmq/src/test/java/org/apache/camel/component/springrabbit/integration/RabbitMQProducerInvalidExchangeIT.java
@@ -16,10 +16,9 @@
  */
 package org.apache.camel.component.springrabbit.integration;
 
-import org.apache.camel.CamelContext;
+import com.rabbitmq.client.ShutdownSignalException;
 import org.apache.camel.RoutesBuilder;
 import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.springrabbit.SpringRabbitMQComponent;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 import org.springframework.amqp.core.AmqpAdmin;
@@ -29,14 +28,13 @@ import org.springframework.amqp.core.TopicExchange;
 import org.springframework.amqp.rabbit.connection.ConnectionFactory;
 import org.springframework.amqp.rabbit.core.RabbitAdmin;
 
+import static org.junit.Assert.fail;
+
 public class RabbitMQProducerInvalidExchangeIT extends RabbitMQITSupport {
 
     @Override
-    protected CamelContext createCamelContext() throws Exception {
-        CamelContext camelContext = super.createCamelContext();
-        SpringRabbitMQComponent rmq = 
camelContext.getComponent("spring-rabbitmq", SpringRabbitMQComponent.class);
-        rmq.setAllowNullBody(true);
-        return camelContext;
+    protected boolean confirmEnabled() {
+        return true;
     }
 
     @Test
@@ -51,7 +49,12 @@ public class RabbitMQProducerInvalidExchangeIT extends 
RabbitMQITSupport {
         admin.declareExchange(t);
         admin.declareBinding(BindingBuilder.bind(q).to(t).with("foo.bar.#"));
 
-        Assertions.assertDoesNotThrow(() -> template.sendBody("direct:start", 
null));
+        try {
+            template.sendBody("direct:start", "Hello World");
+            fail("Should fail");
+        } catch (Exception e) {
+            Assertions.assertInstanceOf(ShutdownSignalException.class, 
e.getCause());
+        }
     }
 
     @Override

Reply via email to