Repository: activemq-artemis
Updated Branches:
  refs/heads/master 1f5921b8a -> eea3e7470


ARTEMIS-1061 Ack MQTT PubRel management messages


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/e33b7af5
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/e33b7af5
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/e33b7af5

Branch: refs/heads/master
Commit: e33b7af5acf376ae7dd899a4e7d0170d1ed99b87
Parents: 1f5921b
Author: Martyn Taylor <[email protected]>
Authored: Wed Mar 22 21:27:56 2017 +0000
Committer: Clebert Suconic <[email protected]>
Committed: Wed Mar 22 23:26:32 2017 -0400

----------------------------------------------------------------------
 .../core/protocol/mqtt/MQTTPublishManager.java  |  7 +++---
 .../core/protocol/mqtt/MQTTSessionState.java    |  6 +++++
 .../artemis/core/protocol/mqtt/MQTTUtil.java    |  2 ++
 .../integration/mqtt/imported/MQTTTest.java     | 26 +++++++++++++++++++-
 4 files changed, 36 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e33b7af5/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
----------------------------------------------------------------------
diff --git 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
index 55fdfcc..8dfaf34 100644
--- 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
+++ 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
@@ -42,8 +42,6 @@ public class MQTTPublishManager {
 
    private static final Logger logger = 
Logger.getLogger(MQTTPublishManager.class);
 
-   private static final String MANAGEMENT_QUEUE_PREFIX = 
"$sys.mqtt.queue.qos2.";
-
    private SimpleString managementAddress;
 
    private ServerConsumer managementConsumer;
@@ -90,7 +88,7 @@ public class MQTTPublishManager {
    }
 
    private SimpleString createManagementAddress() {
-      return new SimpleString(MANAGEMENT_QUEUE_PREFIX + 
session.getSessionState().getClientId());
+      return new SimpleString(MQTTUtil.MANAGEMENT_QUEUE_PREFIX + 
session.getSessionState().getClientId());
    }
 
    private void createManagementQueue() throws Exception {
@@ -183,6 +181,7 @@ public class MQTTPublishManager {
 
    void sendPubRelMessage(Message message) {
       int messageId = message.getIntProperty(MQTTUtil.MQTT_MESSAGE_ID_KEY);
+      
session.getSessionState().getOutboundStore().publishReleasedSent(messageId, 
message.getMessageID());
       session.getProtocolHandler().sendPubRel(messageId);
    }
 
@@ -213,7 +212,7 @@ public class MQTTPublishManager {
    void handlePubComp(int messageId) throws Exception {
       Pair<Long, Long> ref = 
session.getState().getOutboundStore().publishComplete(messageId);
       if (ref != null) {
-         session.getServerSession().acknowledge(ref.getB(), ref.getA());
+         session.getServerSession().acknowledge(managementConsumer.getID(), 
ref.getA());
       }
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e33b7af5/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
----------------------------------------------------------------------
diff --git 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
index 31452bf..971ddb7 100644
--- 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
+++ 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
@@ -168,6 +168,12 @@ public class MQTTSessionState {
          return publishAckd(mqtt);
       }
 
+      public void publishReleasedSent(int mqttId, long serverMessageId) {
+         synchronized (dataStoreLock) {
+            mqttToServerIds.put(mqttId, new Pair<>(serverMessageId, 0L));
+         }
+      }
+
       public Pair<Long, Long> publishComplete(int mqtt) {
          return publishAckd(mqtt);
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e33b7af5/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
----------------------------------------------------------------------
diff --git 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
index 613fef3..0e09fb0 100644
--- 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
+++ 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
@@ -63,6 +63,8 @@ public class MQTTUtil {
 
    public static final String MQTT_MESSAGE_RETAIN_KEY = "mqtt.message.retain";
 
+   public static final String MANAGEMENT_QUEUE_PREFIX = 
"$sys.mqtt.queue.qos2.";
+
    public static final int DEFAULT_KEEP_ALIVE_FREQUENCY = 5000;
 
    public static String convertMQTTAddressFilterToCore(String filter, 
WildcardConfiguration wildcardConfiguration) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e33b7af5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
----------------------------------------------------------------------
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
index 32062c0..6b58fa2 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
@@ -41,6 +41,8 @@ import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.protocol.mqtt.MQTTConnectionManager;
 import org.apache.activemq.artemis.core.protocol.mqtt.MQTTSession;
+import org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil;
+import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.tests.util.Wait;
 import org.apache.activemq.artemis.utils.ConcurrentHashSet;
@@ -189,6 +191,29 @@ public class MQTTTest extends MQTTTestSupport {
    }
 
    @Test(timeout = 2 * 60 * 1000)
+   public void testManagementQueueMessagesAreAckd() throws Exception {
+      String clientId = "test.client.id";
+      final MQTTClientProvider provider = getMQTTClientProvider();
+      provider.setClientId(clientId);
+      initializeConnection(provider);
+      provider.subscribe("foo", EXACTLY_ONCE);
+      for (int i = 0; i < NUM_MESSAGES; i++) {
+         String payload = "Test Message: " + i;
+         provider.publish("foo", payload.getBytes(), EXACTLY_ONCE);
+         byte[] message = provider.receive(5000);
+         assertNotNull("Should get a message", message);
+         assertEquals(payload, new String(message));
+      }
+
+      final Queue queue = server.locateQueue(new 
SimpleString(MQTTUtil.MANAGEMENT_QUEUE_PREFIX + clientId));
+
+      Wait.waitFor(() -> queue.getMessageCount() == 0, 1000, 100);
+
+      assertEquals(0, queue.getMessageCount());
+      provider.disconnect();
+   }
+
+   @Test(timeout = 2 * 60 * 1000)
    public void testSendAtLeastOnceReceiveExactlyOnce() throws Exception {
       final MQTTClientProvider provider = getMQTTClientProvider();
       initializeConnection(provider);
@@ -1065,7 +1090,6 @@ public class MQTTTest extends MQTTTestSupport {
       assertEquals("test message", new String(m.getPayload()));
    }
 
-
    @Test(timeout = 60 * 1000)
    public void testCleanSession() throws Exception {
       final String CLIENTID = "cleansession";

Reply via email to