Hi

I think we should honor the option replyToCacheLevelName, so people
can configure the cache level.
Now you hardcoded it to consumer.

Some brokers may not work well with that. So giving end users the
option to set the replyToCacheLevelName is better.
The default can still be cache consumer.



On Thu, Jan 10, 2013 at 12:49 AM,  <ra...@apache.org> wrote:
> Author: raulk
> Date: Wed Jan  9 23:49:26 2013
> New Revision: 1431152
>
> URL: http://svn.apache.org/viewvc?rev=1431152&view=rev
> Log:
> CAMEL-5865 Enhanced concurrent consumers support for JMS producers using Temp 
> Reply Queue for replies
>
> Added:
>     
> camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyTempQueueMultipleConsumersTest.java
> Modified:
>     
> camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java
>
> Modified: 
> camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java
> URL: 
> http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java?rev=1431152&r1=1431151&r2=1431152&view=diff
> ==============================================================================
> --- 
> camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java
>  (original)
> +++ 
> camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java
>  Wed Jan  9 23:49:26 2013
> @@ -16,7 +16,10 @@
>   */
>  package org.apache.camel.component.jms.reply;
>
> +import java.util.concurrent.atomic.AtomicBoolean;
> +
>  import javax.jms.Destination;
> +import javax.jms.ExceptionListener;
>  import javax.jms.JMSException;
>  import javax.jms.Message;
>  import javax.jms.Session;
> @@ -37,11 +40,23 @@ import org.springframework.jms.support.d
>   * @version
>   */
>  public class TemporaryQueueReplyManager extends ReplyManagerSupport {
> -
> +
> +    final TemporaryReplyQueueDestinationResolver destResolver = new 
> TemporaryReplyQueueDestinationResolver();
> +
>      public TemporaryQueueReplyManager(CamelContext camelContext) {
>          super(camelContext);
>      }
>
> +    @Override
> +    public Destination getReplyTo() {
> +        try {
> +            destResolver.destinationReady();
> +        } catch (InterruptedException e) {
> +            log.warn("Interrupted while waiting for JMSReplyTo destination 
> refresh", e);
> +        }
> +        return super.getReplyTo();
> +    }
> +
>      public String registerReply(ReplyManager replyManager, Exchange 
> exchange, AsyncCallback callback,
>                                  String originalCorrelationId, String 
> correlationId, long requestTimeout) {
>          // add to correlation map
> @@ -90,15 +105,7 @@ public class TemporaryQueueReplyManager
>          DefaultMessageListenerContainer answer = new 
> DefaultJmsMessageListenerContainer(endpoint);
>
>          answer.setDestinationName("temporary");
> -        answer.setDestinationResolver(new DestinationResolver() {
> -            public Destination resolveDestinationName(Session session, 
> String destinationName,
> -                                                      boolean pubSubDomain) 
> throws JMSException {
> -                // use a temporary queue to gather the reply message
> -                TemporaryQueue queue = session.createTemporaryQueue();
> -                setReplyTo(queue);
> -                return queue;
> -            }
> -        });
> +        answer.setDestinationResolver(destResolver);
>          answer.setAutoStartup(true);
>          if (endpoint.getMaxMessagesPerTask() >= 0) {
>              answer.setMaxMessagesPerTask(endpoint.getMaxMessagesPerTask());
> @@ -113,6 +120,9 @@ public class TemporaryQueueReplyManager
>              
> answer.setMaxConcurrentConsumers(endpoint.getMaxConcurrentConsumers());
>          }
>          answer.setConnectionFactory(endpoint.getConnectionFactory());
> +        // we use CACHE_CONSUMER to cling to the consumer as long as we can, 
> since we can only consume
> +        // msgs from the JMS Connection that created the temp destination in 
> the first place
> +        answer.setCacheLevel(DefaultMessageListenerContainer.CACHE_CONSUMER);
>          String clientId = endpoint.getClientId();
>          if (clientId != null) {
>              clientId += ".CamelReplyManager";
> @@ -121,11 +131,10 @@ public class TemporaryQueueReplyManager
>
>          // we cannot do request-reply over JMS with transaction
>          answer.setSessionTransacted(false);
> -
> +
>          // other optional properties
> -        if (endpoint.getExceptionListener() != null) {
> -            answer.setExceptionListener(endpoint.getExceptionListener());
> -        }
> +        answer.setExceptionListener(new 
> TemporaryReplyQueueExceptionListener(destResolver, 
> endpoint.getExceptionListener()));
> +
>          if (endpoint.getErrorHandler() != null) {
>              answer.setErrorHandler(endpoint.getErrorHandler());
>          } else {
> @@ -144,8 +153,9 @@ public class TemporaryQueueReplyManager
>              answer.setTaskExecutor(endpoint.getTaskExecutor());
>          }
>
> -        // setup a bean name which is used ny Spring JMS as the thread name
> -        String name = "TemporaryQueueReplyManager[" + 
> answer.getDestinationName() + "]";
> +        // setup a bean name which is used by Spring JMS as the thread name
> +        // use the name of the request destination
> +        String name = "TemporaryQueueReplyManager[" + 
> endpoint.getDestinationName() + "]";
>          answer.setBeanName(name);
>
>          if (answer.getConcurrentConsumers() > 1) {
> @@ -156,4 +166,60 @@ public class TemporaryQueueReplyManager
>          return answer;
>      }
>
> +    private final class TemporaryReplyQueueExceptionListener implements 
> ExceptionListener {
> +        private final TemporaryReplyQueueDestinationResolver destResolver;
> +        private final ExceptionListener delegate;
> +
> +        private 
> TemporaryReplyQueueExceptionListener(TemporaryReplyQueueDestinationResolver 
> destResolver,
> +                ExceptionListener delegate) {
> +            this.destResolver = destResolver;
> +            this.delegate = delegate;
> +        }
> +
> +        @Override
> +        public void onException(JMSException exception) {
> +            // capture exceptions, and schedule a refresh of the ReplyTo 
> destination
> +            log.warn("Exception inside the DMLC for Temporary ReplyTo Queue 
> for destination " + endpoint.getDestinationName() +
> +                       ", refreshing ReplyTo destination", exception);
> +            destResolver.scheduleRefresh();
> +            // serve as a proxy for any exception listener the user may have 
> set explicitly
> +            if (delegate != null) {
> +                delegate.onException(exception);
> +            }
> +        }
> +
> +    }
> +
> +    private final class TemporaryReplyQueueDestinationResolver implements 
> DestinationResolver {
> +        private TemporaryQueue queue;
> +        private AtomicBoolean refreshWanted = new AtomicBoolean(false);
> +
> +        public Destination resolveDestinationName(Session session, String 
> destinationName, boolean pubSubDomain)
> +                throws JMSException {
> +            // use a temporary queue to gather the reply message
> +            synchronized (refreshWanted) {
> +                if (queue == null || refreshWanted.compareAndSet(true, 
> false)) {
> +                    queue = session.createTemporaryQueue();
> +                    setReplyTo(queue);
> +                    log.debug("Refreshed Temporary ReplyTo Queue. New queue: 
> " + queue.getQueueName());
> +                    refreshWanted.notifyAll();
> +                }
> +            }
> +            return queue;
> +        }
> +
> +        public void scheduleRefresh() {
> +            refreshWanted.set(true);
> +        }
> +
> +        public void destinationReady() throws InterruptedException {
> +            if (refreshWanted.get()) {
> +                synchronized (refreshWanted) {
> +                    log.debug("Waiting for new Temp ReplyTo destination to 
> be assigned to continue");
> +                    refreshWanted.wait();
> +                }
> +            }
> +        }
> +    }
> +
>  }
>
> Added: 
> camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyTempQueueMultipleConsumersTest.java
> URL: 
> http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyTempQueueMultipleConsumersTest.java?rev=1431152&view=auto
> ==============================================================================
> --- 
> camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyTempQueueMultipleConsumersTest.java
>  (added)
> +++ 
> camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyTempQueueMultipleConsumersTest.java
>  Wed Jan  9 23:49:26 2013
> @@ -0,0 +1,143 @@
> +/**
> + * Licensed to the Apache Software Foundation (ASF) under one or more
> + * contributor license agreements.  See the NOTICE file distributed with
> + * this work for additional information regarding copyright ownership.
> + * The ASF licenses this file to You under the Apache License, Version 2.0
> + * (the "License"); you may not use this file except in compliance with
> + * the License.  You may obtain a copy of the License at
> + *
> + *      http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +package org.apache.camel.component.jms;
> +
> +import java.util.Map;
> +import java.util.concurrent.Callable;
> +import java.util.concurrent.ConcurrentHashMap;
> +import java.util.concurrent.ExecutorService;
> +import java.util.concurrent.Executors;
> +import java.util.concurrent.atomic.AtomicInteger;
> +
> +import javax.jms.ConnectionFactory;
> +
> +import org.apache.activemq.ActiveMQConnectionFactory;
> +import org.apache.activemq.broker.BrokerService;
> +import org.apache.activemq.broker.TransportConnection;
> +import org.apache.activemq.pool.PooledConnectionFactory;
> +import org.apache.camel.CamelContext;
> +import org.apache.camel.Exchange;
> +import org.apache.camel.Processor;
> +import org.apache.camel.builder.RouteBuilder;
> +import org.apache.camel.test.junit4.CamelTestSupport;
> +import org.junit.Test;
> +
> +import static 
> org.apache.camel.component.jms.JmsComponent.jmsComponentAutoAcknowledge;
> +
> +/**
> + * Reliability tests for JMS TempQueue Reply Manager with multiple consumers.
> + * @version
> + */
> +public class JmsRequestReplyTempQueueMultipleConsumersTest extends 
> CamelTestSupport {
> +
> +    private Map<String, AtomicInteger> msgsPerThread = new 
> ConcurrentHashMap<String, AtomicInteger>();
> +    private BrokerService broker;
> +    private PooledConnectionFactory connectionFactory;
> +
> +    @Test
> +    public void testMultipleConsumingThreads() throws Exception {
> +        doSendMessages(1000, 5);
> +        assertTrue("Expected multiple consuming threads, but only found: " + 
>  msgsPerThread.keySet().size(),
> +                msgsPerThread.keySet().size() > 1);
> +    }
> +
> +    @Test
> +    public void testTempQueueRefreshed() throws Exception {
> +        doSendMessages(500, 5);
> +        connectionFactory.clear();
> +        doSendMessages(100, 5);
> +        connectionFactory.clear();
> +        doSendMessages(100, 5);
> +        connectionFactory.clear();
> +        doSendMessages(100, 5);
> +        connectionFactory.clear();
> +        doSendMessages(100, 5);
> +        connectionFactory.clear();
> +        doSendMessages(100, 5);
> +    }
> +
> +    private void doSendMessages(int files, int poolSize) throws Exception {
> +        getMockEndpoint("mock:result").expectedMessageCount(files);
> +        getMockEndpoint("mock:result").expectsNoDuplicates(body());
> +
> +        ExecutorService executor = Executors.newFixedThreadPool(poolSize);
> +        for (int i = 0; i < files; i++) {
> +            final int index = i;
> +            executor.submit(new Callable<Object>() {
> +                public Object call() throws Exception {
> +                    template.sendBody("seda:start", "Message " + index);
> +                    return null;
> +                }
> +            });
> +        }
> +
> +        assertMockEndpointsSatisfied();
> +        resetMocks();
> +        executor.shutdownNow();
> +    }
> +
> +    public void startBroker() throws Exception {
> +        String brokerName = "test-broker-" + System.currentTimeMillis();
> +        String brokerUri = "vm://" + brokerName;
> +        broker = new BrokerService();
> +        broker.setBrokerName(brokerName);
> +        broker.setBrokerId(brokerName);
> +        broker.addConnector(brokerUri);
> +        broker.setPersistent(false);
> +        broker.start();
> +    }
> +
> +    protected CamelContext createCamelContext() throws Exception {
> +        CamelContext camelContext = super.createCamelContext();
> +        //startBroker();
> +
> +        connectionFactory = (PooledConnectionFactory) 
> CamelJmsTestHelper.createConnectionFactory();
> +        camelContext.addComponent("jms", 
> jmsComponentAutoAcknowledge(connectionFactory));
> +
> +        return camelContext;
> +    }
> +
> +    @Override
> +    protected RouteBuilder createRouteBuilder() throws Exception {
> +        return new RouteBuilder() {
> +            @Override
> +            public void configure() throws Exception {
> +                from("seda:start")
> +                    
> .inOut("jms:queue:foo?concurrentConsumers=10&maxConcurrentConsumers=20&recoveryInterval=10")
> +                    .process(new Processor() {
> +                        @Override
> +                        public void process(Exchange exchange) throws 
> Exception {
> +                            String threadName = 
> Thread.currentThread().getName();
> +                            synchronized (msgsPerThread) {
> +                               AtomicInteger count = 
> msgsPerThread.get(threadName);
> +                               if (count == null) {
> +                                   count = new AtomicInteger(0);
> +                                   msgsPerThread.put(threadName, count);
> +                               }
> +                               count.incrementAndGet();
> +                            }
> +                        }
> +                    })
> +                    .to("mock:result");
> +
> +                
> from("jms:queue:foo?concurrentConsumers=20&recoveryInterval=10")
> +                    .setBody(simple("Reply >>> ${body}"));
> +            }
> +        };
> +    }
> +
> +}
>
>



-- 
Claus Ibsen
-----------------
Red Hat, Inc.
FuseSource is now part of Red Hat
Email: cib...@redhat.com
Web: http://fusesource.com
Twitter: davsclaus
Blog: http://davsclaus.com
Author of Camel in Action: http://www.manning.com/ibsen

Reply via email to