Author: davsclaus
Date: Sun Feb 6 13:24:44 2011
New Revision: 1067658
URL: http://svn.apache.org/viewvc?rev=1067658&view=rev
Log:
CAMEL-3576: Use Camel managed thread pool for jms consumers, if no custom
configured.
Modified:
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java
camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsSimpleInOnlyNoMutateTest.java
Modified:
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java?rev=1067658&r1=1067657&r2=1067658&view=diff
==============================================================================
---
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
(original)
+++
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
Sun Feb 6 13:24:44 2011
@@ -35,9 +35,7 @@ import org.springframework.jms.core.JmsO
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.jms.core.SessionCallback;
-import org.springframework.jms.listener.AbstractMessageListenerContainer;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
-import org.springframework.jms.listener.SimpleMessageListenerContainer;
import org.springframework.jms.support.JmsUtils;
import org.springframework.jms.support.converter.MessageConverter;
import org.springframework.jms.support.destination.DestinationResolver;
@@ -349,8 +347,8 @@ public class JmsConfiguration implements
return template;
}
- public AbstractMessageListenerContainer
createMessageListenerContainer(JmsEndpoint endpoint) throws Exception {
- AbstractMessageListenerContainer container =
chooseMessageListenerContainerImplementation(endpoint);
+ public DefaultMessageListenerContainer
createMessageListenerContainer(JmsEndpoint endpoint) throws Exception {
+ DefaultMessageListenerContainer container = new
JmsMessageListenerContainer(endpoint);
configureMessageListenerContainer(container, endpoint);
return container;
}
@@ -386,8 +384,7 @@ public class JmsConfiguration implements
}
/**
- * Sets the connection factory to be used for consuming messages via the
- * {@link #createMessageListenerContainer(JmsEndpoint)}
+ * Sets the connection factory to be used for consuming messages
*
* @param listenerConnectionFactory the connection factory to use for
* consuming messages
@@ -802,7 +799,7 @@ public class JmsConfiguration implements
}
- protected void
configureMessageListenerContainer(AbstractMessageListenerContainer container,
+ protected void
configureMessageListenerContainer(DefaultMessageListenerContainer container,
JmsEndpoint endpoint)
throws Exception {
container.setConnectionFactory(getListenerConnectionFactory());
if (endpoint instanceof DestinationEndpoint) {
@@ -844,69 +841,52 @@ public class JmsConfiguration implements
container.setMessageSelector(endpoint.getSelector());
}
- if (container instanceof DefaultMessageListenerContainer) {
- // this includes DefaultMessageListenerContainer102
- DefaultMessageListenerContainer listenerContainer =
(DefaultMessageListenerContainer) container;
- if (concurrentConsumers >= 0) {
- listenerContainer.setConcurrentConsumers(concurrentConsumers);
- }
+ if (concurrentConsumers >= 0) {
+ container.setConcurrentConsumers(concurrentConsumers);
+ }
- if (cacheLevel >= 0) {
- listenerContainer.setCacheLevel(cacheLevel);
- } else if (cacheLevelName != null) {
- listenerContainer.setCacheLevelName(cacheLevelName);
- } else {
- listenerContainer.setCacheLevel(defaultCacheLevel(endpoint));
- }
+ if (cacheLevel >= 0) {
+ container.setCacheLevel(cacheLevel);
+ } else if (cacheLevelName != null) {
+ container.setCacheLevelName(cacheLevelName);
+ } else {
+ container.setCacheLevel(defaultCacheLevel(endpoint));
+ }
- if (idleTaskExecutionLimit >= 0) {
-
listenerContainer.setIdleTaskExecutionLimit(idleTaskExecutionLimit);
- }
- if (maxConcurrentConsumers > 0) {
- if (maxConcurrentConsumers < concurrentConsumers) {
- throw new IllegalArgumentException("Property
maxConcurrentConsumers: " + maxConcurrentConsumers
- + " must be higher than concurrentConsumers: " +
concurrentConsumers);
- }
-
listenerContainer.setMaxConcurrentConsumers(maxConcurrentConsumers);
- }
- if (maxMessagesPerTask >= 0) {
- listenerContainer.setMaxMessagesPerTask(maxMessagesPerTask);
- }
- listenerContainer.setPubSubNoLocal(pubSubNoLocal);
- if (receiveTimeout >= 0) {
- listenerContainer.setReceiveTimeout(receiveTimeout);
- }
- if (recoveryInterval >= 0) {
- listenerContainer.setRecoveryInterval(recoveryInterval);
- }
- if (taskExecutor != null) {
- listenerContainer.setTaskExecutor(taskExecutor);
- }
- PlatformTransactionManager tm = getTransactionManager();
- if (tm != null && transacted) {
- listenerContainer.setTransactionManager(tm);
- } else if (transacted) {
- throw new IllegalArgumentException("Property transacted is
enabled but a transactionManager was not injected!");
- }
- if (transactionName != null) {
- listenerContainer.setTransactionName(transactionName);
- }
- if (transactionTimeout >= 0) {
- listenerContainer.setTransactionTimeout(transactionTimeout);
- }
- if (taskExecutor != null) {
- listenerContainer.setTaskExecutor(taskExecutor);
- }
- } else if (container instanceof SimpleMessageListenerContainer) {
- // this includes SimpleMessageListenerContainer102
- SimpleMessageListenerContainer listenerContainer =
(SimpleMessageListenerContainer) container;
- if (concurrentConsumers >= 0) {
- listenerContainer.setConcurrentConsumers(concurrentConsumers);
- }
- listenerContainer.setPubSubNoLocal(pubSubNoLocal);
- if (taskExecutor != null) {
- listenerContainer.setTaskExecutor(taskExecutor);
+ if (idleTaskExecutionLimit >= 0) {
+ container.setIdleTaskExecutionLimit(idleTaskExecutionLimit);
+ }
+ if (maxConcurrentConsumers > 0) {
+ if (maxConcurrentConsumers < concurrentConsumers) {
+ throw new IllegalArgumentException("Property
maxConcurrentConsumers: " + maxConcurrentConsumers
+ + " must be higher than concurrentConsumers: " +
concurrentConsumers);
}
+ container.setMaxConcurrentConsumers(maxConcurrentConsumers);
+ }
+ if (maxMessagesPerTask >= 0) {
+ container.setMaxMessagesPerTask(maxMessagesPerTask);
+ }
+ container.setPubSubNoLocal(pubSubNoLocal);
+ if (receiveTimeout >= 0) {
+ container.setReceiveTimeout(receiveTimeout);
+ }
+ if (recoveryInterval >= 0) {
+ container.setRecoveryInterval(recoveryInterval);
+ }
+ if (taskExecutor != null) {
+ container.setTaskExecutor(taskExecutor);
+ }
+ PlatformTransactionManager tm = getTransactionManager();
+ if (tm != null && transacted) {
+ container.setTransactionManager(tm);
+ } else if (transacted) {
+ throw new IllegalArgumentException("Property transacted is enabled
but a transactionManager was not injected!");
+ }
+ if (transactionName != null) {
+ container.setTransactionName(transactionName);
+ }
+ if (transactionTimeout >= 0) {
+ container.setTransactionTimeout(transactionTimeout);
}
}
@@ -932,10 +912,6 @@ public class JmsConfiguration implements
}
}
- public AbstractMessageListenerContainer
chooseMessageListenerContainerImplementation(JmsEndpoint endpoint) {
- return new JmsMessageListenerContainer(endpoint);
- }
-
/**
* Defaults the JMS cache level if none is explicitly specified.
* <p/>
Modified:
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java?rev=1067658&r1=1067657&r2=1067658&view=diff
==============================================================================
---
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java
(original)
+++
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java
Sun Feb 6 13:24:44 2011
@@ -22,20 +22,20 @@ import org.apache.camel.FailedToCreateCo
import org.apache.camel.Processor;
import org.apache.camel.SuspendableService;
import org.apache.camel.impl.DefaultConsumer;
-import org.springframework.jms.listener.AbstractMessageListenerContainer;
+import org.springframework.jms.listener.DefaultMessageListenerContainer;
import org.springframework.jms.support.JmsUtils;
/**
- * A {@link org.apache.camel.Consumer} which uses Spring's {@link
AbstractMessageListenerContainer} implementations to consume JMS messages
+ * A {@link org.apache.camel.Consumer} which uses Spring's {@link
DefaultMessageListenerContainer} implementations to consume JMS messages
*
* @version $Revision$
*/
public class JmsConsumer extends DefaultConsumer implements SuspendableService
{
- private AbstractMessageListenerContainer listenerContainer;
+ private DefaultMessageListenerContainer listenerContainer;
private EndpointMessageListener messageListener;
private volatile boolean initialized;
- public JmsConsumer(JmsEndpoint endpoint, Processor processor,
AbstractMessageListenerContainer listenerContainer) {
+ public JmsConsumer(JmsEndpoint endpoint, Processor processor,
DefaultMessageListenerContainer listenerContainer) {
super(endpoint, processor);
this.listenerContainer = listenerContainer;
this.listenerContainer.setMessageListener(getEndpointMessageListener());
@@ -45,7 +45,7 @@ public class JmsConsumer extends Default
return (JmsEndpoint) super.getEndpoint();
}
- public AbstractMessageListenerContainer getListenerContainer() throws
Exception {
+ public DefaultMessageListenerContainer getListenerContainer() throws
Exception {
if (listenerContainer == null) {
createMessageListenerContainer();
}
@@ -66,7 +66,7 @@ public class JmsConsumer extends Default
protected void createMessageListenerContainer() throws Exception {
listenerContainer = getEndpoint().createMessageListenerContainer();
- getEndpoint().configureListenerContainer(listenerContainer);
+ getEndpoint().configureListenerContainer(listenerContainer, this);
listenerContainer.setMessageListener(getEndpointMessageListener());
}
Modified:
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java?rev=1067658&r1=1067657&r2=1067658&view=diff
==============================================================================
---
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
(original)
+++
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
Sun Feb 6 13:24:44 2011
@@ -18,6 +18,7 @@ package org.apache.camel.component.jms;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.ConnectionFactory;
@@ -45,10 +46,13 @@ import org.apache.camel.impl.DefaultExch
import org.apache.camel.impl.SynchronousDelegateProducer;
import org.apache.camel.spi.HeaderFilterStrategy;
import org.apache.camel.spi.HeaderFilterStrategyAware;
+import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.ServiceHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.core.task.TaskExecutor;
import org.springframework.jms.core.JmsOperations;
-import org.springframework.jms.listener.AbstractMessageListenerContainer;
+import org.springframework.jms.listener.DefaultMessageListenerContainer;
import org.springframework.jms.support.converter.MessageConverter;
import org.springframework.jms.support.destination.DestinationResolver;
import org.springframework.jmx.export.annotation.ManagedAttribute;
@@ -62,6 +66,7 @@ import org.springframework.transaction.P
*/
@ManagedResource(description = "Managed JMS Endpoint")
public class JmsEndpoint extends DefaultEndpoint implements
HeaderFilterStrategyAware, MultipleConsumersSupport, Service {
+ protected final Logger log = LoggerFactory.getLogger(getClass());
private HeaderFilterStrategy headerFilterStrategy;
private boolean pubSubDomain;
private JmsBinding binding;
@@ -148,15 +153,15 @@ public class JmsEndpoint extends Default
}
public JmsConsumer createConsumer(Processor processor) throws Exception {
- AbstractMessageListenerContainer listenerContainer =
configuration.createMessageListenerContainer(this);
+ DefaultMessageListenerContainer listenerContainer =
createMessageListenerContainer();
return createConsumer(processor, listenerContainer);
}
- public AbstractMessageListenerContainer createMessageListenerContainer()
throws Exception {
+ public DefaultMessageListenerContainer createMessageListenerContainer()
throws Exception {
return configuration.createMessageListenerContainer(this);
}
- public void configureListenerContainer(AbstractMessageListenerContainer
listenerContainer) {
+ public void configureListenerContainer(DefaultMessageListenerContainer
listenerContainer, JmsConsumer consumer) {
if (destinationName != null) {
listenerContainer.setDestinationName(destinationName);
} else if (destination != null) {
@@ -170,6 +175,29 @@ public class JmsEndpoint extends Default
}
}
listenerContainer.setPubSubDomain(pubSubDomain);
+
+ if (configuration.getTaskExecutor() != null) {
+ if (log.isDebugEnabled()) {
+ log.debug("Using custom TaskExecutor: " +
configuration.getTaskExecutor() + " on listener container: " +
listenerContainer);
+ }
+ listenerContainer.setTaskExecutor(configuration.getTaskExecutor());
+ } else {
+ // include destination name as part of thread name
+ String name = "JmsConsumer[" +
getEndpointConfiguredDestinationName() + "]";
+ // use a cached pool as DefaultMessageListenerContainer will
throttle pool sizing
+ ExecutorService executor =
getCamelContext().getExecutorServiceStrategy().newCachedThreadPool(consumer,
name);
+ listenerContainer.setTaskExecutor(executor);
+ }
+ }
+
+ /**
+ * Gets the destination name which was configured from the endpoint uri.
+ *
+ * @return the destination name resolved from the endpoint uri
+ */
+ public String getEndpointConfiguredDestinationName() {
+ String remainder = ObjectHelper.after(getEndpointKey(), "//");
+ return JmsMessageHelper.normalizeDestinationName(remainder);
}
/**
@@ -180,9 +208,10 @@ public class JmsEndpoint extends Default
* @return a newly created consumer
* @throws Exception if the consumer cannot be created
*/
- public JmsConsumer createConsumer(Processor processor,
AbstractMessageListenerContainer listenerContainer) throws Exception {
- configureListenerContainer(listenerContainer);
- return new JmsConsumer(this, processor, listenerContainer);
+ public JmsConsumer createConsumer(Processor processor,
DefaultMessageListenerContainer listenerContainer) throws Exception {
+ JmsConsumer consumer = new JmsConsumer(this, processor,
listenerContainer);
+ configureListenerContainer(listenerContainer, consumer);
+ return consumer;
}
@Override
Modified:
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java?rev=1067658&r1=1067657&r2=1067658&view=diff
==============================================================================
---
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java
(original)
+++
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java
Sun Feb 6 13:24:44 2011
@@ -18,6 +18,7 @@ package org.apache.camel.component.jms.r
import java.math.BigInteger;
import java.util.Random;
+import java.util.concurrent.ExecutorService;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
@@ -25,7 +26,6 @@ import javax.jms.Session;
import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
-import org.apache.camel.util.IntrospectionSupport;
import org.springframework.jms.listener.AbstractMessageListenerContainer;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
import org.springframework.jms.support.destination.DestinationResolver;
@@ -206,6 +206,10 @@ public class PersistentQueueReplyManager
}
if (endpoint.getTaskExecutor() != null) {
answer.setTaskExecutor(endpoint.getTaskExecutor());
+ } else {
+ String name = "PersistentReplyManager[" +
endpoint.getEndpointConfiguredDestinationName() + "]";
+ ExecutorService executor =
endpoint.getCamelContext().getExecutorServiceStrategy().newSingleThreadExecutor(this,
name);
+ answer.setTaskExecutor(executor);
}
return answer;
Modified:
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java?rev=1067658&r1=1067657&r2=1067658&view=diff
==============================================================================
---
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java
(original)
+++
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java
Sun Feb 6 13:24:44 2011
@@ -16,6 +16,7 @@
*/
package org.apache.camel.component.jms.reply;
+import java.util.concurrent.ExecutorService;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
@@ -122,6 +123,10 @@ public class TemporaryQueueReplyManager
}
if (endpoint.getTaskExecutor() != null) {
answer.setTaskExecutor(endpoint.getTaskExecutor());
+ } else {
+ String name = "TemporaryReplyManager[" +
endpoint.getEndpointConfiguredDestinationName() + "]";
+ ExecutorService executor =
endpoint.getCamelContext().getExecutorServiceStrategy().newSingleThreadExecutor(this,
name);
+ answer.setTaskExecutor(executor);
}
return answer;
Modified:
camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsSimpleInOnlyNoMutateTest.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsSimpleInOnlyNoMutateTest.java?rev=1067658&r1=1067657&r2=1067658&view=diff
==============================================================================
---
camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsSimpleInOnlyNoMutateTest.java
(original)
+++
camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsSimpleInOnlyNoMutateTest.java
Sun Feb 6 13:24:44 2011
@@ -18,7 +18,6 @@ package org.apache.camel.component.jms;
import javax.jms.ConnectionFactory;
-import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;