Repository: activemq Updated Branches: refs/heads/master 96ce14b27 -> a0a23b99c
[AMQ-6859] MQTT - topic name of the message Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/a0a23b99 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/a0a23b99 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/a0a23b99 Branch: refs/heads/master Commit: a0a23b99ccaf13d68853a308240a10807df2c6d8 Parents: 96ce14b Author: Dejan Bosanac <de...@nighttale.net> Authored: Fri Nov 10 15:15:30 2017 +0100 Committer: Dejan Bosanac <de...@nighttale.net> Committed: Fri Nov 10 15:26:48 2017 +0100 ---------------------------------------------------------------------- .../transport/mqtt/MQTTProtocolConverter.java | 12 +++-- .../activemq/transport/mqtt/PahoMQTTTest.java | 49 +++++++++++++------- 2 files changed, 40 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/a0a23b99/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java ---------------------------------------------------------------------- diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java index 23ca5fa..ca6b4cb 100644 --- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java +++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java @@ -106,7 +106,7 @@ public class MQTTProtocolConverter { private final ConcurrentMap<Integer, ResponseHandler> resposeHandlers = new ConcurrentHashMap<Integer, ResponseHandler>(); private final Map<String, ActiveMQDestination> activeMQDestinationMap = new LRUCache<String, ActiveMQDestination>(DEFAULT_CACHE_SIZE); - private final Map<Destination, String> mqttTopicMap = new LRUCache<Destination, String>(DEFAULT_CACHE_SIZE); + private final Map<ActiveMQDestination, String> mqttTopicMap = new LRUCache<ActiveMQDestination, String>(DEFAULT_CACHE_SIZE); private final Map<Short, MessageAck> consumerAcks = new LRUCache<Short, MessageAck>(DEFAULT_CACHE_SIZE); private final Map<Short, PUBREC> publisherRecs = new LRUCache<Short, PUBREC>(DEFAULT_CACHE_SIZE); @@ -594,11 +594,15 @@ public class MQTTProtocolConverter { String topicName; synchronized (mqttTopicMap) { - topicName = mqttTopicMap.get(message.getJMSDestination()); + ActiveMQDestination destination = message.getDestination(); + if (destination.isPattern() && message.getOriginalDestination() != null) { + destination = message.getOriginalDestination(); + } + topicName = mqttTopicMap.get(destination); if (topicName == null) { - String amqTopicName = findSubscriptionStrategy().onSend(message.getDestination()); + String amqTopicName = findSubscriptionStrategy().onSend(destination); topicName = MQTTProtocolSupport.convertActiveMQToMQTT(amqTopicName); - mqttTopicMap.put(message.getJMSDestination(), topicName); + mqttTopicMap.put(destination, topicName); } } result.topicName(new UTF8Buffer(topicName)); http://git-wip-us.apache.org/repos/asf/activemq/blob/a0a23b99/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTTTest.java ---------------------------------------------------------------------- diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTTTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTTTest.java index fbf1156..f6ee310 100644 --- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTTTest.java +++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTTTest.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import java.nio.charset.StandardCharsets; +import java.util.AbstractMap; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; @@ -160,7 +161,7 @@ public class PahoMQTTTest extends MQTTTestSupport { client.publish(ACCOUNT_PREFIX + "1/2/3/4", expectedResult.getBytes(StandardCharsets.UTF_8), 0, false); // One delivery for topic ACCOUNT_PREFIX + "#" - String result = listener.messageQ.poll(20, TimeUnit.SECONDS); + String result = listener.messageQ.poll(20, TimeUnit.SECONDS).getValue(); assertTrue(client.getPendingDeliveryTokens().length == 0); assertEquals(expectedResult, result); @@ -168,10 +169,10 @@ public class PahoMQTTTest extends MQTTTestSupport { client.publish(ACCOUNT_PREFIX + "a/1/2", expectedResult.getBytes(StandardCharsets.UTF_8), 0, false); // One delivery for topic ACCOUNT_PREFIX + "a/1/2" - result = listener.messageQ.poll(20, TimeUnit.SECONDS); + result = listener.messageQ.poll(20, TimeUnit.SECONDS).getValue(); assertEquals(expectedResult, result); // One delivery for topic ACCOUNT_PREFIX + "#" - result = listener.messageQ.poll(20, TimeUnit.SECONDS); + result = listener.messageQ.poll(20, TimeUnit.SECONDS).getValue(); assertEquals(expectedResult, result); assertTrue(client.getPendingDeliveryTokens().length == 0); @@ -183,7 +184,7 @@ public class PahoMQTTTest extends MQTTTestSupport { client.publish(ACCOUNT_PREFIX + "1/2/3", expectedResult.getBytes(StandardCharsets.UTF_8), 0, false); // One delivery for topic ACCOUNT_PREFIX + "1/2/3" - result = listener.messageQ.poll(20, TimeUnit.SECONDS); + result = listener.messageQ.poll(20, TimeUnit.SECONDS).getValue(); assertEquals(expectedResult, result); assertTrue(client.getPendingDeliveryTokens().length == 0); @@ -208,13 +209,13 @@ public class PahoMQTTTest extends MQTTTestSupport { String expectedResult = "hello mqtt broker on hash"; client.publish(ACCOUNT_PREFIX + "a/b/c", expectedResult.getBytes(StandardCharsets.UTF_8), 0, false); - String result = listener.messageQ.poll(20, TimeUnit.SECONDS); + String result = listener.messageQ.poll(20, TimeUnit.SECONDS).getValue(); assertEquals(expectedResult, result); assertTrue(client.getPendingDeliveryTokens().length == 0); expectedResult = "hello mqtt broker on a different topic"; client.publish(ACCOUNT_PREFIX + "1/2/3/4/5/6", expectedResult.getBytes(StandardCharsets.UTF_8), 0, false); - result = listener.messageQ.poll(20, TimeUnit.SECONDS); + result = listener.messageQ.poll(20, TimeUnit.SECONDS).getValue(); assertEquals(expectedResult, result); assertTrue(client.getPendingDeliveryTokens().length == 0); @@ -229,18 +230,18 @@ public class PahoMQTTTest extends MQTTTestSupport { client.publish(ACCOUNT_PREFIX + "1/2/3", expectedResult.getBytes(StandardCharsets.UTF_8), 0, false); // One message from topic subscription on ACCOUNT_PREFIX + "#" - result = listener.messageQ.poll(20, TimeUnit.SECONDS); + result = listener.messageQ.poll(20, TimeUnit.SECONDS).getValue(); assertEquals(expectedResult, result); // One message from topic subscription on ACCOUNT_PREFIX + "1/2/3" - result = listener.messageQ.poll(20, TimeUnit.SECONDS); + result = listener.messageQ.poll(20, TimeUnit.SECONDS).getValue(); assertEquals(expectedResult, result); assertTrue(client.getPendingDeliveryTokens().length == 0); expectedResult = "hello mqtt broker on some other topic"; client.publish(ACCOUNT_PREFIX + "a/b/c/d/e", expectedResult.getBytes(StandardCharsets.UTF_8), 0, false); - result = listener.messageQ.poll(20, TimeUnit.SECONDS); + result = listener.messageQ.poll(20, TimeUnit.SECONDS).getValue(); assertEquals(expectedResult, result); assertTrue(client.getPendingDeliveryTokens().length == 0); @@ -252,14 +253,12 @@ public class PahoMQTTTest extends MQTTTestSupport { expectedResult = "this should not come back..."; client.publish(ACCOUNT_PREFIX + "1/2/3/4", expectedResult.getBytes(StandardCharsets.UTF_8), 0, false); - result = listener.messageQ.poll(3, TimeUnit.SECONDS); - assertNull(result); + assertNull(listener.messageQ.poll(3, TimeUnit.SECONDS)); assertTrue(client.getPendingDeliveryTokens().length == 0); expectedResult = "this should not come back either..."; client.publish(ACCOUNT_PREFIX + "a/b/c", expectedResult.getBytes(StandardCharsets.UTF_8), 0, false); - result = listener.messageQ.poll(3, TimeUnit.SECONDS); - assertNull(result); + assertNull(listener.messageQ.poll(3, TimeUnit.SECONDS)); assertTrue(client.getPendingDeliveryTokens().length == 0); client.disconnect(); @@ -351,11 +350,11 @@ public class PahoMQTTTest extends MQTTTestSupport { String message = "Message from client: " + clientId; client1.publish(topic, message.getBytes(StandardCharsets.UTF_8), 1, false); - String result = client1MqttCallback.messageQ.poll(10, TimeUnit.SECONDS); + String result = client1MqttCallback.messageQ.poll(10, TimeUnit.SECONDS).getValue(); assertEquals(message, result); assertEquals(1, client1MqttCallback.received.get()); - result = clientAdminMqttCallback.messageQ.poll(10, TimeUnit.SECONDS); + result = clientAdminMqttCallback.messageQ.poll(10, TimeUnit.SECONDS).getValue(); assertEquals(message, result); assertTrue(client1.isConnected()); @@ -384,6 +383,22 @@ public class PahoMQTTTest extends MQTTTestSupport { clientAdmin.close(); } + @Test(timeout = 300000) + public void testActiveMQWildCards1() throws Exception { + final DefaultListener listener = new DefaultListener(); + MqttClient client = createClient(false, "receive", listener); + final String ACCOUNT_PREFIX = "test/"; + client.subscribe(ACCOUNT_PREFIX+"a/#"); + assertTrue(client.getPendingDeliveryTokens().length == 0); + String expectedResult = "should get this 1"; + String topic = ACCOUNT_PREFIX+"a/b/1.2.3*4>"; + client.publish(topic, expectedResult.getBytes(), 0, false); + AbstractMap.SimpleEntry<String,String> entry = listener.messageQ.poll(20, TimeUnit.SECONDS); + assertEquals(topic, entry.getKey()); + assertEquals(expectedResult, entry.getValue()); + assertTrue(client.getPendingDeliveryTokens().length == 0); + } + protected MqttClient createClient(boolean cleanSession, String clientId, MqttCallback listener) throws Exception { MqttConnectOptions options = new MqttConnectOptions(); options.setCleanSession(cleanSession); @@ -427,7 +442,7 @@ public class PahoMQTTTest extends MQTTTestSupport { static class DefaultListener implements MqttCallback { final AtomicInteger received = new AtomicInteger(); - final BlockingQueue<String> messageQ = new ArrayBlockingQueue<String>(10); + final BlockingQueue<AbstractMap.SimpleEntry<String, String>> messageQ = new ArrayBlockingQueue<AbstractMap.SimpleEntry<String, String>>(10); @Override public void connectionLost(Throwable cause) { @@ -437,7 +452,7 @@ public class PahoMQTTTest extends MQTTTestSupport { public void messageArrived(String topic, MqttMessage message) throws Exception { LOG.info("Received: {}", message); received.incrementAndGet(); - messageQ.put(new String(message.getPayload(), StandardCharsets.UTF_8)); + messageQ.put(new AbstractMap.SimpleEntry(topic, new String(message.getPayload(), StandardCharsets.UTF_8))); } @Override