Repository: activemq Updated Branches: refs/heads/trunk 28c565c26 -> 0db7e69b4
https://issues.apache.org/jira/browse/AMQ-5065 Patch applied. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/0db7e69b Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/0db7e69b Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/0db7e69b Branch: refs/heads/trunk Commit: 0db7e69b4eddda219c1623b94636d07ee47a0648 Parents: 28c565c Author: Timothy Bish <[email protected]> Authored: Wed Feb 19 14:09:34 2014 -0500 Committer: Timothy Bish <[email protected]> Committed: Wed Feb 19 14:09:34 2014 -0500 ---------------------------------------------------------------------- .../transport/mqtt/MQTTProtocolConverter.java | 22 +++++--- .../activemq/transport/mqtt/MQTTTest.java | 56 +++++++++++++++++++- 2 files changed, 68 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/0db7e69b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java ---------------------------------------------------------------------- diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java index 5b8f8c7..c270566 100644 --- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java +++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java @@ -23,10 +23,10 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.zip.DataFormatException; import java.util.zip.Inflater; - import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; + import org.apache.activemq.broker.BrokerService; import org.apache.activemq.command.*; import org.apache.activemq.store.PersistenceAdapterSupport; @@ -338,23 +338,29 @@ public class MQTTProtocolConverter { } catch (IOException e) { LOG.warn("Couldn't send SUBACK for " + command, e); } - } else { - LOG.warn("No topics defined for Subscription " + command); - } - //check retained messages - if (topics != null){ - for (Topic topic:topics){ + // check retained messages + for (int i = 0; i < topics.length; i++) { + final Topic topic = topics[i]; ActiveMQTopic destination = new ActiveMQTopic(convertMQTTToActiveMQ(topic.name().toString())); for (PUBLISH msg : retainedMessages.getMessages(destination)) { if( msg.payload().length > 0 ) { try { - getMQTTTransport().sendToMQTT(msg.encode()); + PUBLISH retainedCopy = new PUBLISH(); + retainedCopy.topicName(msg.topicName()); + retainedCopy.retain(msg.retain()); + retainedCopy.messageId(msg.messageId()); + retainedCopy.payload(msg.payload()); + // set QoS of retained message to maximum of subscription QoS + retainedCopy.qos(msg.qos().ordinal() > qos[i] ? QoS.values()[qos[i]] : msg.qos()); + getMQTTTransport().sendToMQTT(retainedCopy.encode()); } catch (IOException e) { LOG.warn("Couldn't send retained message " + msg, e); } } } } + } else { + LOG.warn("No topics defined for Subscription " + command); } } http://git-wip-us.apache.org/repos/asf/activemq/blob/0db7e69b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java ---------------------------------------------------------------------- diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java index 64a9b5f..73397de 100644 --- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java +++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java @@ -16,11 +16,11 @@ */ package org.apache.activemq.transport.mqtt; +import java.net.ProtocolException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; - import javax.jms.BytesMessage; import javax.jms.Connection; import javax.jms.Destination; @@ -28,6 +28,8 @@ import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; + +import static org.junit.Assert.assertArrayEquals; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.TransportConnector; @@ -41,10 +43,10 @@ import org.fusesource.mqtt.client.QoS; import org.fusesource.mqtt.client.Topic; import org.fusesource.mqtt.client.Tracer; import org.fusesource.mqtt.codec.MQTTFrame; +import org.fusesource.mqtt.codec.PUBLISH; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.junit.Assert.assertArrayEquals; public class MQTTTest extends AbstractMQTTTest { @@ -324,6 +326,56 @@ public class MQTTTest extends AbstractMQTTTest { publisher.disconnect(); } + @Test(timeout = 60 * 1000) + public void testMQTTRetainQoS() throws Exception { + addMQTTConnector(); + brokerService.start(); + + String[] topics = { "AT_MOST_ONCE", "AT_LEAST_ONCE", "EXACTLY_ONCE" }; + for (int i = 0; i < topics.length; i++) { + final String topic = topics[i]; + + MQTT mqtt = createMQTTConnection(); + mqtt.setClientId("foo"); + mqtt.setKeepAlive((short)2); + + final int[] actualQoS = {-1}; + mqtt.setTracer(new Tracer() { + @Override + public void onReceive(MQTTFrame frame) { + // validate the QoS + if (frame.messageType() == PUBLISH.TYPE) { + PUBLISH publish = new PUBLISH(); + try { + publish.decode(frame); + } catch (ProtocolException e) { + fail ("Failed decoding " + e.getMessage()); + } + actualQoS[0] = publish.qos().ordinal(); + } + } + }); + + final BlockingConnection connection = mqtt.blockingConnection(); + connection.connect(); + connection.publish(topic, topic.getBytes(), QoS.EXACTLY_ONCE, true); + connection.subscribe(new Topic[]{ new Topic(topic, QoS.valueOf(topic)) }); + + final Message msg = connection.receive(5000, TimeUnit.MILLISECONDS); + assertNotNull(msg); + assertEquals(topic, new String(msg.getPayload())); + int waitCount = 0; + while (actualQoS[0] == -1 && waitCount < 10) { + Thread.sleep(1000); + waitCount++; + } + assertEquals(i, actualQoS[0]); + + connection.unsubscribe(new String[]{topic}); + connection.disconnect(); + } + + } @Test(timeout=60 * 1000) public void testSendMQTTReceiveJMS() throws Exception {
