Updated Branches: refs/heads/trunk 084d606d8 -> e2a7d6af5
More improvements for AMQ-5043. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/e2a7d6af Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/e2a7d6af Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/e2a7d6af Branch: refs/heads/trunk Commit: e2a7d6af5a447330e3e180e681c9964332c36558 Parents: 084d606 Author: Hiram Chirino <[email protected]> Authored: Wed Feb 12 10:59:31 2014 -0500 Committer: Hiram Chirino <[email protected]> Committed: Wed Feb 12 10:59:40 2014 -0500 ---------------------------------------------------------------------- .../activemq/filter/DestinationMapNode.java | 2 +- .../transport/mqtt/MQTTProtocolConverter.java | 17 ++++++--- .../transport/mqtt/MQTTTransportFilter.java | 37 +++++++++++++++++--- 3 files changed, 45 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/e2a7d6af/activemq-client/src/main/java/org/apache/activemq/filter/DestinationMapNode.java ---------------------------------------------------------------------- diff --git a/activemq-client/src/main/java/org/apache/activemq/filter/DestinationMapNode.java b/activemq-client/src/main/java/org/apache/activemq/filter/DestinationMapNode.java index d52f9de..a2360a0 100755 --- a/activemq-client/src/main/java/org/apache/activemq/filter/DestinationMapNode.java +++ b/activemq-client/src/main/java/org/apache/activemq/filter/DestinationMapNode.java @@ -138,7 +138,7 @@ public class DestinationMapNode implements DestinationNode { values.clear(); values.add(value); } else { - getChildOrCreate(paths[idx]).add(paths, idx + 1, value); + getChildOrCreate(paths[idx]).set(paths, idx + 1, value); } } http://git-wip-us.apache.org/repos/asf/activemq/blob/e2a7d6af/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java ---------------------------------------------------------------------- diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java index 48c18ce..f7c3c1e 100644 --- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java +++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java @@ -95,7 +95,6 @@ public class MQTTProtocolConverter { } void sendToActiveMQ(Command command, ResponseHandler handler) { - System.out.println(mqttTransport.getInactivityMonitor()+" ==> "+command); command.setCommandId(generateCommandId()); if (handler != null) { command.setResponseRequired(true); @@ -256,10 +255,18 @@ public class MQTTProtocolConverter { public void deleteDurableSubs(List<SubscriptionInfo> subs) { try { for (SubscriptionInfo sub : subs) { - TopicMessageStore store = brokerService.getPersistenceAdapter().createTopicMessageStore((ActiveMQTopic) sub.getDestination()); - store.deleteSubscription(connectionInfo.getClientId(), sub.getSubscriptionName()); + RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo(); + rsi.setConnectionId(connectionId); + rsi.setSubscriptionName(sub.getSubcriptionName()); + rsi.setClientId(sub.getClientId()); + sendToActiveMQ(rsi, new ResponseHandler() { + @Override + public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException { + // ignore failures.. + } + }); } - } catch (IOException e) { + } catch (Throwable e) { LOG.warn("Could not delete the MQTT durable subs.", e); } } @@ -477,7 +484,7 @@ public class MQTTProtocolConverter { msg.setMessageId(id); msg.setTimestamp(System.currentTimeMillis()); msg.setPriority((byte) Message.DEFAULT_PRIORITY); - msg.setPersistent(command.qos() != QoS.AT_MOST_ONCE); + msg.setPersistent(command.qos() != QoS.AT_MOST_ONCE && !command.retain()); msg.setIntProperty(QOS_PROPERTY_NAME, command.qos().ordinal()); ActiveMQTopic topic; http://git-wip-us.apache.org/repos/asf/activemq/blob/e2a7d6af/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java ---------------------------------------------------------------------- diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java index 1dcf3dc..54f40e7 100644 --- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java +++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java @@ -17,6 +17,7 @@ package org.apache.activemq.transport.mqtt; import java.io.IOException; +import java.net.ProtocolException; import java.security.cert.X509Certificate; import java.util.concurrent.atomic.AtomicBoolean; @@ -31,7 +32,7 @@ import org.apache.activemq.transport.TransportListener; import org.apache.activemq.transport.tcp.SslTransport; import org.apache.activemq.util.IOExceptionSupport; import org.apache.activemq.wireformat.WireFormat; -import org.fusesource.mqtt.codec.MQTTFrame; +import org.fusesource.mqtt.codec.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -73,11 +74,11 @@ public class MQTTTransportFilter extends TransportFilter implements MQTTTranspor @Override public void onCommand(Object command) { try { + MQTTFrame frame = (MQTTFrame) command; if (trace) { - TRACE.trace("Received: \n" + command); + TRACE.trace("Received: " + toString(frame)); } - - protocolConverter.onMQTTCommand((MQTTFrame) command); + protocolConverter.onMQTTCommand(frame); } catch (IOException e) { handleException(e); } catch (JMSException e) { @@ -97,7 +98,7 @@ public class MQTTTransportFilter extends TransportFilter implements MQTTTranspor public void sendToMQTT(MQTTFrame command) throws IOException { if( !stopped.get() ) { if (trace) { - TRACE.trace("Sending: \n" + command); + TRACE.trace("Sending : " + toString(command)); } Transport n = next; if (n != null) { @@ -106,6 +107,32 @@ public class MQTTTransportFilter extends TransportFilter implements MQTTTranspor } } + static private String toString(MQTTFrame frame) { + if( frame == null ) + return null; + try { + switch (frame.messageType()) { + case PINGREQ.TYPE: return new PINGREQ().decode(frame).toString(); + case PINGRESP.TYPE: return new PINGRESP().decode(frame).toString(); + case CONNECT.TYPE: return new CONNECT().decode(frame).toString(); + case DISCONNECT.TYPE: return new DISCONNECT().decode(frame).toString(); + case SUBSCRIBE.TYPE: return new SUBSCRIBE().decode(frame).toString(); + case UNSUBSCRIBE.TYPE: return new UNSUBSCRIBE().decode(frame).toString(); + case PUBLISH.TYPE: return new PUBLISH().decode(frame).toString(); + case PUBACK.TYPE: return new PUBACK().decode(frame).toString(); + case PUBREC.TYPE: return new PUBREC().decode(frame).toString(); + case PUBREL.TYPE: return new PUBREL().decode(frame).toString(); + case PUBCOMP.TYPE: return new PUBCOMP().decode(frame).toString(); + case CONNACK.TYPE: return new CONNACK().decode(frame).toString(); + case SUBACK.TYPE: return new SUBACK().decode(frame).toString(); + default: return frame.toString(); + } + } catch (Throwable e) { + e.printStackTrace(); + return frame.toString(); + } + } + @Override public void stop() throws Exception { if( stopped.compareAndSet(false, true) ) {
