This is an automated email from the ASF dual-hosted git repository. jbertram pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
commit 68c5bed1596dafde4805ebfb4a4c5eda8b7959ea Author: Justin Bertram <[email protected]> AuthorDate: Wed Mar 8 14:02:46 2023 -0600 ARTEMIS-4200 configurable link-stealing for MQTT --- .../core/protocol/mqtt/MQTTProtocolHandler.java | 59 ++++++++++++++++++---- .../core/protocol/mqtt/MQTTProtocolManager.java | 18 +++++++ .../artemis/core/protocol/mqtt/MQTTSession.java | 4 ++ docs/user-manual/en/mqtt.md | 14 +++++ .../mqtt/MQTTDisabledLinkStealingTest.java | 52 +++++++++++++++++++ .../artemis/tests/integration/mqtt5/MQTT5Test.java | 26 ++++++++++ 6 files changed, 162 insertions(+), 11 deletions(-) diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java index 1c26fc4fed..8ea0a5db66 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java @@ -254,8 +254,11 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter { return; } - MQTTConnection existingConnection = session.getProtocolManager().addConnectedClient(session.getConnection().getClientID(), session.getConnection()); - disconnectExistingSession(existingConnection); + if (handleLinkStealing() == LinkStealingResult.NEW_LINK_DENIED) { + return; + } else { + protocolManager.addConnectedClient(session.getConnection().getClientID(), session.getConnection()); + } if (connection.getTransportConnection().getRouter() == null || !protocolManager.getRoutingHandler().route(connection, session, connect)) { calculateKeepAlive(connect); @@ -452,19 +455,49 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter { return true; } - // [MQTT-3.1.4-2] If the client ID represents a client already connected to the server then the server MUST disconnect the existing client - private void disconnectExistingSession(MQTTConnection existingConnection) { - if (existingConnection != null) { - MQTTSession existingSession = session.getProtocolManager().getSessionState(session.getConnection().getClientID()).getSession(); - if (existingSession != null) { - if (existingSession.getVersion() == MQTTVersion.MQTT_5) { - existingSession.getProtocolHandler().sendDisconnect(MQTTReasonCodes.SESSION_TAKEN_OVER); + /* The MQTT specification states: + * + * [MQTT-3.1.4-2] If the client ID represents a client already connected to the server then the server MUST + * disconnect the existing client + * + * However, this behavior is configurable via the "allowLinkStealing" acceptor URL property. + */ + private LinkStealingResult handleLinkStealing() { + final String clientID = session.getConnection().getClientID(); + LinkStealingResult result; + + if (protocolManager.isClientConnected(clientID)) { + MQTTConnection existingConnection = protocolManager.getConnectedClient(clientID); + if (protocolManager.isAllowLinkStealing()) { + MQTTSession existingSession = protocolManager.getSessionState(clientID).getSession(); + if (existingSession != null) { + if (existingSession.getVersion() == MQTTVersion.MQTT_5) { + existingSession.getProtocolHandler().sendDisconnect(MQTTReasonCodes.SESSION_TAKEN_OVER); + } + existingSession.getConnectionManager().disconnect(false); + } else { + existingConnection.disconnect(false); } - existingSession.getConnectionManager().disconnect(false); + logger.debug("Existing MQTT session from {} closed due to incoming session from {} with the same client ID: {}", existingConnection.getRemoteAddress(), connection.getRemoteAddress(), session.getConnection().getClientID()); + result = LinkStealingResult.EXISTING_LINK_STOLEN; } else { - existingConnection.disconnect(false); + if (session.getVersion() == MQTTVersion.MQTT_5) { + sendDisconnect(MQTTReasonCodes.UNSPECIFIED_ERROR); + } + logger.debug("Incoming MQTT session from {} closed due to existing session from {} with the same client ID: {}", connection.getRemoteAddress(), existingConnection.getRemoteAddress(), session.getConnection().getClientID()); + /* + * Stopping the session here prevents the connection failure listener from inadvertently removing the + * existing session once the connection is disconnected. + */ + session.setStopped(true); + connection.disconnect(false); + result = LinkStealingResult.NEW_LINK_DENIED; } + } else { + result = LinkStealingResult.NO_ACTION; } + + return result; } private Pair<Boolean, String> validateUser(String username, String password) throws Exception { @@ -510,4 +543,8 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter { disconnect(true); return false; } + + private enum LinkStealingResult { + EXISTING_LINK_STOLEN, NEW_LINK_DENIED, NO_ACTION; + } } diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java index e537fb2125..fef99a75a0 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java @@ -75,6 +75,8 @@ public class MQTTProtocolManager extends AbstractProtocolManager<MqttMessage, MQ private boolean closeMqttConnectionOnPublishAuthorizationFailure = true; + private boolean allowLinkStealing = true; + private final MQTTRoutingHandler routingHandler; MQTTProtocolManager(ActiveMQServer server, @@ -139,6 +141,14 @@ public class MQTTProtocolManager extends AbstractProtocolManager<MqttMessage, MQ this.closeMqttConnectionOnPublishAuthorizationFailure = closeMqttConnectionOnPublishAuthorizationFailure; } + public boolean isAllowLinkStealing() { + return allowLinkStealing; + } + + public void setAllowLinkStealing(boolean allowLinkStealing) { + this.allowLinkStealing = allowLinkStealing; + } + @Override public void onNotification(Notification notification) { if (!(notification.getType() instanceof CoreNotificationType)) @@ -348,6 +358,10 @@ public class MQTTProtocolManager extends AbstractProtocolManager<MqttMessage, MQ return false; } + public boolean isClientConnected(String clientId) { + return connectedClients.containsKey(clientId); + } + public void removeConnectedClient(String clientId) { connectedClients.remove(clientId); } @@ -362,6 +376,10 @@ public class MQTTProtocolManager extends AbstractProtocolManager<MqttMessage, MQ return connectedClients.put(clientId, connection); } + public MQTTConnection getConnectedClient(String clientId) { + return connectedClients.get(clientId); + } + public MQTTSessionState getSessionState(String clientId) { /* [MQTT-3.1.2-4] Attach an existing session if one exists otherwise create a new one. */ return sessionStates.computeIfAbsent(clientId, MQTTSessionState::new); diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java index 5e87d7a459..f6fa85c5de 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java @@ -154,6 +154,10 @@ public class MQTTSession { return stopped; } + void setStopped(boolean stopped) { + this.stopped = stopped; + } + boolean isClean() { return clean; } diff --git a/docs/user-manual/en/mqtt.md b/docs/user-manual/en/mqtt.md index 383c26bda5..9f7920afb6 100644 --- a/docs/user-manual/en/mqtt.md +++ b/docs/user-manual/en/mqtt.md @@ -185,6 +185,20 @@ SSL/TLS is also available, e.g.: Web browsers can then connect to `wss://<server>:8883` using a Web Socket to send and receive MQTT messages. +## Link Stealing + +The MQTT specifications define a behavior often referred to as "link stealing." +This means that whenever a new client connects with the same client ID as +another existing client then the existing client's session will be closed and +its network connection will be terminated. + +In certain use-cases this behavior is not desired so it is configurable. The +URL parameter `allowLinkStealing` can be configured on the MQTT `acceptor` to +modify this behavior. By default `allowLinkStealing` is `true`. If it is set to +`false` then whenever a new client connects with the same client ID as another +existing client then the _new_ client's session will be closed and its network +connection will be terminated. In the case of MQTT 5 clients they will receive +a disconnect reason code of [`0x80` (i.e. "Unspecified error")](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901208). ## Automatic Subscription Clean-up diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTDisabledLinkStealingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTDisabledLinkStealingTest.java new file mode 100644 index 0000000000..c475b36429 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTDisabledLinkStealingTest.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.mqtt; + +import org.apache.activemq.artemis.tests.util.RandomUtil; +import org.apache.activemq.artemis.tests.util.Wait; +import org.fusesource.mqtt.client.BlockingConnection; +import org.fusesource.mqtt.client.MQTT; +import org.junit.Test; + +public class MQTTDisabledLinkStealingTest extends MQTTTestSupport { + + @Test(timeout = 60 * 1000) + public void testDisabledLinkStealing() throws Exception { + final String clientId = RandomUtil.randomString(); + MQTT mqtt = createMQTTConnection(clientId, false); + mqtt.setKeepAlive((short) 2); + + final BlockingConnection connection1 = mqtt.blockingConnection(); + connection1.connect(); + + final BlockingConnection connection2 = mqtt.blockingConnection(); + try { + connection2.connect(); + fail("Should have thrown an exception on connect due to disabled link stealing"); + } catch (Exception e) { + // ignore expected exception + } + + assertTrue("Client no longer connected!", Wait.waitFor(() -> connection1.isConnected(), 3000, 200)); + connection1.disconnect(); + } + + @Override + protected void addMQTTConnector() throws Exception { + server.getConfiguration().addAcceptorConfiguration("MQTT", "tcp://localhost:" + port + "?protocols=MQTT;anycastPrefix=anycast:;multicastPrefix=multicast:;allowLinkStealing=false"); + } +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java index dd09a99461..b582157cfe 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java @@ -490,4 +490,30 @@ public class MQTT5Test extends MQTT5TestSupport { consumer.disconnect(); consumer.close(); } + + @Test(timeout = DEFAULT_TIMEOUT) + public void testConnectionStealingDisabled() throws Exception { + setAcceptorProperty("allowLinkStealing=false"); + final String CLIENT_ID = RandomUtil.randomString(); + + MqttClient client = createPahoClient(CLIENT_ID); + client.connect(); + + MqttClient client2 = createPahoClient(CLIENT_ID); + try { + client2.connect(); + fail("Should have thrown an exception on connect due to disabled link stealing"); + } catch (Exception e) { + // ignore expected exception + } + + // only 1 session should exist + Wait.assertEquals(1, () -> getSessionStates().size(), 2000, 100); + assertNotNull(getSessionStates().get(CLIENT_ID)); + + assertTrue(client.isConnected()); + + client.disconnect(); + client.close(); + } }
