Repository: camel
Updated Branches:
refs/heads/camel-2.14.x 482812f1d -> c05978155
CAMEL-7421 Adding Channel pooling in RabbitMQProducer
Fix RabbitMQSpringIntTest
Replace custom object pool by Commons Pool
Fix Spring integration test again
Conflicts:
components/camel-rabbitmq/pom.xml
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/797cb17c
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/797cb17c
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/797cb17c
Branch: refs/heads/camel-2.14.x
Commit: 797cb17cc175c4d0e7a124592d8233ff9f2720ae
Parents: 482812f
Author: Gerald Quintana <[email protected]>
Authored: Wed May 14 16:35:21 2014 +0200
Committer: Claus Ibsen <[email protected]>
Committed: Thu Dec 4 07:47:09 2014 +0100
----------------------------------------------------------------------
components/camel-rabbitmq/pom.xml | 24 +-
.../component/rabbitmq/RabbitMQProducer.java | 131 ++++++++--
.../rabbitmq/pool/PoolableChannelFactory.java | 59 +++++
.../rabbitmq/RabbitMQSpringIntTest.java | 257 ++++++++++---------
.../rabbitmq/RabbitMQSpringIntTest-context.xml | 7 +-
5 files changed, 315 insertions(+), 163 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/797cb17c/components/camel-rabbitmq/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/pom.xml
b/components/camel-rabbitmq/pom.xml
index 3546d90..3e78216 100644
--- a/components/camel-rabbitmq/pom.xml
+++ b/components/camel-rabbitmq/pom.xml
@@ -43,22 +43,22 @@
<version>${rabbitmq-amqp-client-version}</version>
</dependency>
<dependency>
+ <groupId>commons-pool</groupId>
+ <artifactId>commons-pool</artifactId>
+ <version>${commons-pool-version}</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-core</artifactId>
</dependency>
<!-- testing -->
- <dependency>
- <groupId>org.apache.camel</groupId>
- <artifactId>camel-test-spring</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.camel</groupId>
- <artifactId>camel-core-xml</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-test-spring</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
@@ -92,7 +92,7 @@
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<excludes>
- <exclude>None</exclude>
+ <exclude>**/*.xml</exclude>
</excludes>
<includes>
<include>**/*IntTest*</include>
http://git-wip-us.apache.org/repos/asf/camel/blob/797cb17c/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
----------------------------------------------------------------------
diff --git
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
index f5c7eb4..755fa93 100644
---
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
+++
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
@@ -16,13 +16,6 @@
*/
package org.apache.camel.component.rabbitmq;
-import java.io.IOException;
-import java.math.BigDecimal;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
@@ -30,13 +23,30 @@ import org.apache.camel.Exchange;
import org.apache.camel.impl.DefaultProducer;
import org.apache.camel.util.ObjectHelper;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import org.apache.camel.component.rabbitmq.pool.PoolableChannelFactory;
+import org.apache.commons.pool.ObjectPool;
+import org.apache.commons.pool.impl.GenericObjectPool;
+
public class RabbitMQProducer extends DefaultProducer {
private int closeTimeout = 30 * 1000;
private Connection conn;
- private Channel channel;
+ /**
+ * Maximum number of opened channel in pool
+ */
+ private int channelPoolMaxSize = 10;
+ /**
+ * Maximum time (in milliseconds) waiting for channel
+ */
+ private long channelPoolMaxWait = 1000;
+ private ObjectPool<Channel> channelPool;
private ExecutorService executorService;
-
public RabbitMQProducer(RabbitMQEndpoint endpoint) throws IOException {
super(endpoint);
}
@@ -46,19 +56,41 @@ public class RabbitMQProducer extends DefaultProducer {
return (RabbitMQEndpoint) super.getEndpoint();
}
/**
- * Open connection and channel
+ * Channel callback (similar to Spring JDBC ConnectionCallback)
+ */
+ private static interface ChannelCallback<T> {
+ public T doWithChannel(Channel channel) throws Exception;
+ }
+ /**
+ * Do something with a pooled channel (similar to Spring JDBC
TransactionTemplate#execute)
+ */
+ private <T> T execute(ChannelCallback<T> callback) throws Exception {
+ Channel channel = channelPool.borrowObject();
+ try {
+ return callback.doWithChannel(channel);
+ } finally {
+ channelPool.returnObject(channel);
+ }
+ }
+ /**
+ * Open connection and initialize channel pool
*/
- private void openConnectionAndChannel() throws IOException {
+ private void openConnectionAndChannelPool() throws Exception {
log.trace("Creating connection...");
this.conn = getEndpoint().connect(executorService);
log.debug("Created connection: {}", conn);
- log.trace("Creating channel...");
- this.channel = conn.createChannel();
- log.debug("Created channel: {}", channel);
+ log.trace("Creating channel pool...");
+ channelPool = new GenericObjectPool<>(new
PoolableChannelFactory(this.conn), getChannelPoolMaxSize(),
GenericObjectPool.WHEN_EXHAUSTED_BLOCK, getChannelPoolMaxWait());
if (getEndpoint().isDeclare()) {
- getEndpoint().declareExchangeAndQueue(this.channel);
- }
+ execute(new ChannelCallback<Void>() {
+ @Override
+ public Void doWithChannel(Channel channel) throws Exception {
+ getEndpoint().declareExchangeAndQueue(channel);
+ return null;
+ }
+ });
+ }
}
@Override
@@ -66,7 +98,7 @@ public class RabbitMQProducer extends DefaultProducer {
this.executorService =
getEndpoint().getCamelContext().getExecutorServiceManager().newSingleThreadExecutor(this,
"CamelRabbitMQProducer[" + getEndpoint().getQueue() + "]");
try {
- openConnectionAndChannel();
+ openConnectionAndChannelPool();
} catch (IOException e) {
log.warn("Failed to create connection", e);
}
@@ -75,12 +107,8 @@ public class RabbitMQProducer extends DefaultProducer {
/**
* If needed, close Connection and Channel
*/
- private void closeConnectionAndChannel() throws IOException {
- if (channel != null) {
- log.debug("Closing channel: {}", channel);
- channel.close();
- channel = null;
- }
+ private void closeConnectionAndChannel() throws Exception {
+ channelPool.close();
if (conn != null) {
log.debug("Closing connection: {} with timeout: {} ms.", conn,
closeTimeout);
conn.close(closeTimeout);
@@ -113,13 +141,30 @@ public class RabbitMQProducer extends DefaultProducer {
throw new IllegalArgumentException("ExchangeName and RoutingKey is
not provided in the endpoint: " + getEndpoint());
}
byte[] messageBodyBytes =
exchange.getIn().getMandatoryBody(byte[].class);
- AMQP.BasicProperties.Builder properties = buildProperties(exchange);
+ AMQP.BasicProperties properties = buildProperties(exchange).build();
- if (channel == null) {
+ basicPublish(exchangeName, key, properties, messageBodyBytes);
+ }
+
+ /**
+ * Send a message borrowing a channel from the pool
+ * @param exchange Target exchange
+ * @param routingKey Routing key
+ * @param properties Header properties
+ * @param body Body content
+ */
+ private void basicPublish(final String exchange, final String routingKey,
final AMQP.BasicProperties properties, final byte[] body) throws Exception {
+ if (channelPool==null) {
// Open connection and channel lazily
- openConnectionAndChannel();
+ openConnectionAndChannelPool();
}
- channel.basicPublish(exchangeName, key, properties.build(),
messageBodyBytes);
+ execute(new ChannelCallback<Void>() {
+ @Override
+ public Void doWithChannel(Channel channel) throws Exception {
+ channel.basicPublish(exchange, routingKey, properties, body);
+ return null;
+ }
+ });
}
AMQP.BasicProperties.Builder buildProperties(Exchange exchange) {
@@ -246,4 +291,36 @@ public class RabbitMQProducer extends DefaultProducer {
public void setCloseTimeout(int closeTimeout) {
this.closeTimeout = closeTimeout;
}
+
+ /**
+ * Get maximum number of opened channel in pool
+ * @return Maximum number of opened channel in pool
+ */
+ public int getChannelPoolMaxSize() {
+ return channelPoolMaxSize;
+ }
+
+ /**
+ * Set maximum number of opened channel in pool
+ * @param channelPoolMaxSize Maximum number of opened channel in pool
+ */
+ public void setChannelPoolMaxSize(int channelPoolMaxSize) {
+ this.channelPoolMaxSize = channelPoolMaxSize;
+ }
+
+ /**
+ * Get the maximum number of milliseconds to wait for a channel from the
pool
+ * @return Maximum number of milliseconds waiting for a channel
+ */
+ public long getChannelPoolMaxWait() {
+ return channelPoolMaxWait;
+ }
+
+ /**
+ * Set the maximum number of milliseconds to wait for a channel from the
pool
+ * @param channelPoolMaxWait Maximum number of milliseconds waiting for a
channel
+ */
+ public void setChannelPoolMaxWait(long channelPoolMaxWait) {
+ this.channelPoolMaxWait = channelPoolMaxWait;
+ }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/797cb17c/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/pool/PoolableChannelFactory.java
----------------------------------------------------------------------
diff --git
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/pool/PoolableChannelFactory.java
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/pool/PoolableChannelFactory.java
new file mode 100644
index 0000000..b9bed13
--- /dev/null
+++
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/pool/PoolableChannelFactory.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2014 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.rabbitmq.pool;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import org.apache.commons.pool.PoolableObjectFactory;
+
+/**
+ * Channel lifecyle manager: create, check and close channel
+ */
+public class PoolableChannelFactory implements PoolableObjectFactory<Channel> {
+ /**
+ * Parent connection
+ */
+ private final Connection connection;
+
+ public PoolableChannelFactory(Connection connection) {
+ this.connection = connection;
+ }
+
+ @Override
+ public Channel makeObject() throws Exception {
+ return connection.createChannel();
+ }
+
+ @Override
+ public void destroyObject(Channel t) throws Exception {
+ t.close();
+ }
+
+ @Override
+ public boolean validateObject(Channel t) {
+ return t.isOpen();
+ }
+
+ @Override
+ public void activateObject(Channel t) throws Exception {
+ }
+
+ @Override
+ public void passivateObject(Channel t) throws Exception {
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/797cb17c/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQSpringIntTest.java
----------------------------------------------------------------------
diff --git
a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQSpringIntTest.java
b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQSpringIntTest.java
index 6119082..f65b909 100644
---
a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQSpringIntTest.java
+++
b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQSpringIntTest.java
@@ -1,122 +1,135 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.rabbitmq;
-
-import java.io.IOException;
-
-import com.rabbitmq.client.AMQP;
-import com.rabbitmq.client.Channel;
-import com.rabbitmq.client.Connection;
-import com.rabbitmq.client.ConnectionFactory;
-import com.rabbitmq.client.DefaultConsumer;
-import com.rabbitmq.client.Envelope;
-
-import org.apache.camel.Produce;
-import org.apache.camel.ProducerTemplate;
-import org.apache.camel.test.spring.CamelSpringJUnit4ClassRunner;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.test.context.ContextConfiguration;
-import static org.junit.Assert.assertEquals;
-/**
- * Test RabbitMQ component with Spring DSL
- */
-@RunWith(CamelSpringJUnit4ClassRunner.class)
-@ContextConfiguration("RabbitMQSpringIntTest-context.xml")
-public class RabbitMQSpringIntTest {
- @Produce(uri = "direct:rabbitMQ")
- protected ProducerTemplate template;
- @Autowired
- private ConnectionFactory connectionFactory;
- private Connection connection;
- private Channel channel;
-
- private Connection openConnection() throws IOException {
- if (connection == null) {
- connection = connectionFactory.newConnection();
- }
- return connection;
- }
-
- private Channel openChannel() throws IOException {
- if (channel == null) {
- channel = openConnection().createChannel();
- }
- return channel;
- }
-
- @Before
- public void bindQueueExchange() throws IOException {
- openChannel();
- channel.exchangeDeclare("ex2", "direct", true, false, null);
- channel.queueDeclare("q2", true, false, false, null);
- channel.queueBind("q2", "ex2", "rk2");
- }
-
- @After
- public void closeConnection() {
- if (channel != null) {
- try {
- channel.close();
- } catch (IOException e) {
- }
- }
- if (connection != null) {
- try {
- connection.close();
- } catch (IOException e) {
- }
- }
- }
-
- private static final class LastDeliveryConsumer extends DefaultConsumer {
- private byte[] lastBody;
-
- private LastDeliveryConsumer(Channel channel) {
- super(channel);
- }
-
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
- lastBody = body;
- super.handleDelivery(consumerTag, envelope, properties, body);
- }
-
- public byte[] getLastBody() {
- return lastBody;
- }
- }
-
- @Test
- public void testSendCsutomConnectionFactory() throws Exception {
- String body = "Hello Rabbit";
- template.sendBodyAndHeader(body, RabbitMQConstants.ROUTING_KEY, "rk2");
-
- openChannel();
- LastDeliveryConsumer consumer = new LastDeliveryConsumer(channel);
- channel.basicConsume("q2", true, consumer);
- int i = 10;
- while (consumer.getLastBody() == null && i > 0) {
- Thread.sleep(1000L);
- i--;
- }
- assertEquals(body, new String(consumer.getLastBody()));
- }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.rabbitmq;
+
+import com.rabbitmq.client.*;
+import org.apache.camel.Produce;
+import org.apache.camel.ProducerTemplate;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.test.context.ContextConfiguration;
+import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
+
+import java.io.IOException;
+import java.util.HashMap;
+
+import static org.junit.Assert.assertEquals;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+/**
+ * Test RabbitMQ component with Spring DSL
+ */
+@RunWith(SpringJUnit4ClassRunner.class)
+@ContextConfiguration
+public class RabbitMQSpringIntTest {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(RabbitMQSpringIntTest.class);
+ @Produce(uri = "direct:rabbitMQ")
+ protected ProducerTemplate template;
+ @Autowired
+ private ConnectionFactory connectionFactory;
+ private Connection connection;
+ private Channel channel;
+ private boolean isConnectionOpened() {
+ return connection!=null && connection.isOpen();
+ }
+ private Connection openConnection() throws IOException {
+ if (!isConnectionOpened()) {
+ LOGGER.info("Open connection");
+ connection = connectionFactory.newConnection();
+ }
+ return connection;
+ }
+ private boolean isChannelOpened() {
+ return channel != null && channel.isOpen();
+ }
+ private Channel openChannel() throws IOException {
+ if (!isChannelOpened()) {
+ LOGGER.info("Open channel");
+ channel = openConnection().createChannel();
+ }
+ return channel;
+ }
+
+ @Before
+ public void bindQueueExchange() throws IOException {
+ openChannel();
+ /*
+ LOGGER.info("Declare exchange queue");
+ channel.exchangeDeclare("ex2", "direct", true, false, new
HashMap<String, Object>());
+ channel.queueDeclare("q2", true, false, false, null);
+ channel.queueBind("q2", "ex2", "rk2");
+ */
+ }
+
+ @After
+ public void closeConnection() {
+ if (isChannelOpened()) {
+ try {
+ LOGGER.info("Close channel");
+ channel.close();
+ } catch (IOException e) {
+ }
+ }
+ if (isConnectionOpened()) {
+ try {
+ LOGGER.info("Close connection");
+ connection.close();
+ } catch (IOException e) {
+ }
+ }
+ }
+
+ private static final class LastDeliveryConsumer extends DefaultConsumer {
+ private byte[] lastBody;
+
+ private LastDeliveryConsumer(Channel channel) {
+ super(channel);
+ }
+
+ @Override
+ public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
+ lastBody = body;
+ super.handleDelivery(consumerTag, envelope, properties, body);
+ }
+
+ public byte[] getLastBody() {
+ return lastBody;
+ }
+ public String getLastBodyAsString() {
+ return lastBody == null ? null : new String(lastBody);
+ }
+ }
+
+ @Test
+ public void testSendCsutomConnectionFactory() throws Exception {
+ String body = "Hello Rabbit";
+ template.sendBodyAndHeader(body, RabbitMQConstants.ROUTING_KEY, "rk2");
+
+ openChannel();
+ LastDeliveryConsumer consumer = new LastDeliveryConsumer(channel);
+ channel.basicConsume("q2", true, consumer);
+ int i = 10;
+ while (consumer.getLastBody() == null && i > 0) {
+ Thread.sleep(1000L);
+ i--;
+ }
+ assertEquals(body, consumer.getLastBodyAsString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/797cb17c/components/camel-rabbitmq/src/test/resources/org/apache/camel/component/rabbitmq/RabbitMQSpringIntTest-context.xml
----------------------------------------------------------------------
diff --git
a/components/camel-rabbitmq/src/test/resources/org/apache/camel/component/rabbitmq/RabbitMQSpringIntTest-context.xml
b/components/camel-rabbitmq/src/test/resources/org/apache/camel/component/rabbitmq/RabbitMQSpringIntTest-context.xml
index 6810583..b4688c7 100644
---
a/components/camel-rabbitmq/src/test/resources/org/apache/camel/component/rabbitmq/RabbitMQSpringIntTest-context.xml
+++
b/components/camel-rabbitmq/src/test/resources/org/apache/camel/component/rabbitmq/RabbitMQSpringIntTest-context.xml
@@ -21,7 +21,10 @@
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://camel.apache.org/schema/spring
http://camel.apache.org/schema/spring/camel-spring.xsd
">
-
+ <!-- To create and grant user cameltest:
+ rabbitmqctl add_user cameltest cameltest
+ rabbitmqctl set_permissions -p / cameltest ".*" ".*" ".*"
+ -->
<!-- START SNIPPET: custom connection factory -->
<bean id="customConnectionFactory"
class="com.rabbitmq.client.ConnectionFactory">
<property name="host" value="localhost"/>
@@ -33,7 +36,7 @@
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
<route>
<from uri="direct:rabbitMQ"/>
- <to
uri="rabbitmq://localhost:5672/ex2?connectionFactory=#customConnectionFactory&queue=q2"/>
+ <to
uri="rabbitmq://localhost:5672/ex2?connectionFactory=#customConnectionFactory&queue=q2&routingKey=rk2"/>
</route>
</camelContext>
<!-- END SNIPPET: example -->