This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new db40d24f99b Add support for publisher confirms which allows to fail if 
sending to invalid destination (#9313)
db40d24f99b is described below

commit db40d24f99b150bd5ff1cea56df5877ae72654de
Author: Claus Ibsen <[email protected]>
AuthorDate: Thu Feb 9 06:28:28 2023 +0100

    Add support for publisher confirms which allows to fail if sending to 
invalid destination (#9313)
    
    * rabbitmq confirm
    
    * CAMEL-19008: rabbitmq - Add support for publisher confirms which allows 
to fail if sending to invalid destination.
---
 .../SpringRabbitMQEndpointConfigurer.java          |  9 +++
 .../SpringRabbitMQEndpointUriFactory.java          |  4 +-
 .../component/springrabbit/spring-rabbitmq.json    |  4 +-
 .../springrabbit/SpringRabbitMQEndpoint.java       | 27 ++++++++-
 .../springrabbit/SpringRabbitMQProducer.java       | 24 +++++++-
 .../integration/RabbitMQITSupport.java             | 11 +++-
 .../RabbitMQProducerInvalidExchangeIT.java         | 70 ++++++++++++++++++++++
 7 files changed, 143 insertions(+), 6 deletions(-)

diff --git 
a/components/camel-spring-rabbitmq/src/generated/java/org/apache/camel/component/springrabbit/SpringRabbitMQEndpointConfigurer.java
 
b/components/camel-spring-rabbitmq/src/generated/java/org/apache/camel/component/springrabbit/SpringRabbitMQEndpointConfigurer.java
index 71847e230f6..9b44a1eb362 100644
--- 
a/components/camel-spring-rabbitmq/src/generated/java/org/apache/camel/component/springrabbit/SpringRabbitMQEndpointConfigurer.java
+++ 
b/components/camel-spring-rabbitmq/src/generated/java/org/apache/camel/component/springrabbit/SpringRabbitMQEndpointConfigurer.java
@@ -34,6 +34,9 @@ public class SpringRabbitMQEndpointConfigurer extends 
PropertyConfigurerSupport
         case "bridgeErrorHandler": 
target.setBridgeErrorHandler(property(camelContext, boolean.class, value)); 
return true;
         case "concurrentconsumers":
         case "concurrentConsumers": 
target.setConcurrentConsumers(property(camelContext, java.lang.Integer.class, 
value)); return true;
+        case "confirm": target.setConfirm(property(camelContext, 
java.lang.String.class, value)); return true;
+        case "confirmtimeout":
+        case "confirmTimeout": target.setConfirmTimeout(property(camelContext, 
java.time.Duration.class, value).toMillis()); return true;
         case "connectionfactory":
         case "connectionFactory": 
target.setConnectionFactory(property(camelContext, 
org.springframework.amqp.rabbit.connection.ConnectionFactory.class, value)); 
return true;
         case "deadletterexchange":
@@ -104,6 +107,9 @@ public class SpringRabbitMQEndpointConfigurer extends 
PropertyConfigurerSupport
         case "bridgeErrorHandler": return boolean.class;
         case "concurrentconsumers":
         case "concurrentConsumers": return java.lang.Integer.class;
+        case "confirm": return java.lang.String.class;
+        case "confirmtimeout":
+        case "confirmTimeout": return long.class;
         case "connectionfactory":
         case "connectionFactory": return 
org.springframework.amqp.rabbit.connection.ConnectionFactory.class;
         case "deadletterexchange":
@@ -175,6 +181,9 @@ public class SpringRabbitMQEndpointConfigurer extends 
PropertyConfigurerSupport
         case "bridgeErrorHandler": return target.isBridgeErrorHandler();
         case "concurrentconsumers":
         case "concurrentConsumers": return target.getConcurrentConsumers();
+        case "confirm": return target.getConfirm();
+        case "confirmtimeout":
+        case "confirmTimeout": return target.getConfirmTimeout();
         case "connectionfactory":
         case "connectionFactory": return target.getConnectionFactory();
         case "deadletterexchange":
diff --git 
a/components/camel-spring-rabbitmq/src/generated/java/org/apache/camel/component/springrabbit/SpringRabbitMQEndpointUriFactory.java
 
b/components/camel-spring-rabbitmq/src/generated/java/org/apache/camel/component/springrabbit/SpringRabbitMQEndpointUriFactory.java
index 59b7ddc3aa7..d3f7ebcade4 100644
--- 
a/components/camel-spring-rabbitmq/src/generated/java/org/apache/camel/component/springrabbit/SpringRabbitMQEndpointUriFactory.java
+++ 
b/components/camel-spring-rabbitmq/src/generated/java/org/apache/camel/component/springrabbit/SpringRabbitMQEndpointUriFactory.java
@@ -21,7 +21,7 @@ public class SpringRabbitMQEndpointUriFactory extends 
org.apache.camel.support.c
     private static final Set<String> SECRET_PROPERTY_NAMES;
     private static final Set<String> MULTI_VALUE_PREFIXES;
     static {
-        Set<String> props = new HashSet<>(35);
+        Set<String> props = new HashSet<>(37);
         props.add("acknowledgeMode");
         props.add("args");
         props.add("asyncConsumer");
@@ -29,6 +29,8 @@ public class SpringRabbitMQEndpointUriFactory extends 
org.apache.camel.support.c
         props.add("autoStartup");
         props.add("bridgeErrorHandler");
         props.add("concurrentConsumers");
+        props.add("confirm");
+        props.add("confirmTimeout");
         props.add("connectionFactory");
         props.add("deadLetterExchange");
         props.add("deadLetterExchangeType");
diff --git 
a/components/camel-spring-rabbitmq/src/generated/resources/org/apache/camel/component/springrabbit/spring-rabbitmq.json
 
b/components/camel-spring-rabbitmq/src/generated/resources/org/apache/camel/component/springrabbit/spring-rabbitmq.json
index 1dbd0d0ebe0..b11bbef4c2f 100644
--- 
a/components/camel-spring-rabbitmq/src/generated/resources/org/apache/camel/component/springrabbit/spring-rabbitmq.json
+++ 
b/components/camel-spring-rabbitmq/src/generated/resources/org/apache/camel/component/springrabbit/spring-rabbitmq.json
@@ -85,7 +85,9 @@
     "messageListenerContainerType": { "kind": "parameter", "displayName": 
"Message Listener Container Type", "group": "consumer (advanced)", "label": 
"consumer,advanced", "required": false, "type": "string", "javaType": 
"java.lang.String", "enum": [ "DMLC", "SMLC" ], "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": "DMLC", "description": 
"The type of the MessageListenerContainer" },
     "prefetchCount": { "kind": "parameter", "displayName": "Prefetch Count", 
"group": "consumer (advanced)", "label": "consumer,advanced", "required": 
false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, 
"autowired": false, "secret": false, "description": "Tell the broker how many 
messages to send in a single request. Often this can be set quite high to 
improve throughput." },
     "retry": { "kind": "parameter", "displayName": "Retry", "group": "consumer 
(advanced)", "label": "consumer,advanced", "required": false, "type": "object", 
"javaType": "org.springframework.retry.interceptor.RetryOperationsInterceptor", 
"deprecated": false, "autowired": false, "secret": false, "description": 
"Custom retry configuration to use. If this is configured then the other 
settings such as maximumRetryAttempts for retry are not in use." },
-    "replyTimeout": { "kind": "parameter", "displayName": "Reply Timeout", 
"group": "producer", "label": "producer", "required": false, "type": 
"duration", "javaType": "long", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": "5000", "description": "Specify the timeout in 
milliseconds to be used when waiting for a reply message when doing 
request\/reply messaging. The default value is 5 seconds. A negative value 
indicates an indefinite timeout." },
+    "confirm": { "kind": "parameter", "displayName": "Confirm", "group": 
"producer", "label": "producer", "required": false, "type": "string", 
"javaType": "java.lang.String", "enum": [ "auto", "enabled", "disabled" ], 
"deprecated": false, "autowired": false, "secret": false, "description": 
"Controls whether to wait for confirms. The connection factory must be 
configured for publisher confirms and this method.auto = Camel detects if the 
connection factory uses confirms or not. disabled =  [...]
+    "confirmTimeout": { "kind": "parameter", "displayName": "Confirm Timeout", 
"group": "producer", "label": "producer", "required": false, "type": 
"duration", "javaType": "long", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": "5000", "description": "Specify the timeout in 
milliseconds to be used when waiting for a message sent to be confirmed by 
RabbitMQ when doing send only messaging (InOnly). The default value is 5 
seconds. A negative value indicates an inde [...]
+    "replyTimeout": { "kind": "parameter", "displayName": "Reply Timeout", 
"group": "producer", "label": "producer", "required": false, "type": 
"duration", "javaType": "long", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": "5000", "description": "Specify the timeout in 
milliseconds to be used when waiting for a reply message when doing 
request\/reply (InOut) messaging. The default value is 5 seconds. A negative 
value indicates an indefinite timeout." },
     "usePublisherConnection": { "kind": "parameter", "displayName": "Use 
Publisher Connection", "group": "producer", "label": "producer", "required": 
false, "type": "boolean", "javaType": "boolean", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": false, "description": "Use 
a separate connection for publishers and consumers" },
     "lazyStartProducer": { "kind": "parameter", "displayName": "Lazy Start 
Producer", "group": "producer (advanced)", "label": "producer,advanced", 
"required": false, "type": "boolean", "javaType": "boolean", "deprecated": 
false, "autowired": false, "secret": false, "defaultValue": false, 
"description": "Whether the producer should be started lazy (on the first 
message). By starting lazy you can use this to allow CamelContext and routes to 
startup in situations where a producer may other [...]
     "args": { "kind": "parameter", "displayName": "Args", "group": "advanced", 
"label": "advanced", "required": false, "type": "object", "javaType": 
"java.util.Map<java.lang.String, java.lang.Object>", "prefix": "arg.", 
"multiValue": true, "deprecated": false, "autowired": false, "secret": false, 
"description": "Specify arguments for configuring the different RabbitMQ 
concepts, a different prefix is required for each element: arg.consumer. 
arg.exchange. arg.queue. arg.binding. arg.dlq.ex [...]
diff --git 
a/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/SpringRabbitMQEndpoint.java
 
b/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/SpringRabbitMQEndpoint.java
index 84b5b49f1a8..1a9caa5107e 100644
--- 
a/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/SpringRabbitMQEndpoint.java
+++ 
b/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/SpringRabbitMQEndpoint.java
@@ -147,9 +147,18 @@ public class SpringRabbitMQEndpoint extends 
DefaultEndpoint implements AsyncEndp
                             + " message brokers and you want to route message 
from one system to another.")
     private boolean disableReplyTo;
     @UriParam(label = "producer", javaType = "java.time.Duration", 
defaultValue = "5000",
-              description = "Specify the timeout in milliseconds to be used 
when waiting for a reply message when doing request/reply messaging."
+              description = "Specify the timeout in milliseconds to be used 
when waiting for a reply message when doing request/reply (InOut) messaging."
                             + " The default value is 5 seconds. A negative 
value indicates an indefinite timeout.")
     private long replyTimeout = 5000;
+    @UriParam(label = "producer", javaType = "java.time.Duration", 
defaultValue = "5000",
+              description = "Specify the timeout in milliseconds to be used 
when waiting for a message sent to be confirmed by RabbitMQ when doing send 
only messaging (InOnly)."
+                            + " The default value is 5 seconds. A negative 
value indicates an indefinite timeout.")
+    private long confirmTimeout = 5000;
+    @UriParam(label = "producer", enums = "auto,enabled,disabled",
+              description = "Controls whether to wait for confirms. The 
connection factory must be configured for publisher confirms and this method."
+                            +
+                            "auto = Camel detects if the connection factory 
uses confirms or not. disabled = Confirms is disabled. enabled = Confirms is 
enabled.")
+    private String confirm = "auto";
     @UriParam(label = "producer", defaultValue = "false",
               description = "Use a separate connection for publishers and 
consumers")
     private boolean usePublisherConnection;
@@ -353,6 +362,22 @@ public class SpringRabbitMQEndpoint extends 
DefaultEndpoint implements AsyncEndp
         this.replyTimeout = replyTimeout;
     }
 
+    public long getConfirmTimeout() {
+        return confirmTimeout;
+    }
+
+    public void setConfirmTimeout(long confirmTimeout) {
+        this.confirmTimeout = confirmTimeout;
+    }
+
+    public String getConfirm() {
+        return confirm;
+    }
+
+    public void setConfirm(String confirm) {
+        this.confirm = confirm;
+    }
+
     public boolean isUsePublisherConnection() {
         return usePublisherConnection;
     }
diff --git 
a/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/SpringRabbitMQProducer.java
 
b/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/SpringRabbitMQProducer.java
index 98e47629044..dbd09d74d57 100644
--- 
a/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/SpringRabbitMQProducer.java
+++ 
b/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/SpringRabbitMQProducer.java
@@ -18,6 +18,7 @@ package org.apache.camel.component.springrabbit;
 
 import java.util.Map;
 import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Endpoint;
@@ -201,8 +202,29 @@ public class SpringRabbitMQProducer extends 
DefaultAsyncProducer {
             msg = getEndpoint().getMessageConverter().toMessage(body, mp);
         }
 
+        final String ex = exchangeName;
+        final String rk = routingKey;
+        boolean confirm;
+        if ("auto".equalsIgnoreCase(getEndpoint().getConfirm())) {
+            confirm = 
getEndpoint().getConnectionFactory().isPublisherConfirms();
+        } else if ("enabled".equalsIgnoreCase(getEndpoint().getConfirm())) {
+            confirm = true;
+        } else {
+            confirm = false;
+        }
+        final long timeout = getEndpoint().getConfirmTimeout() <= 0 ? 
Long.MAX_VALUE : getEndpoint().getConfirmTimeout();
         try {
-            getInOnlyTemplate().send(exchangeName, routingKey, msg);
+            Boolean sent = getInOnlyTemplate().invoke(t -> {
+                t.send(ex, rk, msg);
+                if (confirm) {
+                    return t.waitForConfirms(timeout);
+                } else {
+                    return true;
+                }
+            });
+            if (Boolean.FALSE == sent) {
+                exchange.setException(new TimeoutException("Message not sent 
within " + timeout + " millis"));
+            }
         } catch (Exception e) {
             exchange.setException(e);
         }
diff --git 
a/components/camel-spring-rabbitmq/src/test/java/org/apache/camel/component/springrabbit/integration/RabbitMQITSupport.java
 
b/components/camel-spring-rabbitmq/src/test/java/org/apache/camel/component/springrabbit/integration/RabbitMQITSupport.java
index 5cc7526e1e0..ff80deec70b 100644
--- 
a/components/camel-spring-rabbitmq/src/test/java/org/apache/camel/component/springrabbit/integration/RabbitMQITSupport.java
+++ 
b/components/camel-spring-rabbitmq/src/test/java/org/apache/camel/component/springrabbit/integration/RabbitMQITSupport.java
@@ -36,16 +36,23 @@ public abstract class RabbitMQITSupport extends 
CamelTestSupport {
 
     protected Logger log = LoggerFactory.getLogger(getClass());
 
-    ConnectionFactory createConnectionFactory() {
+    ConnectionFactory createConnectionFactory(boolean confirm) {
         CachingConnectionFactory cf = new CachingConnectionFactory();
+        if (confirm) {
+            
cf.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
+        }
         cf.setUri(service.getAmqpUrl());
         return cf;
     }
 
+    protected boolean confirmEnabled() {
+        return false;
+    }
+
     @Override
     protected CamelContext createCamelContext() throws Exception {
         CamelContext context = super.createCamelContext();
-        context.getRegistry().bind("myCF", createConnectionFactory());
+        context.getRegistry().bind("myCF", 
createConnectionFactory(confirmEnabled()));
 
         SpringRabbitMQComponent rmq = context.getComponent("spring-rabbitmq", 
SpringRabbitMQComponent.class);
         // turn on auto declare
diff --git 
a/components/camel-spring-rabbitmq/src/test/java/org/apache/camel/component/springrabbit/integration/RabbitMQProducerInvalidExchangeIT.java
 
b/components/camel-spring-rabbitmq/src/test/java/org/apache/camel/component/springrabbit/integration/RabbitMQProducerInvalidExchangeIT.java
new file mode 100644
index 00000000000..694a5af2ec1
--- /dev/null
+++ 
b/components/camel-spring-rabbitmq/src/test/java/org/apache/camel/component/springrabbit/integration/RabbitMQProducerInvalidExchangeIT.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.springrabbit.integration;
+
+import com.rabbitmq.client.ShutdownSignalException;
+import org.apache.camel.RoutesBuilder;
+import org.apache.camel.builder.RouteBuilder;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.springframework.amqp.core.AmqpAdmin;
+import org.springframework.amqp.core.BindingBuilder;
+import org.springframework.amqp.core.Queue;
+import org.springframework.amqp.core.TopicExchange;
+import org.springframework.amqp.rabbit.connection.ConnectionFactory;
+import org.springframework.amqp.rabbit.core.RabbitAdmin;
+
+import static org.junit.Assert.fail;
+
+public class RabbitMQProducerInvalidExchangeIT extends RabbitMQITSupport {
+
+    @Override
+    protected boolean confirmEnabled() {
+        return true;
+    }
+
+    @Test
+    public void testProducer() {
+        ConnectionFactory cf = 
context.getRegistry().lookupByNameAndType("myCF", ConnectionFactory.class);
+
+        Queue q = new Queue("myqueue");
+        TopicExchange t = new TopicExchange("foo");
+
+        AmqpAdmin admin = new RabbitAdmin(cf);
+        admin.declareQueue(q);
+        admin.declareExchange(t);
+        admin.declareBinding(BindingBuilder.bind(q).to(t).with("foo.bar.#"));
+
+        try {
+            template.sendBody("direct:start", "Hello World");
+            fail("Should fail");
+        } catch (Exception e) {
+            Assertions.assertInstanceOf(ShutdownSignalException.class, 
e.getCause());
+        }
+    }
+
+    @Override
+    protected RoutesBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                        .to("spring-rabbitmq:unknown?routingKey=foo.bar");
+            }
+        };
+    }
+}

Reply via email to