Fixed AMQ-5160, fixed race condition for retained messages
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/86440903 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/86440903 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/86440903 Branch: refs/heads/trunk Commit: 8644090377eeef09a5afb2e46594c4fa4c311aae Parents: c915b19 Author: Dhiraj Bokde <[email protected]> Authored: Tue May 13 12:16:44 2014 -0700 Committer: Dejan Bosanac <[email protected]> Committed: Mon May 26 11:07:19 2014 +0200 ---------------------------------------------------------------------- .../transport/mqtt/MQTTProtocolConverter.java | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/86440903/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java ---------------------------------------------------------------------- diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java index 71a6fcf..88e684e 100644 --- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java +++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java @@ -386,6 +386,10 @@ public class MQTTProtocolConverter { } MQTTSubscription mqttSubscription = new MQTTSubscription(this, topicQoS, consumerInfo); + // optimistic add to local maps first to be able to handle commands in onActiveMQCommand + subscriptionsByConsumerId.put(id, mqttSubscription); + mqttSubscriptionByTopic.put(topicName, mqttSubscription); + final byte[] qos = {-1}; sendToActiveMQ(consumerInfo, new ResponseHandler() { @Override @@ -401,9 +405,10 @@ public class MQTTProtocolConverter { } }); - if (qos[0] != SUBSCRIBE_ERROR) { - subscriptionsByConsumerId.put(id, mqttSubscription); - mqttSubscriptionByTopic.put(topicName, mqttSubscription); + if (qos[0] == SUBSCRIBE_ERROR) { + // remove from local maps if subscribe failed + subscriptionsByConsumerId.remove(id); + mqttSubscriptionByTopic.remove(topicName); } return qos[0]; @@ -431,7 +436,7 @@ public class MQTTProtocolConverter { final Set<org.apache.activemq.broker.region.Destination> matchingDestinations = topicRegion.getDestinations(destination); for (org.apache.activemq.broker.region.Destination dest : matchingDestinations) { - // recover retroactive messages for matching subscriptions + // recover retroactive messages for matching subscription for (Subscription subscription : dest.getConsumers()) { if (subscription.getConsumerInfo().getConsumerId().equals(consumerId)) { try { @@ -440,6 +445,7 @@ public class MQTTProtocolConverter { throw new MQTTProtocolException("Error recovering retained messages for " + dest.getName() + ": " + e.getMessage(), false, e); } + break; } } } @@ -483,7 +489,7 @@ public class MQTTProtocolConverter { } /** - * Dispatch a ActiveMQ command + * Dispatch an ActiveMQ command */ public void onActiveMQCommand(Command command) throws Exception { if (command.isResponse()) {
