[
https://issues.apache.org/activemq/browse/AMQ-2123?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=49997#action_49997
]
Gary Tully commented on AMQ-2123:
---------------------------------
the problem relates to the number of consumers associated with a topic message
send. If 10 consumers are present when the message is persisted, and 11 when
the message is dispatched, the ack from the 11th will remove the message.
redispatch to any of the others can then fail with the above exception.
With the AMQPersistenceAdapter, writes are batched, so the reference store is
updated async. At the point of update, the reference store prepares the
required acks for each subscriber in order to keep the message reference around
till all subscribers have acked.
The problem arises when the consumer list is updated and another consumer (one
that is not in the count that is persisted) gets the message. The set of
subscribers used during dispatch is independent of the set persisted. This is a
problem. The logic that sets up the acks based on the subscription list is at
org.apache.activemq.store.kahadaptor.KahaTopicReferenceStore.addMessageReference(ConnectionContext,
MessageId, ReferenceData)
One fix is to serialize dispatch with a flush to the store and with subscriber
additions.I think this will lock up the dispatch logic quite a bit.
The logic in
org.apache.activemq.store.kahadaptor.KahaTopicReferenceStore.acknowledgeReference(ConnectionContext,
String, String, MessageId) can deal with no reference, the case where a
message has not been persisted, but it cannot deal with the case of a persisted
message and an additional subscriber. Adding the logic to not remove a
reference if it is referenced from another subscription resolves the issue.
> Intermittent Test failure: DuplexNetworkTest.testDurableStoreAndForward
> (org.apache.activemq.network) - java.lang.IllegalStateException: Message id
> ID:... could not be recovered from the data store - already dispatched
> ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
>
> Key: AMQ-2123
> URL: https://issues.apache.org/activemq/browse/AMQ-2123
> Project: ActiveMQ
> Issue Type: Bug
> Components: Broker
> Affects Versions: 5.2.0
> Environment: linux rh
> Reporter: Gary Tully
> Assignee: Gary Tully
> Fix For: 5.3.0
>
>
> {code}
> DuplexNetworkTest.testDurableStoreAndForward (org.apache.activemq.network)
> javax.jms.JMSException: java.lang.RuntimeException:
> java.lang.IllegalStateException: Message id
> ID:pdrhas4_32-40202-1234575036513-14:8:1:1:3 could not be recovered from the
> data store - already dispatched
> at
> org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:49)
> at
> org.apache.activemq.ActiveMQConnection.syncSendPacket(ActiveMQConnection.java:1255)
> Show details »
> « Hide details
> javax.jms.JMSException: java.lang.RuntimeException:
> java.lang.IllegalStateException: Message id
> ID:pdrhas4_32-40202-1234575036513-14:8:1:1:3 could not be recovered from the
> data store - already dispatched
> at
> org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:49)
> at
> org.apache.activemq.ActiveMQConnection.syncSendPacket(ActiveMQConnection.java:1255)
> at
> org.apache.activemq.ActiveMQSession.syncSendPacket(ActiveMQSession.java:1805)
> at
> org.apache.activemq.ActiveMQMessageConsumer.&init&(ActiveMQMessageConsumer.java:225)
> at
> org.apache.activemq.ActiveMQTopicSubscriber.&init&(ActiveMQTopicSubscriber.java:117)
> at
> org.apache.activemq.ActiveMQSession.createDurableSubscriber(ActiveMQSession.java:1207)
> at
> org.apache.activemq.ActiveMQSession.createDurableSubscriber(ActiveMQSession.java:1152)
> at
> org.apache.activemq.network.SimpleNetworkTest.testDurableStoreAndForward(SimpleNetworkTest.java:127)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
> at java.lang.reflect.Method.invoke(Method.java:585)
> at junit.framework.TestCase.runTest(TestCase.java:154)
> at junit.framework.TestCase.runBare(TestCase.java:127)
> at junit.framework.TestResult$1.protect(TestResult.java:106)
> at junit.framework.TestResult.runProtected(TestResult.java:124)
> at junit.framework.TestResult.run(TestResult.java:109)
> at junit.framework.TestCase.run(TestCase.java:118)
> at junit.framework.TestSuite.runTest(TestSuite.java:208)
> at junit.framework.TestSuite.run(TestSuite.java:203)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
> at java.lang.reflect.Method.invoke(Method.java:585)
> at
> org.apache.maven.surefire.junit.JUnitTestSet.execute(JUnitTestSet.java:210)
> at
> org.apache.maven.surefire.suite.AbstractDirectoryTestSuite.executeTestSet(AbstractDirectoryTestSuite.java:135)
> at
> org.apache.maven.surefire.suite.AbstractDirectoryTestSuite.execute(AbstractDirectoryTestSuite.java:160)
> at org.apache.maven.surefire.Surefire.run(Surefire.java:81)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
> at java.lang.reflect.Method.invoke(Method.java:585)
> at
> org.apache.maven.surefire.booter.SurefireBooter.runSuitesInProcess(SurefireBooter.java:182)
> at
> org.apache.maven.surefire.booter.SurefireBooter.main(SurefireBooter.java:743)
> Caused by: java.lang.RuntimeException: java.lang.RuntimeException:
> java.lang.IllegalStateException: Message id
> ID:pdrhas4_32-40202-1234575036513-14:8:1:1:3 could not be recovered from the
> data store - already dispatched
> at
> org.apache.activemq.broker.region.cursors.AbstractStoreCursor.reset(AbstractStoreCursor.java:104)
> at
> org.apache.activemq.broker.region.cursors.StoreDurableSubscriberCursor.reset(StoreDurableSubscriberCursor.java:225)
> at
> org.apache.activemq.broker.region.PrefetchSubscription.dispatchPending(PrefetchSubscription.java:560)
> at
> org.apache.activemq.broker.region.DurableTopicSubscription.activate(DurableTopicSubscription.java:130)
> at
> org.apache.activemq.broker.region.TopicRegion.addConsumer(TopicRegion.java:105)
> at
> org.apache.activemq.broker.region.RegionBroker.addConsumer(RegionBroker.java:376)
> at
> org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:86)
> at
> org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:86)
> at
> org.apache.activemq.advisory.AdvisoryBroker.addConsumer(AdvisoryBroker.java:83)
> at
> org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:86)
> at
> org.apache.activemq.broker.MutableBrokerFilter.addConsumer(MutableBrokerFilter.java:93)
> at
> org.apache.activemq.broker.TransportConnection.processAddConsumer(TransportConnection.java:546)
> at org.apache.activemq.command.ConsumerInfo.visit(ConsumerInfo.java:349)
> at
> org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:308)
> at
> org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:182)
> at
> org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:104)
> at
> org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:68)
> at
> org.apache.activemq.transport.vm.VMTransport.iterate(VMTransport.java:204)
> at
> org.apache.activemq.thread.PooledTaskRunner.runTask(PooledTaskRunner.java:122)
> at
> org.apache.activemq.thread.PooledTaskRunner$1.run(PooledTaskRunner.java:43)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:650)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:675)
> at java.lang.Thread.run(Thread.java:595)
> Caused by: java.lang.RuntimeException: java.lang.IllegalStateException:
> Message id ID:pdrhas4_32-40202-1234575036513-14:8:1:1:3 could not be
> recovered from the data store - already dispatched
> at
> org.apache.activemq.broker.region.cursors.AbstractStoreCursor.fillBatch(AbstractStoreCursor.java:239)
> at
> org.apache.activemq.broker.region.cursors.AbstractStoreCursor.reset(AbstractStoreCursor.java:101)
> ... 22 more
> Caused by: java.lang.IllegalStateException: Message id
> ID:pdrhas4_32-40202-1234575036513-14:8:1:1:3 could not be recovered from the
> data store - already dispatched
> at
> org.apache.activemq.store.amq.RecoveryListenerAdapter.recoverMessageReference(RecoveryListenerAdapter.java:58)
> at
> org.apache.activemq.store.kahadaptor.KahaReferenceStore.recoverReference(KahaReferenceStore.java:82)
> at
> org.apache.activemq.store.kahadaptor.KahaTopicReferenceStore.recoverNextMessages(KahaTopicReferenceStore.java:262)
> at
> org.apache.activemq.store.amq.AMQTopicMessageStore.recoverNextMessages(AMQTopicMessageStore.java:59)
> at
> org.apache.activemq.broker.region.cursors.TopicStorePrefetch.doFillBatch(TopicStorePrefetch.java:91)
> at
> org.apache.activemq.broker.region.cursors.AbstractStoreCursor.fillBatch(AbstractStoreCursor.java:236)
> ... 23 more
> « Hide details
> {code}
--
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.