Repository: activemq
Updated Branches:
  refs/heads/master 96ce14b27 -> a0a23b99c


[AMQ-6859] MQTT - topic name of the message


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/a0a23b99
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/a0a23b99
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/a0a23b99

Branch: refs/heads/master
Commit: a0a23b99ccaf13d68853a308240a10807df2c6d8
Parents: 96ce14b
Author: Dejan Bosanac <de...@nighttale.net>
Authored: Fri Nov 10 15:15:30 2017 +0100
Committer: Dejan Bosanac <de...@nighttale.net>
Committed: Fri Nov 10 15:26:48 2017 +0100

----------------------------------------------------------------------
 .../transport/mqtt/MQTTProtocolConverter.java   | 12 +++--
 .../activemq/transport/mqtt/PahoMQTTTest.java   | 49 +++++++++++++-------
 2 files changed, 40 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/a0a23b99/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
----------------------------------------------------------------------
diff --git 
a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
 
b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
index 23ca5fa..ca6b4cb 100644
--- 
a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
+++ 
b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
@@ -106,7 +106,7 @@ public class MQTTProtocolConverter {
 
     private final ConcurrentMap<Integer, ResponseHandler> resposeHandlers = 
new ConcurrentHashMap<Integer, ResponseHandler>();
     private final Map<String, ActiveMQDestination> activeMQDestinationMap = 
new LRUCache<String, ActiveMQDestination>(DEFAULT_CACHE_SIZE);
-    private final Map<Destination, String> mqttTopicMap = new 
LRUCache<Destination, String>(DEFAULT_CACHE_SIZE);
+    private final Map<ActiveMQDestination, String> mqttTopicMap = new 
LRUCache<ActiveMQDestination, String>(DEFAULT_CACHE_SIZE);
 
     private final Map<Short, MessageAck> consumerAcks = new LRUCache<Short, 
MessageAck>(DEFAULT_CACHE_SIZE);
     private final Map<Short, PUBREC> publisherRecs = new LRUCache<Short, 
PUBREC>(DEFAULT_CACHE_SIZE);
@@ -594,11 +594,15 @@ public class MQTTProtocolConverter {
 
         String topicName;
         synchronized (mqttTopicMap) {
-            topicName = mqttTopicMap.get(message.getJMSDestination());
+            ActiveMQDestination destination = message.getDestination();
+            if (destination.isPattern() && message.getOriginalDestination() != 
null) {
+                destination = message.getOriginalDestination();
+            }
+            topicName = mqttTopicMap.get(destination);
             if (topicName == null) {
-                String amqTopicName = 
findSubscriptionStrategy().onSend(message.getDestination());
+                String amqTopicName = 
findSubscriptionStrategy().onSend(destination);
                 topicName = 
MQTTProtocolSupport.convertActiveMQToMQTT(amqTopicName);
-                mqttTopicMap.put(message.getJMSDestination(), topicName);
+                mqttTopicMap.put(destination, topicName);
             }
         }
         result.topicName(new UTF8Buffer(topicName));

http://git-wip-us.apache.org/repos/asf/activemq/blob/a0a23b99/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTTTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTTTest.java
 
b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTTTest.java
index fbf1156..f6ee310 100644
--- 
a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTTTest.java
+++ 
b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTTTest.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 import java.nio.charset.StandardCharsets;
+import java.util.AbstractMap;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CountDownLatch;
@@ -160,7 +161,7 @@ public class PahoMQTTTest extends MQTTTestSupport {
         client.publish(ACCOUNT_PREFIX + "1/2/3/4", 
expectedResult.getBytes(StandardCharsets.UTF_8), 0, false);
 
         // One delivery for topic  ACCOUNT_PREFIX + "#"
-        String result = listener.messageQ.poll(20, TimeUnit.SECONDS);
+        String result = listener.messageQ.poll(20, 
TimeUnit.SECONDS).getValue();
         assertTrue(client.getPendingDeliveryTokens().length == 0);
         assertEquals(expectedResult, result);
 
@@ -168,10 +169,10 @@ public class PahoMQTTTest extends MQTTTestSupport {
         client.publish(ACCOUNT_PREFIX + "a/1/2", 
expectedResult.getBytes(StandardCharsets.UTF_8), 0, false);
 
         // One delivery for topic  ACCOUNT_PREFIX + "a/1/2"
-        result = listener.messageQ.poll(20, TimeUnit.SECONDS);
+        result = listener.messageQ.poll(20, TimeUnit.SECONDS).getValue();
         assertEquals(expectedResult, result);
         // One delivery for topic  ACCOUNT_PREFIX + "#"
-        result = listener.messageQ.poll(20, TimeUnit.SECONDS);
+        result = listener.messageQ.poll(20, TimeUnit.SECONDS).getValue();
         assertEquals(expectedResult, result);
         assertTrue(client.getPendingDeliveryTokens().length == 0);
 
@@ -183,7 +184,7 @@ public class PahoMQTTTest extends MQTTTestSupport {
         client.publish(ACCOUNT_PREFIX + "1/2/3", 
expectedResult.getBytes(StandardCharsets.UTF_8), 0, false);
 
         // One delivery for topic  ACCOUNT_PREFIX + "1/2/3"
-        result = listener.messageQ.poll(20, TimeUnit.SECONDS);
+        result = listener.messageQ.poll(20, TimeUnit.SECONDS).getValue();
         assertEquals(expectedResult, result);
         assertTrue(client.getPendingDeliveryTokens().length == 0);
 
@@ -208,13 +209,13 @@ public class PahoMQTTTest extends MQTTTestSupport {
         String expectedResult = "hello mqtt broker on hash";
         client.publish(ACCOUNT_PREFIX + "a/b/c", 
expectedResult.getBytes(StandardCharsets.UTF_8), 0, false);
 
-        String result = listener.messageQ.poll(20, TimeUnit.SECONDS);
+        String result = listener.messageQ.poll(20, 
TimeUnit.SECONDS).getValue();
         assertEquals(expectedResult, result);
         assertTrue(client.getPendingDeliveryTokens().length == 0);
 
         expectedResult = "hello mqtt broker on a different topic";
         client.publish(ACCOUNT_PREFIX + "1/2/3/4/5/6", 
expectedResult.getBytes(StandardCharsets.UTF_8), 0, false);
-        result = listener.messageQ.poll(20, TimeUnit.SECONDS);
+        result = listener.messageQ.poll(20, TimeUnit.SECONDS).getValue();
         assertEquals(expectedResult, result);
         assertTrue(client.getPendingDeliveryTokens().length == 0);
 
@@ -229,18 +230,18 @@ public class PahoMQTTTest extends MQTTTestSupport {
         client.publish(ACCOUNT_PREFIX + "1/2/3", 
expectedResult.getBytes(StandardCharsets.UTF_8), 0, false);
 
         // One message from topic subscription on ACCOUNT_PREFIX + "#"
-        result = listener.messageQ.poll(20, TimeUnit.SECONDS);
+        result = listener.messageQ.poll(20, TimeUnit.SECONDS).getValue();
         assertEquals(expectedResult, result);
 
         // One message from topic subscription on ACCOUNT_PREFIX + "1/2/3"
-        result = listener.messageQ.poll(20, TimeUnit.SECONDS);
+        result = listener.messageQ.poll(20, TimeUnit.SECONDS).getValue();
         assertEquals(expectedResult, result);
 
         assertTrue(client.getPendingDeliveryTokens().length == 0);
 
         expectedResult = "hello mqtt broker on some other topic";
         client.publish(ACCOUNT_PREFIX + "a/b/c/d/e", 
expectedResult.getBytes(StandardCharsets.UTF_8), 0, false);
-        result = listener.messageQ.poll(20, TimeUnit.SECONDS);
+        result = listener.messageQ.poll(20, TimeUnit.SECONDS).getValue();
         assertEquals(expectedResult, result);
         assertTrue(client.getPendingDeliveryTokens().length == 0);
 
@@ -252,14 +253,12 @@ public class PahoMQTTTest extends MQTTTestSupport {
 
         expectedResult = "this should not come back...";
         client.publish(ACCOUNT_PREFIX + "1/2/3/4", 
expectedResult.getBytes(StandardCharsets.UTF_8), 0, false);
-        result = listener.messageQ.poll(3, TimeUnit.SECONDS);
-        assertNull(result);
+        assertNull(listener.messageQ.poll(3, TimeUnit.SECONDS));
         assertTrue(client.getPendingDeliveryTokens().length == 0);
 
         expectedResult = "this should not come back either...";
         client.publish(ACCOUNT_PREFIX + "a/b/c", 
expectedResult.getBytes(StandardCharsets.UTF_8), 0, false);
-        result = listener.messageQ.poll(3, TimeUnit.SECONDS);
-        assertNull(result);
+        assertNull(listener.messageQ.poll(3, TimeUnit.SECONDS));
         assertTrue(client.getPendingDeliveryTokens().length == 0);
 
         client.disconnect();
@@ -351,11 +350,11 @@ public class PahoMQTTTest extends MQTTTestSupport {
         String message = "Message from client: " + clientId;
         client1.publish(topic, message.getBytes(StandardCharsets.UTF_8), 1, 
false);
 
-        String result = client1MqttCallback.messageQ.poll(10, 
TimeUnit.SECONDS);
+        String result = client1MqttCallback.messageQ.poll(10, 
TimeUnit.SECONDS).getValue();
         assertEquals(message, result);
         assertEquals(1, client1MqttCallback.received.get());
 
-        result = clientAdminMqttCallback.messageQ.poll(10, TimeUnit.SECONDS);
+        result = clientAdminMqttCallback.messageQ.poll(10, 
TimeUnit.SECONDS).getValue();
         assertEquals(message, result);
 
         assertTrue(client1.isConnected());
@@ -384,6 +383,22 @@ public class PahoMQTTTest extends MQTTTestSupport {
         clientAdmin.close();
     }
 
+    @Test(timeout = 300000)
+    public void testActiveMQWildCards1() throws Exception {
+        final DefaultListener listener = new DefaultListener();
+        MqttClient client = createClient(false, "receive", listener);
+        final String ACCOUNT_PREFIX = "test/";
+        client.subscribe(ACCOUNT_PREFIX+"a/#");
+        assertTrue(client.getPendingDeliveryTokens().length == 0);
+        String expectedResult = "should get this 1";
+        String topic = ACCOUNT_PREFIX+"a/b/1.2.3*4>";
+        client.publish(topic, expectedResult.getBytes(), 0, false);
+        AbstractMap.SimpleEntry<String,String> entry = 
listener.messageQ.poll(20, TimeUnit.SECONDS);
+        assertEquals(topic, entry.getKey());
+        assertEquals(expectedResult, entry.getValue());
+        assertTrue(client.getPendingDeliveryTokens().length == 0);
+    }
+
     protected MqttClient createClient(boolean cleanSession, String clientId, 
MqttCallback listener) throws Exception {
         MqttConnectOptions options = new MqttConnectOptions();
         options.setCleanSession(cleanSession);
@@ -427,7 +442,7 @@ public class PahoMQTTTest extends MQTTTestSupport {
     static class DefaultListener implements MqttCallback {
 
         final AtomicInteger received = new AtomicInteger();
-        final BlockingQueue<String> messageQ = new 
ArrayBlockingQueue<String>(10);
+        final BlockingQueue<AbstractMap.SimpleEntry<String, String>> messageQ 
= new ArrayBlockingQueue<AbstractMap.SimpleEntry<String, String>>(10);
 
         @Override
         public void connectionLost(Throwable cause) {
@@ -437,7 +452,7 @@ public class PahoMQTTTest extends MQTTTestSupport {
         public void messageArrived(String topic, MqttMessage message) throws 
Exception {
             LOG.info("Received: {}", message);
             received.incrementAndGet();
-            messageQ.put(new String(message.getPayload(), 
StandardCharsets.UTF_8));
+            messageQ.put(new AbstractMap.SimpleEntry(topic, new 
String(message.getPayload(), StandardCharsets.UTF_8)));
         }
 
         @Override

Reply via email to