Ah, I overlooked this option. I'll fix it in a new commit soon. Thanks for the report, Claus!
Raúl. On Thu, Jan 10, 2013 at 12:15 PM, Claus Ibsen <claus.ib...@gmail.com> wrote: > Hi > > I think we should honor the option replyToCacheLevelName, so people > can configure the cache level. > Now you hardcoded it to consumer. > > Some brokers may not work well with that. So giving end users the > option to set the replyToCacheLevelName is better. > The default can still be cache consumer. > > > > On Thu, Jan 10, 2013 at 12:49 AM, <ra...@apache.org> wrote: > > Author: raulk > > Date: Wed Jan 9 23:49:26 2013 > > New Revision: 1431152 > > > > URL: http://svn.apache.org/viewvc?rev=1431152&view=rev > > Log: > > CAMEL-5865 Enhanced concurrent consumers support for JMS producers using > Temp Reply Queue for replies > > > > Added: > > > camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyTempQueueMultipleConsumersTest.java > > Modified: > > > camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java > > > > Modified: > camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java > > URL: > http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java?rev=1431152&r1=1431151&r2=1431152&view=diff > > > ============================================================================== > > --- > camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java > (original) > > +++ > camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java > Wed Jan 9 23:49:26 2013 > > @@ -16,7 +16,10 @@ > > */ > > package org.apache.camel.component.jms.reply; > > > > +import java.util.concurrent.atomic.AtomicBoolean; > > + > > import javax.jms.Destination; > > +import javax.jms.ExceptionListener; > > import javax.jms.JMSException; > > import javax.jms.Message; > > import javax.jms.Session; > > @@ -37,11 +40,23 @@ import org.springframework.jms.support.d > > * @version > > */ > > public class TemporaryQueueReplyManager extends ReplyManagerSupport { > > - > > + > > + final TemporaryReplyQueueDestinationResolver destResolver = new > TemporaryReplyQueueDestinationResolver(); > > + > > public TemporaryQueueReplyManager(CamelContext camelContext) { > > super(camelContext); > > } > > > > + @Override > > + public Destination getReplyTo() { > > + try { > > + destResolver.destinationReady(); > > + } catch (InterruptedException e) { > > + log.warn("Interrupted while waiting for JMSReplyTo > destination refresh", e); > > + } > > + return super.getReplyTo(); > > + } > > + > > public String registerReply(ReplyManager replyManager, Exchange > exchange, AsyncCallback callback, > > String originalCorrelationId, String > correlationId, long requestTimeout) { > > // add to correlation map > > @@ -90,15 +105,7 @@ public class TemporaryQueueReplyManager > > DefaultMessageListenerContainer answer = new > DefaultJmsMessageListenerContainer(endpoint); > > > > answer.setDestinationName("temporary"); > > - answer.setDestinationResolver(new DestinationResolver() { > > - public Destination resolveDestinationName(Session session, > String destinationName, > > - boolean > pubSubDomain) throws JMSException { > > - // use a temporary queue to gather the reply message > > - TemporaryQueue queue = session.createTemporaryQueue(); > > - setReplyTo(queue); > > - return queue; > > - } > > - }); > > + answer.setDestinationResolver(destResolver); > > answer.setAutoStartup(true); > > if (endpoint.getMaxMessagesPerTask() >= 0) { > > > answer.setMaxMessagesPerTask(endpoint.getMaxMessagesPerTask()); > > @@ -113,6 +120,9 @@ public class TemporaryQueueReplyManager > > > answer.setMaxConcurrentConsumers(endpoint.getMaxConcurrentConsumers()); > > } > > answer.setConnectionFactory(endpoint.getConnectionFactory()); > > + // we use CACHE_CONSUMER to cling to the consumer as long as we > can, since we can only consume > > + // msgs from the JMS Connection that created the temp > destination in the first place > > + > answer.setCacheLevel(DefaultMessageListenerContainer.CACHE_CONSUMER); > > String clientId = endpoint.getClientId(); > > if (clientId != null) { > > clientId += ".CamelReplyManager"; > > @@ -121,11 +131,10 @@ public class TemporaryQueueReplyManager > > > > // we cannot do request-reply over JMS with transaction > > answer.setSessionTransacted(false); > > - > > + > > // other optional properties > > - if (endpoint.getExceptionListener() != null) { > > - > answer.setExceptionListener(endpoint.getExceptionListener()); > > - } > > + answer.setExceptionListener(new > TemporaryReplyQueueExceptionListener(destResolver, > endpoint.getExceptionListener())); > > + > > if (endpoint.getErrorHandler() != null) { > > answer.setErrorHandler(endpoint.getErrorHandler()); > > } else { > > @@ -144,8 +153,9 @@ public class TemporaryQueueReplyManager > > answer.setTaskExecutor(endpoint.getTaskExecutor()); > > } > > > > - // setup a bean name which is used ny Spring JMS as the thread > name > > - String name = "TemporaryQueueReplyManager[" + > answer.getDestinationName() + "]"; > > + // setup a bean name which is used by Spring JMS as the thread > name > > + // use the name of the request destination > > + String name = "TemporaryQueueReplyManager[" + > endpoint.getDestinationName() + "]"; > > answer.setBeanName(name); > > > > if (answer.getConcurrentConsumers() > 1) { > > @@ -156,4 +166,60 @@ public class TemporaryQueueReplyManager > > return answer; > > } > > > > + private final class TemporaryReplyQueueExceptionListener implements > ExceptionListener { > > + private final TemporaryReplyQueueDestinationResolver > destResolver; > > + private final ExceptionListener delegate; > > + > > + private > TemporaryReplyQueueExceptionListener(TemporaryReplyQueueDestinationResolver > destResolver, > > + ExceptionListener delegate) { > > + this.destResolver = destResolver; > > + this.delegate = delegate; > > + } > > + > > + @Override > > + public void onException(JMSException exception) { > > + // capture exceptions, and schedule a refresh of the > ReplyTo destination > > + log.warn("Exception inside the DMLC for Temporary ReplyTo > Queue for destination " + endpoint.getDestinationName() + > > + ", refreshing ReplyTo destination", exception); > > + destResolver.scheduleRefresh(); > > + // serve as a proxy for any exception listener the user may > have set explicitly > > + if (delegate != null) { > > + delegate.onException(exception); > > + } > > + } > > + > > + } > > + > > + private final class TemporaryReplyQueueDestinationResolver > implements DestinationResolver { > > + private TemporaryQueue queue; > > + private AtomicBoolean refreshWanted = new AtomicBoolean(false); > > + > > + public Destination resolveDestinationName(Session session, > String destinationName, boolean pubSubDomain) > > + throws JMSException { > > + // use a temporary queue to gather the reply message > > + synchronized (refreshWanted) { > > + if (queue == null || refreshWanted.compareAndSet(true, > false)) { > > + queue = session.createTemporaryQueue(); > > + setReplyTo(queue); > > + log.debug("Refreshed Temporary ReplyTo Queue. New > queue: " + queue.getQueueName()); > > + refreshWanted.notifyAll(); > > + } > > + } > > + return queue; > > + } > > + > > + public void scheduleRefresh() { > > + refreshWanted.set(true); > > + } > > + > > + public void destinationReady() throws InterruptedException { > > + if (refreshWanted.get()) { > > + synchronized (refreshWanted) { > > + log.debug("Waiting for new Temp ReplyTo destination > to be assigned to continue"); > > + refreshWanted.wait(); > > + } > > + } > > + } > > + } > > + > > } > > > > Added: > camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyTempQueueMultipleConsumersTest.java > > URL: > http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyTempQueueMultipleConsumersTest.java?rev=1431152&view=auto > > > ============================================================================== > > --- > camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyTempQueueMultipleConsumersTest.java > (added) > > +++ > camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyTempQueueMultipleConsumersTest.java > Wed Jan 9 23:49:26 2013 > > @@ -0,0 +1,143 @@ > > +/** > > + * Licensed to the Apache Software Foundation (ASF) under one or more > > + * contributor license agreements. See the NOTICE file distributed with > > + * this work for additional information regarding copyright ownership. > > + * The ASF licenses this file to You under the Apache License, Version > 2.0 > > + * (the "License"); you may not use this file except in compliance with > > + * the License. You may obtain a copy of the License at > > + * > > + * http://www.apache.org/licenses/LICENSE-2.0 > > + * > > + * Unless required by applicable law or agreed to in writing, software > > + * distributed under the License is distributed on an "AS IS" BASIS, > > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or > implied. > > + * See the License for the specific language governing permissions and > > + * limitations under the License. > > + */ > > +package org.apache.camel.component.jms; > > + > > +import java.util.Map; > > +import java.util.concurrent.Callable; > > +import java.util.concurrent.ConcurrentHashMap; > > +import java.util.concurrent.ExecutorService; > > +import java.util.concurrent.Executors; > > +import java.util.concurrent.atomic.AtomicInteger; > > + > > +import javax.jms.ConnectionFactory; > > + > > +import org.apache.activemq.ActiveMQConnectionFactory; > > +import org.apache.activemq.broker.BrokerService; > > +import org.apache.activemq.broker.TransportConnection; > > +import org.apache.activemq.pool.PooledConnectionFactory; > > +import org.apache.camel.CamelContext; > > +import org.apache.camel.Exchange; > > +import org.apache.camel.Processor; > > +import org.apache.camel.builder.RouteBuilder; > > +import org.apache.camel.test.junit4.CamelTestSupport; > > +import org.junit.Test; > > + > > +import static > org.apache.camel.component.jms.JmsComponent.jmsComponentAutoAcknowledge; > > + > > +/** > > + * Reliability tests for JMS TempQueue Reply Manager with multiple > consumers. > > + * @version > > + */ > > +public class JmsRequestReplyTempQueueMultipleConsumersTest extends > CamelTestSupport { > > + > > + private Map<String, AtomicInteger> msgsPerThread = new > ConcurrentHashMap<String, AtomicInteger>(); > > + private BrokerService broker; > > + private PooledConnectionFactory connectionFactory; > > + > > + @Test > > + public void testMultipleConsumingThreads() throws Exception { > > + doSendMessages(1000, 5); > > + assertTrue("Expected multiple consuming threads, but only > found: " + msgsPerThread.keySet().size(), > > + msgsPerThread.keySet().size() > 1); > > + } > > + > > + @Test > > + public void testTempQueueRefreshed() throws Exception { > > + doSendMessages(500, 5); > > + connectionFactory.clear(); > > + doSendMessages(100, 5); > > + connectionFactory.clear(); > > + doSendMessages(100, 5); > > + connectionFactory.clear(); > > + doSendMessages(100, 5); > > + connectionFactory.clear(); > > + doSendMessages(100, 5); > > + connectionFactory.clear(); > > + doSendMessages(100, 5); > > + } > > + > > + private void doSendMessages(int files, int poolSize) throws > Exception { > > + getMockEndpoint("mock:result").expectedMessageCount(files); > > + getMockEndpoint("mock:result").expectsNoDuplicates(body()); > > + > > + ExecutorService executor = > Executors.newFixedThreadPool(poolSize); > > + for (int i = 0; i < files; i++) { > > + final int index = i; > > + executor.submit(new Callable<Object>() { > > + public Object call() throws Exception { > > + template.sendBody("seda:start", "Message " + index); > > + return null; > > + } > > + }); > > + } > > + > > + assertMockEndpointsSatisfied(); > > + resetMocks(); > > + executor.shutdownNow(); > > + } > > + > > + public void startBroker() throws Exception { > > + String brokerName = "test-broker-" + System.currentTimeMillis(); > > + String brokerUri = "vm://" + brokerName; > > + broker = new BrokerService(); > > + broker.setBrokerName(brokerName); > > + broker.setBrokerId(brokerName); > > + broker.addConnector(brokerUri); > > + broker.setPersistent(false); > > + broker.start(); > > + } > > + > > + protected CamelContext createCamelContext() throws Exception { > > + CamelContext camelContext = super.createCamelContext(); > > + //startBroker(); > > + > > + connectionFactory = (PooledConnectionFactory) > CamelJmsTestHelper.createConnectionFactory(); > > + camelContext.addComponent("jms", > jmsComponentAutoAcknowledge(connectionFactory)); > > + > > + return camelContext; > > + } > > + > > + @Override > > + protected RouteBuilder createRouteBuilder() throws Exception { > > + return new RouteBuilder() { > > + @Override > > + public void configure() throws Exception { > > + from("seda:start") > > + > > .inOut("jms:queue:foo?concurrentConsumers=10&maxConcurrentConsumers=20&recoveryInterval=10") > > + .process(new Processor() { > > + @Override > > + public void process(Exchange exchange) throws > Exception { > > + String threadName = > Thread.currentThread().getName(); > > + synchronized (msgsPerThread) { > > + AtomicInteger count = > msgsPerThread.get(threadName); > > + if (count == null) { > > + count = new AtomicInteger(0); > > + msgsPerThread.put(threadName, count); > > + } > > + count.incrementAndGet(); > > + } > > + } > > + }) > > + .to("mock:result"); > > + > > + > from("jms:queue:foo?concurrentConsumers=20&recoveryInterval=10") > > + .setBody(simple("Reply >>> ${body}")); > > + } > > + }; > > + } > > + > > +} > > > > > > > > -- > Claus Ibsen > ----------------- > Red Hat, Inc. > FuseSource is now part of Red Hat > Email: cib...@redhat.com > Web: http://fusesource.com > Twitter: davsclaus > Blog: http://davsclaus.com > Author of Camel in Action: http://www.manning.com/ibsen >