This is an automated email from the ASF dual-hosted git repository.

jbertram pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit 9b4204b345befd5074c4fcabc400d0764c9b6bd7
Author: Justin Bertram <[email protected]>
AuthorDate: Wed Mar 8 13:33:08 2023 -0600

    ARTEMIS-4201 send proper MQTT disconnect code on stolen link
---
 .../core/protocol/mqtt/MQTTProtocolHandler.java    |  2 +-
 .../tests/integration/mqtt5/MQTT5TestSupport.java  |  4 +++
 .../mqtt5/spec/controlpackets/ConnectTests.java    | 41 ++++++++++++++++++++++
 3 files changed, 46 insertions(+), 1 deletion(-)

diff --git 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java
 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java
index 5b3f7a6764..1c26fc4fed 100644
--- 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java
+++ 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java
@@ -457,7 +457,7 @@ public class MQTTProtocolHandler extends 
ChannelInboundHandlerAdapter {
       if (existingConnection != null) {
          MQTTSession existingSession = 
session.getProtocolManager().getSessionState(session.getConnection().getClientID()).getSession();
          if (existingSession != null) {
-            if (session.getVersion() == MQTTVersion.MQTT_5) {
+            if (existingSession.getVersion() == MQTTVersion.MQTT_5) {
                
existingSession.getProtocolHandler().sendDisconnect(MQTTReasonCodes.SESSION_TAKEN_OVER);
             }
             existingSession.getConnectionManager().disconnect(false);
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5TestSupport.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5TestSupport.java
index 8482f9573b..6b191e0e34 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5TestSupport.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5TestSupport.java
@@ -106,6 +106,10 @@ public class MQTT5TestSupport extends ActiveMQTestBase {
       return new MqttClient(protocol + "://localhost:" + (isUseSsl() ? 
getSslPort() : getPort()), clientId, new MemoryPersistence());
    }
 
+   protected org.eclipse.paho.client.mqttv3.MqttClient 
createPaho3_1_1Client(String clientId) throws 
org.eclipse.paho.client.mqttv3.MqttException {
+      return new org.eclipse.paho.client.mqttv3.MqttClient(protocol + 
"://localhost:" + (isUseSsl() ? getSslPort() : getPort()), clientId, new 
org.eclipse.paho.client.mqttv3.persist.MemoryPersistence());
+   }
+
    protected MqttAsyncClient createAsyncPahoClient(String clientId) throws 
MqttException {
       return new MqttAsyncClient(protocol + "://localhost:" + (isUseSsl() ? 
getSslPort() : getPort()), clientId, new MemoryPersistence());
    }
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/ConnectTests.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/ConnectTests.java
index 9464382e0b..a3b6eb3c03 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/ConnectTests.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/ConnectTests.java
@@ -738,6 +738,47 @@ public class ConnectTests extends MQTT5TestSupport {
       client2.disconnect();
    }
 
+   /*
+    * [MQTT-3.1.4-3] If the ClientID represents a Client already connected to 
the Server, the Server sends a DISCONNECT
+    * packet to the existing Client with Reason Code of 0x8E (Session taken 
over) as described in section 4.13 and MUST
+    * close the Network Connection of the existing Client.
+    */
+   @Test(timeout = DEFAULT_TIMEOUT)
+   public void testConnectionStealingBy3_1_1() throws Exception {
+      final String CLIENT_ID = RandomUtil.randomString();
+
+      MqttClient client = createPahoClient(CLIENT_ID);
+      client.connect();
+      final int[] reasonCode = new int[1];
+      CountDownLatch disconnectedLatch = new CountDownLatch(1);
+      client.setCallback(new LatchedMqttCallback(disconnectedLatch) {
+         @Override
+         public void disconnected(MqttDisconnectResponse disconnectResponse) {
+            reasonCode[0] = disconnectResponse.getReturnCode();
+            disconnectedLatch.countDown();
+         }
+
+         @Override
+         public void mqttErrorOccurred(MqttException exception) {
+            exception.printStackTrace();
+         }
+      });
+
+      org.eclipse.paho.client.mqttv3.MqttClient client2 = 
createPaho3_1_1Client(CLIENT_ID);
+      client2.connect();
+
+      assertTrue(disconnectedLatch.await(500, TimeUnit.MILLISECONDS));
+      assertEquals(MQTTReasonCodes.SESSION_TAKEN_OVER, (byte) reasonCode[0]);
+
+      // only 1 session should exist
+      assertEquals(1, getSessionStates().size());
+      assertNotNull(getSessionStates().get(CLIENT_ID));
+
+      assertFalse(client.isConnected());
+
+      client2.disconnect();
+   }
+
    /*
     * [MQTT-3.1.4-4] The Server MUST perform the processing of Clean Start.
     *

Reply via email to