Updated Branches: refs/heads/trunk de5838660 -> bc4f4e92a
Fixes AMQ-4896 - MQTT does not properly restore durable subs with the Paho client. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/bc4f4e92 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/bc4f4e92 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/bc4f4e92 Branch: refs/heads/trunk Commit: bc4f4e92a6e38032c91748764e5ef4f03b5d4140 Parents: de58386 Author: Hiram Chirino <[email protected]> Authored: Mon Nov 18 12:11:34 2013 -0500 Committer: Hiram Chirino <[email protected]> Committed: Mon Nov 18 12:11:34 2013 -0500 ---------------------------------------------------------------------- .../activemq/broker/TransportConnection.java | 3 + .../store/PersistenceAdapterSupport.java | 47 ++++++++++ .../activemq/transport/ws/MQTTSocket.java | 25 ++++- .../transport/mqtt/MQTTNIOTransportFactory.java | 6 +- .../transport/mqtt/MQTTProtocolConverter.java | 98 ++++++++++++-------- .../transport/mqtt/MQTTSslTransportFactory.java | 6 +- .../transport/mqtt/MQTTTransportFactory.java | 6 +- .../transport/mqtt/MQTTTransportFilter.java | 5 +- 8 files changed, 139 insertions(+), 57 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/bc4f4e92/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java index 1ce75a5..3a9a405 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java @@ -137,6 +137,9 @@ public class TransportConnection implements Connection, Task, CommandVisitor { this.stopTaskRunnerFactory = stopTaskRunnerFactory; this.transport = transport; final BrokerService brokerService = this.broker.getBrokerService(); + if( this.transport instanceof BrokerServiceAware ) { + ((BrokerServiceAware)this.transport).setBrokerService(brokerService); + } this.transport.setTransportListener(new DefaultTransportListener() { @Override public void onCommand(Object o) { http://git-wip-us.apache.org/repos/asf/activemq/blob/bc4f4e92/activemq-broker/src/main/java/org/apache/activemq/store/PersistenceAdapterSupport.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/PersistenceAdapterSupport.java b/activemq-broker/src/main/java/org/apache/activemq/store/PersistenceAdapterSupport.java new file mode 100644 index 0000000..aca4574 --- /dev/null +++ b/activemq-broker/src/main/java/org/apache/activemq/store/PersistenceAdapterSupport.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.store; + +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.command.SubscriptionInfo; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * Used to implement common PersistenceAdapter methods. + */ +public class PersistenceAdapterSupport { + + static public List<SubscriptionInfo> listSubscriptions(PersistenceAdapter pa, String clientId) throws IOException { + ArrayList<SubscriptionInfo> rc = new ArrayList<SubscriptionInfo>(); + for (ActiveMQDestination destination : pa.getDestinations()) { + if( destination.isTopic() ) { + TopicMessageStore store = pa.createTopicMessageStore((ActiveMQTopic) destination); + for (SubscriptionInfo sub : store.getAllSubscriptions()) { + if(clientId==sub.getClientId() || clientId.equals(sub.getClientId()) ) { + rc.add(sub); + } + } + } + } + return rc; + } + +} http://git-wip-us.apache.org/repos/asf/activemq/blob/bc4f4e92/activemq-http/src/main/java/org/apache/activemq/transport/ws/MQTTSocket.java ---------------------------------------------------------------------- diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/MQTTSocket.java b/activemq-http/src/main/java/org/apache/activemq/transport/ws/MQTTSocket.java index 2e112ab..047c459 100644 --- a/activemq-http/src/main/java/org/apache/activemq/transport/ws/MQTTSocket.java +++ b/activemq-http/src/main/java/org/apache/activemq/transport/ws/MQTTSocket.java @@ -16,6 +16,8 @@ */ package org.apache.activemq.transport.ws; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.BrokerServiceAware; import org.apache.activemq.command.Command; import org.apache.activemq.transport.TransportSupport; import org.apache.activemq.transport.mqtt.MQTTInactivityMonitor; @@ -35,13 +37,14 @@ import java.io.IOException; import java.security.cert.X509Certificate; import java.util.concurrent.CountDownLatch; -public class MQTTSocket extends TransportSupport implements WebSocket.OnBinaryMessage, MQTTTransport { +public class MQTTSocket extends TransportSupport implements WebSocket.OnBinaryMessage, MQTTTransport, BrokerServiceAware { private static final Logger LOG = LoggerFactory.getLogger(MQTTSocket.class); Connection outbound; - MQTTProtocolConverter protocolConverter = new MQTTProtocolConverter(this, null); + MQTTProtocolConverter protocolConverter = null; MQTTWireFormat wireFormat = new MQTTWireFormat(); private final CountDownLatch socketTransportStarted = new CountDownLatch(1); + private BrokerService brokerService; @Override public void onMessage(byte[] bytes, int offset, int length) { @@ -56,12 +59,19 @@ public class MQTTSocket extends TransportSupport implements WebSocket.OnBinaryM try { MQTTFrame frame = (MQTTFrame)wireFormat.unmarshal(new ByteSequence(bytes, offset, length)); - protocolConverter.onMQTTCommand(frame); + getProtocolConverter().onMQTTCommand(frame); } catch (Exception e) { onException(IOExceptionSupport.create(e)); } } + private MQTTProtocolConverter getProtocolConverter() { + if( protocolConverter == null ) { + protocolConverter = new MQTTProtocolConverter(this, brokerService); + } + return protocolConverter; + } + @Override public void onOpen(Connection connection) { this.outbound = connection; @@ -70,7 +80,7 @@ public class MQTTSocket extends TransportSupport implements WebSocket.OnBinaryM @Override public void onClose(int closeCode, String message) { try { - protocolConverter.onMQTTCommand(new DISCONNECT().encode()); + getProtocolConverter().onMQTTCommand(new DISCONNECT().encode()); } catch (Exception e) { LOG.warn("Failed to close WebSocket", e); } @@ -101,7 +111,7 @@ public class MQTTSocket extends TransportSupport implements WebSocket.OnBinaryM @Override public void oneway(Object command) throws IOException { try { - protocolConverter.onActiveMQCommand((Command)command); + getProtocolConverter().onActiveMQCommand((Command) command); } catch (Exception e) { onException(IOExceptionSupport.create(e)); } @@ -132,4 +142,9 @@ public class MQTTSocket extends TransportSupport implements WebSocket.OnBinaryM public MQTTWireFormat getWireFormat() { return wireFormat; } + + @Override + public void setBrokerService(BrokerService brokerService) { + this.brokerService = brokerService; + } } http://git-wip-us.apache.org/repos/asf/activemq/blob/bc4f4e92/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOTransportFactory.java ---------------------------------------------------------------------- diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOTransportFactory.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOTransportFactory.java index 52fa228..96f7747 100644 --- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOTransportFactory.java +++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOTransportFactory.java @@ -42,7 +42,7 @@ import org.apache.activemq.wireformat.WireFormat; */ public class MQTTNIOTransportFactory extends NIOTransportFactory implements BrokerServiceAware { - private BrokerContext brokerContext = null; + private BrokerService brokerService = null; protected String getDefaultWireFormatType() { return "mqtt"; @@ -77,13 +77,13 @@ public class MQTTNIOTransportFactory extends NIOTransportFactory implements Brok @SuppressWarnings("rawtypes") public Transport compositeConfigure(Transport transport, WireFormat format, Map options) { - transport = new MQTTTransportFilter(transport, format, brokerContext); + transport = new MQTTTransportFilter(transport, format, brokerService); IntrospectionSupport.setProperties(transport, options); return super.compositeConfigure(transport, format, options); } public void setBrokerService(BrokerService brokerService) { - this.brokerContext = brokerService.getBrokerContext(); + this.brokerService = brokerService; } protected Transport createInactivityMonitor(Transport transport, WireFormat format) { http://git-wip-us.apache.org/repos/asf/activemq/blob/bc4f4e92/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java ---------------------------------------------------------------------- diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java index ac598e7..d4c05eb 100644 --- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java +++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java @@ -17,6 +17,8 @@ package org.apache.activemq.transport.mqtt; import java.io.IOException; +import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; @@ -28,29 +30,10 @@ import javax.jms.JMSException; import javax.jms.Message; import org.apache.activemq.broker.BrokerContext; -import org.apache.activemq.command.ActiveMQBytesMessage; -import org.apache.activemq.command.ActiveMQDestination; -import org.apache.activemq.command.ActiveMQMapMessage; -import org.apache.activemq.command.ActiveMQMessage; -import org.apache.activemq.command.ActiveMQTextMessage; -import org.apache.activemq.command.ActiveMQTopic; -import org.apache.activemq.command.Command; -import org.apache.activemq.command.ConnectionError; -import org.apache.activemq.command.ConnectionId; -import org.apache.activemq.command.ConnectionInfo; -import org.apache.activemq.command.ConsumerId; -import org.apache.activemq.command.ConsumerInfo; -import org.apache.activemq.command.ExceptionResponse; -import org.apache.activemq.command.MessageAck; -import org.apache.activemq.command.MessageDispatch; -import org.apache.activemq.command.MessageId; -import org.apache.activemq.command.ProducerId; -import org.apache.activemq.command.ProducerInfo; -import org.apache.activemq.command.RemoveInfo; -import org.apache.activemq.command.Response; -import org.apache.activemq.command.SessionId; -import org.apache.activemq.command.SessionInfo; -import org.apache.activemq.command.ShutdownInfo; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.command.*; +import org.apache.activemq.store.PersistenceAdapterSupport; +import org.apache.activemq.store.TopicMessageStore; import org.apache.activemq.util.ByteArrayOutputStream; import org.apache.activemq.util.ByteSequence; import org.apache.activemq.util.IOExceptionSupport; @@ -102,6 +85,7 @@ public class MQTTProtocolConverter { private final Map<Short, MessageAck> consumerAcks = new LRUCache<Short, MessageAck>(DEFAULT_CACHE_SIZE); private final Map<Short, PUBREC> publisherRecs = new LRUCache<Short, PUBREC>(DEFAULT_CACHE_SIZE); private final MQTTTransport mqttTransport; + private final BrokerService brokerService; private final Object commnadIdMutex = new Object(); private int lastCommandId; @@ -113,8 +97,9 @@ public class MQTTProtocolConverter { private int activeMQSubscriptionPrefetch=1; private final String QOS_PROPERTY_NAME = "QoSPropertyName"; - public MQTTProtocolConverter(MQTTTransport mqttTransport, BrokerContext brokerContext) { + public MQTTProtocolConverter(MQTTTransport mqttTransport, BrokerService brokerService) { this.mqttTransport = mqttTransport; + this.brokerService = brokerService; this.defaultKeepAlive = 0; } @@ -269,12 +254,43 @@ public class MQTTProtocolConverter { connected.set(true); getMQTTTransport().sendToMQTT(ack.encode()); + List<SubscriptionInfo> subs = PersistenceAdapterSupport.listSubscriptions(brokerService.getPersistenceAdapter(), connectionInfo.getClientId()); + if( connect.cleanSession() ) { + deleteDurableSubs(subs); + } else { + restoreDurableSubs(subs); + } } }); } }); } + public void deleteDurableSubs(List<SubscriptionInfo> subs) { + try { + for (SubscriptionInfo sub : subs) { + TopicMessageStore store = brokerService.getPersistenceAdapter().createTopicMessageStore((ActiveMQTopic) sub.getDestination()); + store.deleteSubscription(connectionInfo.getClientId(), sub.getSubscriptionName()); + } + } catch (IOException e) { + LOG.warn("Could not delete the MQTT durable subs.", e); + } + } + + public void restoreDurableSubs(List<SubscriptionInfo> subs) { + try { + SUBSCRIBE command = new SUBSCRIBE(); + for (SubscriptionInfo sub : subs) { + String name = sub.getSubcriptionName(); + String[] split = name.split(":", 2); + QoS qoS = QoS.valueOf(split[0]); + onSubscribe(new Topic(split[1], qoS)); + } + } catch (IOException e) { + LOG.warn("Could not restore the MQTT durable subs.", e); + } + } + void onMQTTDisconnect() throws MQTTProtocolException { if (connected.get()) { connected.set(false); @@ -290,7 +306,7 @@ public class MQTTProtocolConverter { if (topics != null) { byte[] qos = new byte[topics.length]; for (int i = 0; i < topics.length; i++) { - qos[i] = (byte) onSubscribe(command, topics[i]).ordinal(); + qos[i] = (byte) onSubscribe(topics[i]).ordinal(); } SUBACK ack = new SUBACK(); ack.messageId(command.messageId()); @@ -305,25 +321,25 @@ public class MQTTProtocolConverter { } } - QoS onSubscribe(SUBSCRIBE command, Topic topic) throws MQTTProtocolException { - ActiveMQDestination destination = new ActiveMQTopic(convertMQTTToActiveMQ(topic.name().toString())); + QoS onSubscribe(Topic topic) throws MQTTProtocolException { + if( !mqttSubscriptionByTopic.containsKey(topic.name()) ) { + ActiveMQDestination destination = new ActiveMQTopic(convertMQTTToActiveMQ(topic.name().toString())); - ConsumerId id = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId()); - ConsumerInfo consumerInfo = new ConsumerInfo(id); - consumerInfo.setDestination(destination); - consumerInfo.setPrefetchSize(getActiveMQSubscriptionPrefetch()); - consumerInfo.setDispatchAsync(true); - if (!connect.cleanSession() && (connect.clientId() != null)) { - //by default subscribers are persistent - consumerInfo.setSubscriptionName( - connect.clientId().toString() + topic.name().toString()); - } - MQTTSubscription mqttSubscription = new MQTTSubscription(this, topic.qos(), consumerInfo); + ConsumerId id = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId()); + ConsumerInfo consumerInfo = new ConsumerInfo(id); + consumerInfo.setDestination(destination); + consumerInfo.setPrefetchSize(getActiveMQSubscriptionPrefetch()); + consumerInfo.setDispatchAsync(true); + if (!connect.cleanSession() && (connect.clientId() != null)) { + consumerInfo.setSubscriptionName(topic.qos()+":"+topic.name().toString()); + } + MQTTSubscription mqttSubscription = new MQTTSubscription(this, topic.qos(), consumerInfo); - subscriptionsByConsumerId.put(id, mqttSubscription); - mqttSubscriptionByTopic.put(topic.name(), mqttSubscription); + subscriptionsByConsumerId.put(id, mqttSubscription); + mqttSubscriptionByTopic.put(topic.name(), mqttSubscription); - sendToActiveMQ(consumerInfo, null); + sendToActiveMQ(consumerInfo, null); + } return topic.qos(); } http://git-wip-us.apache.org/repos/asf/activemq/blob/bc4f4e92/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTSslTransportFactory.java ---------------------------------------------------------------------- diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTSslTransportFactory.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTSslTransportFactory.java index 1bb12f2..a0d32b1 100644 --- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTSslTransportFactory.java +++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTSslTransportFactory.java @@ -33,7 +33,7 @@ import org.apache.activemq.wireformat.WireFormat; */ public class MQTTSslTransportFactory extends SslTransportFactory implements BrokerServiceAware { - private BrokerContext brokerContext = null; + private BrokerService brokerService = null; protected String getDefaultWireFormatType() { return "mqtt"; @@ -42,7 +42,7 @@ public class MQTTSslTransportFactory extends SslTransportFactory implements Brok @SuppressWarnings("rawtypes") public Transport compositeConfigure(Transport transport, WireFormat format, Map options) { - transport = new MQTTTransportFilter(transport, format, brokerContext); + transport = new MQTTTransportFilter(transport, format, brokerService); IntrospectionSupport.setProperties(transport, options); return super.compositeConfigure(transport, format, options); } @@ -61,7 +61,7 @@ public class MQTTSslTransportFactory extends SslTransportFactory implements Brok } public void setBrokerService(BrokerService brokerService) { - this.brokerContext = brokerService.getBrokerContext(); + this.brokerService = brokerService; } protected Transport createInactivityMonitor(Transport transport, WireFormat format) { http://git-wip-us.apache.org/repos/asf/activemq/blob/bc4f4e92/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFactory.java ---------------------------------------------------------------------- diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFactory.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFactory.java index 7b4696a..da23ba5 100644 --- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFactory.java +++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFactory.java @@ -40,7 +40,7 @@ import javax.net.ServerSocketFactory; */ public class MQTTTransportFactory extends TcpTransportFactory implements BrokerServiceAware { - private BrokerContext brokerContext = null; + private BrokerService brokerService = null; protected String getDefaultWireFormatType() { return "mqtt"; @@ -54,13 +54,13 @@ public class MQTTTransportFactory extends TcpTransportFactory implements BrokerS @SuppressWarnings("rawtypes") public Transport compositeConfigure(Transport transport, WireFormat format, Map options) { - transport = new MQTTTransportFilter(transport, format, brokerContext); + transport = new MQTTTransportFilter(transport, format, brokerService); IntrospectionSupport.setProperties(transport, options); return super.compositeConfigure(transport, format, options); } public void setBrokerService(BrokerService brokerService) { - this.brokerContext = brokerService.getBrokerContext(); + this.brokerService = brokerService; } @SuppressWarnings("rawtypes") http://git-wip-us.apache.org/repos/asf/activemq/blob/bc4f4e92/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java ---------------------------------------------------------------------- diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java index 805fd3f..1dcf3dc 100644 --- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java +++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java @@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import javax.jms.JMSException; import org.apache.activemq.broker.BrokerContext; +import org.apache.activemq.broker.BrokerService; import org.apache.activemq.command.Command; import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.TransportFilter; @@ -50,9 +51,9 @@ public class MQTTTransportFilter extends TransportFilter implements MQTTTranspor private boolean trace; - public MQTTTransportFilter(Transport next, WireFormat wireFormat, BrokerContext brokerContext) { + public MQTTTransportFilter(Transport next, WireFormat wireFormat, BrokerService brokerService) { super(next); - this.protocolConverter = new MQTTProtocolConverter(this, brokerContext); + this.protocolConverter = new MQTTProtocolConverter(this, brokerService); if (wireFormat instanceof MQTTWireFormat) { this.wireFormat = (MQTTWireFormat) wireFormat;
