This is an automated email from the ASF dual-hosted git repository.

jbertram pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit 68c5bed1596dafde4805ebfb4a4c5eda8b7959ea
Author: Justin Bertram <[email protected]>
AuthorDate: Wed Mar 8 14:02:46 2023 -0600

    ARTEMIS-4200 configurable link-stealing for MQTT
---
 .../core/protocol/mqtt/MQTTProtocolHandler.java    | 59 ++++++++++++++++++----
 .../core/protocol/mqtt/MQTTProtocolManager.java    | 18 +++++++
 .../artemis/core/protocol/mqtt/MQTTSession.java    |  4 ++
 docs/user-manual/en/mqtt.md                        | 14 +++++
 .../mqtt/MQTTDisabledLinkStealingTest.java         | 52 +++++++++++++++++++
 .../artemis/tests/integration/mqtt5/MQTT5Test.java | 26 ++++++++++
 6 files changed, 162 insertions(+), 11 deletions(-)

diff --git 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java
 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java
index 1c26fc4fed..8ea0a5db66 100644
--- 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java
+++ 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java
@@ -254,8 +254,11 @@ public class MQTTProtocolHandler extends 
ChannelInboundHandlerAdapter {
          return;
       }
 
-      MQTTConnection existingConnection = 
session.getProtocolManager().addConnectedClient(session.getConnection().getClientID(),
 session.getConnection());
-      disconnectExistingSession(existingConnection);
+      if (handleLinkStealing() == LinkStealingResult.NEW_LINK_DENIED) {
+         return;
+      } else {
+         
protocolManager.addConnectedClient(session.getConnection().getClientID(), 
session.getConnection());
+      }
 
       if (connection.getTransportConnection().getRouter() == null || 
!protocolManager.getRoutingHandler().route(connection, session, connect)) {
          calculateKeepAlive(connect);
@@ -452,19 +455,49 @@ public class MQTTProtocolHandler extends 
ChannelInboundHandlerAdapter {
       return true;
    }
 
-   // [MQTT-3.1.4-2] If the client ID represents a client already connected to 
the server then the server MUST disconnect the existing client
-   private void disconnectExistingSession(MQTTConnection existingConnection) {
-      if (existingConnection != null) {
-         MQTTSession existingSession = 
session.getProtocolManager().getSessionState(session.getConnection().getClientID()).getSession();
-         if (existingSession != null) {
-            if (existingSession.getVersion() == MQTTVersion.MQTT_5) {
-               
existingSession.getProtocolHandler().sendDisconnect(MQTTReasonCodes.SESSION_TAKEN_OVER);
+   /* The MQTT specification states:
+    *
+    *     [MQTT-3.1.4-2] If the client ID represents a client already 
connected to the server then the server MUST
+    *     disconnect the existing client
+    *
+    * However, this behavior is configurable via the "allowLinkStealing" 
acceptor URL property.
+    */
+   private LinkStealingResult handleLinkStealing() {
+      final String clientID = session.getConnection().getClientID();
+      LinkStealingResult result;
+
+      if (protocolManager.isClientConnected(clientID)) {
+         MQTTConnection existingConnection = 
protocolManager.getConnectedClient(clientID);
+         if (protocolManager.isAllowLinkStealing()) {
+            MQTTSession existingSession = 
protocolManager.getSessionState(clientID).getSession();
+            if (existingSession != null) {
+               if (existingSession.getVersion() == MQTTVersion.MQTT_5) {
+                  
existingSession.getProtocolHandler().sendDisconnect(MQTTReasonCodes.SESSION_TAKEN_OVER);
+               }
+               existingSession.getConnectionManager().disconnect(false);
+            } else {
+               existingConnection.disconnect(false);
             }
-            existingSession.getConnectionManager().disconnect(false);
+            logger.debug("Existing MQTT session from {} closed due to incoming 
session from {} with the same client ID: {}", 
existingConnection.getRemoteAddress(), connection.getRemoteAddress(), 
session.getConnection().getClientID());
+            result = LinkStealingResult.EXISTING_LINK_STOLEN;
          } else {
-            existingConnection.disconnect(false);
+            if (session.getVersion() == MQTTVersion.MQTT_5) {
+               sendDisconnect(MQTTReasonCodes.UNSPECIFIED_ERROR);
+            }
+            logger.debug("Incoming MQTT session from {} closed due to existing 
session from {} with the same client ID: {}", connection.getRemoteAddress(), 
existingConnection.getRemoteAddress(), session.getConnection().getClientID());
+            /*
+             * Stopping the session here prevents the connection failure 
listener from inadvertently removing the
+             * existing session once the connection is disconnected.
+             */
+            session.setStopped(true);
+            connection.disconnect(false);
+            result = LinkStealingResult.NEW_LINK_DENIED;
          }
+      } else {
+         result = LinkStealingResult.NO_ACTION;
       }
+
+      return result;
    }
 
    private Pair<Boolean, String> validateUser(String username, String 
password) throws Exception {
@@ -510,4 +543,8 @@ public class MQTTProtocolHandler extends 
ChannelInboundHandlerAdapter {
       disconnect(true);
       return false;
    }
+
+   private enum LinkStealingResult {
+      EXISTING_LINK_STOLEN, NEW_LINK_DENIED, NO_ACTION;
+   }
 }
diff --git 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java
 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java
index e537fb2125..fef99a75a0 100644
--- 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java
+++ 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java
@@ -75,6 +75,8 @@ public class MQTTProtocolManager extends 
AbstractProtocolManager<MqttMessage, MQ
 
    private boolean closeMqttConnectionOnPublishAuthorizationFailure = true;
 
+   private boolean allowLinkStealing = true;
+
    private final MQTTRoutingHandler routingHandler;
 
    MQTTProtocolManager(ActiveMQServer server,
@@ -139,6 +141,14 @@ public class MQTTProtocolManager extends 
AbstractProtocolManager<MqttMessage, MQ
       this.closeMqttConnectionOnPublishAuthorizationFailure = 
closeMqttConnectionOnPublishAuthorizationFailure;
    }
 
+   public boolean isAllowLinkStealing() {
+      return allowLinkStealing;
+   }
+
+   public void setAllowLinkStealing(boolean allowLinkStealing) {
+      this.allowLinkStealing = allowLinkStealing;
+   }
+
    @Override
    public void onNotification(Notification notification) {
       if (!(notification.getType() instanceof CoreNotificationType))
@@ -348,6 +358,10 @@ public class MQTTProtocolManager extends 
AbstractProtocolManager<MqttMessage, MQ
       return false;
    }
 
+   public boolean isClientConnected(String clientId) {
+      return connectedClients.containsKey(clientId);
+   }
+
    public void removeConnectedClient(String clientId) {
       connectedClients.remove(clientId);
    }
@@ -362,6 +376,10 @@ public class MQTTProtocolManager extends 
AbstractProtocolManager<MqttMessage, MQ
       return connectedClients.put(clientId, connection);
    }
 
+   public MQTTConnection getConnectedClient(String clientId) {
+      return connectedClients.get(clientId);
+   }
+
    public MQTTSessionState getSessionState(String clientId) {
       /* [MQTT-3.1.2-4] Attach an existing session if one exists otherwise 
create a new one. */
       return sessionStates.computeIfAbsent(clientId, MQTTSessionState::new);
diff --git 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
index 5e87d7a459..f6fa85c5de 100644
--- 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
+++ 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
@@ -154,6 +154,10 @@ public class MQTTSession {
       return stopped;
    }
 
+   void setStopped(boolean stopped) {
+      this.stopped = stopped;
+   }
+
    boolean isClean() {
       return clean;
    }
diff --git a/docs/user-manual/en/mqtt.md b/docs/user-manual/en/mqtt.md
index 383c26bda5..9f7920afb6 100644
--- a/docs/user-manual/en/mqtt.md
+++ b/docs/user-manual/en/mqtt.md
@@ -185,6 +185,20 @@ SSL/TLS is also available, e.g.:
 Web browsers can then connect to `wss://<server>:8883` using a Web Socket to
 send and receive MQTT messages.
 
+## Link Stealing
+
+The MQTT specifications define a behavior often referred to as "link stealing."
+This means that whenever a new client connects with the same client ID as
+another existing client then the existing client's session will be closed and
+its network connection will be terminated.
+
+In certain use-cases this behavior is not desired so it is configurable. The
+URL parameter `allowLinkStealing` can be configured on the MQTT `acceptor` to
+modify this behavior. By default `allowLinkStealing` is `true`. If it is set to
+`false` then whenever a new client connects with the same client ID as another
+existing client then the _new_ client's session will be closed and its network
+connection will be terminated. In the case of MQTT 5 clients they will receive
+a disconnect reason code of [`0x80` (i.e. "Unspecified 
error")](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901208).
 
 ## Automatic Subscription Clean-up 
 
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTDisabledLinkStealingTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTDisabledLinkStealingTest.java
new file mode 100644
index 0000000000..c475b36429
--- /dev/null
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTDisabledLinkStealingTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.mqtt;
+
+import org.apache.activemq.artemis.tests.util.RandomUtil;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.fusesource.mqtt.client.BlockingConnection;
+import org.fusesource.mqtt.client.MQTT;
+import org.junit.Test;
+
+public class MQTTDisabledLinkStealingTest extends MQTTTestSupport {
+
+   @Test(timeout = 60 * 1000)
+   public void testDisabledLinkStealing() throws Exception {
+      final String clientId = RandomUtil.randomString();
+      MQTT mqtt = createMQTTConnection(clientId, false);
+      mqtt.setKeepAlive((short) 2);
+
+      final BlockingConnection connection1 = mqtt.blockingConnection();
+      connection1.connect();
+
+      final BlockingConnection connection2 = mqtt.blockingConnection();
+      try {
+         connection2.connect();
+         fail("Should have thrown an exception on connect due to disabled link 
stealing");
+      } catch (Exception e) {
+         // ignore expected exception
+      }
+
+      assertTrue("Client no longer connected!", Wait.waitFor(() -> 
connection1.isConnected(), 3000, 200));
+      connection1.disconnect();
+   }
+
+   @Override
+   protected void addMQTTConnector() throws Exception {
+      server.getConfiguration().addAcceptorConfiguration("MQTT", 
"tcp://localhost:" + port + 
"?protocols=MQTT;anycastPrefix=anycast:;multicastPrefix=multicast:;allowLinkStealing=false");
+   }
+}
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
index dd09a99461..b582157cfe 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
@@ -490,4 +490,30 @@ public class MQTT5Test extends MQTT5TestSupport {
       consumer.disconnect();
       consumer.close();
    }
+
+   @Test(timeout = DEFAULT_TIMEOUT)
+   public void testConnectionStealingDisabled() throws Exception {
+      setAcceptorProperty("allowLinkStealing=false");
+      final String CLIENT_ID = RandomUtil.randomString();
+
+      MqttClient client = createPahoClient(CLIENT_ID);
+      client.connect();
+
+      MqttClient client2 = createPahoClient(CLIENT_ID);
+      try {
+         client2.connect();
+         fail("Should have thrown an exception on connect due to disabled link 
stealing");
+      } catch (Exception e) {
+         // ignore expected exception
+      }
+
+      // only 1 session should exist
+      Wait.assertEquals(1, () -> getSessionStates().size(), 2000, 100);
+      assertNotNull(getSessionStates().get(CLIENT_ID));
+
+      assertTrue(client.isConnected());
+
+      client.disconnect();
+      client.close();
+   }
 }

Reply via email to