Repository: activemq Updated Branches: refs/heads/trunk 6aaf859d2 -> 67f151fe0
https://issues.apache.org/jira/browse/AMQ-5092 - apply patch from Dhiraj Bokde with thanks Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/67f151fe Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/67f151fe Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/67f151fe Branch: refs/heads/trunk Commit: 67f151fe0bca5f69a912fe0e707c528ae8c450b6 Parents: 6aaf859 Author: gtully <[email protected]> Authored: Wed Mar 12 13:00:38 2014 +0000 Committer: gtully <[email protected]> Committed: Wed Mar 12 13:00:38 2014 +0000 ---------------------------------------------------------------------- .../transport/mqtt/MQTTProtocolConverter.java | 61 +++++- .../transport/mqtt/MQTTSubscription.java | 10 +- .../activemq/transport/mqtt/MQTTTest.java | 194 ++++++++++++++++++- 3 files changed, 258 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/67f151fe/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java ---------------------------------------------------------------------- diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java index 19614a9..1f912b7 100644 --- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java +++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java @@ -57,6 +57,7 @@ public class MQTTProtocolConverter { private final SessionId sessionId = new SessionId(connectionId, -1); private final ProducerId producerId = new ProducerId(sessionId, 1); private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator(); + private final LongSequenceGenerator publisherIdGenerator = new LongSequenceGenerator(); private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator(); private final ConcurrentHashMap<Integer, ResponseHandler> resposeHandlers = new ConcurrentHashMap<Integer, ResponseHandler>(); @@ -66,6 +67,9 @@ public class MQTTProtocolConverter { private final Map<Destination, UTF8Buffer> mqttTopicMap = new LRUCache<Destination, UTF8Buffer>(DEFAULT_CACHE_SIZE); private final Map<Short, MessageAck> consumerAcks = new LRUCache<Short, MessageAck>(DEFAULT_CACHE_SIZE); private final Map<Short, PUBREC> publisherRecs = new LRUCache<Short, PUBREC>(DEFAULT_CACHE_SIZE); + private final Map<String, Short> activemqToPacketIds = new LRUCache<String, Short>(DEFAULT_CACHE_SIZE); + private final Map<Short, String> packetIdsToActivemq = new LRUCache<Short, String>(DEFAULT_CACHE_SIZE); + private final MQTTTransport mqttTransport; private final BrokerService brokerService; @@ -348,10 +352,15 @@ public class MQTTProtocolConverter { PUBLISH retainedCopy = new PUBLISH(); retainedCopy.topicName(msg.topicName()); retainedCopy.retain(msg.retain()); - retainedCopy.messageId(msg.messageId()); retainedCopy.payload(msg.payload()); // set QoS of retained message to maximum of subscription QoS retainedCopy.qos(msg.qos().ordinal() > qos[i] ? QoS.values()[qos[i]] : msg.qos()); + switch (retainedCopy.qos()) { + case AT_LEAST_ONCE: + case EXACTLY_ONCE: + retainedCopy.messageId(getNextSequenceId()); + case AT_MOST_ONCE: + } getMQTTTransport().sendToMQTT(retainedCopy.encode()); } catch (IOException e) { LOG.warn("Couldn't send retained message " + msg, e); @@ -446,6 +455,12 @@ public class MQTTProtocolConverter { if (sub != null) { MessageAck ack = sub.createMessageAck(md); PUBLISH publish = sub.createPublish((ActiveMQMessage) md.getMessage()); + switch (publish.qos()) { + case AT_LEAST_ONCE: + case EXACTLY_ONCE: + publish.dup(publish.dup() ? true : md.getMessage().isRedelivered()); + case AT_MOST_ONCE: + } if (ack != null && sub.expectAck(publish)) { synchronized (consumerAcks) { consumerAcks.put(publish.messageId(), ack); @@ -480,6 +495,7 @@ public class MQTTProtocolConverter { void onMQTTPubAck(PUBACK command) { short messageId = command.messageId(); + ackPacketId(messageId); MessageAck ack; synchronized (consumerAcks) { ack = consumerAcks.remove(messageId); @@ -511,6 +527,7 @@ public class MQTTProtocolConverter { void onMQTTPubComp(PUBCOMP command) { short messageId = command.messageId(); + ackPacketId(messageId); MessageAck ack; synchronized (consumerAcks) { ack = consumerAcks.remove(messageId); @@ -524,7 +541,7 @@ public class MQTTProtocolConverter { ActiveMQBytesMessage msg = new ActiveMQBytesMessage(); msg.setProducerId(producerId); - MessageId id = new MessageId(producerId, messageIdGenerator.getNextSequenceId()); + MessageId id = new MessageId(producerId, publisherIdGenerator.getNextSequenceId()); msg.setMessageId(id); msg.setTimestamp(System.currentTimeMillis()); msg.setPriority((byte) Message.DEFAULT_PRIORITY); @@ -547,8 +564,7 @@ public class MQTTProtocolConverter { public PUBLISH convertMessage(ActiveMQMessage message) throws IOException, JMSException, DataFormatException { PUBLISH result = new PUBLISH(); - short id = (short) message.getMessageId().getProducerSequenceId(); - result.messageId(id); + // packet id is set in MQTTSubscription QoS qoS; if (message.propertyExists(QOS_PROPERTY_NAME)) { int ordinal = message.getIntProperty(QOS_PROPERTY_NAME); @@ -623,7 +639,7 @@ public class MQTTProtocolConverter { PUBLISH publish = new PUBLISH(); publish.topicName(connect.willTopic()); publish.qos(connect.willQos()); - publish.messageId((short) messageIdGenerator.getNextSequenceId()); + publish.messageId(getNextSequenceId()); publish.payload(connect.willMessage()); ActiveMQMessage message = convertMessage(publish); message.setProducerId(producerId); @@ -815,4 +831,39 @@ public class MQTTProtocolConverter { public void setActiveMQSubscriptionPrefetch(int activeMQSubscriptionPrefetch) { this.activeMQSubscriptionPrefetch = activeMQSubscriptionPrefetch; } + + short setPacketId(MQTTSubscription subscription, ActiveMQMessage message, PUBLISH publish) { + // subscription key + final StringBuilder subscriptionKey = new StringBuilder(); + subscriptionKey.append(subscription.getConsumerInfo().getDestination().getPhysicalName()) + .append(':').append(message.getJMSMessageID()); + final String keyStr = subscriptionKey.toString(); + Short packetId; + synchronized (activemqToPacketIds) { + packetId = activemqToPacketIds.get(keyStr); + if (packetId == null) { + packetId = getNextSequenceId(); + activemqToPacketIds.put(keyStr, packetId); + packetIdsToActivemq.put(packetId, keyStr); + } else { + // mark publish as duplicate! + publish.dup(true); + } + } + publish.messageId(packetId); + return packetId; + } + + void ackPacketId(short packetId) { + synchronized (activemqToPacketIds) { + final String subscriptionKey = packetIdsToActivemq.remove(packetId); + if (subscriptionKey != null) { + activemqToPacketIds.remove(subscriptionKey); + } + } + } + + short getNextSequenceId() { + return (short) messageIdGenerator.getNextSequenceId(); + } } http://git-wip-us.apache.org/repos/asf/activemq/blob/67f151fe/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTSubscription.java ---------------------------------------------------------------------- diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTSubscription.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTSubscription.java index 99af92a..0eed8f6 100644 --- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTSubscription.java +++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTSubscription.java @@ -18,8 +18,8 @@ package org.apache.activemq.transport.mqtt; import java.io.IOException; import java.util.zip.DataFormatException; - import javax.jms.JMSException; + import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ConsumerInfo; @@ -53,6 +53,13 @@ class MQTTSubscription { if (publish.qos().ordinal() > this.qos.ordinal()) { publish.qos(this.qos); } + switch (publish.qos()) { + case AT_LEAST_ONCE: + case EXACTLY_ONCE: + // set packet id, and optionally dup flag + protocolConverter.setPacketId(this, message, publish); + case AT_MOST_ONCE: + } return publish; } @@ -71,4 +78,5 @@ class MQTTSubscription { public QoS qos() { return qos; } + } http://git-wip-us.apache.org/repos/asf/activemq/blob/67f151fe/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java ---------------------------------------------------------------------- diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java index 9ece80e..76d1597 100644 --- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java +++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.transport.mqtt; +import java.net.ProtocolException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CountDownLatch; @@ -30,12 +31,14 @@ import javax.jms.Session; import javax.jms.TextMessage; import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertNotEquals; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.TransportConnector; import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.util.ByteSequence; import org.apache.activemq.util.Wait; +import org.fusesource.hawtbuf.Buffer; import org.fusesource.mqtt.client.BlockingConnection; import org.fusesource.mqtt.client.MQTT; import org.fusesource.mqtt.client.Message; @@ -512,6 +515,195 @@ public class MQTTTest extends AbstractMQTTTest { } + @Test(timeout = 60 * 1000) + public void testUniqueMessageIds() throws Exception { + addMQTTConnector(); + brokerService.start(); + + MQTT mqtt = createMQTTConnection(); + mqtt.setClientId("foo"); + mqtt.setKeepAlive((short)2); + mqtt.setCleanSession(true); + + final List<PUBLISH> publishList = new ArrayList<PUBLISH>(); + mqtt.setTracer(new Tracer() { + @Override + public void onReceive(MQTTFrame frame) { + LOG.info("Client received:\n" + frame); + if (frame.messageType() == PUBLISH.TYPE) { + PUBLISH publish = new PUBLISH(); + try { + // copy the buffers before we decode + Buffer[] buffers = frame.buffers(); + Buffer[] copy = new Buffer[buffers.length]; + for (int i = 0; i < buffers.length; i++) { + copy[i] = buffers[i].deepCopy(); + } + publish.decode(frame); + // reset frame buffers to deep copy + frame.buffers(copy); + } catch (ProtocolException e) { + fail("Error decoding publish " + e.getMessage()); + } + publishList.add(publish); + } + } + + @Override + public void onSend(MQTTFrame frame) { + LOG.info("Client sent:\n" + frame); + } + }); + + final BlockingConnection connection = mqtt.blockingConnection(); + connection.connect(); + + // create overlapping subscriptions with different QoSs + QoS[] qoss = { QoS.AT_MOST_ONCE, QoS.AT_LEAST_ONCE, QoS.EXACTLY_ONCE }; + final String TOPIC = "TopicA/"; + + // publish retained message + connection.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, true); + + String[] subs = {TOPIC, "TopicA/#", "TopicA/+"}; + for (int i = 0; i < qoss.length; i++) { + connection.subscribe(new Topic[]{ new Topic(subs[i], qoss[i]) }); + } + + // publish non-retained message + connection.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, false); + int received = 0; + + Message msg = connection.receive(5000, TimeUnit.MILLISECONDS); + do { + assertNotNull(msg); + assertEquals(TOPIC, new String(msg.getPayload())); + msg.ack(); + int waitCount = 0; + while (publishList.size() <= received && waitCount < 10) { + Thread.sleep(1000); + waitCount++; + } + msg = connection.receive(5000, TimeUnit.MILLISECONDS); + } while (msg != null && received++ < subs.length * 2); + assertEquals("Unexpected number of messages", subs.length * 2, received + 1); + + // make sure we received distinct ids for QoS != AT_MOST_ONCE, and 0 for AT_MOST_ONCE + for (int i = 0; i < publishList.size(); i++) { + for (int j = i + 1; j < publishList.size(); j++) { + final PUBLISH publish1 = publishList.get(i); + final PUBLISH publish2 = publishList.get(j); + boolean qos0 = false; + if (publish1.qos() == QoS.AT_MOST_ONCE) { + qos0 = true; + assertEquals(0, publish1.messageId()); + } + if (publish2.qos() == QoS.AT_MOST_ONCE) { + qos0 = true; + assertEquals(0, publish2.messageId()); + } + if (!qos0) { + assertNotEquals(publish1.messageId(), publish2.messageId()); + } + } + } + + connection.unsubscribe(subs); + connection.disconnect(); + } + + @Test(timeout = 600 * 1000) + public void testResendMessageId() throws Exception { + addMQTTConnector(); + brokerService.start(); + + MQTT mqtt = createMQTTConnection(); + mqtt.setClientId("foo"); + mqtt.setKeepAlive((short)2); + mqtt.setCleanSession(true); + + final List<PUBLISH> publishList = new ArrayList<PUBLISH>(); + mqtt.setTracer(new Tracer() { + @Override + public void onReceive(MQTTFrame frame) { + LOG.info("Client received:\n" + frame); + if (frame.messageType() == PUBLISH.TYPE) { + PUBLISH publish = new PUBLISH(); + try { + // copy the buffers before we decode + Buffer[] buffers = frame.buffers(); + Buffer[] copy = new Buffer[buffers.length]; + for (int i = 0; i < buffers.length; i++) { + copy[i] = buffers[i].deepCopy(); + } + publish.decode(frame); + // reset frame buffers to deep copy + frame.buffers(copy); + } catch (ProtocolException e) { + fail("Error decoding publish " + e.getMessage()); + } + publishList.add(publish); + } + } + + @Override + public void onSend(MQTTFrame frame) { + LOG.info("Client sent:\n" + frame); + } + }); + + final BlockingConnection connection = mqtt.blockingConnection(); + connection.connect(); + + // create overlapping subscriptions with different QoSs + final String TOPIC = "TopicA/"; + final String[] subs = { TOPIC, "+/"}; + connection.subscribe(new Topic[]{new Topic(subs[0], QoS.AT_LEAST_ONCE), new Topic(subs[1], QoS.EXACTLY_ONCE)}); + + // publish non-retained message + connection.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, false); + + Message msg = connection.receive(5000, TimeUnit.MILLISECONDS); + assertNotNull(msg); + assertEquals(TOPIC, new String(msg.getPayload())); + msg = connection.receive(5000, TimeUnit.MILLISECONDS); + assertNotNull(msg); + assertEquals(TOPIC, new String(msg.getPayload())); + + // drop subs without acknowledging messages, then subscribe and receive again + connection.unsubscribe(subs); + connection.subscribe(new Topic[]{new Topic(subs[0], QoS.AT_LEAST_ONCE), new Topic(subs[1], QoS.EXACTLY_ONCE)}); + // wait for all acks to be processed + Thread.sleep(1000); + + msg = connection.receive(5000, TimeUnit.MILLISECONDS); + assertNotNull(msg); + assertEquals(TOPIC, new String(msg.getPayload())); + msg.ack(); + msg = connection.receive(5000, TimeUnit.MILLISECONDS); + assertNotNull(msg); + assertEquals(TOPIC, new String(msg.getPayload())); + msg.ack(); + + // make sure we received duplicate message ids + for (int i = 0; i < publishList.size(); i++) { + boolean found = false; + for (int j = 0; j < publishList.size(); j++) { + if (i != j) { + if (publishList.get(i).messageId() == publishList.get(j).messageId()) { + // one of them is a duplicate + assertTrue(publishList.get(i).dup() || publishList.get(j).dup()); + found = true; + } + } + } + assertTrue("Dup Not found " + publishList.get(i), found); + } + + connection.unsubscribe(subs); + connection.disconnect(); + } + @Test(timeout=60 * 1000) public void testSendMQTTReceiveJMS() throws Exception { addMQTTConnector(); @@ -691,7 +883,7 @@ public class MQTTTest extends AbstractMQTTTest { payload = message.getPayload(); String messageContent = new String(payload); LOG.info("Received message from topic: " + message.getTopic() + - " Message content: " + messageContent); + " Message content: " + messageContent); message.ack(); }
