[
https://issues.apache.org/activemq/browse/CAMEL-1201?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=48331#action_48331
]
Claus Ibsen commented on CAMEL-1201:
------------------------------------
Could you try on the trunk? I recall that Martin Krasser has submitted a great
patch to fix the BatchProcessor that is the core for the resequencer,
aggregator etc. It should fix adding a competing consumer, so that could
explain the dropped messages.
> Messages dropped with ActiveMQ/JMS component and Resequencer pattern
> ---------------------------------------------------------------------
>
> Key: CAMEL-1201
> URL: https://issues.apache.org/activemq/browse/CAMEL-1201
> Project: Apache Camel
> Issue Type: Bug
> Components: camel-activemq, camel-jms, eip
> Affects Versions: 1.5.0
> Reporter: Bruce Snyder
> Assignee: Hadrian Zbarcea
>
> I've created a simple class to test the resequencer as listed below:
> {code}
> public final class MyCamelResequencer {
> private static final Logger LOG =
> Logger.getLogger(MyCamelResequencer.class);
> private MyCamelResequencer() {}
> public static void main(String args[]) throws Exception {
> CamelContext context = new DefaultCamelContext();
> Tracer tracer = new Tracer();
> tracer.getFormatter().setShowBreadCrumb(false);
> tracer.getFormatter().setShowNode(true);
> context.addInterceptStrategy(tracer);
>
> String brokerUrl = "vm://localhost?broker.persistent=false";
> // String brokerUrl = "tcp://localhost:61616";
> context.addComponent("activemq",
> ActiveMQComponent.activeMQComponent(brokerUrl));
> context.addRoutes(new RouteBuilder() {
> public void configure() {
> // from("seda:FOO").to("log:PLANETS");
> //
> from("seda:FOO").resequencer(header("seqnum")).to("log:PLANETS");
> // from("activemq:TEST.IN").to("log:PLANETS");
>
> from("activemq:TEST.IN").resequencer(header("seqnum")).to("log:PLANETS");
> //
> from("activemq:TEST.IN").resequencer(header("seq")).to("activemq:TEST.OUT");
> }
> });
> ProducerTemplate template = context.createProducerTemplate();
> context.start();
> List<String> planets = new ArrayList(8);
> planets.add("Mercury");
> planets.add("Venus");
> planets.add("Earth");
> planets.add("Mars");
> planets.add("Jupiter");
> planets.add("Saturn");
> planets.add("Uranus");
> planets.add("Neptune");
>
> List<Integer> numbers = new ArrayList<Integer>(8);
> numbers.add(1);
> numbers.add(2);
> numbers.add(3);
> numbers.add(4);
> numbers.add(5);
> numbers.add(6);
> numbers.add(7);
> numbers.add(8);
>
> for (String planet: planets) {
> Collections.shuffle(numbers);
> int seqnum = numbers.remove(0);
> String message = "I am the planet " + planet + " with seqnum: " +
> seqnum;
> LOG.info("Sending message: " + message);
> template.sendBodyAndHeader("activemq:TEST.IN", message, "seqnum",
> seqnum);
> // template.sendBodyAndHeader("seda:FOO", message, "seqnum",
> seqnum);
> }
> Thread.sleep(10000);
> LOG.info("Shutting down the Camel context");
> context.stop();
> System.exit(1);
> }
> {code}
> The following are my observations:
> * SEDA -> log receives all messages
> * SEDA -> resequencer -> log receives all messages
> * ActiveMQ -> log receives all messages
> * ActiveMQ -> resequencer -> log does not receive all messages
> Upon enabling the Tracer interceptor, I'm able to see that it doesn't even
> receive all messages from the ActiveMQ component:
> {panel}
> [ main] DefaultCamelContext INFO JMX
> enabled. Using InstrumentationLifecycleStrategy.
> [ main] BrokerService INFO Using
> Persistence Adapter: MemoryPersistenceAdapter
> [ main] BrokerService INFO
> ActiveMQ null JMS Message Broker (localhost) is starting
> [ main] BrokerService INFO For
> help or more information please see: http://activemq.apache.org/
> [ JMX connector] ManagementContext WARN Failed
> to start jmx connector: Cannot bind to URL [rmi://localhost:1099/jmxrmi]:
> javax.naming.NameAlreadyBoundException: jmxrmi [Root exception is
> java.rmi.AlreadyBoundException: jmxrmi]
> [ main] BrokerService INFO
> ActiveMQ JMS Message Broker (localhost,
> ID:174-155-37-223.pools.spcsdns.net-58792-1229973871555-0:0) started
> [ main] TransportConnector INFO
> Connector vm://localhost Started
> [ main] MyCamelResequencer INFO Sending
> message: I am the planet Mercury with seqnum: 1
> [ main] MyCamelResequencer INFO Sending
> message: I am the planet Venus with seqnum: 3
> [ main] MyCamelResequencer INFO Sending
> message: I am the planet Earth with seqnum: 6
> [ main] MyCamelResequencer INFO Sending
> message: I am the planet Mars with seqnum: 2
> [ main] MyCamelResequencer INFO Sending
> message: I am the planet Jupiter with seqnum: 4
> [ main] MyCamelResequencer INFO Sending
> message: I am the planet Saturn with seqnum: 8
> [ main] MyCamelResequencer INFO Sending
> message: I am the planet Uranus with seqnum: 7
> [ main] MyCamelResequencer INFO Sending
> message: I am the planet Neptune with seqnum: 5
> [aultMessageListenerContainer-1] TraceInterceptor INFO ->
> interceptor1
> Interceptor[Delegate(DeadLetterChannel[Delegate(TraceInterceptor[Resequencer[
> [] -> [To[log:PLANETS]]]]),
> RecipientList[log:org.apache.camel.DeadLetterChannel?level=error],
> RedeliveryPolicy[maximumRedeliveries=6]])] InOnly Properties:{}
> Headers:{JMSDestination=queue://TEST.IN, JMSTimestamp=1229973871918,
> JMSRedelivered=false, JMSExpiration=0, JMSType=null, seqnum=1,
> JMSXGroupID=null,
> JMSMessageID=ID:174-155-37-223.pools.spcsdns.net-58792-1229973871555-2:1:1:1:1,
> JMSDeliveryMode=2, JMSCorrelationID=null, JMSReplyTo=null, JMSPriority=4}
> BodyType:String Body:I am the planet Mercury with seqnum: 1
> [aultMessageListenerContainer-1] TraceInterceptor INFO ->
> resequencer1 Resequencer[ [] -> [To[log:PLANETS]]] InOnly
> Properties:{CamelCauseException=null}
> Headers:{JMSDestination=queue://TEST.IN, JMSTimestamp=1229973871918,
> JMSRedelivered=false, JMSExpiration=0, JMSType=null, seqnum=1,
> JMSXGroupID=null,
> JMSMessageID=ID:174-155-37-223.pools.spcsdns.net-58792-1229973871555-2:1:1:1:1,
> JMSDeliveryMode=2, JMSCorrelationID=null, JMSReplyTo=null, JMSPriority=4}
> BodyType:String Body:I am the planet Mercury with seqnum: 1
> [aultMessageListenerContainer-2] TraceInterceptor INFO ->
> interceptor1
> Interceptor[Delegate(DeadLetterChannel[Delegate(TraceInterceptor[Resequencer[
> [] -> [To[log:PLANETS]]]]),
> RecipientList[log:org.apache.camel.DeadLetterChannel?level=error],
> RedeliveryPolicy[maximumRedeliveries=6]])] InOnly Properties:{}
> Headers:{JMSDestination=queue://TEST.IN, JMSTimestamp=1229973871930,
> JMSRedelivered=false, JMSExpiration=0, JMSType=null, seqnum=6,
> JMSXGroupID=null,
> JMSMessageID=ID:174-155-37-223.pools.spcsdns.net-58792-1229973871555-2:1:1:1:3,
> JMSDeliveryMode=2, JMSCorrelationID=null, JMSReplyTo=null, JMSPriority=4}
> BodyType:String Body:I am the planet Earth with seqnum: 6
> [aultMessageListenerContainer-2] TraceInterceptor INFO ->
> resequencer1 Resequencer[ [] -> [To[log:PLANETS]]] InOnly
> Properties:{CamelCauseException=null}
> Headers:{JMSDestination=queue://TEST.IN, JMSTimestamp=1229973871930,
> JMSRedelivered=false, JMSExpiration=0, JMSType=null, seqnum=6,
> JMSXGroupID=null,
> JMSMessageID=ID:174-155-37-223.pools.spcsdns.net-58792-1229973871555-2:1:1:1:3,
> JMSDeliveryMode=2, JMSCorrelationID=null, JMSReplyTo=null, JMSPriority=4}
> BodyType:String Body:I am the planet Earth with seqnum: 6
> [aultMessageListenerContainer-3] TraceInterceptor INFO ->
> interceptor1
> Interceptor[Delegate(DeadLetterChannel[Delegate(TraceInterceptor[Resequencer[
> [] -> [To[log:PLANETS]]]]),
> RecipientList[log:org.apache.camel.DeadLetterChannel?level=error],
> RedeliveryPolicy[maximumRedeliveries=6]])] InOnly Properties:{}
> Headers:{JMSDestination=queue://TEST.IN, JMSTimestamp=1229973871930,
> JMSRedelivered=true, JMSExpiration=0, JMSType=null, seqnum=2,
> JMSXGroupID=null,
> JMSMessageID=ID:174-155-37-223.pools.spcsdns.net-58792-1229973871555-2:1:1:1:4,
> JMSDeliveryMode=2, JMSCorrelationID=null, JMSReplyTo=null, JMSPriority=4}
> BodyType:String Body:I am the planet Mars with seqnum: 2
> [aultMessageListenerContainer-3] TraceInterceptor INFO ->
> resequencer1 Resequencer[ [] -> [To[log:PLANETS]]] InOnly
> Properties:{CamelCauseException=null}
> Headers:{JMSDestination=queue://TEST.IN, JMSTimestamp=1229973871930,
> JMSRedelivered=true, JMSExpiration=0, JMSType=null, seqnum=2,
> JMSXGroupID=null,
> JMSMessageID=ID:174-155-37-223.pools.spcsdns.net-58792-1229973871555-2:1:1:1:4,
> JMSDeliveryMode=2, JMSCorrelationID=null, JMSReplyTo=null, JMSPriority=4}
> BodyType:String Body:I am the planet Mars with seqnum: 2
> [aultMessageListenerContainer-4] TraceInterceptor INFO ->
> interceptor1
> Interceptor[Delegate(DeadLetterChannel[Delegate(TraceInterceptor[Resequencer[
> [] -> [To[log:PLANETS]]]]),
> RecipientList[log:org.apache.camel.DeadLetterChannel?level=error],
> RedeliveryPolicy[maximumRedeliveries=6]])] InOnly Properties:{}
> Headers:{JMSDestination=queue://TEST.IN, JMSTimestamp=1229973871936,
> JMSRedelivered=false, JMSExpiration=0, JMSType=null, seqnum=4,
> JMSXGroupID=null,
> JMSMessageID=ID:174-155-37-223.pools.spcsdns.net-58792-1229973871555-2:1:1:1:5,
> JMSDeliveryMode=2, JMSCorrelationID=null, JMSReplyTo=null, JMSPriority=4}
> BodyType:String Body:I am the planet Jupiter with seqnum: 4
> [aultMessageListenerContainer-4] TraceInterceptor INFO ->
> resequencer1 Resequencer[ [] -> [To[log:PLANETS]]] InOnly
> Properties:{CamelCauseException=null}
> Headers:{JMSDestination=queue://TEST.IN, JMSTimestamp=1229973871936,
> JMSRedelivered=false, JMSExpiration=0, JMSType=null, seqnum=4,
> JMSXGroupID=null,
> JMSMessageID=ID:174-155-37-223.pools.spcsdns.net-58792-1229973871555-2:1:1:1:5,
> JMSDeliveryMode=2, JMSCorrelationID=null, JMSReplyTo=null, JMSPriority=4}
> BodyType:String Body:I am the planet Jupiter with seqnum: 4
> [aultMessageListenerContainer-5] TraceInterceptor INFO ->
> interceptor1
> Interceptor[Delegate(DeadLetterChannel[Delegate(TraceInterceptor[Resequencer[
> [] -> [To[log:PLANETS]]]]),
> RecipientList[log:org.apache.camel.DeadLetterChannel?level=error],
> RedeliveryPolicy[maximumRedeliveries=6]])] InOnly Properties:{}
> Headers:{JMSDestination=queue://TEST.IN, JMSTimestamp=1229973871938,
> JMSRedelivered=false, JMSExpiration=0, JMSType=null, seqnum=7,
> JMSXGroupID=null,
> JMSMessageID=ID:174-155-37-223.pools.spcsdns.net-58792-1229973871555-2:1:1:1:7,
> JMSDeliveryMode=2, JMSCorrelationID=null, JMSReplyTo=null, JMSPriority=4}
> BodyType:String Body:I am the planet Uranus with seqnum: 7
> [aultMessageListenerContainer-5] TraceInterceptor INFO ->
> resequencer1 Resequencer[ [] -> [To[log:PLANETS]]] InOnly
> Properties:{CamelCauseException=null}
> Headers:{JMSDestination=queue://TEST.IN, JMSTimestamp=1229973871938,
> JMSRedelivered=false, JMSExpiration=0, JMSType=null, seqnum=7,
> JMSXGroupID=null,
> JMSMessageID=ID:174-155-37-223.pools.spcsdns.net-58792-1229973871555-2:1:1:1:7,
> JMSDeliveryMode=2, JMSCorrelationID=null, JMSReplyTo=null, JMSPriority=4}
> BodyType:String Body:I am the planet Uranus with seqnum: 7
> [aultMessageListenerContainer-6] TraceInterceptor INFO ->
> interceptor1
> Interceptor[Delegate(DeadLetterChannel[Delegate(TraceInterceptor[Resequencer[
> [] -> [To[log:PLANETS]]]]),
> RecipientList[log:org.apache.camel.DeadLetterChannel?level=error],
> RedeliveryPolicy[maximumRedeliveries=6]])] InOnly Properties:{}
> Headers:{JMSDestination=queue://TEST.IN, JMSTimestamp=1229973871938,
> JMSRedelivered=true, JMSExpiration=0, JMSType=null, seqnum=5,
> JMSXGroupID=null,
> JMSMessageID=ID:174-155-37-223.pools.spcsdns.net-58792-1229973871555-2:1:1:1:8,
> JMSDeliveryMode=2, JMSCorrelationID=null, JMSReplyTo=null, JMSPriority=4}
> BodyType:String Body:I am the planet Neptune with seqnum: 5
> [aultMessageListenerContainer-6] TraceInterceptor INFO ->
> resequencer1 Resequencer[ [] -> [To[log:PLANETS]]] InOnly
> Properties:{CamelCauseException=null}
> Headers:{JMSDestination=queue://TEST.IN, JMSTimestamp=1229973871938,
> JMSRedelivered=true, JMSExpiration=0, JMSType=null, seqnum=5,
> JMSXGroupID=null,
> JMSMessageID=ID:174-155-37-223.pools.spcsdns.net-58792-1229973871555-2:1:1:1:8,
> JMSDeliveryMode=2, JMSCorrelationID=null, JMSReplyTo=null, JMSPriority=4}
> BodyType:String Body:I am the planet Neptune with seqnum: 5
> [[log:PLANETS]]] Polling Thread] TraceInterceptor INFO -> to1
> To[log:PLANETS] InOnly Properties:{} Headers:{JMSDestination=queue://TEST.IN,
> JMSTimestamp=1229973871929, JMSRedelivered=false, JMSExpiration=0,
> JMSType=null, seqnum=3, JMSXGroupID=null,
> JMSMessageID=ID:174-155-37-223.pools.spcsdns.net-58792-1229973871555-2:1:1:1:2,
> JMSDeliveryMode=2, JMSCorrelationID=null, JMSReplyTo=null, JMSPriority=4}
> BodyType:String Body:I am the planet Venus with seqnum: 3
> [[log:PLANETS]]] Polling Thread] PLANETS INFO
> Exchange[BodyType:String, Body:I am the planet Venus with seqnum: 3]
> [[log:PLANETS]]] Polling Thread] TraceInterceptor INFO -> to1
> To[log:PLANETS] InOnly Properties:{} Headers:{JMSDestination=queue://TEST.IN,
> JMSTimestamp=1229973871937, JMSRedelivered=false, JMSExpiration=0,
> JMSType=null, seqnum=8, JMSXGroupID=null,
> JMSMessageID=ID:174-155-37-223.pools.spcsdns.net-58792-1229973871555-2:1:1:1:6,
> JMSDeliveryMode=2, JMSCorrelationID=null, JMSReplyTo=null, JMSPriority=4}
> BodyType:String Body:I am the planet Saturn with seqnum: 8
> [[log:PLANETS]]] Polling Thread] PLANETS INFO
> Exchange[BodyType:String, Body:I am the planet Saturn with seqnum: 8]
> [ main] MyCamelResequencer INFO
> Shutting down the Camel context
> [ ActiveMQ ShutdownHook] BrokerService INFO
> ActiveMQ Message Broker (localhost,
> ID:174-155-37-223.pools.spcsdns.net-58792-1229973871555-0:0) is shutting down
> [ ActiveMQ ShutdownHook] TransportConnector INFO
> Connector vm://localhost Stopped
> [ ActiveMQ ShutdownHook] BrokerService INFO
> ActiveMQ JMS Message Broker (localhost,
> ID:174-155-37-223.pools.spcsdns.net-58792-1229973871555-0:0) stopped
> {panel}
--
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.