This is an automated email from the ASF dual-hosted git repository.

robbie pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 3c058e98f1 ARTEMIS-3622 MQTT can deadlock on client cxn/discxn
3c058e98f1 is described below

commit 3c058e98f1e332987aff026e4026758df6ca2785
Author: Justin Bertram <[email protected]>
AuthorDate: Tue Apr 30 08:08:37 2024 -0500

    ARTEMIS-3622 MQTT can deadlock on client cxn/discxn
    
    This commit fixes the deadlock described on ARTEMIS-3622 by moving the
    synchronization "up" a level from the MQTTSession to the
    MQTTConnectionManager. It also eliminates the synchronization on the
    MQTTSessionState in the MQTTConnectionManager because it's no longer
    needed. This change should not only eliminate the deadlock, but improve
    performance relatively as well.
    
    There is no test associated with this commit as I wasn't able to
    reproduce the deadlock with any kind of straight-forward test. There was
    a test linked on the Jira, but it involved intrusive and fragile
    scaffolding and wasn't ultimately tenable. That said, I did test this
    fix with that test and it was successful. In any case, I think static
    analysis should be sufficient here as the changes are pretty
    straight-forward.
---
 .../core/protocol/mqtt/MQTTConnectionManager.java  | 142 ++++++++++-----------
 .../artemis/core/protocol/mqtt/MQTTSession.java    |  13 +-
 2 files changed, 77 insertions(+), 78 deletions(-)

diff --git 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java
 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java
index eecc76166d..cedd8153c5 100644
--- 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java
+++ 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java
@@ -51,7 +51,7 @@ public class MQTTConnectionManager {
       session.getConnection().addFailureListener(failureListener);
    }
 
-   void connect(MqttConnectMessage connect, String validatedUser, String 
username, String password) throws Exception {
+   synchronized void connect(MqttConnectMessage connect, String validatedUser, 
String username, String password) throws Exception {
       if (session.getVersion() == MQTTVersion.MQTT_5) {
          
session.getConnection().setProtocolVersion(Byte.toString(MqttVersion.MQTT_5.protocolLevel()));
          String authenticationMethod = MQTTUtil.getProperty(String.class, 
connect.variableHeader().properties(), AUTHENTICATION_METHOD);
@@ -68,67 +68,65 @@ public class MQTTConnectionManager {
 
       String clientId = session.getConnection().getClientID();
       boolean sessionPresent = 
session.getStateManager().getSessionStates().containsKey(clientId);
-      MQTTSessionState sessionState = getSessionState(clientId);
-      synchronized (sessionState) {
-         session.setSessionState(sessionState);
-         sessionState.setFailed(false);
-         ServerSessionImpl serverSession = createServerSession(username, 
password, validatedUser);
-         serverSession.start();
-         ServerSessionImpl internalServerSession = 
createServerSession(username, password, validatedUser);
-         internalServerSession.disableSecurity();
-         internalServerSession.start();
-         session.setServerSession(serverSession, internalServerSession);
-
-         if (cleanStart) {
-            /* [MQTT-3.1.2-6] If CleanSession is set to 1, the Client and 
Server MUST discard any previous Session and
-             * start a new one. This Session lasts as long as the Network 
Connection. State data associated with this Session
-             * MUST NOT be reused in any subsequent Session */
-            session.clean(true);
-            session.setClean(true);
-         }
+      MQTTSessionState sessionState = 
session.getStateManager().getSessionState(clientId);
+      session.setSessionState(sessionState);
+      sessionState.setFailed(false);
+      ServerSessionImpl serverSession = createServerSession(username, 
password, validatedUser);
+      serverSession.start();
+      ServerSessionImpl internalServerSession = createServerSession(username, 
password, validatedUser);
+      internalServerSession.disableSecurity();
+      internalServerSession.start();
+      session.setServerSession(serverSession, internalServerSession);
+
+      if (cleanStart) {
+         /* [MQTT-3.1.2-6] If CleanSession is set to 1, the Client and Server 
MUST discard any previous Session and
+          * start a new one. This Session lasts as long as the Network 
Connection. State data associated with this Session
+          * MUST NOT be reused in any subsequent Session */
+         session.clean(true);
+         session.setClean(true);
+      }
 
-         if (connect.variableHeader().isWillFlag()) {
-            session.getState().setWill(true);
-            byte[] willMessage = connect.payload().willMessageInBytes();
-            
session.getState().setWillMessage(ByteBufAllocator.DEFAULT.buffer(willMessage.length).writeBytes(willMessage));
-            
session.getState().setWillQoSLevel(connect.variableHeader().willQos());
-            
session.getState().setWillRetain(connect.variableHeader().isWillRetain());
-            session.getState().setWillTopic(connect.payload().willTopic());
-
-            if (session.getVersion() == MQTTVersion.MQTT_5) {
-               MqttProperties willProperties = 
connect.payload().willProperties();
-               if (willProperties != null) {
-                  MqttProperties.MqttProperty willDelayInterval = 
willProperties.getProperty(WILL_DELAY_INTERVAL.value());
-                  if (willDelayInterval != null) {
-                     session.getState().setWillDelayInterval(( int) 
willDelayInterval.value());
-                  }
-                  List<? extends MqttProperties.MqttProperty> userProperties = 
willProperties.getProperties(MqttProperties.MqttPropertyType.USER_PROPERTY.value());
-                  if (userProperties != null) {
-                     session.getState().setWillUserProperties(userProperties);
-                  }
+      if (connect.variableHeader().isWillFlag()) {
+         session.getState().setWill(true);
+         byte[] willMessage = connect.payload().willMessageInBytes();
+         
session.getState().setWillMessage(ByteBufAllocator.DEFAULT.buffer(willMessage.length).writeBytes(willMessage));
+         
session.getState().setWillQoSLevel(connect.variableHeader().willQos());
+         
session.getState().setWillRetain(connect.variableHeader().isWillRetain());
+         session.getState().setWillTopic(connect.payload().willTopic());
+
+         if (session.getVersion() == MQTTVersion.MQTT_5) {
+            MqttProperties willProperties = connect.payload().willProperties();
+            if (willProperties != null) {
+               MqttProperties.MqttProperty willDelayInterval = 
willProperties.getProperty(WILL_DELAY_INTERVAL.value());
+               if (willDelayInterval != null) {
+                  session.getState().setWillDelayInterval(( int) 
willDelayInterval.value());
+               }
+               List<? extends MqttProperties.MqttProperty> userProperties = 
willProperties.getProperties(MqttProperties.MqttPropertyType.USER_PROPERTY.value());
+               if (userProperties != null) {
+                  session.getState().setWillUserProperties(userProperties);
                }
             }
          }
+      }
 
-         MqttProperties connackProperties;
-         if (session.getVersion() == MQTTVersion.MQTT_5) {
-            
session.getConnection().setReceiveMaximum(MQTTUtil.getProperty(Integer.class, 
connect.variableHeader().properties(), RECEIVE_MAXIMUM, -1));
-
-            
sessionState.setClientSessionExpiryInterval(MQTTUtil.getProperty(Integer.class, 
connect.variableHeader().properties(), SESSION_EXPIRY_INTERVAL, 0));
-            
sessionState.setClientMaxPacketSize(MQTTUtil.getProperty(Integer.class, 
connect.variableHeader().properties(), MAXIMUM_PACKET_SIZE, 0));
-            
sessionState.setClientTopicAliasMaximum(MQTTUtil.getProperty(Integer.class, 
connect.variableHeader().properties(), TOPIC_ALIAS_MAXIMUM));
+      MqttProperties connackProperties;
+      if (session.getVersion() == MQTTVersion.MQTT_5) {
+         
session.getConnection().setReceiveMaximum(MQTTUtil.getProperty(Integer.class, 
connect.variableHeader().properties(), RECEIVE_MAXIMUM, -1));
 
-            connackProperties = getConnackProperties();
-         } else {
-            
sessionState.setClientSessionExpiryInterval(session.getProtocolManager().getDefaultMqttSessionExpiryInterval());
-            connackProperties = MqttProperties.NO_PROPERTIES;
-         }
+         
sessionState.setClientSessionExpiryInterval(MQTTUtil.getProperty(Integer.class, 
connect.variableHeader().properties(), SESSION_EXPIRY_INTERVAL, 0));
+         
sessionState.setClientMaxPacketSize(MQTTUtil.getProperty(Integer.class, 
connect.variableHeader().properties(), MAXIMUM_PACKET_SIZE, 0));
+         
sessionState.setClientTopicAliasMaximum(MQTTUtil.getProperty(Integer.class, 
connect.variableHeader().properties(), TOPIC_ALIAS_MAXIMUM));
 
-         session.getConnection().setConnected(true);
-         session.getProtocolHandler().sendConnack(MQTTReasonCodes.SUCCESS, 
sessionPresent && !cleanStart, connackProperties);
-         // ensure we don't publish before the CONNACK
-         session.start();
+         connackProperties = getConnackProperties();
+      } else {
+         
sessionState.setClientSessionExpiryInterval(session.getProtocolManager().getDefaultMqttSessionExpiryInterval());
+         connackProperties = MqttProperties.NO_PROPERTIES;
       }
+
+      session.getConnection().setConnected(true);
+      session.getProtocolHandler().sendConnack(MQTTReasonCodes.SUCCESS, 
sessionPresent && !cleanStart, connackProperties);
+      // ensure we don't publish before the CONNACK
+      session.start();
    }
 
    private MqttProperties getConnackProperties() {
@@ -176,33 +174,27 @@ public class MQTTConnectionManager {
       return (ServerSessionImpl) serverSession;
    }
 
-   void disconnect(boolean failure) {
+   synchronized void disconnect(boolean failure) {
       if (session == null || session.getStopped()) {
          return;
       }
 
-      synchronized (session.getState()) {
-         try {
-            session.stop(failure);
-            session.getConnection().destroy();
-         } catch (Exception e) {
-            MQTTLogger.LOGGER.errorDisconnectingClient(e);
-         } finally {
-            if (session.getState() != null) {
-               String clientId = session.getState().getClientId();
-               /**
-                *  ensure that the connection for the client ID matches *this* 
connection otherwise we could remove the
-                *  entry for the client who "stole" this client ID via 
[MQTT-3.1.4-2]
-                */
-               if (clientId != null && 
session.getStateManager().isClientConnected(clientId, session.getConnection())) 
{
-                  session.getStateManager().removeConnectedClient(clientId);
-               }
+      try {
+         session.stop(failure);
+         session.getConnection().destroy();
+      } catch (Exception e) {
+         MQTTLogger.LOGGER.errorDisconnectingClient(e);
+      } finally {
+         if (session.getState() != null) {
+            String clientId = session.getState().getClientId();
+            /**
+             *  ensure that the connection for the client ID matches *this* 
connection otherwise we could remove the
+             *  entry for the client who "stole" this client ID via 
[MQTT-3.1.4-2]
+             */
+            if (clientId != null && 
session.getStateManager().isClientConnected(clientId, session.getConnection())) 
{
+               session.getStateManager().removeConnectedClient(clientId);
             }
          }
       }
    }
-
-   private synchronized MQTTSessionState getSessionState(String clientId) 
throws Exception {
-      return session.getStateManager().getSessionState(clientId);
-   }
 }
diff --git 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
index ba70c45ccb..e91cadeec8 100644
--- 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
+++ 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
@@ -98,14 +98,21 @@ public class MQTTSession {
       logger.debug("MQTT session created: {}", id);
    }
 
-   // Called after the client has Connected.
-   synchronized void start() throws Exception {
+   /*
+    * This method is only called by MQTTConnectionManager.connect
+    * which is synchronized with MQTTConnectionManager.disconnect
+    */
+   void start() throws Exception {
       mqttPublishManager.start();
       subscriptionManager.start();
       stopped = false;
    }
 
-   synchronized void stop(boolean failure) throws Exception {
+   /*
+    * This method is only called by MQTTConnectionManager.disconnect
+    * which is synchronized with MQTTConnectionManager.connect
+    */
+   void stop(boolean failure) throws Exception {
       state.setFailed(failure);
 
       if (!stopped) {

Reply via email to