This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch camel-3.21.x
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-3.21.x by this push:
new a3d2bb2d08a [CAMEL-19575] Fixes bug in camel-rabbitmq -
RabbitMQConsumer keeps on consuming even when route shutdown is triggered.
(#10604)
a3d2bb2d08a is described below
commit a3d2bb2d08a653989e8578fddc2cb08be0a109e8
Author: Nikunj Kumar Gupta <[email protected]>
AuthorDate: Fri Jul 7 11:38:25 2023 +0530
[CAMEL-19575] Fixes bug in camel-rabbitmq - RabbitMQConsumer keeps on
consuming even when route shutdown is triggered. (#10604)
* close all consumers concurrently
Closing all consumers in RabbitMqConsumer concurrently
* RabbitMQConsumer to suspend Consumer when suspending.
* cancelling all RabbitConsumers in RabbitMQConsumer before trying to close
them
---
.../org/apache/camel/component/rabbitmq/RabbitConsumer.java | 13 ++++++++++++-
.../apache/camel/component/rabbitmq/RabbitMQConsumer.java | 7 +++++++
2 files changed, 19 insertions(+), 1 deletion(-)
diff --git
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
index 13115cd4536..82385124190 100644
---
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
+++
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
@@ -48,6 +48,7 @@ class RabbitConsumer extends ServiceSupport implements
com.rabbitmq.client.Consu
private Channel channel;
private String tag;
private volatile String consumerTag;
+ private boolean cancelled;
private final Semaphore lock = new Semaphore(1);
@@ -208,12 +209,22 @@ class RabbitConsumer extends ServiceSupport implements
com.rabbitmq.client.Consu
consumer.getEndpoint().isExclusiveConsumer(), null, this);
}
+ protected void cancelChannel() throws Exception {
+ if (channel == null) {
+ return;
+ }
+ if (tag != null && isChannelOpen() && !cancelled) {
+ channel.basicCancel(tag);
+ cancelled = true;
+ }
+ }
+
@Override
protected void doStop() throws Exception {
if (channel == null) {
return;
}
- if (tag != null && isChannelOpen()) {
+ if (tag != null && isChannelOpen() && !cancelled) {
channel.basicCancel(tag);
}
try {
diff --git
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
index 330ec59a590..e2d446ea3aa 100644
---
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
+++
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
@@ -155,6 +155,13 @@ public class RabbitMQConsumer extends DefaultConsumer
implements Suspendable {
if (startConsumerCallable != null) {
startConsumerCallable.stop();
}
+ for (RabbitConsumer consumer : this.consumers) {
+ try {
+ consumer.cancelChannel();
+ } catch (Exception e) {
+ LOG.warn("Error occurred while cancelling consumer. This
exception is ignored", e);
+ }
+ }
for (RabbitConsumer consumer : this.consumers) {
try {
ServiceHelper.stopAndShutdownService(consumer);