GitHub user onlyMIT opened a pull request:

    https://github.com/apache/activemq-artemis/pull/2466

    NO-JIRA The MQTT consumer reconnection caused the queue to not be cle…

    ### Test environment
    
    1. Use 10,000 (9 thousand senders, 1 thousand consumers) MQTT connection on 
one server to test Artemis, set the 'cleanSession' property to true;
    2. MQTT client: paho 1.2.0;
    3. Server: CPU Cores:32, Mem:64G, SSD:250G, HDD:1T
    
    ### Issue
    
    **Issue 1**
    Artemis broker has the following exception log:
    `[Thread-0 
(ActiveMQ-remoting-threads-ActiveMQServerImpl::serverUUID=fb358579-feb3-11e8-bc7c-141877a7fd13-1409545055)]
 17:27:59,035 WARN  [org.apache.activemq.artemis.utils.actors.OrderedExecutor] 
null: java.lang.NullPointerException
        at 
org.apache.activemq.artemis.core.protocol.mqtt.MQTTProtocolManager.isClientConnected(MQTTProtocolManager.java:182)
 [:]
        at 
org.apache.activemq.artemis.core.protocol.mqtt.MQTTConnectionManager.disconnect(MQTTConnectionManager.java:150)
 [:]
        at 
org.apache.activemq.artemis.core.protocol.mqtt.MQTTFailureListener.connectionFailed(MQTTFailureListener.java:37)
 [:]
        at 
org.apache.activemq.artemis.core.protocol.mqtt.MQTTConnection.fail(MQTTConnection.java:147)
 [:]
        at 
org.apache.activemq.artemis.core.remoting.server.impl.RemotingServiceImpl.issueFailure(RemotingServiceImpl.java:561)
 [:]
        at 
org.apache.activemq.artemis.core.remoting.server.impl.RemotingServiceImpl.connectionDestroyed(RemotingServiceImpl.java:542)
 [:]
        at 
org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptor$Listener.connectionDestroyed(NettyAcceptor.java:858)
 [:]
        at 
org.apache.activemq.artemis.core.remoting.impl.netty.ActiveMQChannelHandler.lambda$channelInactive$0(ActiveMQChannelHandler.java:83)
 [:]
        at 
org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:42)
 [:]
        at 
org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:31)
 [:]
        at 
org.apache.activemq.artemis.utils.actors.ProcessorBase.executePendingTasks(ProcessorBase.java:66)
 [:]
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
[rt.jar:1.8.0_101]
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
[rt.jar:1.8.0_101]
    at 
org.apache.activemq.artemis.utils.ActiveMQThreadFactory$1.run(ActiveMQThreadFactory.java:118)
 [:]`
    
    **Issue 2**
    After closing all client connections, 64 queues were not cleaned up。
    
    ### Analysis and simulation reproduction
    
      When the MQTT consumer client (cleanSession property set to true) 
reconnected,There is a certain probability that the queue will not be 
automatically cleared and a NullPointerException will be thrown.
      This is because the MQTT consumer client thinks that its connection has 
been disconnected and triggers reconnection, but the MQTT connection is still 
alive at Artemis broker. This bug occurs when the Artemis broker to start 
processing a ‘new MQTT connection’ while closing the ‘old MQTT 
connection’.
      Create an MQTT consumer (cleanSession: true, clientID: superConsumer, 
topic: mit.test) and connect to the Artemis broker. Create another MQTT 
consumer to set the same cleanSession, clientID, and topic, then start 
connecting with the Artemis broker. Close the two MQTT connections, and so many 
times after repeated trials, there is a probability to reproduce the two 
problems mentioned above.
    
    ### Solution
    
    **Issue 1**
    
      When 'session.getProtocolManager().isClientConnected(clientId, 
session.getConnection())' is called, if the 'MQTTConnection' instance retrieved 
from 'connectedClients' is 'null', a NullPointerException is thrown. Add a 
non-null decision in the 'MQTTProtocolManager.isClientConnected' method.
    
    **Issue 2** 
    
    1. Remove ‘InterruptedException’ from the 
‘MQTTConnectionManager.getSessionState’ method because the 
‘InterruptedException’ exception will never be thrown in this method;
    2. 'MQTTConnectionManager.connect' and 'MQTTConnectionManager.disconnect' 
methods add 'synchronized' with the MQTTSessionState instance as a lock.In the 
Artemis broker, all MQTT connections using the same clientId share the same 
MQTTSessionState instance. After adding this lock, you can avoid calling the 
'connect' and 'disconnect' methods on the MQTT connections with the same 
clientId.
    3. For the MQTT protocol, there is one and only one consumer connection per 
queue, which is a good choice for closing the old MQTT consumer before the new 
MQTT consumer connects.
        The original code could not effectively clean up the 'old consumer' in 
the queue when the 'new MQTT connection' was connected to the Artemis broker. 
Modify ‘MQTTSubscriptionManager.removeSubscription’ to get the queue 
consumer collection from the ‘QueueImpl’ instance and close them.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/onlyMIT/activemq-artemis onlyMIT-artemis-2.7.0

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/activemq-artemis/pull/2466.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2466
    
----
commit 8140f2e9d094ea32c272ed763c3827a73e12e420
Author: onlyMIT <liangshipin_g@...>
Date:   2018-12-17T07:53:33Z

    NO-JIRA The MQTT consumer reconnection caused the queue to not be cleared, 
and caused Artemis broker to throw a NullPointerException.
    
    When the MQTT consumer client (cleanSession property set to true) 
reconnected, there are certain probabilities that these two bugs will occur.
    This is because the MQTT consumer client thinks that its connection has 
been disconnected and triggers reconnection, but the MQTT connection is still 
alive at Artemis broker. This bug occurs when new and old connections occur 
while operating the same queue for unsafe behavior.

----


---

Reply via email to