Fixed AMQ-5160, fixed durable subscription retroactive recovery
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/6c859676 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/6c859676 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/6c859676 Branch: refs/heads/trunk Commit: 6c859676b3995334e96c16c47653ec72fa70f729 Parents: 42ad103 Author: Dhiraj Bokde <[email protected]> Authored: Fri May 16 14:21:19 2014 -0700 Committer: Dejan Bosanac <[email protected]> Committed: Mon May 26 11:07:19 2014 +0200 ---------------------------------------------------------------------- .../broker/region/DurableTopicSubscription.java | 18 ++++------- .../broker/region/PrefetchSubscription.java | 3 +- .../transport/mqtt/MQTTProtocolConverter.java | 21 ++++++++++++ .../activemq/transport/mqtt/MQTTTest.java | 34 ++++++++++++++------ 4 files changed, 55 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/6c859676/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java index e61a608..4c19c62 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java @@ -23,7 +23,6 @@ import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; - import javax.jms.InvalidSelectorException; import javax.jms.JMSException; @@ -120,9 +119,6 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us if (active.get() || keepDurableSubsActive) { Topic topic = (Topic) destination; topic.activate(context, this); - if (topic.isAlwaysRetroactive() || info.isRetroactive()) { - topic.recoverRetroactiveMessages(context, this); - } this.enqueueCounter += pending.size(); } else if (destination.getMessageStore() != null) { TopicMessageStore store = (TopicMessageStore) destination.getMessageStore(); @@ -172,12 +168,12 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us pending.setMaxAuditDepth(getMaxAuditDepth()); pending.setMaxProducersToAudit(getMaxProducersToAudit()); pending.start(); - // use recovery policy for retroactive topics and consumers - for (Destination destination : durableDestinations.values()) { - Topic topic = (Topic) destination; - if (topic.isAlwaysRetroactive() || info.isRetroactive()) { - topic.recoverRetroactiveMessages(context, this); - } + } + // use recovery policy every time sub is activated for retroactive topics and consumers + for (Destination destination : durableDestinations.values()) { + Topic topic = (Topic) destination; + if (topic.isAlwaysRetroactive() || info.isRetroactive()) { + topic.recoverRetroactiveMessages(context, this); } } } @@ -277,7 +273,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us } @Override - protected void dispatchPending() throws IOException { + public void dispatchPending() throws IOException { if (isActive()) { super.dispatchPending(); } http://git-wip-us.apache.org/repos/asf/activemq/blob/6c859676/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java index ff4c0aa..5ba3b53 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java @@ -633,7 +633,8 @@ public abstract class PrefetchSubscription extends AbstractSubscription { dispatched.removeAll(references); } - protected void dispatchPending() throws IOException { + // made public so it can be used in MQTTProtocolConverter + public void dispatchPending() throws IOException { synchronized(pendingLock) { try { int numberToDispatch = countBeforeFull(); http://git-wip-us.apache.org/repos/asf/activemq/blob/6c859676/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java ---------------------------------------------------------------------- diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java index 88e684e..56f7fbd 100644 --- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java +++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java @@ -17,6 +17,8 @@ package org.apache.activemq.transport.mqtt; import java.io.IOException; +import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -30,6 +32,7 @@ import javax.jms.Message; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.broker.region.PrefetchSubscription; import org.apache.activemq.broker.region.RegionBroker; import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.TopicRegion; @@ -71,6 +74,8 @@ public class MQTTProtocolConverter { private final ConcurrentHashMap<UTF8Buffer, MQTTSubscription> mqttSubscriptionByTopic = new ConcurrentHashMap<UTF8Buffer, MQTTSubscription>(); private final Map<UTF8Buffer, ActiveMQTopic> activeMQTopicMap = new LRUCache<UTF8Buffer, ActiveMQTopic>(DEFAULT_CACHE_SIZE); private final Map<Destination, UTF8Buffer> mqttTopicMap = new LRUCache<Destination, UTF8Buffer>(DEFAULT_CACHE_SIZE); + private final Set<String> restoredSubs = Collections.synchronizedSet(new HashSet<String>()); + private final Map<Short, MessageAck> consumerAcks = new LRUCache<Short, MessageAck>(DEFAULT_CACHE_SIZE); private final Map<Short, PUBREC> publisherRecs = new LRUCache<Short, PUBREC>(DEFAULT_CACHE_SIZE); @@ -317,6 +322,8 @@ public class MQTTProtocolConverter { String[] split = name.split(":", 2); QoS qoS = QoS.valueOf(split[0]); onSubscribe(new Topic(split[1], qoS)); + // mark this durable subscription as restored by Broker + restoredSubs.add(split[1]); } } catch (IOException e) { LOG.warn("Could not restore the MQTT durable subs.", e); @@ -416,6 +423,12 @@ public class MQTTProtocolConverter { private void resendRetainedMessages(UTF8Buffer topicName, ActiveMQDestination destination, MQTTSubscription mqttSubscription) throws MQTTProtocolException { + // check whether the Topic has been recovered in restoreDurableSubs + // mark subscription available for recovery for duplicate subscription + if (restoredSubs.remove(destination.getPhysicalName())) { + return; + } + // get TopicRegion RegionBroker regionBroker; try { @@ -441,6 +454,11 @@ public class MQTTProtocolConverter { if (subscription.getConsumerInfo().getConsumerId().equals(consumerId)) { try { ((org.apache.activemq.broker.region.Topic)dest).recoverRetroactiveMessages(connectionContext, subscription); + if (subscription instanceof PrefetchSubscription) { + // request dispatch for prefetch subs + PrefetchSubscription prefetchSubscription = (PrefetchSubscription) subscription; + prefetchSubscription.dispatchPending(); + } } catch (Exception e) { throw new MQTTProtocolException("Error recovering retained messages for " + dest.getName() + ": " + e.getMessage(), false, e); @@ -479,6 +497,9 @@ public class MQTTProtocolConverter { // check if the durable sub also needs to be removed if (subs.getConsumerInfo().getSubscriptionName() != null) { + // also remove it from restored durable subscriptions set + restoredSubs.remove(convertMQTTToActiveMQ(topicName.toString())); + RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo(); rsi.setConnectionId(connectionId); rsi.setSubscriptionName(subs.getConsumerInfo().getSubscriptionName()); http://git-wip-us.apache.org/repos/asf/activemq/blob/6c859676/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java ---------------------------------------------------------------------- diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java index 9c8c9b5..466e6a6 100644 --- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java +++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java @@ -526,55 +526,71 @@ public class MQTTTest extends AbstractMQTTTest { } - @Test(timeout = 60 * 1000) + @Test(timeout = 120 * 1000) public void testRetainedMessage() throws Exception { addMQTTConnector(); brokerService.start(); MQTT mqtt = createMQTTConnection(); mqtt.setKeepAlive((short) 2); - mqtt.setCleanSession(true); final String RETAIN = "RETAIN"; final String TOPICA = "TopicA"; - final String[] clientIds = { null, "foo" }; + final String[] clientIds = { null, "foo", "durable" }; for (String clientId : clientIds) { mqtt.setClientId(clientId); - final BlockingConnection connection = mqtt.blockingConnection(); + mqtt.setCleanSession(!"durable".equals(clientId)); + + BlockingConnection connection = mqtt.blockingConnection(); connection.connect(); // set retained message and check connection.publish(TOPICA, RETAIN.getBytes(), QoS.EXACTLY_ONCE, true); - connection.subscribe(new Topic[]{new Topic(TOPICA, QoS.AT_MOST_ONCE)}); + connection.subscribe(new Topic[]{new Topic(TOPICA, QoS.AT_LEAST_ONCE)}); Message msg = connection.receive(5000, TimeUnit.MILLISECONDS); assertNotNull("No retained message for " + clientId, msg); assertEquals(RETAIN, new String(msg.getPayload())); msg.ack(); + assertNull(connection.receive(5000, TimeUnit.MILLISECONDS)); // test duplicate subscription - connection.subscribe(new Topic[]{new Topic(TOPICA, QoS.AT_MOST_ONCE)}); - msg = connection.receive(5000, TimeUnit.MILLISECONDS); + connection.subscribe(new Topic[]{new Topic(TOPICA, QoS.AT_LEAST_ONCE)}); + msg = connection.receive(15000, TimeUnit.MILLISECONDS); assertNotNull("No retained message on duplicate subscription for " + clientId, msg); assertEquals(RETAIN, new String(msg.getPayload())); msg.ack(); + assertNull(connection.receive(5000, TimeUnit.MILLISECONDS)); connection.unsubscribe(new String[]{"TopicA"}); // clear retained message and check that we don't receive it connection.publish(TOPICA, "".getBytes(), QoS.AT_MOST_ONCE, true); - connection.subscribe(new Topic[]{new Topic(TOPICA, QoS.AT_MOST_ONCE)}); + connection.subscribe(new Topic[]{new Topic(TOPICA, QoS.AT_LEAST_ONCE)}); msg = connection.receive(5000, TimeUnit.MILLISECONDS); assertNull("Retained message not cleared for " + clientId, msg); connection.unsubscribe(new String[]{"TopicA"}); // set retained message again and check connection.publish(TOPICA, RETAIN.getBytes(), QoS.EXACTLY_ONCE, true); - connection.subscribe(new Topic[]{new Topic(TOPICA, QoS.AT_MOST_ONCE)}); + connection.subscribe(new Topic[]{new Topic(TOPICA, QoS.AT_LEAST_ONCE)}); msg = connection.receive(5000, TimeUnit.MILLISECONDS); assertNotNull("No reset retained message for " + clientId, msg); assertEquals(RETAIN, new String(msg.getPayload())); msg.ack(); + assertNull(connection.receive(5000, TimeUnit.MILLISECONDS)); + + // re-connect and check + connection.disconnect(); + connection = mqtt.blockingConnection(); + connection.connect(); + connection.subscribe(new Topic[]{new Topic(TOPICA, QoS.AT_LEAST_ONCE)}); + msg = connection.receive(5000, TimeUnit.MILLISECONDS); + assertNotNull("No reset retained message for " + clientId, msg); + assertEquals(RETAIN, new String(msg.getPayload())); + msg.ack(); + assertNull(connection.receive(5000, TimeUnit.MILLISECONDS)); + connection.unsubscribe(new String[]{"TopicA"}); connection.disconnect();
