[ 
https://issues.apache.org/jira/browse/AMQ-5668?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14581865#comment-14581865
 ] 

Gary Tully commented on AMQ-5668:
---------------------------------

[~christopher.l.shannon] - great that you are looking into this, it always 
helps to have another set of eyes.
I think there are two things to consider: 1) the value of 
concurrentStoreAndDispatchTopics=true - I don't really see the benefit b/c it 
is very unlikely that all subs will get to ack a message before it is 
persisted. In the queue case, it does have a beneficial effect b/c it is just a 
single client that needs to ack fast.
2) what lastCacheId is used for? When the cache is full it stops tracking 
messages and when the cache is empty, to avoid replaying all messages from the 
store the cursor wants to start a store replay from the last message that it 
cached. There is a setBatch that sets the start point. Implicit in this is that 
the sequenceId in the store is in sync with the order messages are placed in 
the cursor. That is the crux of it. If the start point is off, there is a 
potential for a missed dispatch or a duplicate dispatch. Note - I think the 
durable topic case is a little simpler because of the per sub index, but that 
would need to be validated.

In the queue case, there are a bunch of hoops that ensure ordering because it 
is complicated with concurrentStoreAndDispatch. The same attention has not been 
given to the topic case to date.
If there is a proven usecase for  concurrentStoreAndDispatchTopics=true (point 
1) - then the is a case for migrating the work done on queue cursors around 
lastCacheId to topics.


> NPE in kahadb with concurrentStoreAndDispatchTopics when sending MQTT msgs 
> with different QoS
> ---------------------------------------------------------------------------------------------
>
>                 Key: AMQ-5668
>                 URL: https://issues.apache.org/jira/browse/AMQ-5668
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Broker, KahaDB, MQTT
>    Affects Versions: 5.11.1
>         Environment: MQTT, KahaDB
>            Reporter: Torsten Mielke
>              Labels: broker, kahadb, mqtt
>         Attachments: AMQ-5668Test.tgz
>
>
> Running KahaDB with concurrentStoreAndDispatchTopics="true" and sending 3 
> MQTT messages using different QoS values raises 
> {code}
> 2015-03-17 13:27:48,866 WARN ActiveMQ NIO Worker 2 - Failed to send MQTT 
> Publish:
> java.lang.NullPointerException
>       at 
> org.apache.activemq.broker.region.cursors.AbstractStoreCursor.setLastCachedId(AbstractStoreCursor.java:319)
>       at 
> org.apache.activemq.broker.region.cursors.AbstractStoreCursor.trackLastCached(AbstractStoreCursor.java:280)
>       at 
> org.apache.activemq.broker.region.cursors.AbstractStoreCursor.addMessageLast(AbstractStoreCursor.java:213)
>       at 
> org.apache.activemq.broker.region.cursors.TopicStorePrefetch.addMessageLast(TopicStorePrefetch.java:74)
>       at 
> org.apache.activemq.broker.region.cursors.StoreDurableSubscriberCursor.addMessageLast(StoreDurableSubscriberCursor.java:198)
>       at 
> org.apache.activemq.broker.region.PrefetchSubscription.add(PrefetchSubscription.java:159)
>       at 
> org.apache.activemq.broker.region.DurableTopicSubscription.add(DurableTopicSubscription.java:274)
>       at 
> org.apache.activemq.broker.region.policy.SimpleDispatchPolicy.dispatch(SimpleDispatchPolicy.java:48)
>       at org.apache.activemq.broker.region.Topic.dispatch(Topic.java:717)
>       at org.apache.activemq.broker.region.Topic.doMessageSend(Topic.java:510)
>       at org.apache.activemq.broker.region.Topic.send(Topic.java:441)
>       at 
> org.apache.activemq.broker.region.AbstractRegion.send(AbstractRegion.java:419)
>       at 
> org.apache.activemq.broker.region.RegionBroker.send(RegionBroker.java:468)
>       at 
> org.apache.activemq.broker.jmx.ManagedRegionBroker.send(ManagedRegionBroker.java:297)
>       at org.apache.activemq.broker.BrokerFilter.send(BrokerFilter.java:152)
>       at 
> org.apache.activemq.broker.CompositeDestinationBroker.send(CompositeDestinationBroker.java:96)
>       at 
> org.apache.activemq.broker.TransactionBroker.send(TransactionBroker.java:307)
>       at 
> org.apache.activemq.broker.MutableBrokerFilter.send(MutableBrokerFilter.java:157)
>       at 
> org.apache.activemq.broker.TransportConnection.processMessage(TransportConnection.java:541)
>       at 
> org.apache.activemq.command.ActiveMQMessage.visit(ActiveMQMessage.java:768)
>       at 
> org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:334)
>       at 
> org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:188)
>       at 
> org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:45)
>       at 
> org.apache.activemq.transport.mqtt.MQTTInactivityMonitor.onCommand(MQTTInactivityMonitor.java:147)
>       at 
> org.apache.activemq.transport.mqtt.MQTTTransportFilter.sendToActiveMQ(MQTTTransportFilter.java:106)
>       at 
> org.apache.activemq.transport.mqtt.MQTTProtocolConverter.sendToActiveMQ(MQTTProtocolConverter.java:173)
>       at 
> org.apache.activemq.transport.mqtt.MQTTProtocolConverter.onMQTTPublish(MQTTProtocolConverter.java:445)
>       at 
> org.apache.activemq.transport.mqtt.MQTTProtocolConverter.onMQTTCommand(MQTTProtocolConverter.java:210)
>       at 
> org.apache.activemq.transport.mqtt.MQTTTransportFilter.onCommand(MQTTTransportFilter.java:94)
>       at 
> org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)
>       at 
> org.apache.activemq.transport.mqtt.MQTTCodec$1.onFrame(MQTTCodec.java:54)
>       at 
> org.apache.activemq.transport.mqtt.MQTTCodec.processCommand(MQTTCodec.java:79)
>       at 
> org.apache.activemq.transport.mqtt.MQTTCodec.access$400(MQTTCodec.java:26)
>       at 
> org.apache.activemq.transport.mqtt.MQTTCodec$4.parse(MQTTCodec.java:194)
>       at 
> org.apache.activemq.transport.mqtt.MQTTCodec$3.parse(MQTTCodec.java:160)
>       at 
> org.apache.activemq.transport.mqtt.MQTTCodec$2.parse(MQTTCodec.java:123)
>       at org.apache.activemq.transport.mqtt.MQTTCodec.parse(MQTTCodec.java:65)
>       at 
> org.apache.activemq.transport.mqtt.MQTTNIOTransport.serviceRead(MQTTNIOTransport.java:105)
>       at 
> org.apache.activemq.transport.mqtt.MQTTNIOTransport.access$000(MQTTNIOTransport.java:43)
>       at 
> org.apache.activemq.transport.mqtt.MQTTNIOTransport$1.onSelect(MQTTNIOTransport.java:66)
>       at 
> org.apache.activemq.transport.nio.SelectorSelection.onSelect(SelectorSelection.java:97)
>       at 
> org.apache.activemq.transport.nio.SelectorWorker$1.run(SelectorWorker.java:119)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>       at java.lang.Thread.run(Thread.java:744)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to