Hi Claus, Setting up this attribute at the JmsConfiguration level was already possible. But I also added a setter to JmsComponent for further convenience and made it a read-only @ManagedAttribute.
Regards, *Raúl Kripalani* Apache Camel Committer Enterprise Architect, Program Manager, Open Source Integration specialist http://about.me/raulkripalani | http://www.linkedin.com/in/raulkripalani http://blog.raulkr.net | twitter: @raulvk <http://twitter.com/raulvk> On Mon, Feb 4, 2013 at 8:18 AM, Claus Ibsen <claus.ib...@gmail.com> wrote: > Hi > > I think you should make it possible to configure this once on > JmsConfiguration / JmsComponent, Then you dont need to configure this > per endpoint. > Though if you configure per endpoint then it overrides the > configuration inherited from component. > > Also the option could be exposed as a read-only JMX annotation by > adding a @ManagedAttribute to the getter in the endpoint. > > On Mon, Feb 4, 2013 at 2:32 AM, <ra...@apache.org> wrote: > > Author: raulk > > Date: Mon Feb 4 01:32:58 2013 > > New Revision: 1442002 > > > > URL: http://svn.apache.org/viewvc?rev=1442002&view=rev > > Log: > > CAMEL-5974 Allow specifying the default type of TaskExecutor used by the > DMLC (camel-jms) > > > > Added: > > > camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultTaskExecutorType.java > > > camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsDefaultTaskExecutorTypeTest.java > > Modified: > > > camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultJmsMessageListenerContainer.java > > > camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java > > > camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java > > > camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java > > > > Modified: > camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultJmsMessageListenerContainer.java > > URL: > http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultJmsMessageListenerContainer.java?rev=1442002&r1=1442001&r2=1442002&view=diff > > > ============================================================================== > > --- > camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultJmsMessageListenerContainer.java > (original) > > +++ > camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultJmsMessageListenerContainer.java > Mon Feb 4 01:32:58 2013 > > @@ -20,6 +20,7 @@ import org.apache.camel.util.concurrent. > > import org.springframework.core.task.SimpleAsyncTaskExecutor; > > import org.springframework.core.task.TaskExecutor; > > import org.springframework.jms.listener.DefaultMessageListenerContainer; > > +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; > > > > /** > > * The default {@link DefaultMessageListenerContainer container} which > listen for messages > > @@ -46,19 +47,38 @@ public class DefaultJmsMessageListenerCo > > > > /** > > * Create a default TaskExecutor. Called if no explicit > TaskExecutor has been specified. > > - * <p>The default implementation builds a {@link > org.springframework.core.task.SimpleAsyncTaskExecutor} > > - * with the specified bean name and using Camel's {@link > org.apache.camel.spi.ExecutorServiceManager} > > + * <p /> > > + * The type of {@link TaskExecutor} will depend on the value of > > + * {@link JmsConfiguration#getDefaultTaskExecutorType()}. For more > details, refer to the Javadoc of > > + * {@link DefaultTaskExecutorType}. > > + * <p /> > > + * In all cases, it uses the specified bean name and Camel's {@link > org.apache.camel.spi.ExecutorServiceManager} > > * to resolve the thread name. > > - * @see > org.springframework.core.task.SimpleAsyncTaskExecutor#SimpleAsyncTaskExecutor(String) > > + * @see > JmsConfiguration#setDefaultTaskExecutorType(DefaultTaskExecutorType) > > + * @see ThreadPoolTaskExecutor#setBeanName(String) > > */ > > @Override > > protected TaskExecutor createDefaultTaskExecutor() { > > String pattern = > endpoint.getCamelContext().getExecutorServiceManager().getThreadNamePattern(); > > - String beanName = getBeanName(); > > + String beanName = getBeanName() == null ? > endpoint.getThreadName() : getBeanName(); > > > > - SimpleAsyncTaskExecutor answer = new > SimpleAsyncTaskExecutor(beanName); > > - answer.setThreadFactory(new CamelThreadFactory(pattern, > beanName, true)); > > - return answer; > > + if (endpoint.getDefaultTaskExecutorType() == > DefaultTaskExecutorType.ThreadPool) { > > + ThreadPoolTaskExecutor answer = new > ThreadPoolTaskExecutor(); > > + answer.setBeanName(beanName); > > + answer.setThreadFactory(new CamelThreadFactory(pattern, > beanName, true)); > > + answer.setCorePoolSize(endpoint.getConcurrentConsumers()); > > + // Direct hand-off mode. Do not queue up tasks: assign it > to a thread immediately. > > + // We set no upper-bound on the thread pool (no > maxPoolSize) as it's already implicitly constrained by > > + // maxConcurrentConsumers on the DMLC itself (i.e. DMLC > will only grow up to a level of concurrency as > > + // defined by maxConcurrentConsumers). > > + answer.setQueueCapacity(0); > > + answer.initialize(); > > + return answer; > > + } else { > > + SimpleAsyncTaskExecutor answer = new > SimpleAsyncTaskExecutor(beanName); > > + answer.setThreadFactory(new CamelThreadFactory(pattern, > beanName, true)); > > + return answer; > > + } > > } > > - > > + > > } > > > > Added: > camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultTaskExecutorType.java > > URL: > http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultTaskExecutorType.java?rev=1442002&view=auto > > > ============================================================================== > > --- > camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultTaskExecutorType.java > (added) > > +++ > camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultTaskExecutorType.java > Mon Feb 4 01:32:58 2013 > > @@ -0,0 +1,49 @@ > > +/** > > + * Licensed to the Apache Software Foundation (ASF) under one or more > > + * contributor license agreements. See the NOTICE file distributed with > > + * this work for additional information regarding copyright ownership. > > + * The ASF licenses this file to You under the Apache License, Version > 2.0 > > + * (the "License"); you may not use this file except in compliance with > > + * the License. You may obtain a copy of the License at > > + * > > + * http://www.apache.org/licenses/LICENSE-2.0 > > + * > > + * Unless required by applicable law or agreed to in writing, software > > + * distributed under the License is distributed on an "AS IS" BASIS, > > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or > implied. > > + * See the License for the specific language governing permissions and > > + * limitations under the License. > > + */ > > +package org.apache.camel.component.jms; > > + > > +import java.util.concurrent.ThreadPoolExecutor; > > + > > +import org.springframework.core.task.SimpleAsyncTaskExecutor; > > +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; > > + > > +/** > > + * Hints what type of default task executor our {@link > DefaultJmsMessageListenerContainer} should use. > > + * @since 2.10.3 > > + */ > > +public enum DefaultTaskExecutorType { > > + > > + /** > > + * Use a {@link ThreadPoolTaskExecutor} as the underlying task > executor for consuming messages. > > + * It will be configured with these attributes: > > + * <p /> > > + * <li> > > + * <ul>{@code corePoolSize} = concurrentConsumers</ul> > > + * <ul>{@code queueSize} = 0 (to use the 'direct handoff' strategy > for growing the thread pool, > > + * see Javadoc of {@link ThreadPoolExecutor}.</ul> > > + * <ul>{@code maxPoolSize}, default value, i.e. no upper bound (as > concurrency should be limited by > > + * the endpoint's maxConcurrentConsumers, not by the thread > pool).</ul> > > + * </li> > > + */ > > + ThreadPool, > > + > > + /** > > + * Use a {@link SimpleAsyncTaskExecutor} as the underlying task > executor for consuming messages. > > + * (Legacy mode - default behaviour before version 2.10.3). > > + */ > > + SimpleAsync > > +} > > > > Modified: > camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java > > URL: > http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java?rev=1442002&r1=1442001&r2=1442002&view=diff > > > ============================================================================== > > --- > camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java > (original) > > +++ > camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java > Mon Feb 4 01:32:58 2013 > > @@ -18,6 +18,7 @@ package org.apache.camel.component.jms; > > > > import java.util.Map; > > import java.util.concurrent.ExecutorService; > > + > > import javax.jms.ConnectionFactory; > > import javax.jms.ExceptionListener; > > import javax.jms.Session; > > @@ -183,7 +184,7 @@ public class JmsComponent extends Defaul > > public void setCacheLevelName(String cacheName) { > > getConfiguration().setCacheLevelName(cacheName); > > } > > - > > + > > public void setReplyToCacheLevelName(String cacheName) { > > getConfiguration().setReplyToCacheLevelName(cacheName); > > } > > @@ -235,7 +236,7 @@ public class JmsComponent extends Defaul > > public void setIdleTaskExecutionLimit(int idleTaskExecutionLimit) { > > > getConfiguration().setIdleTaskExecutionLimit(idleTaskExecutionLimit); > > } > > - > > + > > public void setIdleConsumerLimit(int idleConsumerLimit) { > > getConfiguration().setIdleConsumerLimit(idleConsumerLimit); > > } > > > > Modified: > camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java > > URL: > http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java?rev=1442002&r1=1442001&r2=1442002&view=diff > > > ============================================================================== > > --- > camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java > (original) > > +++ > camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java > Mon Feb 4 01:32:58 2013 > > @@ -49,7 +49,7 @@ import org.springframework.util.ErrorHan > > import static > org.apache.camel.component.jms.JmsMessageHelper.normalizeDestinationName; > > > > /** > > - * @version > > + * @version > > */ > > public class JmsConfiguration implements Cloneable { > > > > @@ -137,6 +137,7 @@ public class JmsConfiguration implements > > private boolean allowNullBody = true; > > private MessageListenerContainerFactory > messageListenerContainerFactory; > > private boolean includeSentJMSMessageID; > > + private DefaultTaskExecutorType defaultTaskExecutorType; > > > > public JmsConfiguration() { > > } > > @@ -386,7 +387,7 @@ public class JmsConfiguration implements > > case Default: > > return new DefaultJmsMessageListenerContainer(endpoint); > > case Custom: > > - return getCustomMessageListenerContainer(endpoint); > > + return getCustomMessageListenerContainer(endpoint); > > default: > > throw new IllegalArgumentException("Unknown consumer type: > " + consumerType); > > } > > @@ -1313,4 +1314,16 @@ public class JmsConfiguration implements > > public void setIncludeSentJMSMessageID(boolean > includeSentJMSMessageID) { > > this.includeSentJMSMessageID = includeSentJMSMessageID; > > } > > + > > + public DefaultTaskExecutorType getDefaultTaskExecutorType() { > > + return defaultTaskExecutorType; > > + } > > + > > + /** > > + * Indicates what type of {@link TaskExecutor} to use by default > for JMS consumers. > > + * Refer to the documentation of {@link DefaultTaskExecutorType} > for available options. > > + */ > > + public void setDefaultTaskExecutorType(DefaultTaskExecutorType > defaultTaskExecutorType) { > > + this.defaultTaskExecutorType = defaultTaskExecutorType; > > + } > > } > > > > Modified: > camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java > > URL: > http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java?rev=1442002&r1=1442001&r2=1442002&view=diff > > > ============================================================================== > > --- > camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java > (original) > > +++ > camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java > Mon Feb 4 01:32:58 2013 > > @@ -62,7 +62,7 @@ import org.springframework.util.ErrorHan > > /** > > * A <a href="http://activemq.apache.org/jms.html">JMS Endpoint</a> > > * > > - * @version > > + * @version > > */ > > @ManagedResource(description = "Managed JMS Endpoint") > > public class JmsEndpoint extends DefaultEndpoint implements > HeaderFilterStrategyAware, MultipleConsumersSupport, Service { > > @@ -176,24 +176,32 @@ public class JmsEndpoint extends Default > > listenerContainer.setPubSubDomain(pubSubDomain); > > > > // include destination name as part of thread and transaction > name > > - String consumerName = "JmsConsumer[" + > getEndpointConfiguredDestinationName() + "]"; > > + String consumerName = getThreadName(); > > > > if (configuration.getTaskExecutor() != null) { > > if (log.isDebugEnabled()) { > > log.debug("Using custom TaskExecutor: {} on listener > container: {}", configuration.getTaskExecutor(), listenerContainer); > > } > > setContainerTaskExecutor(listenerContainer, > configuration.getTaskExecutor()); > > - } else { > > + } else if ((listenerContainer instanceof > DefaultJmsMessageListenerContainer && > configuration.getDefaultTaskExecutorType() == null) > > + || !(listenerContainer instanceof > DefaultJmsMessageListenerContainer)) { > > + // preserve backwards compatibility if an explicit Default > TaskExecutor Type was not set; > > + // otherwise, defer the creation of the TaskExecutor > > // use a cached pool as DefaultMessageListenerContainer > will throttle pool sizing > > ExecutorService executor = > getCamelContext().getExecutorServiceManager().newCachedThreadPool(consumer, > consumerName); > > setContainerTaskExecutor(listenerContainer, executor); > > + } else { > > + // do nothing, as we're working with a > DefaultJmsMessageListenerContainer with an explicit DefaultTaskExecutorType, > > + // so > DefaultJmsMessageListenerContainer#createDefaultTaskExecutor will handle > the creation > > + log.debug("Deferring creation of TaskExecutor for listener > container: {} as per policy: {}", > > + listenerContainer, > configuration.getDefaultTaskExecutorType()); > > } > > - > > + > > // set a default transaction name if none provided > > if (configuration.getTransactionName() == null) { > > if (listenerContainer instanceof > DefaultMessageListenerContainer) { > > ((DefaultMessageListenerContainer) > listenerContainer).setTransactionName(consumerName); > > - } > > + } > > } > > } > > > > @@ -271,6 +279,10 @@ public class JmsEndpoint extends Default > > return true; > > } > > > > + public String getThreadName() { > > + return "JmsConsumer[" + getEndpointConfiguredDestinationName() > + "]"; > > + } > > + > > // Properties > > // > ------------------------------------------------------------------------- > > > > @@ -448,7 +460,7 @@ public class JmsEndpoint extends Default > > public String getCacheLevelName() { > > return getConfiguration().getCacheLevelName(); > > } > > - > > + > > @ManagedAttribute > > public String getReplyToCacheLevelName() { > > return getConfiguration().getReplyToCacheLevelName(); > > @@ -489,7 +501,7 @@ public class JmsEndpoint extends Default > > public LoggingLevel getErrorHandlerLoggingLevel() { > > return getConfiguration().getErrorHandlerLoggingLevel(); > > } > > - > > + > > @ManagedAttribute > > public boolean isErrorHandlerLogStackTrace() { > > return getConfiguration().isErrorHandlerLogStackTrace(); > > @@ -509,7 +521,7 @@ public class JmsEndpoint extends Default > > public int getIdleConsumerLimit() { > > return getConfiguration().getIdleConsumerLimit(); > > } > > - > > + > > public JmsOperations getJmsOperations() { > > return getConfiguration().getJmsOperations(); > > } > > @@ -724,7 +736,7 @@ public class JmsEndpoint extends Default > > public void setCacheLevelName(String cacheName) { > > getConfiguration().setCacheLevelName(cacheName); > > } > > - > > + > > @ManagedAttribute > > public void setReplyToCacheLevelName(String cacheName) { > > getConfiguration().setReplyToCacheLevelName(cacheName); > > @@ -795,7 +807,7 @@ public class JmsEndpoint extends Default > > public void setIdleConsumerLimit(int idleConsumerLimit) { > > getConfiguration().setIdleConsumerLimit(idleConsumerLimit); > > } > > - > > + > > public void setJmsOperations(JmsOperations jmsOperations) { > > getConfiguration().setJmsOperations(jmsOperations); > > } > > @@ -1040,7 +1052,7 @@ public class JmsEndpoint extends Default > > public void setAllowNullBody(boolean allowNullBody) { > > configuration.setAllowNullBody(allowNullBody); > > } > > - > > + > > @ManagedAttribute > > public boolean isIncludeSentJMSMessageID() { > > return configuration.isIncludeSentJMSMessageID(); > > @@ -1051,6 +1063,14 @@ public class JmsEndpoint extends Default > > > configuration.setIncludeSentJMSMessageID(includeSentJMSMessageID); > > } > > > > + public DefaultTaskExecutorType getDefaultTaskExecutorType() { > > + return configuration.getDefaultTaskExecutorType(); > > + } > > + > > + public void setDefaultTaskExecutorType(DefaultTaskExecutorType > type) { > > + configuration.setDefaultTaskExecutorType(type); > > + } > > + > > public MessageListenerContainerFactory > getMessageListenerContainerFactory() { > > return configuration.getMessageListenerContainerFactory(); > > } > > > > Added: > camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsDefaultTaskExecutorTypeTest.java > > URL: > http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsDefaultTaskExecutorTypeTest.java?rev=1442002&view=auto > > > ============================================================================== > > --- > camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsDefaultTaskExecutorTypeTest.java > (added) > > +++ > camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsDefaultTaskExecutorTypeTest.java > Mon Feb 4 01:32:58 2013 > > @@ -0,0 +1,150 @@ > > +/** > > + * Licensed to the Apache Software Foundation (ASF) under one or more > > + * contributor license agreements. See the NOTICE file distributed with > > + * this work for additional information regarding copyright ownership. > > + * The ASF licenses this file to You under the Apache License, Version > 2.0 > > + * (the "License"); you may not use this file except in compliance with > > + * the License. You may obtain a copy of the License at > > + * > > + * http://www.apache.org/licenses/LICENSE-2.0 > > + * > > + * Unless required by applicable law or agreed to in writing, software > > + * distributed under the License is distributed on an "AS IS" BASIS, > > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or > implied. > > + * See the License for the specific language governing permissions and > > + * limitations under the License. > > + */ > > +package org.apache.camel.component.jms; > > + > > +import java.lang.reflect.InvocationTargetException; > > +import java.lang.reflect.Method; > > +import java.util.concurrent.Callable; > > +import java.util.concurrent.CountDownLatch; > > +import java.util.concurrent.ExecutorService; > > +import java.util.concurrent.Executors; > > + > > +import javax.jms.ConnectionFactory; > > + > > +import org.apache.camel.CamelContext; > > +import org.apache.camel.builder.RouteBuilder; > > +import org.apache.camel.test.junit4.CamelTestSupport; > > +import org.apache.camel.util.concurrent.ThreadHelper; > > +import org.junit.Test; > > +import org.slf4j.Logger; > > +import org.slf4j.LoggerFactory; > > + > > +import static > org.apache.camel.component.jms.JmsComponent.jmsComponentAutoAcknowledge; > > + > > +/** > > + * > > + */ > > +public class JmsDefaultTaskExecutorTypeTest extends CamelTestSupport { > > + > > + private static final Logger LOG = > LoggerFactory.getLogger(JmsDefaultTaskExecutorTypeTest.class); > > + > > + @Test > > + public void testThreadPoolTaskExecutor() throws Exception { > > + context.startRoute("threadPool"); > > + Long beforeThreadCount = currentThreadCount(); > > + > getMockEndpoint("mock:result.threadPool").expectedMessageCount(1000); > > + doSendMessages("foo.threadPool", 500, 5, > DefaultTaskExecutorType.ThreadPool); > > + Thread.sleep(100); > > + doSendMessages("foo.threadPool", 500, 5, > DefaultTaskExecutorType.ThreadPool); > > + assertMockEndpointsSatisfied(); > > + Long numberThreadsCreated = currentThreadCount() - > beforeThreadCount; > > + LOG.info("Number of threads created, > testThreadPoolTaskExecutor: " + numberThreadsCreated); > > + assertTrue("Number of threads created should be equal or lower > than " > > + + "100 with ThreadPoolTaskExecutor", > numberThreadsCreated <= 100); > > + } > > + > > + @Test > > + public void testSimpleAsyncTaskExecutor() throws Exception { > > + context.startRoute("simpleAsync"); > > + Long beforeThreadCount = currentThreadCount(); > > + > getMockEndpoint("mock:result.simpleAsync").expectedMessageCount(1000); > > + doSendMessages("foo.simpleAsync", 500, 5, > DefaultTaskExecutorType.SimpleAsync); > > + Thread.sleep(100); > > + doSendMessages("foo.simpleAsync", 500, 5, > DefaultTaskExecutorType.SimpleAsync); > > + assertMockEndpointsSatisfied(); > > + Long numberThreadsCreated = currentThreadCount() - > beforeThreadCount; > > + LOG.info("Number of threads created, > testSimpleAsyncTaskExecutor: " + numberThreadsCreated); > > + assertTrue("Number of threads created should be equal or higher > than " > > + + "800 with SimpleAsyncTaskExecutor", > numberThreadsCreated >= 800); > > + } > > + > > + @Test > > + public void testDefaultTaskExecutor() throws Exception { > > + context.startRoute("default"); > > + Long beforeThreadCount = currentThreadCount(); > > + > getMockEndpoint("mock:result.default").expectedMessageCount(1000); > > + doSendMessages("foo.default", 500, 5, null); > > + Thread.sleep(100); > > + doSendMessages("foo.default", 500, 5, null); > > + assertMockEndpointsSatisfied(); > > + Long numberThreadsCreated = currentThreadCount() - > beforeThreadCount; > > + LOG.info("Number of threads created, testDefaultTaskExecutor: " > + numberThreadsCreated); > > + assertTrue("Number of threads created should be equal or higher > than " > > + + "800 with default behaviour", numberThreadsCreated >= > 800); > > + } > > + > > + private Long currentThreadCount() throws NoSuchMethodException, > > + IllegalAccessException, InvocationTargetException { > > + Method m = > ThreadHelper.class.getDeclaredMethod("nextThreadCounter", (Class<?>[]) > null); > > + m.setAccessible(true); > > + Long nextThreadCount = (Long) m.invoke(null); > > + return nextThreadCount; > > + } > > + > > + protected CamelContext createCamelContext() throws Exception { > > + CamelContext camelContext = super.createCamelContext(); > > + ConnectionFactory connectionFactory = > CamelJmsTestHelper.createConnectionFactory(); > > + JmsComponent jmsComponent = > jmsComponentAutoAcknowledge(connectionFactory); > > + jmsComponent.getConfiguration().setMaxMessagesPerTask(1); > > + jmsComponent.getConfiguration().setIdleTaskExecutionLimit(1); > > + jmsComponent.getConfiguration().setConcurrentConsumers(3); > > + jmsComponent.getConfiguration().setMaxConcurrentConsumers(10); > > + jmsComponent.getConfiguration().setReceiveTimeout(50); > > + camelContext.addComponent("activemq", jmsComponent); > > + return camelContext; > > + } > > + > > + private void doSendMessages(final String queueName, int messages, > int poolSize, > > + final DefaultTaskExecutorType defaultTaskExecutorType) > throws Exception { > > + ExecutorService executor = > Executors.newFixedThreadPool(poolSize); > > + final CountDownLatch latch = new CountDownLatch(messages); > > + for (int i = 0; i < messages; i++) { > > + final int index = i; > > + executor.submit(new Callable<Object>() { > > + public Object call() throws Exception { > > + String options = defaultTaskExecutorType == null ? > "" : "?defaultTaskExecutorType=" > > + + defaultTaskExecutorType.toString(); > > + template.requestBody("activemq:queue:" + queueName > + options, "Message " + index); > > + latch.countDown(); > > + return null; > > + } > > + }); > > + } > > + latch.await(); > > + executor.shutdown(); > > + } > > + > > + @Override > > + protected RouteBuilder createRouteBuilder() throws Exception { > > + return new RouteBuilder() { > > + @Override > > + public void configure() throws Exception { > > + > > from("activemq:queue:foo.simpleAsync?defaultTaskExecutorType=SimpleAsync").routeId("simpleAsync").noAutoStartup() > > + .to("mock:result.simpleAsync") > > + .setBody(constant("Reply")); > > + > > + > > from("activemq:queue:foo.threadPool?defaultTaskExecutorType=ThreadPool").routeId("threadPool").noAutoStartup() > > + .to("mock:result.threadPool") > > + .setBody(constant("Reply")); > > + > > + > from("activemq:queue:foo.default").routeId("default").noAutoStartup() > > + .to("mock:result.default") > > + .setBody(constant("Reply")); > > + } > > + }; > > + } > > +} > > > > > > > > -- > Claus Ibsen > ----------------- > Red Hat, Inc. > FuseSource is now part of Red Hat > Email: cib...@redhat.com > Web: http://fusesource.com > Twitter: davsclaus > Blog: http://davsclaus.com > Author of Camel in Action: http://www.manning.com/ibsen >