Repository: camel Updated Branches: refs/heads/camel-2.17.x a863c8868 -> 968bac1d8 refs/heads/master d518e543a -> 406b83ef6
CAMEL-9984: Rabbit MQ connection is not closed when channel has been closed by server. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/406b83ef Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/406b83ef Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/406b83ef Branch: refs/heads/master Commit: 406b83ef66215976149733ff31beaafe04d0af7c Parents: d518e54 Author: Darrell King <[email protected]> Authored: Tue May 24 09:18:53 2016 +0100 Committer: Claus Ibsen <[email protected]> Committed: Tue May 24 11:52:08 2016 +0200 ---------------------------------------------------------------------- .../component/rabbitmq/RabbitConsumer.java | 6 +++-- .../rabbitmq/RabbitMQConsumerTest.java | 26 ++++++++++++++++++++ 2 files changed, 30 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/406b83ef/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java index eeeafd6..21560f8 100644 --- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java @@ -156,11 +156,13 @@ class RabbitConsumer implements com.rabbitmq.client.Consumer { if (channel == null) { return; } - if (tag != null) { + if (tag != null && isChannelOpen()) { channel.basicCancel(tag); } try { - channel.close(); + if (isChannelOpen()) { + channel.close(); + } } catch (TimeoutException e) { log.error("Timeout occured"); throw e; http://git-wip-us.apache.org/repos/asf/camel/blob/406b83ef/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQConsumerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQConsumerTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQConsumerTest.java index ef6b096..da84477 100644 --- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQConsumerTest.java +++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQConsumerTest.java @@ -20,9 +20,11 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadPoolExecutor; +import com.rabbitmq.client.AlreadyClosedException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; +import com.rabbitmq.client.Consumer; import org.apache.camel.Processor; import org.junit.Test; import org.mockito.Matchers; @@ -30,6 +32,9 @@ import org.mockito.Mockito; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Matchers.anyString; public class RabbitMQConsumerTest { @@ -69,4 +74,25 @@ public class RabbitMQConsumerTest { Mockito.verify(conn).close(30 * 1000); } + + @Test + public void testStoppingConsumerShutdownConnectionWhenServerHasClosedChannel() throws Exception { + AlreadyClosedException alreadyClosedException = Mockito.mock(AlreadyClosedException.class); + + RabbitMQConsumer consumer = new RabbitMQConsumer(endpoint, processor); + + Mockito.when(endpoint.createExecutor()).thenReturn(Executors.newFixedThreadPool(3)); + Mockito.when(endpoint.getConcurrentConsumers()).thenReturn(1); + Mockito.when(endpoint.connect(Matchers.any(ExecutorService.class))).thenReturn(conn); + Mockito.when(conn.createChannel()).thenReturn(channel); + Mockito.when(channel.basicConsume(anyString(), anyBoolean(), any(Consumer.class))).thenReturn("TAG"); + Mockito.when(channel.isOpen()).thenReturn(false); + Mockito.doThrow(alreadyClosedException).when(channel).basicCancel("TAG"); + Mockito.doThrow(alreadyClosedException).when(channel).close(); + + consumer.doStart(); + consumer.doStop(); + + Mockito.verify(conn).close(30 * 1000); + } }
