Repository: activemq Updated Branches: refs/heads/master 19fd084a8 -> 6d20cba0e
https://issues.apache.org/jira/browse/AMQ-6253 - mqtt composite destinations support for virtual topic subscriptions Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/6d20cba0 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/6d20cba0 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/6d20cba0 Branch: refs/heads/master Commit: 6d20cba0e4de8970da0236f2c772ef66c0f02661 Parents: 19fd084 Author: Dejan Bosanac <[email protected]> Authored: Fri Apr 15 10:58:19 2016 +0200 Committer: Dejan Bosanac <[email protected]> Committed: Fri Apr 15 11:00:39 2016 +0200 ---------------------------------------------------------------------- .../MQTTVirtualTopicSubscriptionStrategy.java | 19 +++++++++++-- .../activemq/transport/mqtt/MQTTTest.java | 29 ++++++++++++++++++++ 2 files changed, 46 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/6d20cba0/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTVirtualTopicSubscriptionStrategy.java ---------------------------------------------------------------------- diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTVirtualTopicSubscriptionStrategy.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTVirtualTopicSubscriptionStrategy.java index 434c248..f802991 100644 --- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTVirtualTopicSubscriptionStrategy.java +++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTVirtualTopicSubscriptionStrategy.java @@ -182,10 +182,25 @@ public class MQTTVirtualTopicSubscriptionStrategy extends AbstractMQTTSubscripti @Override public ActiveMQDestination onSend(String topicName) { + ActiveMQTopic topic = new ActiveMQTopic(topicName); + if (topic.isComposite()) { + ActiveMQDestination[] composites = topic.getCompositeDestinations(); + for (ActiveMQDestination composite : composites) { + composite.setPhysicalName(prefix(composite.getPhysicalName())); + } + ActiveMQTopic result = new ActiveMQTopic(); + result.setCompositeDestinations(composites); + return result; + } else { + return new ActiveMQTopic(prefix(topicName)); + } + } + + private String prefix(String topicName) { if (!topicName.startsWith(VIRTUALTOPIC_PREFIX)) { - return new ActiveMQTopic(VIRTUALTOPIC_PREFIX + topicName); + return VIRTUALTOPIC_PREFIX + topicName; } else { - return new ActiveMQTopic(topicName); + return topicName; } } http://git-wip-us.apache.org/repos/asf/activemq/blob/6d20cba0/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java ---------------------------------------------------------------------- diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java index fc401e9..a84cf21 100644 --- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java +++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java @@ -380,6 +380,35 @@ public class MQTTTest extends MQTTTestSupport { } @Test(timeout = 2 * 60 * 1000) + public void testMQTTCompositeDestinations() throws Exception { + MQTT mqtt = createMQTTConnection(); + mqtt.setClientId(""); + mqtt.setCleanSession(true); + + BlockingConnection connection = mqtt.blockingConnection(); + connection.connect(); + + Topic[] topics = {new Topic(utf8("a/1"), QoS.values()[AT_MOST_ONCE]), new Topic(utf8("a/2"), QoS.values()[AT_MOST_ONCE])}; + connection.subscribe(topics); + + String payload = "Test Message"; + String publishedTopic = "a/1,a/2"; + connection.publish(publishedTopic, payload.getBytes(), QoS.values()[AT_MOST_ONCE], false); + + Message msg = connection.receive(1, TimeUnit.SECONDS); + assertNotNull(msg); + assertEquals("a/2", msg.getTopic()); + + msg = connection.receive(1, TimeUnit.SECONDS); + assertNotNull(msg); + assertEquals("a/1", msg.getTopic()); + + msg = connection.receive(1, TimeUnit.SECONDS); + assertNull(msg); + + } + + @Test(timeout = 2 * 60 * 1000) public void testMQTTPathPatterns() throws Exception { MQTT mqtt = createMQTTConnection(); mqtt.setClientId("");
