Updated Branches: refs/heads/trunk 99d533c06 -> 973580603
Attempts to fix many of the compatibility issues with MQTT highlighted by AMQ-5043. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/97358060 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/97358060 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/97358060 Branch: refs/heads/trunk Commit: 973580603097f5e620e4d7f375dbbbbbb3581c84 Parents: 99d533c Author: Hiram Chirino <[email protected]> Authored: Tue Feb 11 17:52:57 2014 -0500 Committer: Hiram Chirino <[email protected]> Committed: Tue Feb 11 17:53:24 2014 -0500 ---------------------------------------------------------------------- .../activemq/filter/DestinationMapNode.java | 9 +++ activemq-mqtt/pom.xml | 6 ++ .../transport/mqtt/MQTTProtocolConverter.java | 65 ++++++++++++++------ .../transport/mqtt/MQTTRetainedMessages.java | 29 ++++++--- .../activemq/transport/mqtt/IDERunner.java | 39 ++++++++++++ 5 files changed, 123 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/97358060/activemq-client/src/main/java/org/apache/activemq/filter/DestinationMapNode.java ---------------------------------------------------------------------- diff --git a/activemq-client/src/main/java/org/apache/activemq/filter/DestinationMapNode.java b/activemq-client/src/main/java/org/apache/activemq/filter/DestinationMapNode.java index f9ca156..d52f9de 100755 --- a/activemq-client/src/main/java/org/apache/activemq/filter/DestinationMapNode.java +++ b/activemq-client/src/main/java/org/apache/activemq/filter/DestinationMapNode.java @@ -133,6 +133,15 @@ public class DestinationMapNode implements DestinationNode { } } + public void set(String[] paths, int idx, Object value) { + if (idx >= paths.length) { + values.clear(); + values.add(value); + } else { + getChildOrCreate(paths[idx]).add(paths, idx + 1, value); + } + } + public void remove(String[] paths, int idx, Object value) { if (idx >= paths.length) { values.remove(value); http://git-wip-us.apache.org/repos/asf/activemq/blob/97358060/activemq-mqtt/pom.xml ---------------------------------------------------------------------- diff --git a/activemq-mqtt/pom.xml b/activemq-mqtt/pom.xml index 022fc7c..6b125e8 100755 --- a/activemq-mqtt/pom.xml +++ b/activemq-mqtt/pom.xml @@ -135,6 +135,12 @@ <scope>test</scope> </dependency> <dependency> + <groupId>org.apache.activemq</groupId> + <artifactId>activemq-kahadb-store</artifactId> + <scope>test</scope> + </dependency> + + <dependency> <groupId>org.eclipse.paho</groupId> <artifactId>mqtt-client</artifactId> <scope>test</scope> http://git-wip-us.apache.org/repos/asf/activemq/blob/97358060/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java ---------------------------------------------------------------------- diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java index 9c6fa12..48c18ce 100644 --- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java +++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java @@ -95,6 +95,7 @@ public class MQTTProtocolConverter { } void sendToActiveMQ(Command command, ResponseHandler handler) { + System.out.println(mqttTransport.getInactivityMonitor()+" ==> "+command); command.setCommandId(generateCommandId()); if (handler != null) { command.setResponseRequired(true); @@ -308,15 +309,14 @@ public class MQTTProtocolConverter { //check retained messages if (topics != null){ for (Topic topic:topics){ - Buffer buffer = retainedMessages.getMessage(topic.name().toString()); - if (buffer != null){ - PUBLISH msg = new PUBLISH(); - msg.payload(buffer); - msg.topicName(topic.name()); - try { - getMQTTTransport().sendToMQTT(msg.encode()); - } catch (IOException e) { - LOG.warn("Couldn't send retained message " + msg, e); + ActiveMQTopic destination = new ActiveMQTopic(convertMQTTToActiveMQ(topic.name().toString())); + for (PUBLISH msg : retainedMessages.getMessages(destination)) { + if( msg.payload().length > 0 ) { + try { + getMQTTTransport().sendToMQTT(msg.encode()); + } catch (IOException e) { + LOG.warn("Couldn't send retained message " + msg, e); + } } } } @@ -333,7 +333,7 @@ public class MQTTProtocolConverter { consumerInfo.setDestination(destination); consumerInfo.setPrefetchSize(getActiveMQSubscriptionPrefetch()); consumerInfo.setDispatchAsync(true); - if (!connect.cleanSession() && (connect.clientId() != null)) { + if ( connect.clientId() != null && topic.qos().ordinal() >= QoS.AT_LEAST_ONCE.ordinal() ) { consumerInfo.setSubscriptionName(topic.qos()+":"+topic.name().toString()); } MQTTSubscription mqttSubscription = new MQTTSubscription(this, topic.qos(), consumerInfo); @@ -418,10 +418,10 @@ public class MQTTProtocolConverter { void onMQTTPublish(PUBLISH command) throws IOException, JMSException { checkConnected(); + ActiveMQMessage message = convertMessage(command); if (command.retain()){ - retainedMessages.addMessage(command.topicName().toString(),command.payload()); + retainedMessages.addMessage((ActiveMQTopic) message.getDestination(), command); } - ActiveMQMessage message = convertMessage(command); message.setProducerId(producerId); message.onSend(); sendToActiveMQ(message, createResponseHandler(command)); @@ -484,7 +484,7 @@ public class MQTTProtocolConverter { synchronized (activeMQTopicMap) { topic = activeMQTopicMap.get(command.topicName()); if (topic == null) { - String topicName = command.topicName().toString().replaceAll("/", "."); + String topicName = convertMQTTToActiveMQ(command.topicName().toString()); topic = new ActiveMQTopic(topicName); activeMQTopicMap.put(command.topicName(), topic); } @@ -563,17 +563,21 @@ public class MQTTProtocolConverter { return mqttTransport; } + boolean willSent = false; public void onTransportError() { if (connect != null) { - if (connected.get() && connect.willTopic() != null && connect.willMessage() != null) { + if (connected.get() && connect.willTopic() != null && connect.willMessage() != null && !willSent) { + willSent = true; try { PUBLISH publish = new PUBLISH(); publish.topicName(connect.willTopic()); publish.qos(connect.willQos()); + publish.messageId((short) messageIdGenerator.getNextSequenceId()); publish.payload(connect.willMessage()); ActiveMQMessage message = convertMessage(publish); message.setProducerId(producerId); message.onSend(); + sendToActiveMQ(message, null); } catch (Exception e) { LOG.warn("Failed to publish Will Message " + connect.willMessage()); @@ -703,10 +707,35 @@ public class MQTTProtocolConverter { } private String convertMQTTToActiveMQ(String name) { - String result = name.replace('#', '>'); - result = result.replace('+', '*'); - result = result.replace('/', '.'); - return result; + char[] chars = name.toCharArray(); + for (int i = 0; i < chars.length; i++) { + switch(chars[i]) { + + case '#': + chars[i] = '>'; + break; + case '>': + chars[i] = '#'; + break; + + case '+': + chars[i] = '*'; + break; + case '*': + chars[i] = '+'; + break; + + case '/': + chars[i] = '.'; + break; + case '.': + chars[i] = '/'; + break; + + } + } + String rc = new String(chars); + return rc; } public long getDefaultKeepAlive() { http://git-wip-us.apache.org/repos/asf/activemq/blob/97358060/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTRetainedMessages.java ---------------------------------------------------------------------- diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTRetainedMessages.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTRetainedMessages.java index e502dce..250366d 100644 --- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTRetainedMessages.java +++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTRetainedMessages.java @@ -18,36 +18,51 @@ package org.apache.activemq.transport.mqtt; import org.apache.activemq.Service; import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.util.LRUCache; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.filter.DestinationMapNode; import org.apache.activemq.util.ServiceStopper; import org.apache.activemq.util.ServiceSupport; import org.fusesource.hawtbuf.Buffer; +import org.fusesource.hawtbuf.UTF8Buffer; +import org.fusesource.mqtt.codec.PUBLISH; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.HashSet; +import java.util.Set; + public class MQTTRetainedMessages extends ServiceSupport { private static final Logger LOG = LoggerFactory.getLogger(MQTTRetainedMessages.class); private static final Object LOCK = new Object(); - private LRUCache<String,Buffer> cache = new LRUCache<String, Buffer>(10000); + + DestinationMapNode retainedMessages = new DestinationMapNode(null); private MQTTRetainedMessages(){ } @Override protected void doStop(ServiceStopper stopper) throws Exception { - cache.clear(); + synchronized (this) { + retainedMessages = new DestinationMapNode(null); + } } @Override protected void doStart() throws Exception { } - public void addMessage(String destination,Buffer payload){ - cache.put(destination,payload); + public void addMessage(ActiveMQTopic dest, PUBLISH publish){ + synchronized (this) { + retainedMessages.set(dest.getDestinationPaths(), 0, publish); + } } - public Buffer getMessage(String destination){ - return cache.get(destination); + public Set<PUBLISH> getMessages(ActiveMQTopic topic){ + Set answer = new HashSet(); + synchronized (this) { + retainedMessages.appendMatchingValues(answer, topic.getDestinationPaths(), 0); + } + return (Set<PUBLISH>)answer; } public static MQTTRetainedMessages getMQTTRetainedMessages(BrokerService broker){ http://git-wip-us.apache.org/repos/asf/activemq/blob/97358060/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/IDERunner.java ---------------------------------------------------------------------- diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/IDERunner.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/IDERunner.java new file mode 100644 index 0000000..48e34c4 --- /dev/null +++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/IDERunner.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.mqtt; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.store.kahadb.KahaDBStore; + +import java.io.File; + +/** + * A little helper class for testing a broker in your IDE. + */ +public class IDERunner { + + public static void main(String[]args) throws Exception { + BrokerService bs = new BrokerService(); + bs.addConnector("mqtt://0.0.0.0:1883?trace=true"); + KahaDBStore store = new KahaDBStore(); + store.setDirectory(new File("target/activemq-data/kahadb")); + bs.setPersistenceAdapter(store); + bs.deleteAllMessages(); + bs.start(); + bs.waitUntilStopped(); + } +}
