Repository: activemq Updated Branches: refs/heads/trunk 2d9475c4f -> fc3d90e8b
https://issues.apache.org/jira/browse/AMQ-5377 - mqtt wildcard conversion Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/fc3d90e8 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/fc3d90e8 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/fc3d90e8 Branch: refs/heads/trunk Commit: fc3d90e8b740c1621ee1b51ac8d7ec6bec4c55fc Parents: 2d9475c Author: Dejan Bosanac <[email protected]> Authored: Wed Oct 1 12:18:32 2014 +0200 Committer: Dejan Bosanac <[email protected]> Committed: Wed Oct 1 12:18:32 2014 +0200 ---------------------------------------------------------------------- .../transport/mqtt/MQTTProtocolConverter.java | 1 - .../transport/mqtt/MQTTProtocolSupport.java | 26 +++++++++++++++++++- .../activemq/transport/mqtt/MQTTTest.java | 20 +++++++++++++++ 3 files changed, 45 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/fc3d90e8/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java ---------------------------------------------------------------------- diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java index d80b10a..c05c729 100644 --- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java +++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java @@ -502,7 +502,6 @@ public class MQTTProtocolConverter { destination = activeMQDestinationMap.get(command.topicName()); if (destination == null) { String topicName = MQTTProtocolSupport.convertMQTTToActiveMQ(command.topicName().toString()); - try { destination = findSubscriptionStrategy().onSend(topicName); } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/activemq/blob/fc3d90e8/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolSupport.java ---------------------------------------------------------------------- diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolSupport.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolSupport.java index a30ed50..90f8644 100644 --- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolSupport.java +++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolSupport.java @@ -79,7 +79,31 @@ public class MQTTProtocolSupport { * @return a destination name formatted for MQTT. */ public static String convertActiveMQToMQTT(String destinationName) { - return destinationName.replace('.', '/'); + char[] chars = destinationName.toCharArray(); + for (int i = 0; i < chars.length; i++) { + switch(chars[i]) { + case '>': + chars[i] = '#'; + break; + case '#': + chars[i] = '>'; + break; + case '*': + chars[i] = '+'; + break; + case '+': + chars[i] = '*'; + break; + case '.': + chars[i] = '/'; + break; + case '/': + chars[i] = '.'; + break; + } + } + String rc = new String(chars); + return rc; } /** http://git-wip-us.apache.org/repos/asf/activemq/blob/fc3d90e8/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java ---------------------------------------------------------------------- diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java index 3b4062d..1586ff4 100644 --- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java +++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.transport.mqtt; +import static org.fusesource.hawtbuf.UTF8Buffer.utf8; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; @@ -330,6 +331,25 @@ public class MQTTTest extends MQTTTestSupport { } @Test(timeout = 2 * 60 * 1000) + public void testMQTTWildcard() throws Exception { + MQTT mqtt = createMQTTConnection(); + mqtt.setClientId(""); + mqtt.setCleanSession(true); + + BlockingConnection connection = mqtt.blockingConnection(); + connection.connect(); + + Topic[] topics = {new Topic(utf8("a/#"), QoS.values()[AT_MOST_ONCE])}; + connection.subscribe(topics); + String payload = "Test Message"; + String publishedTopic = "a/b/1.2.3*4>"; + connection.publish(publishedTopic, payload.getBytes(), QoS.values()[AT_MOST_ONCE], false); + + Message msg = connection.receive(1, TimeUnit.SECONDS); + assertEquals("Topic changed", publishedTopic, msg.getTopic()); + } + + @Test(timeout = 2 * 60 * 1000) public void testMQTTPathPatterns() throws Exception { MQTT mqtt = createMQTTConnection(); mqtt.setClientId("");
