Repository: activemq-artemis Updated Branches: refs/heads/master 1f5921b8a -> eea3e7470
ARTEMIS-1061 Ack MQTT PubRel management messages Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/e33b7af5 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/e33b7af5 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/e33b7af5 Branch: refs/heads/master Commit: e33b7af5acf376ae7dd899a4e7d0170d1ed99b87 Parents: 1f5921b Author: Martyn Taylor <[email protected]> Authored: Wed Mar 22 21:27:56 2017 +0000 Committer: Clebert Suconic <[email protected]> Committed: Wed Mar 22 23:26:32 2017 -0400 ---------------------------------------------------------------------- .../core/protocol/mqtt/MQTTPublishManager.java | 7 +++--- .../core/protocol/mqtt/MQTTSessionState.java | 6 +++++ .../artemis/core/protocol/mqtt/MQTTUtil.java | 2 ++ .../integration/mqtt/imported/MQTTTest.java | 26 +++++++++++++++++++- 4 files changed, 36 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e33b7af5/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java index 55fdfcc..8dfaf34 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java @@ -42,8 +42,6 @@ public class MQTTPublishManager { private static final Logger logger = Logger.getLogger(MQTTPublishManager.class); - private static final String MANAGEMENT_QUEUE_PREFIX = "$sys.mqtt.queue.qos2."; - private SimpleString managementAddress; private ServerConsumer managementConsumer; @@ -90,7 +88,7 @@ public class MQTTPublishManager { } private SimpleString createManagementAddress() { - return new SimpleString(MANAGEMENT_QUEUE_PREFIX + session.getSessionState().getClientId()); + return new SimpleString(MQTTUtil.MANAGEMENT_QUEUE_PREFIX + session.getSessionState().getClientId()); } private void createManagementQueue() throws Exception { @@ -183,6 +181,7 @@ public class MQTTPublishManager { void sendPubRelMessage(Message message) { int messageId = message.getIntProperty(MQTTUtil.MQTT_MESSAGE_ID_KEY); + session.getSessionState().getOutboundStore().publishReleasedSent(messageId, message.getMessageID()); session.getProtocolHandler().sendPubRel(messageId); } @@ -213,7 +212,7 @@ public class MQTTPublishManager { void handlePubComp(int messageId) throws Exception { Pair<Long, Long> ref = session.getState().getOutboundStore().publishComplete(messageId); if (ref != null) { - session.getServerSession().acknowledge(ref.getB(), ref.getA()); + session.getServerSession().acknowledge(managementConsumer.getID(), ref.getA()); } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e33b7af5/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java index 31452bf..971ddb7 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java @@ -168,6 +168,12 @@ public class MQTTSessionState { return publishAckd(mqtt); } + public void publishReleasedSent(int mqttId, long serverMessageId) { + synchronized (dataStoreLock) { + mqttToServerIds.put(mqttId, new Pair<>(serverMessageId, 0L)); + } + } + public Pair<Long, Long> publishComplete(int mqtt) { return publishAckd(mqtt); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e33b7af5/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java index 613fef3..0e09fb0 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java @@ -63,6 +63,8 @@ public class MQTTUtil { public static final String MQTT_MESSAGE_RETAIN_KEY = "mqtt.message.retain"; + public static final String MANAGEMENT_QUEUE_PREFIX = "$sys.mqtt.queue.qos2."; + public static final int DEFAULT_KEEP_ALIVE_FREQUENCY = 5000; public static String convertMQTTAddressFilterToCore(String filter, WildcardConfiguration wildcardConfiguration) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e33b7af5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java index 32062c0..6b58fa2 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java @@ -41,6 +41,8 @@ import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.protocol.mqtt.MQTTConnectionManager; import org.apache.activemq.artemis.core.protocol.mqtt.MQTTSession; +import org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil; +import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.tests.util.Wait; import org.apache.activemq.artemis.utils.ConcurrentHashSet; @@ -189,6 +191,29 @@ public class MQTTTest extends MQTTTestSupport { } @Test(timeout = 2 * 60 * 1000) + public void testManagementQueueMessagesAreAckd() throws Exception { + String clientId = "test.client.id"; + final MQTTClientProvider provider = getMQTTClientProvider(); + provider.setClientId(clientId); + initializeConnection(provider); + provider.subscribe("foo", EXACTLY_ONCE); + for (int i = 0; i < NUM_MESSAGES; i++) { + String payload = "Test Message: " + i; + provider.publish("foo", payload.getBytes(), EXACTLY_ONCE); + byte[] message = provider.receive(5000); + assertNotNull("Should get a message", message); + assertEquals(payload, new String(message)); + } + + final Queue queue = server.locateQueue(new SimpleString(MQTTUtil.MANAGEMENT_QUEUE_PREFIX + clientId)); + + Wait.waitFor(() -> queue.getMessageCount() == 0, 1000, 100); + + assertEquals(0, queue.getMessageCount()); + provider.disconnect(); + } + + @Test(timeout = 2 * 60 * 1000) public void testSendAtLeastOnceReceiveExactlyOnce() throws Exception { final MQTTClientProvider provider = getMQTTClientProvider(); initializeConnection(provider); @@ -1065,7 +1090,6 @@ public class MQTTTest extends MQTTTestSupport { assertEquals("test message", new String(m.getPayload())); } - @Test(timeout = 60 * 1000) public void testCleanSession() throws Exception { final String CLIENTID = "cleansession";
