ARTEMIS-952 Remove MQTT Queues on Clean Session

Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/b2e250d4
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/b2e250d4
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/b2e250d4

Branch: refs/heads/master
Commit: b2e250d4254f5d560ddc7fccb4e955e691174fbe
Parents: 20737cb
Author: Martyn Taylor <[email protected]>
Authored: Tue Feb 7 16:51:53 2017 +0000
Committer: Martyn Taylor <[email protected]>
Committed: Thu Feb 9 10:43:03 2017 +0000

----------------------------------------------------------------------
 .../protocol/mqtt/MQTTConnectionManager.java    | 52 +++++++++++---------
 .../core/protocol/mqtt/MQTTPublishManager.java  | 14 ++++--
 .../artemis/core/protocol/mqtt/MQTTSession.java | 28 +++++++++--
 .../core/protocol/mqtt/MQTTSessionState.java    | 46 ++++++++---------
 .../protocol/mqtt/MQTTSubscriptionManager.java  | 28 ++++++-----
 .../integration/mqtt/imported/MQTTTest.java     |  1 +
 .../mqtt/imported/MQTTTestSupport.java          |  5 ++
 7 files changed, 108 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b2e250d4/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java
----------------------------------------------------------------------
diff --git 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java
 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java
index a4690e7..c623f3b 100644
--- 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java
+++ 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java
@@ -67,12 +67,12 @@ public class MQTTConnectionManager {
          return;
       }
 
-      session.setSessionState(getSessionState(clientId, cleanSession));
-
+      session.setSessionState(getSessionState(clientId));
       ServerSessionImpl serverSession = createServerSession(username, 
password);
       serverSession.start();
 
       session.setServerSession(serverSession);
+      session.setIsClean(cleanSession);
 
       if (will) {
          ServerMessage w = MQTTUtil.createServerMessageFromString(session, 
willMessage, willTopic, willQosLevel, willRetain);
@@ -96,8 +96,20 @@ public class MQTTConnectionManager {
       String id = UUIDGenerator.getInstance().generateStringUUID();
       ActiveMQServer server = session.getServer();
 
-      ServerSession serverSession = server.createSession(id, username, 
password, ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, 
session.getConnection(), MQTTUtil.SESSION_AUTO_COMMIT_SENDS, 
MQTTUtil.SESSION_AUTO_COMMIT_ACKS, MQTTUtil.SESSION_PREACKNOWLEDGE, 
MQTTUtil.SESSION_XA, null,
-                                                         
session.getSessionCallback(), MQTTUtil.SESSION_AUTO_CREATE_QUEUE, 
server.newOperationContext(), session.getProtocolManager().getPrefixes());
+      ServerSession serverSession = server.createSession(id,
+                                                         username,
+                                                         password,
+                                                         
ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+                                                         
session.getConnection(),
+                                                         
MQTTUtil.SESSION_AUTO_COMMIT_SENDS,
+                                                         
MQTTUtil.SESSION_AUTO_COMMIT_ACKS,
+                                                         
MQTTUtil.SESSION_PREACKNOWLEDGE,
+                                                         MQTTUtil.SESSION_XA,
+                                                         null,
+                                                         
session.getSessionCallback(),
+                                                         
MQTTUtil.SESSION_AUTO_CREATE_QUEUE,
+                                                         
server.newOperationContext(),
+                                                         
session.getProtocolManager().getPrefixes());
       return (ServerSessionImpl) serverSession;
    }
 
@@ -131,29 +143,25 @@ public class MQTTConnectionManager {
       session.getSessionState().deleteWillMessage();
    }
 
-   private MQTTSessionState getSessionState(String clientId, boolean 
cleanSession) throws InterruptedException {
+   private MQTTSessionState getSessionState(String clientId) throws 
InterruptedException {
       synchronized (MQTTSession.SESSIONS) {
          /* [MQTT-3.1.2-6] If CleanSession is set to 1, the Client and Server 
MUST discard any previous Session and
           * start a new one  This Session lasts as long as the Network 
Connection. State data associated with this Session
           * MUST NOT be reused in any subsequent Session */
-         if (cleanSession) {
-            MQTTSession.SESSIONS.remove(clientId);
-            return new MQTTSessionState(clientId);
-         } else {
-            /* [MQTT-3.1.2-4] Attach an existing session if one exists (if 
cleanSession flag is false) otherwise create
-            a new one. */
-            MQTTSessionState state = MQTTSession.SESSIONS.get(clientId);
-            if (state != null) {
-               // TODO Add a count down latch for handling wait during 
attached session state.
-               while (state.getAttached()) {
-                  Thread.sleep(1000);
-               }
-               return state;
-            } else {
-               state = new MQTTSessionState(clientId);
-               MQTTSession.SESSIONS.put(clientId, state);
-               return state;
+
+         /* [MQTT-3.1.2-4] Attach an existing session if one exists (if 
cleanSession flag is false) otherwise create
+         a new one. */
+         MQTTSessionState state = MQTTSession.SESSIONS.get(clientId);
+         if (state != null) {
+            // TODO Add a count down latch for handling wait during attached 
session state.
+            while (state.getAttached()) {
+               Thread.sleep(1000);
             }
+            return state;
+         } else {
+            state = new MQTTSessionState(clientId);
+            MQTTSession.SESSIONS.put(clientId, state);
+            return state;
          }
       }
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b2e250d4/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
----------------------------------------------------------------------
diff --git 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
index e6b3345..2e5a1e9 100644
--- 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
+++ 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
@@ -67,13 +67,19 @@ public class MQTTPublishManager {
       createManagementConsumer();
    }
 
-   synchronized void stop(boolean clean) throws Exception {
+   synchronized void stop() throws Exception {
       if (managementConsumer != null) {
          managementConsumer.removeItself();
          managementConsumer.setStarted(false);
          managementConsumer.close(false);
-         if (clean)
-            session.getServer().destroyQueue(managementAddress);
+      }
+   }
+
+   void clean() throws Exception {
+      createManagementAddress();
+      Queue queue = session.getServer().locateQueue(managementAddress);
+      if (queue != null) {
+         queue.deleteQueue();
       }
    }
 
@@ -84,7 +90,7 @@ public class MQTTPublishManager {
    }
 
    private void createManagementAddress() {
-      managementAddress = new SimpleString(MANAGEMENT_QUEUE_PREFIX + 
state.getClientId());
+      managementAddress = new SimpleString(MANAGEMENT_QUEUE_PREFIX + 
session.getSessionState().getClientId());
    }
 
    private void createManagementQueue() throws Exception {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b2e250d4/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
----------------------------------------------------------------------
diff --git 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
index d4fd7bf..cf0b4e6 100644
--- 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
+++ 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
@@ -55,6 +55,8 @@ public class MQTTSession {
 
    private MQTTProtocolManager protocolManager;
 
+   private boolean isClean;
+
    public MQTTSession(MQTTProtocolHandler protocolHandler,
                       MQTTConnection connection,
                       MQTTProtocolManager protocolManager) throws Exception {
@@ -83,9 +85,8 @@ public class MQTTSession {
    synchronized void stop() throws Exception {
       if (!stopped) {
          protocolHandler.stop(false);
-         // TODO this should pass in clean session.
-         subscriptionManager.stop(false);
-         mqttPublishManager.stop(false);
+         subscriptionManager.stop();
+         mqttPublishManager.stop();
 
          if (serverSession != null) {
             serverSession.stop();
@@ -95,6 +96,10 @@ public class MQTTSession {
          if (state != null) {
             state.setAttached(false);
          }
+
+         if (isClean()) {
+            clean();
+         }
       }
       stopped = true;
    }
@@ -103,6 +108,17 @@ public class MQTTSession {
       return stopped;
    }
 
+   boolean isClean() {
+      return isClean;
+   }
+
+   void setIsClean(boolean isClean) throws Exception {
+      this.isClean = isClean;
+      if (isClean) {
+         clean();
+      }
+   }
+
    MQTTPublishManager getMqttPublishManager() {
       return mqttPublishManager;
    }
@@ -159,4 +175,10 @@ public class MQTTSession {
    MQTTProtocolManager getProtocolManager() {
       return protocolManager;
    }
+
+   void clean() throws Exception {
+      subscriptionManager.clean();
+      mqttPublishManager.clean();
+      state.clear();
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b2e250d4/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
----------------------------------------------------------------------
diff --git 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
index 194fe5e..9e18bc5 100644
--- 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
+++ 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
@@ -39,39 +39,27 @@ public class MQTTSessionState {
    private final ConcurrentMap<String, MqttTopicSubscription> subscriptions = 
new ConcurrentHashMap<>();
 
    // Used to store Packet ID of Publish QoS1 and QoS2 message.  See spec: 
4.3.3 QoS 2: Exactly once delivery.  Method B.
-   private Map<Integer, MQTTMessageInfo> messageRefStore;
+   private final Map<Integer, MQTTMessageInfo> messageRefStore = new 
ConcurrentHashMap<>();
 
-   private ConcurrentMap<String, Map<Long, Integer>> addressMessageMap;
+   private final ConcurrentMap<String, Map<Long, Integer>> addressMessageMap = 
new ConcurrentHashMap<>();
 
-   private Set<Integer> pubRec;
-
-   private Set<Integer> pub;
+   private final Set<Integer>  pubRec = new HashSet<>();
 
    private boolean attached = false;
 
-   // Objects track the Outbound message references
-   private Map<Integer, Pair<String, Long>> outboundMessageReferenceStore;
-
-   private ConcurrentMap<String, ConcurrentMap<Long, Integer>> 
reverseOutboundReferenceStore;
-
-   private final Object outboundLock = new Object();
-
-   // FIXME We should use a better mechanism for creating packet IDs.
-   private AtomicInteger lastId = new AtomicInteger(0);
-
    private final OutboundStore outboundStore = new OutboundStore();
 
    public MQTTSessionState(String clientId) {
       this.clientId = clientId;
+   }
 
-      pubRec = new HashSet<>();
-      pub = new HashSet<>();
-
-      outboundMessageReferenceStore = new ConcurrentHashMap<>();
-      reverseOutboundReferenceStore = new ConcurrentHashMap<>();
-
-      messageRefStore = new ConcurrentHashMap<>();
-      addressMessageMap = new ConcurrentHashMap<>();
+   public synchronized void clear() {
+      subscriptions.clear();
+      messageRefStore.clear();
+      addressMessageMap.clear();
+      pubRec.clear();
+      outboundStore.clear();
+      willMessage = null;
    }
 
    OutboundStore getOutboundStore() {
@@ -159,9 +147,9 @@ public class MQTTSessionState {
 
    public class OutboundStore {
 
-      private final HashMap<String, Integer> artemisToMqttMessageMap = new 
HashMap<>();
+      private HashMap<String, Integer> artemisToMqttMessageMap = new 
HashMap<>();
 
-      private final HashMap<Integer, Pair<Long, Long>> mqttToServerIds = new 
HashMap<>();
+      private HashMap<Integer, Pair<Long, Long>> mqttToServerIds = new 
HashMap<>();
 
       private final Object dataStoreLock = new Object();
 
@@ -202,5 +190,13 @@ public class MQTTSessionState {
       public Pair<Long, Long> publishComplete(int mqtt) {
          return publishAckd(mqtt);
       }
+
+      public void clear() {
+         synchronized (dataStoreLock) {
+            artemisToMqttMessageMap.clear();
+            mqttToServerIds.clear();
+            ids.set(0);
+         }
+      }
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b2e250d4/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
----------------------------------------------------------------------
diff --git 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
index f09e5c5..521d885 100644
--- 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
+++ 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
@@ -74,19 +74,13 @@ public class MQTTSubscriptionManager {
       }
    }
 
-   synchronized void stop(boolean clean) throws Exception {
+   synchronized void stop() throws Exception {
       for (ServerConsumer consumer : consumers.values()) {
          consumer.setStarted(false);
          consumer.disconnect();
          consumer.getQueue().removeConsumer(consumer);
          consumer.close(false);
       }
-
-      if (clean) {
-         for (ServerConsumer consumer : consumers.values()) {
-            session.getServer().destroyQueue(consumer.getQueue().getName());
-         }
-      }
    }
 
    /**
@@ -192,15 +186,20 @@ public class MQTTSubscriptionManager {
 
    // FIXME: Do we need this synchronzied?
    private synchronized void removeSubscription(String address) throws 
Exception {
-      ServerConsumer consumer = consumers.get(address);
       String internalAddress = 
MQTTUtil.convertMQTTAddressFilterToCore(address);
       SimpleString internalQueueName = getQueueNameForTopic(internalAddress);
-
-      Queue queue = session.getServer().locateQueue(internalQueueName);
-      queue.deleteQueue(true);
       session.getSessionState().removeSubscription(address);
+
+      ServerConsumer consumer = consumers.get(address);
       consumers.remove(address);
-      consumerQoSLevels.remove(consumer.getID());
+      if (consumer != null) {
+         consumer.removeItself();
+         consumerQoSLevels.remove(consumer.getID());
+      }
+
+      if 
(session.getServerSession().executeQueueQuery(internalQueueName).isExists()) {
+         session.getServerSession().deleteQueue(internalQueueName);
+      }
    }
 
    private SimpleString getQueueNameForTopic(String topic) {
@@ -228,4 +227,9 @@ public class MQTTSubscriptionManager {
       return consumerQoSLevels;
    }
 
+   void clean() throws Exception {
+      for (MqttTopicSubscription mqttTopicSubscription : 
session.getSessionState().getSubscriptions()) {
+         removeSubscription(mqttTopicSubscription.topicName());
+      }
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b2e250d4/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
----------------------------------------------------------------------
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
index 333ddd3..0081069 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
@@ -273,6 +273,7 @@ public class MQTTTest extends MQTTTestSupport {
       assertEquals(NUM_MESSAGES, MQTTOutoingInterceptor.getMessageCount());
    }
 
+   @Ignore
    @Test(timeout = 600 * 1000)
    public void testSendMoreThanUniqueId() throws Exception {
       int messages = (Short.MAX_VALUE * 2) + 1;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b2e250d4/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java
----------------------------------------------------------------------
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java
index 965804c..b578f97 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java
@@ -34,6 +34,7 @@ import java.util.concurrent.TimeUnit;
 
 import io.netty.handler.codec.mqtt.MqttMessage;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.TransportConfiguration;
 import org.apache.activemq.artemis.core.config.Configuration;
 import org.apache.activemq.artemis.core.protocol.mqtt.MQTTInterceptor;
@@ -142,6 +143,10 @@ public class MQTTTestSupport extends ActiveMQTestBase {
 
    private ActiveMQServer createServerForMQTT() throws Exception {
       Configuration defaultConfig = 
createDefaultConfig(true).setIncomingInterceptorClassNames(singletonList(MQTTIncomingInterceptor.class.getName())).setOutgoingInterceptorClassNames(singletonList(MQTTOutoingInterceptor.class.getName()));
+      AddressSettings addressSettings = new AddressSettings();
+      addressSettings.setDeadLetterAddress(SimpleString.toSimpleString("DLA"));
+      addressSettings.setExpiryAddress(SimpleString.toSimpleString("EXPIRY"));
+      defaultConfig.getAddressesSettings().put("#", addressSettings);
       return createServer(true, defaultConfig);
    }
 

Reply via email to