This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-3.7.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-3.7.x by this push:
     new 1128360  CAMEL-16619: camel-rabbitmq - Producer destroys rabbit 
channels when returns it back to the pool (#5584)
1128360 is described below

commit 1128360267df73b349c123fe2ae1d51fdbece16f
Author: Anton Ovcharenko <[email protected]>
AuthorDate: Wed May 19 07:18:36 2021 +0300

    CAMEL-16619: camel-rabbitmq - Producer destroys rabbit channels when 
returns it back to the pool (#5584)
    
    - for RabbitMQProducer specify `GenericObjectPool._maxIdle` property of 
channel pool with `RabbitMQEndpoint.channelPoolMaxSize` value
    
    Co-authored-by: Anton Ovcharenko <[email protected]>
---
 .../camel/component/rabbitmq/RabbitMQProducer.java | 14 +++++++++--
 .../component/rabbitmq/RabbitMQProducerTest.java   | 27 ++++++++++++++++++----
 2 files changed, 35 insertions(+), 6 deletions(-)

diff --git 
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
index 30aa453..2003ab4 100644
--- 
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
+++ 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
@@ -109,10 +109,20 @@ public class RabbitMQProducer extends 
DefaultAsyncProducer {
         LOG.debug("Created connection: {}", conn);
 
         LOG.trace("Creating channel pool...");
+        int channelPoolMaxSize = getEndpoint().getChannelPoolMaxSize();
         channelPool = new GenericObjectPool<>(
-                new PoolableChannelFactory(this.conn), 
getEndpoint().getChannelPoolMaxSize(),
+                new PoolableChannelFactory(this.conn),
+                channelPoolMaxSize,
                 GenericObjectPool.WHEN_EXHAUSTED_BLOCK,
-                getEndpoint().getChannelPoolMaxWait());
+                getEndpoint().getChannelPoolMaxWait(),
+                channelPoolMaxSize,
+                GenericObjectPool.DEFAULT_MIN_IDLE,
+                GenericObjectPool.DEFAULT_TEST_ON_BORROW,
+                GenericObjectPool.DEFAULT_TEST_ON_RETURN,
+                GenericObjectPool.DEFAULT_TIME_BETWEEN_EVICTION_RUNS_MILLIS,
+                GenericObjectPool.DEFAULT_NUM_TESTS_PER_EVICTION_RUN,
+                GenericObjectPool.DEFAULT_MIN_EVICTABLE_IDLE_TIME_MILLIS,
+                GenericObjectPool.DEFAULT_TEST_WHILE_IDLE);
         attemptDeclaration();
     }
 
diff --git 
a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerTest.java
 
b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerTest.java
index 0bf14df..0751bd1 100644
--- 
a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerTest.java
+++ 
b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerTest.java
@@ -26,24 +26,27 @@ import java.util.concurrent.TimeoutException;
 
 import com.rabbitmq.client.AMQP;
 import com.rabbitmq.client.Connection;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Channel;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.impl.DefaultCamelContext;
 import org.apache.camel.support.DefaultMessage;
+import org.apache.camel.util.ReflectionHelper;
+import org.apache.commons.pool.impl.GenericObjectPool;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.mockito.Mockito;
 
-import static org.junit.jupiter.api.Assertions.assertArrayEquals;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.*;
 import static org.mockito.ArgumentMatchers.any;
 
 public class RabbitMQProducerTest {
 
+    private CamelContext context = new DefaultCamelContext();
     private RabbitMQEndpoint endpoint = Mockito.mock(RabbitMQEndpoint.class);
     private Exchange exchange = Mockito.mock(Exchange.class);
-    private Message message = new DefaultMessage(new DefaultCamelContext());
+    private Message message = new DefaultMessage(context);
     private Connection conn = Mockito.mock(Connection.class);
 
     @BeforeEach
@@ -55,6 +58,7 @@ public class RabbitMQProducerTest {
         
Mockito.when(endpoint.connect(any(ExecutorService.class))).thenReturn(conn);
         Mockito.when(conn.createChannel()).thenReturn(null);
         Mockito.when(endpoint.getMessageConverter()).thenReturn(converter);
+        Mockito.when(endpoint.getCamelContext()).thenReturn(context);
     }
 
     @Test
@@ -202,6 +206,21 @@ public class RabbitMQProducerTest {
         assertNull(props.getHeaders().get("invalidHeader"));
     }
 
+    @Test
+    public void testChannelPoolConfiguration() throws Exception {
+        RabbitMQProducer producer = new RabbitMQProducer(endpoint);
+        Mockito.when(endpoint.getChannelPoolMaxSize()).thenReturn(123);
+        Mockito.when(endpoint.getChannelPoolMaxWait()).thenReturn(321L);
+        producer.doStart();
+        Object channelPool = 
ReflectionHelper.getField(producer.getClass().getDeclaredField("channelPool"), 
producer);
+        assertNotNull(channelPool);
+        assertTrue(channelPool instanceof GenericObjectPool);
+        GenericObjectPool<Channel> genericObjectPool = 
(GenericObjectPool<Channel>) channelPool;
+        assertEquals(123, genericObjectPool.getMaxActive());
+        assertEquals(123, genericObjectPool.getMaxIdle());
+        assertEquals(321L, genericObjectPool.getMaxWait());
+    }
+
     private static class Something {
     }
 }

Reply via email to