[ 
https://issues.apache.org/activemq/browse/CAMEL-1201?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=48331#action_48331
 ] 

Claus Ibsen commented on CAMEL-1201:
------------------------------------

Could you try on the trunk? I recall that Martin Krasser has submitted a great 
patch to fix the BatchProcessor that is the core for the resequencer, 
aggregator etc. It should fix adding a competing consumer, so that could 
explain the dropped messages.

> Messages dropped with ActiveMQ/JMS component and Resequencer pattern 
> ---------------------------------------------------------------------
>
>                 Key: CAMEL-1201
>                 URL: https://issues.apache.org/activemq/browse/CAMEL-1201
>             Project: Apache Camel
>          Issue Type: Bug
>          Components: camel-activemq, camel-jms, eip
>    Affects Versions: 1.5.0
>            Reporter: Bruce Snyder
>            Assignee: Hadrian Zbarcea
>
> I've created a simple class to test the resequencer as listed below: 
> {code}
> public final class MyCamelResequencer {
>     private static final Logger LOG = 
> Logger.getLogger(MyCamelResequencer.class);
>     private MyCamelResequencer() {}
>     public static void main(String args[]) throws Exception {
>         CamelContext context = new DefaultCamelContext();
>         Tracer tracer = new Tracer();
>         tracer.getFormatter().setShowBreadCrumb(false);
>         tracer.getFormatter().setShowNode(true);
>         context.addInterceptStrategy(tracer);
>        
>         String brokerUrl = "vm://localhost?broker.persistent=false";
> //        String brokerUrl = "tcp://localhost:61616";
>         context.addComponent("activemq", 
> ActiveMQComponent.activeMQComponent(brokerUrl));
>         context.addRoutes(new RouteBuilder() {
>             public void configure() {
> //                from("seda:FOO").to("log:PLANETS");
> //                
> from("seda:FOO").resequencer(header("seqnum")).to("log:PLANETS");
> //                from("activemq:TEST.IN").to("log:PLANETS");
>                 
> from("activemq:TEST.IN").resequencer(header("seqnum")).to("log:PLANETS");
> //                
> from("activemq:TEST.IN").resequencer(header("seq")).to("activemq:TEST.OUT");
>             }
>         });
>         ProducerTemplate template = context.createProducerTemplate();
>         context.start();
>         List<String> planets = new ArrayList(8);
>         planets.add("Mercury");
>         planets.add("Venus");
>         planets.add("Earth");
>         planets.add("Mars");
>         planets.add("Jupiter");
>         planets.add("Saturn");
>         planets.add("Uranus");
>         planets.add("Neptune");
>         
>         List<Integer> numbers = new ArrayList<Integer>(8);
>         numbers.add(1);
>         numbers.add(2);
>         numbers.add(3);
>         numbers.add(4);
>         numbers.add(5);
>         numbers.add(6);
>         numbers.add(7);
>         numbers.add(8);
>         
>         for (String planet: planets) {
>             Collections.shuffle(numbers);
>             int seqnum = numbers.remove(0);
>             String message = "I am the planet " + planet + " with seqnum: " + 
> seqnum;
>             LOG.info("Sending message: " + message);
>             template.sendBodyAndHeader("activemq:TEST.IN", message, "seqnum", 
> seqnum);
> //            template.sendBodyAndHeader("seda:FOO", message, "seqnum", 
> seqnum);
>         }
>         Thread.sleep(10000);
>         LOG.info("Shutting down the Camel context");
>         context.stop();
>         System.exit(1);
>     }
> {code}
> The following are my observations: 
> * SEDA -> log receives all messages 
> * SEDA -> resequencer -> log receives all messages 
> * ActiveMQ -> log receives all messages 
> * ActiveMQ -> resequencer -> log does not receive all messages 
> Upon enabling the Tracer interceptor, I'm able to see that it doesn't even 
> receive all messages from the ActiveMQ component: 
> {panel}
> [                          main] DefaultCamelContext            INFO  JMX 
> enabled. Using InstrumentationLifecycleStrategy.
> [                          main] BrokerService                  INFO  Using 
> Persistence Adapter: MemoryPersistenceAdapter
> [                          main] BrokerService                  INFO  
> ActiveMQ null JMS Message Broker (localhost) is starting
> [                          main] BrokerService                  INFO  For 
> help or more information please see: http://activemq.apache.org/
> [                 JMX connector] ManagementContext              WARN  Failed 
> to start jmx connector: Cannot bind to URL [rmi://localhost:1099/jmxrmi]: 
> javax.naming.NameAlreadyBoundException: jmxrmi [Root exception is 
> java.rmi.AlreadyBoundException: jmxrmi]
> [                          main] BrokerService                  INFO  
> ActiveMQ JMS Message Broker (localhost, 
> ID:174-155-37-223.pools.spcsdns.net-58792-1229973871555-0:0) started
> [                          main] TransportConnector             INFO  
> Connector vm://localhost Started
> [                          main] MyCamelResequencer             INFO  Sending 
> message: I am the planet Mercury with seqnum: 1
> [                          main] MyCamelResequencer             INFO  Sending 
> message: I am the planet Venus with seqnum: 3
> [                          main] MyCamelResequencer             INFO  Sending 
> message: I am the planet Earth with seqnum: 6
> [                          main] MyCamelResequencer             INFO  Sending 
> message: I am the planet Mars with seqnum: 2
> [                          main] MyCamelResequencer             INFO  Sending 
> message: I am the planet Jupiter with seqnum: 4
> [                          main] MyCamelResequencer             INFO  Sending 
> message: I am the planet Saturn with seqnum: 8
> [                          main] MyCamelResequencer             INFO  Sending 
> message: I am the planet Uranus with seqnum: 7
> [                          main] MyCamelResequencer             INFO  Sending 
> message: I am the planet Neptune with seqnum: 5
> [aultMessageListenerContainer-1] TraceInterceptor               INFO  -> 
> interceptor1 
> Interceptor[Delegate(DeadLetterChannel[Delegate(TraceInterceptor[Resequencer[ 
> [] -> [To[log:PLANETS]]]]), 
> RecipientList[log:org.apache.camel.DeadLetterChannel?level=error], 
> RedeliveryPolicy[maximumRedeliveries=6]])] InOnly Properties:{} 
> Headers:{JMSDestination=queue://TEST.IN, JMSTimestamp=1229973871918, 
> JMSRedelivered=false, JMSExpiration=0, JMSType=null, seqnum=1, 
> JMSXGroupID=null, 
> JMSMessageID=ID:174-155-37-223.pools.spcsdns.net-58792-1229973871555-2:1:1:1:1,
>  JMSDeliveryMode=2, JMSCorrelationID=null, JMSReplyTo=null, JMSPriority=4} 
> BodyType:String Body:I am the planet Mercury with seqnum: 1
> [aultMessageListenerContainer-1] TraceInterceptor               INFO  -> 
> resequencer1 Resequencer[ [] -> [To[log:PLANETS]]] InOnly 
> Properties:{CamelCauseException=null} 
> Headers:{JMSDestination=queue://TEST.IN, JMSTimestamp=1229973871918, 
> JMSRedelivered=false, JMSExpiration=0, JMSType=null, seqnum=1, 
> JMSXGroupID=null, 
> JMSMessageID=ID:174-155-37-223.pools.spcsdns.net-58792-1229973871555-2:1:1:1:1,
>  JMSDeliveryMode=2, JMSCorrelationID=null, JMSReplyTo=null, JMSPriority=4} 
> BodyType:String Body:I am the planet Mercury with seqnum: 1
> [aultMessageListenerContainer-2] TraceInterceptor               INFO  -> 
> interceptor1 
> Interceptor[Delegate(DeadLetterChannel[Delegate(TraceInterceptor[Resequencer[ 
> [] -> [To[log:PLANETS]]]]), 
> RecipientList[log:org.apache.camel.DeadLetterChannel?level=error], 
> RedeliveryPolicy[maximumRedeliveries=6]])] InOnly Properties:{} 
> Headers:{JMSDestination=queue://TEST.IN, JMSTimestamp=1229973871930, 
> JMSRedelivered=false, JMSExpiration=0, JMSType=null, seqnum=6, 
> JMSXGroupID=null, 
> JMSMessageID=ID:174-155-37-223.pools.spcsdns.net-58792-1229973871555-2:1:1:1:3,
>  JMSDeliveryMode=2, JMSCorrelationID=null, JMSReplyTo=null, JMSPriority=4} 
> BodyType:String Body:I am the planet Earth with seqnum: 6
> [aultMessageListenerContainer-2] TraceInterceptor               INFO  -> 
> resequencer1 Resequencer[ [] -> [To[log:PLANETS]]] InOnly 
> Properties:{CamelCauseException=null} 
> Headers:{JMSDestination=queue://TEST.IN, JMSTimestamp=1229973871930, 
> JMSRedelivered=false, JMSExpiration=0, JMSType=null, seqnum=6, 
> JMSXGroupID=null, 
> JMSMessageID=ID:174-155-37-223.pools.spcsdns.net-58792-1229973871555-2:1:1:1:3,
>  JMSDeliveryMode=2, JMSCorrelationID=null, JMSReplyTo=null, JMSPriority=4} 
> BodyType:String Body:I am the planet Earth with seqnum: 6
> [aultMessageListenerContainer-3] TraceInterceptor               INFO  -> 
> interceptor1 
> Interceptor[Delegate(DeadLetterChannel[Delegate(TraceInterceptor[Resequencer[ 
> [] -> [To[log:PLANETS]]]]), 
> RecipientList[log:org.apache.camel.DeadLetterChannel?level=error], 
> RedeliveryPolicy[maximumRedeliveries=6]])] InOnly Properties:{} 
> Headers:{JMSDestination=queue://TEST.IN, JMSTimestamp=1229973871930, 
> JMSRedelivered=true, JMSExpiration=0, JMSType=null, seqnum=2, 
> JMSXGroupID=null, 
> JMSMessageID=ID:174-155-37-223.pools.spcsdns.net-58792-1229973871555-2:1:1:1:4,
>  JMSDeliveryMode=2, JMSCorrelationID=null, JMSReplyTo=null, JMSPriority=4} 
> BodyType:String Body:I am the planet Mars with seqnum: 2
> [aultMessageListenerContainer-3] TraceInterceptor               INFO  -> 
> resequencer1 Resequencer[ [] -> [To[log:PLANETS]]] InOnly 
> Properties:{CamelCauseException=null} 
> Headers:{JMSDestination=queue://TEST.IN, JMSTimestamp=1229973871930, 
> JMSRedelivered=true, JMSExpiration=0, JMSType=null, seqnum=2, 
> JMSXGroupID=null, 
> JMSMessageID=ID:174-155-37-223.pools.spcsdns.net-58792-1229973871555-2:1:1:1:4,
>  JMSDeliveryMode=2, JMSCorrelationID=null, JMSReplyTo=null, JMSPriority=4} 
> BodyType:String Body:I am the planet Mars with seqnum: 2
> [aultMessageListenerContainer-4] TraceInterceptor               INFO  -> 
> interceptor1 
> Interceptor[Delegate(DeadLetterChannel[Delegate(TraceInterceptor[Resequencer[ 
> [] -> [To[log:PLANETS]]]]), 
> RecipientList[log:org.apache.camel.DeadLetterChannel?level=error], 
> RedeliveryPolicy[maximumRedeliveries=6]])] InOnly Properties:{} 
> Headers:{JMSDestination=queue://TEST.IN, JMSTimestamp=1229973871936, 
> JMSRedelivered=false, JMSExpiration=0, JMSType=null, seqnum=4, 
> JMSXGroupID=null, 
> JMSMessageID=ID:174-155-37-223.pools.spcsdns.net-58792-1229973871555-2:1:1:1:5,
>  JMSDeliveryMode=2, JMSCorrelationID=null, JMSReplyTo=null, JMSPriority=4} 
> BodyType:String Body:I am the planet Jupiter with seqnum: 4
> [aultMessageListenerContainer-4] TraceInterceptor               INFO  -> 
> resequencer1 Resequencer[ [] -> [To[log:PLANETS]]] InOnly 
> Properties:{CamelCauseException=null} 
> Headers:{JMSDestination=queue://TEST.IN, JMSTimestamp=1229973871936, 
> JMSRedelivered=false, JMSExpiration=0, JMSType=null, seqnum=4, 
> JMSXGroupID=null, 
> JMSMessageID=ID:174-155-37-223.pools.spcsdns.net-58792-1229973871555-2:1:1:1:5,
>  JMSDeliveryMode=2, JMSCorrelationID=null, JMSReplyTo=null, JMSPriority=4} 
> BodyType:String Body:I am the planet Jupiter with seqnum: 4
> [aultMessageListenerContainer-5] TraceInterceptor               INFO  -> 
> interceptor1 
> Interceptor[Delegate(DeadLetterChannel[Delegate(TraceInterceptor[Resequencer[ 
> [] -> [To[log:PLANETS]]]]), 
> RecipientList[log:org.apache.camel.DeadLetterChannel?level=error], 
> RedeliveryPolicy[maximumRedeliveries=6]])] InOnly Properties:{} 
> Headers:{JMSDestination=queue://TEST.IN, JMSTimestamp=1229973871938, 
> JMSRedelivered=false, JMSExpiration=0, JMSType=null, seqnum=7, 
> JMSXGroupID=null, 
> JMSMessageID=ID:174-155-37-223.pools.spcsdns.net-58792-1229973871555-2:1:1:1:7,
>  JMSDeliveryMode=2, JMSCorrelationID=null, JMSReplyTo=null, JMSPriority=4} 
> BodyType:String Body:I am the planet Uranus with seqnum: 7
> [aultMessageListenerContainer-5] TraceInterceptor               INFO  -> 
> resequencer1 Resequencer[ [] -> [To[log:PLANETS]]] InOnly 
> Properties:{CamelCauseException=null} 
> Headers:{JMSDestination=queue://TEST.IN, JMSTimestamp=1229973871938, 
> JMSRedelivered=false, JMSExpiration=0, JMSType=null, seqnum=7, 
> JMSXGroupID=null, 
> JMSMessageID=ID:174-155-37-223.pools.spcsdns.net-58792-1229973871555-2:1:1:1:7,
>  JMSDeliveryMode=2, JMSCorrelationID=null, JMSReplyTo=null, JMSPriority=4} 
> BodyType:String Body:I am the planet Uranus with seqnum: 7
> [aultMessageListenerContainer-6] TraceInterceptor               INFO  -> 
> interceptor1 
> Interceptor[Delegate(DeadLetterChannel[Delegate(TraceInterceptor[Resequencer[ 
> [] -> [To[log:PLANETS]]]]), 
> RecipientList[log:org.apache.camel.DeadLetterChannel?level=error], 
> RedeliveryPolicy[maximumRedeliveries=6]])] InOnly Properties:{} 
> Headers:{JMSDestination=queue://TEST.IN, JMSTimestamp=1229973871938, 
> JMSRedelivered=true, JMSExpiration=0, JMSType=null, seqnum=5, 
> JMSXGroupID=null, 
> JMSMessageID=ID:174-155-37-223.pools.spcsdns.net-58792-1229973871555-2:1:1:1:8,
>  JMSDeliveryMode=2, JMSCorrelationID=null, JMSReplyTo=null, JMSPriority=4} 
> BodyType:String Body:I am the planet Neptune with seqnum: 5
> [aultMessageListenerContainer-6] TraceInterceptor               INFO  -> 
> resequencer1 Resequencer[ [] -> [To[log:PLANETS]]] InOnly 
> Properties:{CamelCauseException=null} 
> Headers:{JMSDestination=queue://TEST.IN, JMSTimestamp=1229973871938, 
> JMSRedelivered=true, JMSExpiration=0, JMSType=null, seqnum=5, 
> JMSXGroupID=null, 
> JMSMessageID=ID:174-155-37-223.pools.spcsdns.net-58792-1229973871555-2:1:1:1:8,
>  JMSDeliveryMode=2, JMSCorrelationID=null, JMSReplyTo=null, JMSPriority=4} 
> BodyType:String Body:I am the planet Neptune with seqnum: 5
> [[log:PLANETS]]] Polling Thread] TraceInterceptor               INFO  -> to1 
> To[log:PLANETS] InOnly Properties:{} Headers:{JMSDestination=queue://TEST.IN, 
> JMSTimestamp=1229973871929, JMSRedelivered=false, JMSExpiration=0, 
> JMSType=null, seqnum=3, JMSXGroupID=null, 
> JMSMessageID=ID:174-155-37-223.pools.spcsdns.net-58792-1229973871555-2:1:1:1:2,
>  JMSDeliveryMode=2, JMSCorrelationID=null, JMSReplyTo=null, JMSPriority=4} 
> BodyType:String Body:I am the planet Venus with seqnum: 3
> [[log:PLANETS]]] Polling Thread] PLANETS                        INFO  
> Exchange[BodyType:String, Body:I am the planet Venus with seqnum: 3]
> [[log:PLANETS]]] Polling Thread] TraceInterceptor               INFO  -> to1 
> To[log:PLANETS] InOnly Properties:{} Headers:{JMSDestination=queue://TEST.IN, 
> JMSTimestamp=1229973871937, JMSRedelivered=false, JMSExpiration=0, 
> JMSType=null, seqnum=8, JMSXGroupID=null, 
> JMSMessageID=ID:174-155-37-223.pools.spcsdns.net-58792-1229973871555-2:1:1:1:6,
>  JMSDeliveryMode=2, JMSCorrelationID=null, JMSReplyTo=null, JMSPriority=4} 
> BodyType:String Body:I am the planet Saturn with seqnum: 8
> [[log:PLANETS]]] Polling Thread] PLANETS                        INFO  
> Exchange[BodyType:String, Body:I am the planet Saturn with seqnum: 8]
> [                          main] MyCamelResequencer             INFO  
> Shutting down the Camel context
> [         ActiveMQ ShutdownHook] BrokerService                  INFO  
> ActiveMQ Message Broker (localhost, 
> ID:174-155-37-223.pools.spcsdns.net-58792-1229973871555-0:0) is shutting down
> [         ActiveMQ ShutdownHook] TransportConnector             INFO  
> Connector vm://localhost Stopped
> [         ActiveMQ ShutdownHook] BrokerService                  INFO  
> ActiveMQ JMS Message Broker (localhost, 
> ID:174-155-37-223.pools.spcsdns.net-58792-1229973871555-0:0) stopped
> {panel}

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.

Reply via email to