[
https://issues.apache.org/jira/browse/AMQ-5249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14267631#comment-14267631
]
Christian Tytgat commented on AMQ-5249:
---------------------------------------
@Gary: I managed to reproduce the problem with a local instance, maybe the
following info might give you some insights.
- started a client publishing messages to a topic continuously
- our camel application was consuming from that topic
- while everything was running ok, I restarted activemq
- the duplicate message warnings started appearing immediately after startup
- even after killing the client, the broker seems to be 'stuck' in this state.
Each single message will trigger the warning from then on.
- not all 'duplicate' messages end up in the DLQ. Seems to be timing related.
(Looks like the faster the machine, the less messages end up in the DLQ.)
- reproducable both with an oracle and a kahadb store
I connected a debugger and noticed that the handling thread enters
AbstractStoreCursor.recoverMessage() twice, hence the duplicate warning because
it already exists in the audit map. PrefetchSubscription.add() seems to trigger
the 2 executions, once through pending.addMessageLast(node) and once through
dispatchPending()...
Here are 2 stacktraces showing how it enters that method.
{code}
ActiveMQ Transport: tcp:///0:0:0:0:0:0:0:1:56106@1883@10055 daemon, prio=5, in
group 'main', status: 'RUNNING'
at
org.apache.activemq.broker.region.cursors.AbstractStoreCursor.recoverMessage(AbstractStoreCursor.java:90)
at
org.apache.activemq.broker.region.cursors.TopicStorePrefetch.recoverMessage(TopicStorePrefetch.java:77)
at
org.apache.activemq.broker.region.cursors.AbstractStoreCursor.addMessageLast(AbstractStoreCursor.java:194)
at
org.apache.activemq.broker.region.cursors.StoreDurableSubscriberCursor.addMessageLast(StoreDurableSubscriberCursor.java:198)
at
org.apache.activemq.broker.region.PrefetchSubscription.add(PrefetchSubscription.java:159)
at
org.apache.activemq.broker.region.DurableTopicSubscription.add(DurableTopicSubscription.java:272)
at
org.apache.activemq.broker.region.policy.SimpleDispatchPolicy.dispatch(SimpleDispatchPolicy.java:48)
at org.apache.activemq.broker.region.Topic.dispatch(Topic.java:717)
at
org.apache.activemq.broker.region.Topic.doMessageSend(Topic.java:510)
at org.apache.activemq.broker.region.Topic.send(Topic.java:441)
at
org.apache.activemq.broker.region.AbstractRegion.send(AbstractRegion.java:424)
at
org.apache.activemq.broker.region.RegionBroker.send(RegionBroker.java:445)
at
org.apache.activemq.broker.jmx.ManagedRegionBroker.send(ManagedRegionBroker.java:297)
at org.apache.activemq.broker.BrokerFilter.send(BrokerFilter.java:147)
at
org.apache.activemq.broker.CompositeDestinationBroker.send(CompositeDestinationBroker.java:96)
at
org.apache.activemq.broker.TransactionBroker.send(TransactionBroker.java:307)
at org.apache.activemq.broker.BrokerFilter.send(BrokerFilter.java:147)
at
org.apache.activemq.broker.MutableBrokerFilter.send(MutableBrokerFilter.java:152)
at
org.apache.activemq.broker.TransportConnection.processMessage(TransportConnection.java:496)
at
org.apache.activemq.command.ActiveMQMessage.visit(ActiveMQMessage.java:756)
at
org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:294)
at
org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:148)
at
org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:45)
at
org.apache.activemq.transport.mqtt.MQTTInactivityMonitor.onCommand(MQTTInactivityMonitor.java:123)
at
org.apache.activemq.transport.mqtt.MQTTTransportFilter.sendToActiveMQ(MQTTTransportFilter.java:91)
at
org.apache.activemq.transport.mqtt.MQTTProtocolConverter.sendToActiveMQ(MQTTProtocolConverter.java:132)
at
org.apache.activemq.transport.mqtt.MQTTProtocolConverter.onMQTTPublish(MQTTProtocolConverter.java:566)
at
org.apache.activemq.transport.mqtt.MQTTProtocolConverter.onMQTTCommand(MQTTProtocolConverter.java:175)
at
org.apache.activemq.transport.mqtt.MQTTTransportFilter.onCommand(MQTTTransportFilter.java:79)
at
org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)
at
org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:214)
at
org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:196)
at java.lang.Thread.run(Thread.java:745)
{code}
{code}
ActiveMQ Transport: tcp:///0:0:0:0:0:0:0:1:56106@1883@10055 daemon, prio=5, in
group 'main', status: 'RUNNING'
blocks ActiveMQ Transport: tcp:///127.0.0.1:56056@61616@9345
blocks ActiveMQ BrokerService[localhost] Task-178@8926
at
org.apache.activemq.broker.region.cursors.AbstractStoreCursor.recoverMessage(AbstractStoreCursor.java:103)
at
org.apache.activemq.broker.region.cursors.TopicStorePrefetch.recoverMessage(TopicStorePrefetch.java:77)
at
org.apache.activemq.broker.region.cursors.AbstractStoreCursor.recoverMessage(AbstractStoreCursor.java:85)
at
org.apache.activemq.store.kahadb.KahaDBStore$KahaDBTopicMessageStore$5.execute(KahaDBStore.java:931)
at
org.apache.activemq.store.kahadb.disk.page.Transaction.execute(Transaction.java:779)
at
org.apache.activemq.store.kahadb.KahaDBStore$KahaDBTopicMessageStore.recoverNextMessages(KahaDBStore.java:905)
at
org.apache.activemq.store.ProxyTopicMessageStore.recoverNextMessages(ProxyTopicMessageStore.java:115)
at
org.apache.activemq.broker.region.cursors.TopicStorePrefetch.doFillBatch(TopicStorePrefetch.java:113)
at
org.apache.activemq.broker.region.cursors.AbstractStoreCursor.fillBatch(AbstractStoreCursor.java:277)
at
org.apache.activemq.broker.region.cursors.AbstractStoreCursor.hasNext(AbstractStoreCursor.java:162)
at
org.apache.activemq.broker.region.cursors.StoreDurableSubscriberCursor.hasNext(StoreDurableSubscriberCursor.java:255)
at
org.apache.activemq.broker.region.PrefetchSubscription.dispatchPending(PrefetchSubscription.java:646)
at
org.apache.activemq.broker.region.DurableTopicSubscription.dispatchPending(DurableTopicSubscription.java:278)
at
org.apache.activemq.broker.region.PrefetchSubscription.add(PrefetchSubscription.java:161)
at
org.apache.activemq.broker.region.DurableTopicSubscription.add(DurableTopicSubscription.java:272)
at
org.apache.activemq.broker.region.policy.SimpleDispatchPolicy.dispatch(SimpleDispatchPolicy.java:48)
at org.apache.activemq.broker.region.Topic.dispatch(Topic.java:717)
at
org.apache.activemq.broker.region.Topic.doMessageSend(Topic.java:510)
at org.apache.activemq.broker.region.Topic.send(Topic.java:441)
at
org.apache.activemq.broker.region.AbstractRegion.send(AbstractRegion.java:424)
at
org.apache.activemq.broker.region.RegionBroker.send(RegionBroker.java:445)
at
org.apache.activemq.broker.jmx.ManagedRegionBroker.send(ManagedRegionBroker.java:297)
at org.apache.activemq.broker.BrokerFilter.send(BrokerFilter.java:147)
at
org.apache.activemq.broker.CompositeDestinationBroker.send(CompositeDestinationBroker.java:96)
at
org.apache.activemq.broker.TransactionBroker.send(TransactionBroker.java:307)
at org.apache.activemq.broker.BrokerFilter.send(BrokerFilter.java:147)
at
org.apache.activemq.broker.MutableBrokerFilter.send(MutableBrokerFilter.java:152)
at
org.apache.activemq.broker.TransportConnection.processMessage(TransportConnection.java:496)
at
org.apache.activemq.command.ActiveMQMessage.visit(ActiveMQMessage.java:756)
at
org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:294)
at
org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:148)
at
org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:45)
at
org.apache.activemq.transport.mqtt.MQTTInactivityMonitor.onCommand(MQTTInactivityMonitor.java:123)
at
org.apache.activemq.transport.mqtt.MQTTTransportFilter.sendToActiveMQ(MQTTTransportFilter.java:91)
at
org.apache.activemq.transport.mqtt.MQTTProtocolConverter.sendToActiveMQ(MQTTProtocolConverter.java:132)
at
org.apache.activemq.transport.mqtt.MQTTProtocolConverter.onMQTTPublish(MQTTProtocolConverter.java:566)
at
org.apache.activemq.transport.mqtt.MQTTProtocolConverter.onMQTTCommand(MQTTProtocolConverter.java:175)
at
org.apache.activemq.transport.mqtt.MQTTTransportFilter.onCommand(MQTTTransportFilter.java:79)
at
org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)
at
org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:214)
at
org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:196)
at java.lang.Thread.run(Thread.java:745)
{code}
> "cursor got duplicate" error after upgrade
> ------------------------------------------
>
> Key: AMQ-5249
> URL: https://issues.apache.org/jira/browse/AMQ-5249
> Project: ActiveMQ
> Issue Type: Bug
> Affects Versions: 5.9.1, 5.10.0
> Reporter: Rural Hunter
>
> I was using 5.9.0 and meet one problem so I tried to upgrade activemq. I
> tried both 5.9.1 and 5.10.0 and encouterred a same problem. I saw messages
> filled DLQ very quickly. I checked the clients both producer and consumer but
> there was no error. I checked activemq log and found the log is full of these
> warnings:
> 2014-06-27 23:22:09,337 | WARN |
> org.apache.activemq.broker.region.cursors.QueueStorePrefetch@19117501:com.cyyun.webmon.spider.update,batchResetNeeded=false,storeHasMessages=true,size=211,cacheEnabled=true,maxBatchSize:200,hasSpace:true
> - cursor got duplicate: ID:211.com-52399-1400732399425-1:1:235992:1:1, 4 |
> org.apache.activemq.broker.region.cursors.AbstractStoreCursor | ActiveMQ
> Broker[localhost] Scheduler
> 2014-06-27 23:22:09,337 | WARN |
> org.apache.activemq.broker.region.cursors.QueueStorePrefetch@19117501:com.xxxx.update,batchResetNeeded=false,storeHasMessages=true,size=211,cacheEnabled=true,maxBatchSize:200,hasSpace:true
> - cursor got duplicate: ID:nbzjjf22805-34129-1403880308671-1:1:28:1:1, 4 |
> org.apache.activemq.broker.region.cursors.AbstractStoreCursor | ActiveMQ
> Broker[localhost] Scheduler
> 2014-06-27 23:22:09,338 | WARN |
> org.apache.activemq.broker.region.cursors.QueueStorePrefetch@19117501:com.xxxxx.update,batchResetNeeded=false,storeHasMessages=true,size=211,cacheEnabled=true,maxBatchSize:200,hasSpace:true
> - cursor got duplicate: ID:jxncxnj2-48598-1403856107346-1:1:6007:1:1, 4 |
> org.apache.activemq.broker.region.cursors.AbstractStoreCursor | ActiveMQ
> Broker[localhost] Scheduler
> 2014-06-27 23:22:09,338 | WARN |
> org.apache.activemq.broker.region.cursors.QueueStorePrefetch@19117501:com.xxxx.update,batchResetNeeded=false,storeHasMessages=true,size=211,cacheEnabled=true,maxBatchSize:200,hasSpace:true
> - cursor got duplicate: ID:jxnc17-60227-1400730816361-1:1:149072:1:1, 4 |
> org.apache.activemq.broker.region.cursors.AbstractStoreCursor | ActiveMQ
> Broker[localhost] Scheduler
> 2014-06-27 23:22:09,339 | WARN |
> org.apache.activemq.broker.region.cursors.QueueStorePrefetch@19117501:com.xxxx.update,batchResetNeeded=false,storeHasMessages=true,size=211,cacheEnabled=true,maxBatchSize:200,hasSpace:true
> - cursor got duplicate: ID:cyyun-46954-1403800808565-1:1:9765:1:1, 4 |
> org.apache.activemq.broker.region.cursors.AbstractStoreCursor | ActiveMQ
> Broker[localhost] Scheduler
> 2014-06-27 23:22:09,339 | WARN |
> org.apache.activemq.broker.region.cursors.QueueStorePrefetch@19117501:com.xxxx.update,batchResetNeeded=false,storeHasMessages=true,size=211,cacheEnabled=true,maxBatchSize:200,hasSpace:true
> - cursor got duplicate: ID:ubuntu-55495-1403497638437-1:1:53086:1:1, 4 |
> org.apache.activemq.broker.region.cursors.AbstractStoreCursor | ActiveMQ
> Broker[localhost] Scheduler
> 2014-06-27 23:22:09,340 | WARN |
> org.apache.activemq.broker.region.cursors.QueueStorePrefetch@19117501:com.xxxx.update,batchResetNeeded=false,storeHasMessages=true,size=211,cacheEnabled=true,maxBatchSize:200,hasSpace:true
> - cursor got duplicate: ID:cyyun-39030-1403880008363-1:1:70:1:1, 4 |
> org.apache.activemq.broker.region.cursors.AbstractStoreCursor | ActiveMQ
> Broker[localhost] Scheduler
> The problem mostly happens right after activemq starts and sometimes happened
> after activemq worked normally for a while.
> For now I have to roll back to 5.9.0 and the problem doesn't occure.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)