Hi

I think you should make it possible to configure this once on
JmsConfiguration / JmsComponent, Then you dont need to configure this
per endpoint.
Though if you configure per endpoint then it overrides the
configuration inherited from component.

Also the option could be exposed as a read-only JMX annotation by
adding a @ManagedAttribute to the getter in the endpoint.

On Mon, Feb 4, 2013 at 2:32 AM,  <ra...@apache.org> wrote:
> Author: raulk
> Date: Mon Feb  4 01:32:58 2013
> New Revision: 1442002
>
> URL: http://svn.apache.org/viewvc?rev=1442002&view=rev
> Log:
> CAMEL-5974 Allow specifying the default type of TaskExecutor used by the DMLC 
> (camel-jms)
>
> Added:
>     
> camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultTaskExecutorType.java
>     
> camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsDefaultTaskExecutorTypeTest.java
> Modified:
>     
> camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultJmsMessageListenerContainer.java
>     
> camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
>     
> camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
>     
> camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
>
> Modified: 
> camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultJmsMessageListenerContainer.java
> URL: 
> http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultJmsMessageListenerContainer.java?rev=1442002&r1=1442001&r2=1442002&view=diff
> ==============================================================================
> --- 
> camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultJmsMessageListenerContainer.java
>  (original)
> +++ 
> camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultJmsMessageListenerContainer.java
>  Mon Feb  4 01:32:58 2013
> @@ -20,6 +20,7 @@ import org.apache.camel.util.concurrent.
>  import org.springframework.core.task.SimpleAsyncTaskExecutor;
>  import org.springframework.core.task.TaskExecutor;
>  import org.springframework.jms.listener.DefaultMessageListenerContainer;
> +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
>
>  /**
>   * The default {@link DefaultMessageListenerContainer container} which 
> listen for messages
> @@ -46,19 +47,38 @@ public class DefaultJmsMessageListenerCo
>
>      /**
>       * Create a default TaskExecutor. Called if no explicit TaskExecutor has 
> been specified.
> -     * <p>The default implementation builds a {@link 
> org.springframework.core.task.SimpleAsyncTaskExecutor}
> -     * with the specified bean name and using Camel's {@link 
> org.apache.camel.spi.ExecutorServiceManager}
> +     * <p />
> +     * The type of {@link TaskExecutor} will depend on the value of
> +     * {@link JmsConfiguration#getDefaultTaskExecutorType()}. For more 
> details, refer to the Javadoc of
> +     * {@link DefaultTaskExecutorType}.
> +     * <p />
> +     * In all cases, it uses the specified bean name and Camel's {@link 
> org.apache.camel.spi.ExecutorServiceManager}
>       * to resolve the thread name.
> -     * @see 
> org.springframework.core.task.SimpleAsyncTaskExecutor#SimpleAsyncTaskExecutor(String)
> +     * @see 
> JmsConfiguration#setDefaultTaskExecutorType(DefaultTaskExecutorType)
> +     * @see ThreadPoolTaskExecutor#setBeanName(String)
>       */
>      @Override
>      protected TaskExecutor createDefaultTaskExecutor() {
>          String pattern = 
> endpoint.getCamelContext().getExecutorServiceManager().getThreadNamePattern();
> -        String beanName = getBeanName();
> +        String beanName = getBeanName() == null ? endpoint.getThreadName() : 
> getBeanName();
>
> -        SimpleAsyncTaskExecutor answer = new 
> SimpleAsyncTaskExecutor(beanName);
> -        answer.setThreadFactory(new CamelThreadFactory(pattern, beanName, 
> true));
> -        return answer;
> +        if (endpoint.getDefaultTaskExecutorType() == 
> DefaultTaskExecutorType.ThreadPool) {
> +            ThreadPoolTaskExecutor answer = new ThreadPoolTaskExecutor();
> +            answer.setBeanName(beanName);
> +            answer.setThreadFactory(new CamelThreadFactory(pattern, 
> beanName, true));
> +            answer.setCorePoolSize(endpoint.getConcurrentConsumers());
> +            // Direct hand-off mode. Do not queue up tasks: assign it to a 
> thread immediately.
> +            // We set no upper-bound on the thread pool (no maxPoolSize) as 
> it's already implicitly constrained by
> +            // maxConcurrentConsumers on the DMLC itself (i.e. DMLC will 
> only grow up to a level of concurrency as
> +            // defined by maxConcurrentConsumers).
> +            answer.setQueueCapacity(0);
> +            answer.initialize();
> +            return answer;
> +        } else {
> +            SimpleAsyncTaskExecutor answer = new 
> SimpleAsyncTaskExecutor(beanName);
> +            answer.setThreadFactory(new CamelThreadFactory(pattern, 
> beanName, true));
> +            return answer;
> +        }
>      }
> -
> +
>  }
>
> Added: 
> camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultTaskExecutorType.java
> URL: 
> http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultTaskExecutorType.java?rev=1442002&view=auto
> ==============================================================================
> --- 
> camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultTaskExecutorType.java
>  (added)
> +++ 
> camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultTaskExecutorType.java
>  Mon Feb  4 01:32:58 2013
> @@ -0,0 +1,49 @@
> +/**
> + * Licensed to the Apache Software Foundation (ASF) under one or more
> + * contributor license agreements.  See the NOTICE file distributed with
> + * this work for additional information regarding copyright ownership.
> + * The ASF licenses this file to You under the Apache License, Version 2.0
> + * (the "License"); you may not use this file except in compliance with
> + * the License.  You may obtain a copy of the License at
> + *
> + *      http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +package org.apache.camel.component.jms;
> +
> +import java.util.concurrent.ThreadPoolExecutor;
> +
> +import org.springframework.core.task.SimpleAsyncTaskExecutor;
> +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
> +
> +/**
> + * Hints what type of default task executor our {@link 
> DefaultJmsMessageListenerContainer} should use.
> + * @since 2.10.3
> + */
> +public enum DefaultTaskExecutorType {
> +
> +    /**
> +     * Use a {@link ThreadPoolTaskExecutor} as the underlying task executor 
> for consuming messages.
> +     * It will be configured with these attributes:
> +     * <p />
> +     * <li>
> +     * <ul>{@code corePoolSize} = concurrentConsumers</ul>
> +     * <ul>{@code queueSize} = 0 (to use the 'direct handoff' strategy for 
> growing the thread pool,
> +     * see Javadoc of {@link ThreadPoolExecutor}.</ul>
> +     * <ul>{@code maxPoolSize}, default value, i.e. no upper bound (as 
> concurrency should be limited by
> +     * the endpoint's maxConcurrentConsumers, not by the thread pool).</ul>
> +     * </li>
> +     */
> +    ThreadPool,
> +
> +    /**
> +     * Use a {@link SimpleAsyncTaskExecutor} as the underlying task executor 
> for consuming messages.
> +     * (Legacy mode - default behaviour before version 2.10.3).
> +     */
> +    SimpleAsync
> +}
>
> Modified: 
> camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
> URL: 
> http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java?rev=1442002&r1=1442001&r2=1442002&view=diff
> ==============================================================================
> --- 
> camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
>  (original)
> +++ 
> camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
>  Mon Feb  4 01:32:58 2013
> @@ -18,6 +18,7 @@ package org.apache.camel.component.jms;
>
>  import java.util.Map;
>  import java.util.concurrent.ExecutorService;
> +
>  import javax.jms.ConnectionFactory;
>  import javax.jms.ExceptionListener;
>  import javax.jms.Session;
> @@ -183,7 +184,7 @@ public class JmsComponent extends Defaul
>      public void setCacheLevelName(String cacheName) {
>          getConfiguration().setCacheLevelName(cacheName);
>      }
> -
> +
>      public void setReplyToCacheLevelName(String cacheName) {
>          getConfiguration().setReplyToCacheLevelName(cacheName);
>      }
> @@ -235,7 +236,7 @@ public class JmsComponent extends Defaul
>      public void setIdleTaskExecutionLimit(int idleTaskExecutionLimit) {
>          getConfiguration().setIdleTaskExecutionLimit(idleTaskExecutionLimit);
>      }
> -
> +
>      public void setIdleConsumerLimit(int idleConsumerLimit) {
>          getConfiguration().setIdleConsumerLimit(idleConsumerLimit);
>      }
>
> Modified: 
> camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
> URL: 
> http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java?rev=1442002&r1=1442001&r2=1442002&view=diff
> ==============================================================================
> --- 
> camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
>  (original)
> +++ 
> camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
>  Mon Feb  4 01:32:58 2013
> @@ -49,7 +49,7 @@ import org.springframework.util.ErrorHan
>  import static 
> org.apache.camel.component.jms.JmsMessageHelper.normalizeDestinationName;
>
>  /**
> - * @version
> + * @version
>   */
>  public class JmsConfiguration implements Cloneable {
>
> @@ -137,6 +137,7 @@ public class JmsConfiguration implements
>      private boolean allowNullBody = true;
>      private MessageListenerContainerFactory messageListenerContainerFactory;
>      private boolean includeSentJMSMessageID;
> +    private DefaultTaskExecutorType defaultTaskExecutorType;
>
>      public JmsConfiguration() {
>      }
> @@ -386,7 +387,7 @@ public class JmsConfiguration implements
>          case Default:
>              return new DefaultJmsMessageListenerContainer(endpoint);
>          case Custom:
> -            return getCustomMessageListenerContainer(endpoint);
> +            return getCustomMessageListenerContainer(endpoint);
>          default:
>              throw new IllegalArgumentException("Unknown consumer type: " + 
> consumerType);
>          }
> @@ -1313,4 +1314,16 @@ public class JmsConfiguration implements
>      public void setIncludeSentJMSMessageID(boolean includeSentJMSMessageID) {
>          this.includeSentJMSMessageID = includeSentJMSMessageID;
>      }
> +
> +    public DefaultTaskExecutorType getDefaultTaskExecutorType() {
> +        return defaultTaskExecutorType;
> +    }
> +
> +    /**
> +     * Indicates what type of {@link TaskExecutor} to use by default for JMS 
> consumers.
> +     * Refer to the documentation of {@link DefaultTaskExecutorType} for 
> available options.
> +     */
> +    public void setDefaultTaskExecutorType(DefaultTaskExecutorType 
> defaultTaskExecutorType) {
> +        this.defaultTaskExecutorType = defaultTaskExecutorType;
> +    }
>  }
>
> Modified: 
> camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
> URL: 
> http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java?rev=1442002&r1=1442001&r2=1442002&view=diff
> ==============================================================================
> --- 
> camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
>  (original)
> +++ 
> camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
>  Mon Feb  4 01:32:58 2013
> @@ -62,7 +62,7 @@ import org.springframework.util.ErrorHan
>  /**
>   * A <a href="http://activemq.apache.org/jms.html";>JMS Endpoint</a>
>   *
> - * @version
> + * @version
>   */
>  @ManagedResource(description = "Managed JMS Endpoint")
>  public class JmsEndpoint extends DefaultEndpoint implements 
> HeaderFilterStrategyAware, MultipleConsumersSupport, Service {
> @@ -176,24 +176,32 @@ public class JmsEndpoint extends Default
>          listenerContainer.setPubSubDomain(pubSubDomain);
>
>          // include destination name as part of thread and transaction name
> -        String consumerName = "JmsConsumer[" + 
> getEndpointConfiguredDestinationName() + "]";
> +        String consumerName = getThreadName();
>
>          if (configuration.getTaskExecutor() != null) {
>              if (log.isDebugEnabled()) {
>                  log.debug("Using custom TaskExecutor: {} on listener 
> container: {}", configuration.getTaskExecutor(), listenerContainer);
>              }
>              setContainerTaskExecutor(listenerContainer, 
> configuration.getTaskExecutor());
> -        } else {
> +        } else if ((listenerContainer instanceof 
> DefaultJmsMessageListenerContainer && 
> configuration.getDefaultTaskExecutorType() == null)
> +                || !(listenerContainer instanceof 
> DefaultJmsMessageListenerContainer)) {
> +            // preserve backwards compatibility if an explicit Default 
> TaskExecutor Type was not set;
> +            // otherwise, defer the creation of the TaskExecutor
>              // use a cached pool as DefaultMessageListenerContainer will 
> throttle pool sizing
>              ExecutorService executor = 
> getCamelContext().getExecutorServiceManager().newCachedThreadPool(consumer, 
> consumerName);
>              setContainerTaskExecutor(listenerContainer, executor);
> +        } else {
> +            // do nothing, as we're working with a 
> DefaultJmsMessageListenerContainer with an explicit DefaultTaskExecutorType,
> +            // so 
> DefaultJmsMessageListenerContainer#createDefaultTaskExecutor will handle the 
> creation
> +            log.debug("Deferring creation of TaskExecutor for listener 
> container: {} as per policy: {}",
> +                    listenerContainer, 
> configuration.getDefaultTaskExecutorType());
>          }
> -
> +
>          // set a default transaction name if none provided
>          if (configuration.getTransactionName() == null) {
>              if (listenerContainer instanceof 
> DefaultMessageListenerContainer) {
>                  ((DefaultMessageListenerContainer) 
> listenerContainer).setTransactionName(consumerName);
> -            }
> +            }
>          }
>      }
>
> @@ -271,6 +279,10 @@ public class JmsEndpoint extends Default
>          return true;
>      }
>
> +    public String getThreadName() {
> +        return "JmsConsumer[" + getEndpointConfiguredDestinationName() + "]";
> +    }
> +
>      // Properties
>      // 
> -------------------------------------------------------------------------
>
> @@ -448,7 +460,7 @@ public class JmsEndpoint extends Default
>      public String getCacheLevelName() {
>          return getConfiguration().getCacheLevelName();
>      }
> -
> +
>      @ManagedAttribute
>      public String getReplyToCacheLevelName() {
>          return getConfiguration().getReplyToCacheLevelName();
> @@ -489,7 +501,7 @@ public class JmsEndpoint extends Default
>      public LoggingLevel getErrorHandlerLoggingLevel() {
>          return getConfiguration().getErrorHandlerLoggingLevel();
>      }
> -
> +
>      @ManagedAttribute
>      public boolean isErrorHandlerLogStackTrace() {
>          return getConfiguration().isErrorHandlerLogStackTrace();
> @@ -509,7 +521,7 @@ public class JmsEndpoint extends Default
>      public int getIdleConsumerLimit() {
>          return getConfiguration().getIdleConsumerLimit();
>      }
> -
> +
>      public JmsOperations getJmsOperations() {
>          return getConfiguration().getJmsOperations();
>      }
> @@ -724,7 +736,7 @@ public class JmsEndpoint extends Default
>      public void setCacheLevelName(String cacheName) {
>          getConfiguration().setCacheLevelName(cacheName);
>      }
> -
> +
>      @ManagedAttribute
>      public void setReplyToCacheLevelName(String cacheName) {
>          getConfiguration().setReplyToCacheLevelName(cacheName);
> @@ -795,7 +807,7 @@ public class JmsEndpoint extends Default
>      public void setIdleConsumerLimit(int idleConsumerLimit) {
>          getConfiguration().setIdleConsumerLimit(idleConsumerLimit);
>      }
> -
> +
>      public void setJmsOperations(JmsOperations jmsOperations) {
>          getConfiguration().setJmsOperations(jmsOperations);
>      }
> @@ -1040,7 +1052,7 @@ public class JmsEndpoint extends Default
>      public void setAllowNullBody(boolean allowNullBody) {
>          configuration.setAllowNullBody(allowNullBody);
>      }
> -
> +
>      @ManagedAttribute
>      public boolean isIncludeSentJMSMessageID() {
>          return configuration.isIncludeSentJMSMessageID();
> @@ -1051,6 +1063,14 @@ public class JmsEndpoint extends Default
>          configuration.setIncludeSentJMSMessageID(includeSentJMSMessageID);
>      }
>
> +    public DefaultTaskExecutorType getDefaultTaskExecutorType() {
> +        return configuration.getDefaultTaskExecutorType();
> +    }
> +
> +    public void setDefaultTaskExecutorType(DefaultTaskExecutorType type) {
> +        configuration.setDefaultTaskExecutorType(type);
> +    }
> +
>      public MessageListenerContainerFactory 
> getMessageListenerContainerFactory() {
>          return configuration.getMessageListenerContainerFactory();
>      }
>
> Added: 
> camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsDefaultTaskExecutorTypeTest.java
> URL: 
> http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsDefaultTaskExecutorTypeTest.java?rev=1442002&view=auto
> ==============================================================================
> --- 
> camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsDefaultTaskExecutorTypeTest.java
>  (added)
> +++ 
> camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsDefaultTaskExecutorTypeTest.java
>  Mon Feb  4 01:32:58 2013
> @@ -0,0 +1,150 @@
> +/**
> + * Licensed to the Apache Software Foundation (ASF) under one or more
> + * contributor license agreements.  See the NOTICE file distributed with
> + * this work for additional information regarding copyright ownership.
> + * The ASF licenses this file to You under the Apache License, Version 2.0
> + * (the "License"); you may not use this file except in compliance with
> + * the License.  You may obtain a copy of the License at
> + *
> + *      http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +package org.apache.camel.component.jms;
> +
> +import java.lang.reflect.InvocationTargetException;
> +import java.lang.reflect.Method;
> +import java.util.concurrent.Callable;
> +import java.util.concurrent.CountDownLatch;
> +import java.util.concurrent.ExecutorService;
> +import java.util.concurrent.Executors;
> +
> +import javax.jms.ConnectionFactory;
> +
> +import org.apache.camel.CamelContext;
> +import org.apache.camel.builder.RouteBuilder;
> +import org.apache.camel.test.junit4.CamelTestSupport;
> +import org.apache.camel.util.concurrent.ThreadHelper;
> +import org.junit.Test;
> +import org.slf4j.Logger;
> +import org.slf4j.LoggerFactory;
> +
> +import static 
> org.apache.camel.component.jms.JmsComponent.jmsComponentAutoAcknowledge;
> +
> +/**
> + *
> + */
> +public class JmsDefaultTaskExecutorTypeTest extends CamelTestSupport {
> +
> +    private static final Logger LOG = 
> LoggerFactory.getLogger(JmsDefaultTaskExecutorTypeTest.class);
> +
> +    @Test
> +    public void testThreadPoolTaskExecutor() throws Exception {
> +        context.startRoute("threadPool");
> +        Long beforeThreadCount = currentThreadCount();
> +        getMockEndpoint("mock:result.threadPool").expectedMessageCount(1000);
> +        doSendMessages("foo.threadPool", 500, 5, 
> DefaultTaskExecutorType.ThreadPool);
> +        Thread.sleep(100);
> +        doSendMessages("foo.threadPool", 500, 5, 
> DefaultTaskExecutorType.ThreadPool);
> +        assertMockEndpointsSatisfied();
> +        Long numberThreadsCreated = currentThreadCount() - beforeThreadCount;
> +        LOG.info("Number of threads created, testThreadPoolTaskExecutor: " + 
> numberThreadsCreated);
> +        assertTrue("Number of threads created should be equal or lower than "
> +                + "100 with ThreadPoolTaskExecutor", numberThreadsCreated <= 
> 100);
> +    }
> +
> +    @Test
> +    public void testSimpleAsyncTaskExecutor() throws Exception {
> +        context.startRoute("simpleAsync");
> +        Long beforeThreadCount = currentThreadCount();
> +        
> getMockEndpoint("mock:result.simpleAsync").expectedMessageCount(1000);
> +        doSendMessages("foo.simpleAsync", 500, 5, 
> DefaultTaskExecutorType.SimpleAsync);
> +        Thread.sleep(100);
> +        doSendMessages("foo.simpleAsync", 500, 5, 
> DefaultTaskExecutorType.SimpleAsync);
> +        assertMockEndpointsSatisfied();
> +        Long numberThreadsCreated = currentThreadCount() - beforeThreadCount;
> +        LOG.info("Number of threads created, testSimpleAsyncTaskExecutor: " 
> + numberThreadsCreated);
> +        assertTrue("Number of threads created should be equal or higher than 
> "
> +                + "800 with SimpleAsyncTaskExecutor", numberThreadsCreated 
> >= 800);
> +    }
> +
> +    @Test
> +    public void testDefaultTaskExecutor() throws Exception {
> +        context.startRoute("default");
> +        Long beforeThreadCount = currentThreadCount();
> +        getMockEndpoint("mock:result.default").expectedMessageCount(1000);
> +        doSendMessages("foo.default", 500, 5, null);
> +        Thread.sleep(100);
> +        doSendMessages("foo.default", 500, 5, null);
> +        assertMockEndpointsSatisfied();
> +        Long numberThreadsCreated = currentThreadCount() - beforeThreadCount;
> +        LOG.info("Number of threads created, testDefaultTaskExecutor: " + 
> numberThreadsCreated);
> +        assertTrue("Number of threads created should be equal or higher than 
> "
> +                + "800 with default behaviour", numberThreadsCreated >= 800);
> +    }
> +
> +    private Long currentThreadCount() throws NoSuchMethodException,
> +            IllegalAccessException, InvocationTargetException {
> +        Method m = ThreadHelper.class.getDeclaredMethod("nextThreadCounter", 
> (Class<?>[]) null);
> +        m.setAccessible(true);
> +        Long nextThreadCount = (Long) m.invoke(null);
> +        return nextThreadCount;
> +    }
> +
> +    protected CamelContext createCamelContext() throws Exception {
> +        CamelContext camelContext = super.createCamelContext();
> +        ConnectionFactory connectionFactory = 
> CamelJmsTestHelper.createConnectionFactory();
> +        JmsComponent jmsComponent = 
> jmsComponentAutoAcknowledge(connectionFactory);
> +        jmsComponent.getConfiguration().setMaxMessagesPerTask(1);
> +        jmsComponent.getConfiguration().setIdleTaskExecutionLimit(1);
> +        jmsComponent.getConfiguration().setConcurrentConsumers(3);
> +        jmsComponent.getConfiguration().setMaxConcurrentConsumers(10);
> +        jmsComponent.getConfiguration().setReceiveTimeout(50);
> +        camelContext.addComponent("activemq", jmsComponent);
> +        return camelContext;
> +    }
> +
> +    private void doSendMessages(final String queueName, int messages, int 
> poolSize,
> +            final DefaultTaskExecutorType defaultTaskExecutorType) throws 
> Exception {
> +        ExecutorService executor = Executors.newFixedThreadPool(poolSize);
> +        final CountDownLatch latch = new CountDownLatch(messages);
> +        for (int i = 0; i < messages; i++) {
> +            final int index = i;
> +            executor.submit(new Callable<Object>() {
> +                public Object call() throws Exception {
> +                    String options = defaultTaskExecutorType == null ? "" : 
> "?defaultTaskExecutorType="
> +                            + defaultTaskExecutorType.toString();
> +                    template.requestBody("activemq:queue:" + queueName + 
> options, "Message " + index);
> +                    latch.countDown();
> +                    return null;
> +                }
> +            });
> +        }
> +        latch.await();
> +        executor.shutdown();
> +    }
> +
> +    @Override
> +    protected RouteBuilder createRouteBuilder() throws Exception {
> +        return new RouteBuilder() {
> +            @Override
> +            public void configure() throws Exception {
> +                
> from("activemq:queue:foo.simpleAsync?defaultTaskExecutorType=SimpleAsync").routeId("simpleAsync").noAutoStartup()
> +                    .to("mock:result.simpleAsync")
> +                    .setBody(constant("Reply"));
> +
> +                
> from("activemq:queue:foo.threadPool?defaultTaskExecutorType=ThreadPool").routeId("threadPool").noAutoStartup()
> +                    .to("mock:result.threadPool")
> +                    .setBody(constant("Reply"));
> +
> +                
> from("activemq:queue:foo.default").routeId("default").noAutoStartup()
> +                    .to("mock:result.default")
> +                    .setBody(constant("Reply"));
> +            }
> +        };
> +    }
> +}
>
>



-- 
Claus Ibsen
-----------------
Red Hat, Inc.
FuseSource is now part of Red Hat
Email: cib...@redhat.com
Web: http://fusesource.com
Twitter: davsclaus
Blog: http://davsclaus.com
Author of Camel in Action: http://www.manning.com/ibsen

Reply via email to