This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-3.7.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-3.7.x by this push:
     new f77a9c8  CAMEL-16707: rabbitmq - connection leak and an a new option  
(#5661) (#5668)
f77a9c8 is described below

commit f77a9c88c1a9d1df91a77b0606934cced9ca9e45
Author: rastislavpapp <[email protected]>
AuthorDate: Mon Jun 14 20:59:16 2021 +0200

    CAMEL-16707: rabbitmq - connection leak and an a new option  (#5661) (#5668)
    
    * CAMEL-16707: fix rabbitmq connection leak
    
    * CAMEL-16707: add rabbitmq consumer option to disable recovery from 
exception during declaration of exchanges/queues
---
 .../camel/catalog/docs/rabbitmq-component.adoc     |   6 +-
 components/camel-rabbitmq/pom.xml                  |   5 +
 .../rabbitmq/RabbitMQComponentConfigurer.java      |   6 ++
 .../rabbitmq/RabbitMQEndpointConfigurer.java       |   6 ++
 .../rabbitmq/RabbitMQEndpointUriFactory.java       |   3 +-
 .../apache/camel/component/rabbitmq/rabbitmq.json  |   2 +
 .../src/main/docs/rabbitmq-component.adoc          |   6 +-
 .../camel/component/rabbitmq/RabbitConsumer.java   |  18 +++-
 .../component/rabbitmq/RabbitMQComponent.java      |  18 ++++
 .../camel/component/rabbitmq/RabbitMQConsumer.java |   1 -
 .../camel/component/rabbitmq/RabbitMQEndpoint.java |  16 ++++
 .../RabbitMQRecoverFromDeclareExceptionIT.java     | 102 +++++++++++++++++++++
 .../dsl/RabbitmqComponentBuilderFactory.java       |  22 +++++
 .../dsl/RabbitMQEndpointBuilderFactory.java        |  43 +++++++++
 .../modules/ROOT/pages/rabbitmq-component.adoc     |   6 +-
 15 files changed, 251 insertions(+), 9 deletions(-)

diff --git 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/rabbitmq-component.adoc
 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/rabbitmq-component.adoc
index f43d384..915471a 100644
--- 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/rabbitmq-component.adoc
+++ 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/rabbitmq-component.adoc
@@ -57,7 +57,7 @@ determines the exchange the queue will be bound to.
 == Options
 
 // component options: START
-The RabbitMQ component supports 55 options, which are listed below.
+The RabbitMQ component supports 56 options, which are listed below.
 
 
 
@@ -88,6 +88,7 @@ The RabbitMQ component supports 55 options, which are listed 
below.
 | *prefetchEnabled* (consumer) | Enables the quality of service on the 
RabbitMQConsumer side. You need to specify the option of prefetchSize, 
prefetchCount, prefetchGlobal at the same time | false | boolean
 | *prefetchGlobal* (consumer) | If the settings should be applied to the 
entire channel rather than each consumer You need to specify the option of 
prefetchSize, prefetchCount, prefetchGlobal at the same time | false | boolean
 | *prefetchSize* (consumer) | The maximum amount of content (measured in 
octets) that the server will deliver, 0 if unlimited. You need to specify the 
option of prefetchSize, prefetchCount, prefetchGlobal at the same time |  | int
+| *recoverFromDeclareException* (consumer) | Decides whether an exception 
during declaration of exchanges or queues is recoverable or not. If the option 
is false, camel will throw an exception when starting the consumer, which will 
interrupt application startup (e.g. in the case when the exchange / queue is 
already declared in RabbitMQ and has incompatible configuration). If set to 
true, the consumer will try to reconnect periodically. | false | boolean
 | *threadPoolSize* (consumer) | The consumer uses a Thread Pool Executor with 
a fixed number of threads. This setting allows you to set that number of 
threads. | 10 | int
 | *additionalHeaders* (producer) | Map of additional headers. These headers 
will be set only when the 'allowCustomHeaders' is set to true |  | Map
 | *additionalProperties* (producer) | Map of additional properties. These are 
standard RabbitMQ properties as defined in 
com.rabbitmq.client.AMQP.BasicProperties The map keys should be from 
org.apache.camel.component.rabbitmq.RabbitMQConstants. Any other keys will be 
ignored. When the message already contains these headers they will be given 
precedence over these properties. |  | Map
@@ -142,7 +143,7 @@ with the following path and query parameters:
 |===
 
 
-=== Query Parameters (66 parameters):
+=== Query Parameters (67 parameters):
 
 
 [width="100%",cols="2,5,^1,2",options="header"]
@@ -179,6 +180,7 @@ with the following path and query parameters:
 | *prefetchEnabled* (consumer) | Enables the quality of service on the 
RabbitMQConsumer side. You need to specify the option of prefetchSize, 
prefetchCount, prefetchGlobal at the same time | false | boolean
 | *prefetchGlobal* (consumer) | If the settings should be applied to the 
entire channel rather than each consumer You need to specify the option of 
prefetchSize, prefetchCount, prefetchGlobal at the same time | false | boolean
 | *prefetchSize* (consumer) | The maximum amount of content (measured in 
octets) that the server will deliver, 0 if unlimited. You need to specify the 
option of prefetchSize, prefetchCount, prefetchGlobal at the same time |  | int
+| *recoverFromDeclareException* (consumer) | Decides whether an exception 
during declaration of exchanges or queues is recoverable or not. If the option 
is false, camel will throw an exception when starting the consumer, which will 
interrupt application startup (e.g. in the case when the exchange / queue is 
already declared in RabbitMQ and has incompatible configuration). If set to 
true, the consumer will try to reconnect periodically. | true | boolean
 | *reQueue* (consumer) | This is used by the consumer to control rejection of 
the message. When the consumer is complete processing the exchange, and if the 
exchange failed, then the consumer is going to reject the message from the 
RabbitMQ broker. If the header CamelRabbitmqRequeue is present then the value 
of the header will be used, otherwise this endpoint value is used as fallback. 
If the value is false (by default) then the message is discarded/dead-lettered. 
If the value is true, t [...]
 | *exceptionHandler* (consumer) | To let the consumer use a custom 
ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this 
option is not in use. By default the consumer will deal with exceptions, that 
will be logged at WARN or ERROR level and ignored. |  | ExceptionHandler
 | *exchangePattern* (consumer) | Sets the exchange pattern when the consumer 
creates an exchange. There are 3 enums and the value can be one of: InOnly, 
InOut, InOptionalOut |  | ExchangePattern
diff --git a/components/camel-rabbitmq/pom.xml 
b/components/camel-rabbitmq/pom.xml
index 712fb04..3996838 100644
--- a/components/camel-rabbitmq/pom.xml
+++ b/components/camel-rabbitmq/pom.xml
@@ -117,6 +117,11 @@
             <artifactId>camel-http</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.assertj</groupId>
+            <artifactId>assertj-core</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <profiles>
diff --git 
a/components/camel-rabbitmq/src/generated/java/org/apache/camel/component/rabbitmq/RabbitMQComponentConfigurer.java
 
b/components/camel-rabbitmq/src/generated/java/org/apache/camel/component/rabbitmq/RabbitMQComponentConfigurer.java
index d381c07..61cb92b 100644
--- 
a/components/camel-rabbitmq/src/generated/java/org/apache/camel/component/rabbitmq/RabbitMQComponentConfigurer.java
+++ 
b/components/camel-rabbitmq/src/generated/java/org/apache/camel/component/rabbitmq/RabbitMQComponentConfigurer.java
@@ -91,6 +91,8 @@ public class RabbitMQComponentConfigurer extends 
PropertyConfigurerSupport imple
         case "publisherAcknowledgements": 
target.setPublisherAcknowledgements(property(camelContext, boolean.class, 
value)); return true;
         case "publisheracknowledgementstimeout":
         case "publisherAcknowledgementsTimeout": 
target.setPublisherAcknowledgementsTimeout(property(camelContext, long.class, 
value)); return true;
+        case "recoverfromdeclareexception":
+        case "recoverFromDeclareException": 
target.setRecoverFromDeclareException(property(camelContext, boolean.class, 
value)); return true;
         case "requesttimeout":
         case "requestTimeout": target.setRequestTimeout(property(camelContext, 
long.class, value)); return true;
         case "requesttimeoutcheckerinterval":
@@ -196,6 +198,8 @@ public class RabbitMQComponentConfigurer extends 
PropertyConfigurerSupport imple
         case "publisherAcknowledgements": return boolean.class;
         case "publisheracknowledgementstimeout":
         case "publisherAcknowledgementsTimeout": return long.class;
+        case "recoverfromdeclareexception":
+        case "recoverFromDeclareException": return boolean.class;
         case "requesttimeout":
         case "requestTimeout": return long.class;
         case "requesttimeoutcheckerinterval":
@@ -302,6 +306,8 @@ public class RabbitMQComponentConfigurer extends 
PropertyConfigurerSupport imple
         case "publisherAcknowledgements": return 
target.isPublisherAcknowledgements();
         case "publisheracknowledgementstimeout":
         case "publisherAcknowledgementsTimeout": return 
target.getPublisherAcknowledgementsTimeout();
+        case "recoverfromdeclareexception":
+        case "recoverFromDeclareException": return 
target.isRecoverFromDeclareException();
         case "requesttimeout":
         case "requestTimeout": return target.getRequestTimeout();
         case "requesttimeoutcheckerinterval":
diff --git 
a/components/camel-rabbitmq/src/generated/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointConfigurer.java
 
b/components/camel-rabbitmq/src/generated/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointConfigurer.java
index 17dd65c..faca916 100644
--- 
a/components/camel-rabbitmq/src/generated/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointConfigurer.java
+++ 
b/components/camel-rabbitmq/src/generated/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointConfigurer.java
@@ -106,6 +106,8 @@ public class RabbitMQEndpointConfigurer extends 
PropertyConfigurerSupport implem
         case "queue": target.setQueue(property(camelContext, 
java.lang.String.class, value)); return true;
         case "requeue":
         case "reQueue": target.setReQueue(property(camelContext, 
boolean.class, value)); return true;
+        case "recoverfromdeclareexception":
+        case "recoverFromDeclareException": 
target.setRecoverFromDeclareException(property(camelContext, boolean.class, 
value)); return true;
         case "requesttimeout":
         case "requestTimeout": target.setRequestTimeout(property(camelContext, 
long.class, value)); return true;
         case "requesttimeoutcheckerinterval":
@@ -231,6 +233,8 @@ public class RabbitMQEndpointConfigurer extends 
PropertyConfigurerSupport implem
         case "queue": return java.lang.String.class;
         case "requeue":
         case "reQueue": return boolean.class;
+        case "recoverfromdeclareexception":
+        case "recoverFromDeclareException": return boolean.class;
         case "requesttimeout":
         case "requestTimeout": return long.class;
         case "requesttimeoutcheckerinterval":
@@ -357,6 +361,8 @@ public class RabbitMQEndpointConfigurer extends 
PropertyConfigurerSupport implem
         case "queue": return target.getQueue();
         case "requeue":
         case "reQueue": return target.isReQueue();
+        case "recoverfromdeclareexception":
+        case "recoverFromDeclareException": return 
target.isRecoverFromDeclareException();
         case "requesttimeout":
         case "requestTimeout": return target.getRequestTimeout();
         case "requesttimeoutcheckerinterval":
diff --git 
a/components/camel-rabbitmq/src/generated/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointUriFactory.java
 
b/components/camel-rabbitmq/src/generated/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointUriFactory.java
index 77cbede..d352ca1 100644
--- 
a/components/camel-rabbitmq/src/generated/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointUriFactory.java
+++ 
b/components/camel-rabbitmq/src/generated/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointUriFactory.java
@@ -20,7 +20,7 @@ public class RabbitMQEndpointUriFactory extends 
org.apache.camel.support.compone
     private static final Set<String> PROPERTY_NAMES;
     private static final Set<String> SECRET_PROPERTY_NAMES;
     static {
-        Set<String> props = new HashSet<>(67);
+        Set<String> props = new HashSet<>(68);
         props.add("prefetchCount");
         props.add("publisherAcknowledgementsTimeout");
         props.add("addresses");
@@ -50,6 +50,7 @@ public class RabbitMQEndpointUriFactory extends 
org.apache.camel.support.compone
         props.add("concurrentConsumers");
         props.add("guaranteedDeliveries");
         props.add("vhost");
+        props.add("recoverFromDeclareException");
         props.add("lazyStartProducer");
         props.add("threadPoolSize");
         props.add("deadLetterQueue");
diff --git 
a/components/camel-rabbitmq/src/generated/resources/org/apache/camel/component/rabbitmq/rabbitmq.json
 
b/components/camel-rabbitmq/src/generated/resources/org/apache/camel/component/rabbitmq/rabbitmq.json
index 784d99b..b071691 100644
--- 
a/components/camel-rabbitmq/src/generated/resources/org/apache/camel/component/rabbitmq/rabbitmq.json
+++ 
b/components/camel-rabbitmq/src/generated/resources/org/apache/camel/component/rabbitmq/rabbitmq.json
@@ -46,6 +46,7 @@
     "prefetchEnabled": { "kind": "property", "displayName": "Prefetch 
Enabled", "group": "consumer", "label": "consumer", "required": false, "type": 
"boolean", "javaType": "boolean", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": false, "description": "Enables the quality of 
service on the RabbitMQConsumer side. You need to specify the option of 
prefetchSize, prefetchCount, prefetchGlobal at the same time" },
     "prefetchGlobal": { "kind": "property", "displayName": "Prefetch Global", 
"group": "consumer", "label": "consumer", "required": false, "type": "boolean", 
"javaType": "boolean", "deprecated": false, "autowired": false, "secret": 
false, "defaultValue": false, "description": "If the settings should be applied 
to the entire channel rather than each consumer You need to specify the option 
of prefetchSize, prefetchCount, prefetchGlobal at the same time" },
     "prefetchSize": { "kind": "property", "displayName": "Prefetch Size", 
"group": "consumer", "label": "consumer", "required": false, "type": "integer", 
"javaType": "int", "deprecated": false, "autowired": false, "secret": false, 
"description": "The maximum amount of content (measured in octets) that the 
server will deliver, 0 if unlimited. You need to specify the option of 
prefetchSize, prefetchCount, prefetchGlobal at the same time" },
+    "recoverFromDeclareException": { "kind": "property", "displayName": 
"Recover From Declare Exception", "group": "consumer", "label": "consumer", 
"required": false, "type": "boolean", "javaType": "boolean", "deprecated": 
false, "autowired": false, "secret": false, "defaultValue": false, 
"description": "Decides whether an exception during declaration of exchanges or 
queues is recoverable or not. If the option is false, camel will throw an 
exception when starting the consumer, which will [...]
     "threadPoolSize": { "kind": "property", "displayName": "Thread Pool Size", 
"group": "consumer (advanced)", "label": "consumer,advanced", "required": 
false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": 10, "description": "The consumer uses a 
Thread Pool Executor with a fixed number of threads. This setting allows you to 
set that number of threads." },
     "additionalHeaders": { "kind": "property", "displayName": "Additional 
Headers", "group": "producer", "label": "producer", "required": false, "type": 
"object", "javaType": "java.util.Map<java.lang.String, java.lang.Object>", 
"deprecated": false, "autowired": false, "secret": false, "description": "Map 
of additional headers. These headers will be set only when the 
'allowCustomHeaders' is set to true" },
     "additionalProperties": { "kind": "property", "displayName": "Additional 
Properties", "group": "producer", "label": "producer", "required": false, 
"type": "object", "javaType": "java.util.Map<java.lang.String, 
java.lang.Object>", "deprecated": false, "autowired": false, "secret": false, 
"description": "Map of additional properties. These are standard RabbitMQ 
properties as defined in com.rabbitmq.client.AMQP.BasicProperties The map keys 
should be from org.apache.camel.component.rabbi [...]
@@ -111,6 +112,7 @@
     "prefetchEnabled": { "kind": "parameter", "displayName": "Prefetch 
Enabled", "group": "consumer", "label": "consumer", "required": false, "type": 
"boolean", "javaType": "boolean", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": false, "description": "Enables the quality of 
service on the RabbitMQConsumer side. You need to specify the option of 
prefetchSize, prefetchCount, prefetchGlobal at the same time" },
     "prefetchGlobal": { "kind": "parameter", "displayName": "Prefetch Global", 
"group": "consumer", "label": "consumer", "required": false, "type": "boolean", 
"javaType": "boolean", "deprecated": false, "autowired": false, "secret": 
false, "defaultValue": false, "description": "If the settings should be applied 
to the entire channel rather than each consumer You need to specify the option 
of prefetchSize, prefetchCount, prefetchGlobal at the same time" },
     "prefetchSize": { "kind": "parameter", "displayName": "Prefetch Size", 
"group": "consumer", "label": "consumer", "required": false, "type": "integer", 
"javaType": "int", "deprecated": false, "autowired": false, "secret": false, 
"description": "The maximum amount of content (measured in octets) that the 
server will deliver, 0 if unlimited. You need to specify the option of 
prefetchSize, prefetchCount, prefetchGlobal at the same time" },
+    "recoverFromDeclareException": { "kind": "parameter", "displayName": 
"Recover From Declare Exception", "group": "consumer", "label": "consumer", 
"required": false, "type": "boolean", "javaType": "boolean", "deprecated": 
false, "autowired": false, "secret": false, "defaultValue": true, 
"description": "Decides whether an exception during declaration of exchanges or 
queues is recoverable or not. If the option is false, camel will throw an 
exception when starting the consumer, which will [...]
     "reQueue": { "kind": "parameter", "displayName": "Re Queue", "group": 
"consumer", "label": "consumer", "required": false, "type": "boolean", 
"javaType": "boolean", "deprecated": false, "autowired": false, "secret": 
false, "defaultValue": false, "description": "This is used by the consumer to 
control rejection of the message. When the consumer is complete processing the 
exchange, and if the exchange failed, then the consumer is going to reject the 
message from the RabbitMQ broker. If  [...]
     "exceptionHandler": { "kind": "parameter", "displayName": "Exception 
Handler", "group": "consumer (advanced)", "label": "consumer,advanced", 
"required": false, "type": "object", "javaType": 
"org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.", 
"deprecated": false, "autowired": false, "secret": false, "description": "To 
let the consumer use a custom ExceptionHandler. Notice if the option 
bridgeErrorHandler is enabled then this option is not in use. By default the 
con [...]
     "exchangePattern": { "kind": "parameter", "displayName": "Exchange 
Pattern", "group": "consumer (advanced)", "label": "consumer,advanced", 
"required": false, "type": "object", "javaType": 
"org.apache.camel.ExchangePattern", "enum": [ "InOnly", "InOut", 
"InOptionalOut" ], "deprecated": false, "autowired": false, "secret": false, 
"description": "Sets the exchange pattern when the consumer creates an 
exchange." },
diff --git a/components/camel-rabbitmq/src/main/docs/rabbitmq-component.adoc 
b/components/camel-rabbitmq/src/main/docs/rabbitmq-component.adoc
index f43d384..915471a 100644
--- a/components/camel-rabbitmq/src/main/docs/rabbitmq-component.adoc
+++ b/components/camel-rabbitmq/src/main/docs/rabbitmq-component.adoc
@@ -57,7 +57,7 @@ determines the exchange the queue will be bound to.
 == Options
 
 // component options: START
-The RabbitMQ component supports 55 options, which are listed below.
+The RabbitMQ component supports 56 options, which are listed below.
 
 
 
@@ -88,6 +88,7 @@ The RabbitMQ component supports 55 options, which are listed 
below.
 | *prefetchEnabled* (consumer) | Enables the quality of service on the 
RabbitMQConsumer side. You need to specify the option of prefetchSize, 
prefetchCount, prefetchGlobal at the same time | false | boolean
 | *prefetchGlobal* (consumer) | If the settings should be applied to the 
entire channel rather than each consumer You need to specify the option of 
prefetchSize, prefetchCount, prefetchGlobal at the same time | false | boolean
 | *prefetchSize* (consumer) | The maximum amount of content (measured in 
octets) that the server will deliver, 0 if unlimited. You need to specify the 
option of prefetchSize, prefetchCount, prefetchGlobal at the same time |  | int
+| *recoverFromDeclareException* (consumer) | Decides whether an exception 
during declaration of exchanges or queues is recoverable or not. If the option 
is false, camel will throw an exception when starting the consumer, which will 
interrupt application startup (e.g. in the case when the exchange / queue is 
already declared in RabbitMQ and has incompatible configuration). If set to 
true, the consumer will try to reconnect periodically. | false | boolean
 | *threadPoolSize* (consumer) | The consumer uses a Thread Pool Executor with 
a fixed number of threads. This setting allows you to set that number of 
threads. | 10 | int
 | *additionalHeaders* (producer) | Map of additional headers. These headers 
will be set only when the 'allowCustomHeaders' is set to true |  | Map
 | *additionalProperties* (producer) | Map of additional properties. These are 
standard RabbitMQ properties as defined in 
com.rabbitmq.client.AMQP.BasicProperties The map keys should be from 
org.apache.camel.component.rabbitmq.RabbitMQConstants. Any other keys will be 
ignored. When the message already contains these headers they will be given 
precedence over these properties. |  | Map
@@ -142,7 +143,7 @@ with the following path and query parameters:
 |===
 
 
-=== Query Parameters (66 parameters):
+=== Query Parameters (67 parameters):
 
 
 [width="100%",cols="2,5,^1,2",options="header"]
@@ -179,6 +180,7 @@ with the following path and query parameters:
 | *prefetchEnabled* (consumer) | Enables the quality of service on the 
RabbitMQConsumer side. You need to specify the option of prefetchSize, 
prefetchCount, prefetchGlobal at the same time | false | boolean
 | *prefetchGlobal* (consumer) | If the settings should be applied to the 
entire channel rather than each consumer You need to specify the option of 
prefetchSize, prefetchCount, prefetchGlobal at the same time | false | boolean
 | *prefetchSize* (consumer) | The maximum amount of content (measured in 
octets) that the server will deliver, 0 if unlimited. You need to specify the 
option of prefetchSize, prefetchCount, prefetchGlobal at the same time |  | int
+| *recoverFromDeclareException* (consumer) | Decides whether an exception 
during declaration of exchanges or queues is recoverable or not. If the option 
is false, camel will throw an exception when starting the consumer, which will 
interrupt application startup (e.g. in the case when the exchange / queue is 
already declared in RabbitMQ and has incompatible configuration). If set to 
true, the consumer will try to reconnect periodically. | true | boolean
 | *reQueue* (consumer) | This is used by the consumer to control rejection of 
the message. When the consumer is complete processing the exchange, and if the 
exchange failed, then the consumer is going to reject the message from the 
RabbitMQ broker. If the header CamelRabbitmqRequeue is present then the value 
of the header will be used, otherwise this endpoint value is used as fallback. 
If the value is false (by default) then the message is discarded/dead-lettered. 
If the value is true, t [...]
 | *exceptionHandler* (consumer) | To let the consumer use a custom 
ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this 
option is not in use. By default the consumer will deal with exceptions, that 
will be logged at WARN or ERROR level and ignored. |  | ExceptionHandler
 | *exchangePattern* (consumer) | Sets the exchange pattern when the consumer 
creates an exchange. There are 3 enums and the value can be one of: InOnly, 
InOut, InOptionalOut |  | ExchangePattern
diff --git 
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
index f1bd416..b852b74 100644
--- 
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
+++ 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
@@ -380,7 +380,23 @@ class RabbitConsumer extends ServiceSupport implements 
com.rabbitmq.client.Consu
         // This really only needs to be called on the first consumer or on
         // reconnections.
         if (consumer.getEndpoint().isDeclare()) {
-            consumer.getEndpoint().declareExchangeAndQueue(channel);
+            try {
+                consumer.getEndpoint().declareExchangeAndQueue(channel);
+            } catch (IOException e) {
+                if (channel != null && channel.isOpen()) {
+                    try {
+                        channel.close();
+                    } catch (Exception innerEx) {
+                        e.addSuppressed(innerEx);
+                    }
+                }
+                if 
(this.consumer.getEndpoint().isRecoverFromDeclareException()) {
+                    throw e;
+                } else {
+                    throw new RuntimeCamelException(
+                            "Unrecoverable error when attempting to declare 
exchange or queue for " + consumer, e);
+                }
+            }
         }
         return channel;
     }
diff --git 
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQComponent.java
 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQComponent.java
index 4f6f97f..f2c1873 100644
--- 
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQComponent.java
+++ 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQComponent.java
@@ -148,6 +148,8 @@ public class RabbitMQComponent extends DefaultComponent {
     private Map<String, Object> clientProperties;
     @Metadata(label = "advanced")
     private ExceptionHandler connectionFactoryExceptionHandler;
+    @Metadata(label = "consumer")
+    private boolean recoverFromDeclareException = true;
 
     public RabbitMQComponent() {
     }
@@ -270,6 +272,7 @@ public class RabbitMQComponent extends DefaultComponent {
         endpoint.setDeadLetterRoutingKey(getDeadLetterRoutingKey());
         endpoint.setAllowNullHeaders(isAllowNullHeaders());
         
endpoint.setConnectionFactoryExceptionHandler(getConnectionFactoryExceptionHandler());
+        
endpoint.setRecoverFromDeclareException(isRecoverFromDeclareException());
 
         if (LOG.isDebugEnabled()) {
             LOG.debug("Creating RabbitMQEndpoint with host {}:{} and 
exchangeName: {}",
@@ -910,4 +913,19 @@ public class RabbitMQComponent extends DefaultComponent {
     public void setConnectionFactoryExceptionHandler(ExceptionHandler 
connectionFactoryExceptionHandler) {
         this.connectionFactoryExceptionHandler = 
connectionFactoryExceptionHandler;
     }
+
+    public boolean isRecoverFromDeclareException() {
+        return recoverFromDeclareException;
+    }
+
+    /**
+     * Decides whether an exception during declaration of exchanges or queues 
is recoverable or not. If the option is
+     * false, camel will throw an exception when starting the consumer, which 
will interrupt application startup (e.g.
+     * in the case when the exchange / queue is already declared in RabbitMQ 
and has incompatible configuration). If set
+     * to true, the consumer will try to reconnect periodically.
+     */
+    public void setRecoverFromDeclareException(boolean 
recoverFromDeclareException) {
+        this.recoverFromDeclareException = recoverFromDeclareException;
+    }
+
 }
diff --git 
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
index 63b044d..d31a868 100644
--- 
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
+++ 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
@@ -79,7 +79,6 @@ public class RabbitMQConsumer extends DefaultConsumer 
implements Suspendable {
             openConnection();
             return this.conn;
         } else {
-            openConnection();
             return this.conn;
         }
     }
diff --git 
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
index 7f865de..e976775 100644
--- 
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
+++ 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
@@ -186,6 +186,8 @@ public class RabbitMQEndpoint extends DefaultEndpoint 
implements AsyncEndpoint {
     private boolean allowMessageBodySerialization;
     @UriParam(label = "consumer")
     private boolean reQueue;
+    @UriParam(label = "consumer", defaultValue = "true")
+    private boolean recoverFromDeclareException = true;
     // camel-jms supports this setting but it is not currently configurable in
     // camel-rabbitmq
     private boolean useMessageIDAsCorrelationID = true;
@@ -705,6 +707,20 @@ public class RabbitMQEndpoint extends DefaultEndpoint 
implements AsyncEndpoint {
     }
 
     /**
+     * Decides whether an exception during declaration of exchanges or queues 
is recoverable or not. If the option is
+     * false, camel will throw an exception when starting the consumer, which 
will interrupt application startup (e.g.
+     * in the case when the exchange / queue is already declared in RabbitMQ 
and has incompatible configuration). If set
+     * to true, the consumer will try to reconnect periodically.
+     */
+    public boolean isRecoverFromDeclareException() {
+        return recoverFromDeclareException;
+    }
+
+    public void setRecoverFromDeclareException(boolean 
recoverFromDeclareException) {
+        this.recoverFromDeclareException = recoverFromDeclareException;
+    }
+
+    /**
      * If the option is true, camel declare the exchange and queue name and 
bind them together. If the option is false,
      * camel won't declare the exchange and queue name on the server.
      */
diff --git 
a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/integration/RabbitMQRecoverFromDeclareExceptionIT.java
 
b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/integration/RabbitMQRecoverFromDeclareExceptionIT.java
new file mode 100644
index 0000000..ad4ef8d
--- /dev/null
+++ 
b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/integration/RabbitMQRecoverFromDeclareExceptionIT.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.rabbitmq.integration;
+
+import org.apache.camel.RoutesBuilder;
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.rabbitmq.RabbitMQComponent;
+import org.apache.camel.test.infra.rabbitmq.services.ConnectionProperties;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+
+import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
+import static org.assertj.core.api.Assertions.assertThatNoException;
+import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_METHOD;
+
+/**
+ * Integration test to check whether an exception during declaring 
exchanges/queues during startup of camel context
+ * behaves according to the value of {@link 
RabbitMQComponent#isRecoverFromDeclareException()}.
+ */
+@TestInstance(PER_METHOD)
+public class RabbitMQRecoverFromDeclareExceptionIT extends 
AbstractRabbitMQIntTest {
+
+    private static final String EXCHANGE = "testExchange";
+
+    private com.rabbitmq.client.Connection connection;
+    private com.rabbitmq.client.Channel channel;
+
+    @Override
+    public boolean isUseAdviceWith() {
+        return true;
+    }
+
+    @Override
+    @BeforeEach
+    public void setUp() throws Exception {
+        connection = connection();
+        channel = connection.createChannel();
+        channel.exchangeDeclare(EXCHANGE, "direct");
+
+        super.setUp();
+
+    }
+
+    @Override
+    @AfterEach
+    public void tearDown() throws Exception {
+        super.tearDown();
+
+        channel.abort();
+        connection.abort();
+    }
+
+    @Test
+    void testWrongExchangeTypeInterruptsStartupWhenRecoveryOff() throws 
Exception {
+        context.addRoutes(createRouteWithWrongExchangeType(false));
+        assertThatExceptionOfType(RuntimeCamelException.class)
+                .isThrownBy(() -> context.start())
+                .withMessageStartingWith("Unrecoverable error when attempting 
to declare exchange or queue");
+    }
+
+    @Test
+    void testWrongExchangeTypeDoesNotInterruptStartup() throws Exception {
+        context.addRoutes(createRouteWithWrongExchangeType(true));
+        assertThatNoException().isThrownBy(() -> context.start());
+    }
+
+    private RoutesBuilder createRouteWithWrongExchangeType(boolean 
recoverFromDeclareException) {
+        ConnectionProperties connectionProperties = 
service.connectionProperties();
+        String rabbitMQEndpoint = 
String.format("rabbitmq:localhost:%d/%s?exchangeType=fanout" +
+                                                "&username=%s&password=%s" +
+                                                "&queue=q1" +
+                                                "&routingKey=rk1" +
+                                                
"&recoverFromDeclareException=%b",
+                connectionProperties.port(), EXCHANGE, 
connectionProperties.username(), connectionProperties.password(),
+                recoverFromDeclareException);
+
+        return new RouteBuilder(context) {
+            @Override
+            public void configure() {
+                from(rabbitMQEndpoint).id("consumingRoute").log("Receiving 
message");
+            }
+        };
+    }
+
+}
diff --git 
a/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/RabbitmqComponentBuilderFactory.java
 
b/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/RabbitmqComponentBuilderFactory.java
index 717fc1e..98152ab 100644
--- 
a/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/RabbitmqComponentBuilderFactory.java
+++ 
b/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/RabbitmqComponentBuilderFactory.java
@@ -443,6 +443,27 @@ public interface RabbitmqComponentBuilderFactory {
             return this;
         }
         /**
+         * Decides whether an exception during declaration of exchanges or
+         * queues is recoverable or not. If the option is false, camel will
+         * throw an exception when starting the consumer, which will interrupt
+         * application startup (e.g. in the case when the exchange / queue is
+         * already declared in RabbitMQ and has incompatible configuration). If
+         * set to true, the consumer will try to reconnect periodically.
+         * 
+         * The option is a: &lt;code&gt;boolean&lt;/code&gt; type.
+         * 
+         * Default: false
+         * Group: consumer
+         * 
+         * @param recoverFromDeclareException the value to set
+         * @return the dsl builder
+         */
+        default RabbitmqComponentBuilder recoverFromDeclareException(
+                boolean recoverFromDeclareException) {
+            doSetProperty("recoverFromDeclareException", 
recoverFromDeclareException);
+            return this;
+        }
+        /**
          * The consumer uses a Thread Pool Executor with a fixed number of
          * threads. This setting allows you to set that number of threads.
          * 
@@ -1018,6 +1039,7 @@ public interface RabbitmqComponentBuilderFactory {
             case "prefetchEnabled": ((RabbitMQComponent) 
component).setPrefetchEnabled((boolean) value); return true;
             case "prefetchGlobal": ((RabbitMQComponent) 
component).setPrefetchGlobal((boolean) value); return true;
             case "prefetchSize": ((RabbitMQComponent) 
component).setPrefetchSize((int) value); return true;
+            case "recoverFromDeclareException": ((RabbitMQComponent) 
component).setRecoverFromDeclareException((boolean) value); return true;
             case "threadPoolSize": ((RabbitMQComponent) 
component).setThreadPoolSize((int) value); return true;
             case "additionalHeaders": ((RabbitMQComponent) 
component).setAdditionalHeaders((java.util.Map) value); return true;
             case "additionalProperties": ((RabbitMQComponent) 
component).setAdditionalProperties((java.util.Map) value); return true;
diff --git 
a/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/RabbitMQEndpointBuilderFactory.java
 
b/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/RabbitMQEndpointBuilderFactory.java
index 6f9f0e0..8faa5ce 100644
--- 
a/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/RabbitMQEndpointBuilderFactory.java
+++ 
b/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/RabbitMQEndpointBuilderFactory.java
@@ -910,6 +910,49 @@ public interface RabbitMQEndpointBuilderFactory {
             return this;
         }
         /**
+         * Decides whether an exception during declaration of exchanges or
+         * queues is recoverable or not. If the option is false, camel will
+         * throw an exception when starting the consumer, which will interrupt
+         * application startup (e.g. in the case when the exchange / queue is
+         * already declared in RabbitMQ and has incompatible configuration). If
+         * set to true, the consumer will try to reconnect periodically.
+         * 
+         * The option is a: &lt;code&gt;boolean&lt;/code&gt; type.
+         * 
+         * Default: true
+         * Group: consumer
+         * 
+         * @param recoverFromDeclareException the value to set
+         * @return the dsl builder
+         */
+        default RabbitMQEndpointConsumerBuilder recoverFromDeclareException(
+                boolean recoverFromDeclareException) {
+            doSetProperty("recoverFromDeclareException", 
recoverFromDeclareException);
+            return this;
+        }
+        /**
+         * Decides whether an exception during declaration of exchanges or
+         * queues is recoverable or not. If the option is false, camel will
+         * throw an exception when starting the consumer, which will interrupt
+         * application startup (e.g. in the case when the exchange / queue is
+         * already declared in RabbitMQ and has incompatible configuration). If
+         * set to true, the consumer will try to reconnect periodically.
+         * 
+         * The option will be converted to a &lt;code&gt;boolean&lt;/code&gt;
+         * type.
+         * 
+         * Default: true
+         * Group: consumer
+         * 
+         * @param recoverFromDeclareException the value to set
+         * @return the dsl builder
+         */
+        default RabbitMQEndpointConsumerBuilder recoverFromDeclareException(
+                String recoverFromDeclareException) {
+            doSetProperty("recoverFromDeclareException", 
recoverFromDeclareException);
+            return this;
+        }
+        /**
          * This is used by the consumer to control rejection of the message.
          * When the consumer is complete processing the exchange, and if the
          * exchange failed, then the consumer is going to reject the message
diff --git a/docs/components/modules/ROOT/pages/rabbitmq-component.adoc 
b/docs/components/modules/ROOT/pages/rabbitmq-component.adoc
index 7461f53..cd2c55b 100644
--- a/docs/components/modules/ROOT/pages/rabbitmq-component.adoc
+++ b/docs/components/modules/ROOT/pages/rabbitmq-component.adoc
@@ -59,7 +59,7 @@ determines the exchange the queue will be bound to.
 == Options
 
 // component options: START
-The RabbitMQ component supports 55 options, which are listed below.
+The RabbitMQ component supports 56 options, which are listed below.
 
 
 
@@ -90,6 +90,7 @@ The RabbitMQ component supports 55 options, which are listed 
below.
 | *prefetchEnabled* (consumer) | Enables the quality of service on the 
RabbitMQConsumer side. You need to specify the option of prefetchSize, 
prefetchCount, prefetchGlobal at the same time | false | boolean
 | *prefetchGlobal* (consumer) | If the settings should be applied to the 
entire channel rather than each consumer You need to specify the option of 
prefetchSize, prefetchCount, prefetchGlobal at the same time | false | boolean
 | *prefetchSize* (consumer) | The maximum amount of content (measured in 
octets) that the server will deliver, 0 if unlimited. You need to specify the 
option of prefetchSize, prefetchCount, prefetchGlobal at the same time |  | int
+| *recoverFromDeclareException* (consumer) | Decides whether an exception 
during declaration of exchanges or queues is recoverable or not. If the option 
is false, camel will throw an exception when starting the consumer, which will 
interrupt application startup (e.g. in the case when the exchange / queue is 
already declared in RabbitMQ and has incompatible configuration). If set to 
true, the consumer will try to reconnect periodically. | false | boolean
 | *threadPoolSize* (consumer) | The consumer uses a Thread Pool Executor with 
a fixed number of threads. This setting allows you to set that number of 
threads. | 10 | int
 | *additionalHeaders* (producer) | Map of additional headers. These headers 
will be set only when the 'allowCustomHeaders' is set to true |  | Map
 | *additionalProperties* (producer) | Map of additional properties. These are 
standard RabbitMQ properties as defined in 
com.rabbitmq.client.AMQP.BasicProperties The map keys should be from 
org.apache.camel.component.rabbitmq.RabbitMQConstants. Any other keys will be 
ignored. When the message already contains these headers they will be given 
precedence over these properties. |  | Map
@@ -144,7 +145,7 @@ with the following path and query parameters:
 |===
 
 
-=== Query Parameters (66 parameters):
+=== Query Parameters (67 parameters):
 
 
 [width="100%",cols="2,5,^1,2",options="header"]
@@ -181,6 +182,7 @@ with the following path and query parameters:
 | *prefetchEnabled* (consumer) | Enables the quality of service on the 
RabbitMQConsumer side. You need to specify the option of prefetchSize, 
prefetchCount, prefetchGlobal at the same time | false | boolean
 | *prefetchGlobal* (consumer) | If the settings should be applied to the 
entire channel rather than each consumer You need to specify the option of 
prefetchSize, prefetchCount, prefetchGlobal at the same time | false | boolean
 | *prefetchSize* (consumer) | The maximum amount of content (measured in 
octets) that the server will deliver, 0 if unlimited. You need to specify the 
option of prefetchSize, prefetchCount, prefetchGlobal at the same time |  | int
+| *recoverFromDeclareException* (consumer) | Decides whether an exception 
during declaration of exchanges or queues is recoverable or not. If the option 
is false, camel will throw an exception when starting the consumer, which will 
interrupt application startup (e.g. in the case when the exchange / queue is 
already declared in RabbitMQ and has incompatible configuration). If set to 
true, the consumer will try to reconnect periodically. | true | boolean
 | *reQueue* (consumer) | This is used by the consumer to control rejection of 
the message. When the consumer is complete processing the exchange, and if the 
exchange failed, then the consumer is going to reject the message from the 
RabbitMQ broker. If the header CamelRabbitmqRequeue is present then the value 
of the header will be used, otherwise this endpoint value is used as fallback. 
If the value is false (by default) then the message is discarded/dead-lettered. 
If the value is true, t [...]
 | *exceptionHandler* (consumer) | To let the consumer use a custom 
ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this 
option is not in use. By default the consumer will deal with exceptions, that 
will be logged at WARN or ERROR level and ignored. |  | ExceptionHandler
 | *exchangePattern* (consumer) | Sets the exchange pattern when the consumer 
creates an exchange. There are 3 enums and the value can be one of: InOnly, 
InOut, InOptionalOut |  | ExchangePattern

Reply via email to