Partial fix for AMQ-5160, attempts to resolve retained messages using subscription recovery policy, but fails to resend retained messages for duplicate subscriptions
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/bcb60a48 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/bcb60a48 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/bcb60a48 Branch: refs/heads/trunk Commit: bcb60a482cdc1cd2d2de9b4dba6a38ef831b2fa4 Parents: ba519d8 Author: Dhiraj Bokde <[email protected]> Authored: Wed May 7 19:05:36 2014 -0700 Committer: Dejan Bosanac <[email protected]> Committed: Mon May 26 11:07:18 2014 +0200 ---------------------------------------------------------------------- .../apache/activemq/broker/region/Topic.java | 14 ++- ...tainedMessageSubscriptionRecoveryPolicy.java | 107 +++++++++++++++++++ .../transport/mqtt/MQTTProtocolConverter.java | 71 +++++------- .../activemq/transport/mqtt/MQTTTest.java | 86 ++++++++++++--- 4 files changed, 215 insertions(+), 63 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/bcb60a48/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java index 277ce05..4744af8 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java @@ -33,7 +33,7 @@ import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ProducerBrokerExchange; import org.apache.activemq.broker.region.policy.DispatchPolicy; import org.apache.activemq.broker.region.policy.LastImageSubscriptionRecoveryPolicy; -import org.apache.activemq.broker.region.policy.NoSubscriptionRecoveryPolicy; +import org.apache.activemq.broker.region.policy.RetainedMessageSubscriptionRecoveryPolicy; import org.apache.activemq.broker.region.policy.SimpleDispatchPolicy; import org.apache.activemq.broker.region.policy.SubscriptionRecoveryPolicy; import org.apache.activemq.broker.util.InsertionCountList; @@ -91,7 +91,7 @@ public class Topic extends BaseDestination implements Task { subscriptionRecoveryPolicy = new LastImageSubscriptionRecoveryPolicy(); setAlwaysRetroactive(true); } else { - subscriptionRecoveryPolicy = new NoSubscriptionRecoveryPolicy(); + subscriptionRecoveryPolicy = new RetainedMessageSubscriptionRecoveryPolicy(null); } this.taskRunner = taskFactory.createTaskRunner(this, "Topic " + destination.getPhysicalName()); } @@ -675,8 +675,14 @@ public class Topic extends BaseDestination implements Task { return subscriptionRecoveryPolicy; } - public void setSubscriptionRecoveryPolicy(SubscriptionRecoveryPolicy subscriptionRecoveryPolicy) { - this.subscriptionRecoveryPolicy = subscriptionRecoveryPolicy; + public void setSubscriptionRecoveryPolicy(SubscriptionRecoveryPolicy recoveryPolicy) { + if (this.subscriptionRecoveryPolicy != null && this.subscriptionRecoveryPolicy instanceof RetainedMessageSubscriptionRecoveryPolicy) { + // allow users to combine retained message policy with other ActiveMQ policies + RetainedMessageSubscriptionRecoveryPolicy policy = (RetainedMessageSubscriptionRecoveryPolicy) this.subscriptionRecoveryPolicy; + policy.setWrapped(recoveryPolicy); + } else { + this.subscriptionRecoveryPolicy = recoveryPolicy; + } } // Implementation methods http://git-wip-us.apache.org/repos/asf/activemq/blob/bcb60a48/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/RetainedMessageSubscriptionRecoveryPolicy.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/RetainedMessageSubscriptionRecoveryPolicy.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/RetainedMessageSubscriptionRecoveryPolicy.java new file mode 100644 index 0000000..d350a5f --- /dev/null +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/RetainedMessageSubscriptionRecoveryPolicy.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.region.policy; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.activemq.broker.Broker; +import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.broker.region.MessageReference; +import org.apache.activemq.broker.region.SubscriptionRecovery; +import org.apache.activemq.broker.region.Topic; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.Message; +import org.apache.activemq.filter.DestinationFilter; + +/** + * This implementation of {@link org.apache.activemq.broker.region.policy.SubscriptionRecoveryPolicy} will only keep the + * last non-zero length message with the {@link org.apache.activemq.command.ActiveMQMessage}.RETAIN_PROPERTY. + * + * @org.apache.xbean.XBean + * + */ +public class RetainedMessageSubscriptionRecoveryPolicy implements SubscriptionRecoveryPolicy { + + public static final String RETAIN_PROPERTY = "ActiveMQRetain"; + public static final String RETAINED_PROPERTY = "ActiveMQRetained"; + private volatile MessageReference retainedMessage; + private SubscriptionRecoveryPolicy wrapped; + + public RetainedMessageSubscriptionRecoveryPolicy(SubscriptionRecoveryPolicy wrapped) { + this.wrapped = wrapped; + } + + public boolean add(ConnectionContext context, MessageReference node) throws Exception { + final Message message = node.getMessage(); + final Object retainValue = message.getProperty(RETAIN_PROPERTY); + // retain property set to true + final boolean retain = retainValue != null && Boolean.parseBoolean(retainValue.toString()); + if (retain) { + if (message.getContent().getLength() > 0) { + // non zero length message content + retainedMessage = message.copy(); + retainedMessage.getMessage().setProperty(RETAINED_PROPERTY, true); + } else { + // clear retained message + retainedMessage = null; + } + // TODO should we remove the publisher's retain property?? + node.getMessage().removeProperty(RETAIN_PROPERTY); + } + return wrapped == null ? true : wrapped.add(context, node); + } + + public void recover(ConnectionContext context, Topic topic, SubscriptionRecovery sub) throws Exception { + // Re-dispatch the last retained message seen. + if (retainedMessage != null) { + sub.addRecoveredMessage(context, retainedMessage); + } + if (wrapped != null) { + wrapped.recover(context, topic, sub); + } + } + + public void start() throws Exception { + } + + public void stop() throws Exception { + } + + public Message[] browse(ActiveMQDestination destination) throws Exception { + List<Message> result = new ArrayList<Message>(); + if (retainedMessage != null) { + DestinationFilter filter = DestinationFilter.parseFilter(destination); + if (filter.matches(retainedMessage.getMessage().getDestination())) { + result.add(retainedMessage.getMessage()); + } + } + return result.toArray(new Message[result.size()]); + } + + public SubscriptionRecoveryPolicy copy() { + return new RetainedMessageSubscriptionRecoveryPolicy(wrapped); + } + + public void setBroker(Broker broker) { + } + + public void setWrapped(SubscriptionRecoveryPolicy wrapped) { + this.wrapped = wrapped; + } + +} http://git-wip-us.apache.org/repos/asf/activemq/blob/bcb60a48/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java ---------------------------------------------------------------------- diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java index 0e590f0..ebb9f45 100644 --- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java +++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java @@ -28,6 +28,7 @@ import javax.jms.JMSException; import javax.jms.Message; import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.RetainedMessageSubscriptionRecoveryPolicy; import org.apache.activemq.command.*; import org.apache.activemq.store.PersistenceAdapterSupport; import org.apache.activemq.util.ByteArrayOutputStream; @@ -80,13 +81,11 @@ public class MQTTProtocolConverter { private long defaultKeepAlive; private int activeMQSubscriptionPrefetch=1; private final String QOS_PROPERTY_NAME = "QoSPropertyName"; - private final MQTTRetainedMessages retainedMessages; private final MQTTPacketIdGenerator packetIdGenerator; public MQTTProtocolConverter(MQTTTransport mqttTransport, BrokerService brokerService) { this.mqttTransport = mqttTransport; this.brokerService = brokerService; - this.retainedMessages = MQTTRetainedMessages.getMQTTRetainedMessages(brokerService); this.packetIdGenerator = MQTTPacketIdGenerator.getMQTTPacketIdGenerator(brokerService); this.defaultKeepAlive = 0; } @@ -344,36 +343,6 @@ public class MQTTProtocolConverter { } catch (IOException e) { LOG.warn("Couldn't send SUBACK for " + command, e); } - // check retained messages - for (int i = 0; i < topics.length; i++) { - if (qos[i] == SUBSCRIBE_ERROR) { - // skip this topic if subscribe failed - continue; - } - final Topic topic = topics[i]; - ActiveMQTopic destination = new ActiveMQTopic(convertMQTTToActiveMQ(topic.name().toString())); - for (PUBLISH msg : retainedMessages.getMessages(destination)) { - if( msg.payload().length > 0 ) { - try { - PUBLISH retainedCopy = new PUBLISH(); - retainedCopy.topicName(msg.topicName()); - retainedCopy.retain(msg.retain()); - retainedCopy.payload(msg.payload()); - // set QoS of retained message to maximum of subscription QoS - retainedCopy.qos(msg.qos().ordinal() > qos[i] ? QoS.values()[qos[i]] : msg.qos()); - switch (retainedCopy.qos()) { - case AT_LEAST_ONCE: - case EXACTLY_ONCE: - retainedCopy.messageId(packetIdGenerator.getNextSequenceId(getClientId())); - case AT_MOST_ONCE: - } - getMQTTTransport().sendToMQTT(retainedCopy.encode()); - } catch (IOException e) { - LOG.warn("Couldn't send retained message " + msg, e); - } - } - } - } } else { LOG.warn("No topics defined for Subscription " + command); } @@ -382,28 +351,33 @@ public class MQTTProtocolConverter { byte onSubscribe(final Topic topic) throws MQTTProtocolException { - if( mqttSubscriptionByTopic.containsKey(topic.name())) { - if (topic.qos() != mqttSubscriptionByTopic.get(topic.name()).qos()) { + final UTF8Buffer topicName = topic.name(); + final QoS topicQoS = topic.qos(); + if( mqttSubscriptionByTopic.containsKey(topicName)) { + if (topicQoS != mqttSubscriptionByTopic.get(topicName).qos()) { // remove old subscription as the QoS has changed - onUnSubscribe(topic.name()); + onUnSubscribe(topicName); } else { - // duplicate SUBSCRIBE packet, nothing to do - return (byte) topic.qos().ordinal(); + // duplicate SUBSCRIBE packet + // TODO find all matching topics and resend retained messages + return (byte) topicQoS.ordinal(); } + onUnSubscribe(topicName); } - ActiveMQDestination destination = new ActiveMQTopic(convertMQTTToActiveMQ(topic.name().toString())); + ActiveMQDestination destination = new ActiveMQTopic(convertMQTTToActiveMQ(topicName.toString())); ConsumerId id = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId()); ConsumerInfo consumerInfo = new ConsumerInfo(id); consumerInfo.setDestination(destination); consumerInfo.setPrefetchSize(getActiveMQSubscriptionPrefetch()); + consumerInfo.setRetroactive(true); consumerInfo.setDispatchAsync(true); // create durable subscriptions only when cleansession is false - if ( !connect.cleanSession() && connect.clientId() != null && topic.qos().ordinal() >= QoS.AT_LEAST_ONCE.ordinal() ) { - consumerInfo.setSubscriptionName(topic.qos()+":"+topic.name().toString()); + if ( !connect.cleanSession() && connect.clientId() != null && topicQoS.ordinal() >= QoS.AT_LEAST_ONCE.ordinal() ) { + consumerInfo.setSubscriptionName(topicQoS + ":" + topicName.toString()); } - MQTTSubscription mqttSubscription = new MQTTSubscription(this, topic.qos(), consumerInfo); + MQTTSubscription mqttSubscription = new MQTTSubscription(this, topicQoS, consumerInfo); final byte[] qos = {-1}; sendToActiveMQ(consumerInfo, new ResponseHandler() { @@ -412,17 +386,17 @@ public class MQTTProtocolConverter { // validate subscription request if (response.isException()) { final Throwable throwable = ((ExceptionResponse) response).getException(); - LOG.warn("Error subscribing to " + topic.name(), throwable); + LOG.warn("Error subscribing to " + topicName, throwable); qos[0] = SUBSCRIBE_ERROR; } else { - qos[0] = (byte) topic.qos().ordinal(); + qos[0] = (byte) topicQoS.ordinal(); } } }); if (qos[0] != SUBSCRIBE_ERROR) { subscriptionsByConsumerId.put(id, mqttSubscription); - mqttSubscriptionByTopic.put(topic.name(), mqttSubscription); + mqttSubscriptionByTopic.put(topicName, mqttSubscription); } return qos[0]; @@ -508,9 +482,6 @@ public class MQTTProtocolConverter { void onMQTTPublish(PUBLISH command) throws IOException, JMSException { checkConnected(); ActiveMQMessage message = convertMessage(command); - if (command.retain()){ - retainedMessages.addMessage((ActiveMQTopic) message.getDestination(), command); - } message.setProducerId(producerId); message.onSend(); sendToActiveMQ(message, createResponseHandler(command)); @@ -570,6 +541,9 @@ public class MQTTProtocolConverter { msg.setPriority((byte) Message.DEFAULT_PRIORITY); msg.setPersistent(command.qos() != QoS.AT_MOST_ONCE && !command.retain()); msg.setIntProperty(QOS_PROPERTY_NAME, command.qos().ordinal()); + if (command.retain()) { + msg.setBooleanProperty(RetainedMessageSubscriptionRecoveryPolicy.RETAIN_PROPERTY, true); + } ActiveMQTopic topic; synchronized (activeMQTopicMap) { @@ -597,6 +571,9 @@ public class MQTTProtocolConverter { qoS = message.isPersistent() ? QoS.AT_MOST_ONCE : QoS.AT_LEAST_ONCE; } result.qos(qoS); + if (message.getBooleanProperty(RetainedMessageSubscriptionRecoveryPolicy.RETAINED_PROPERTY)) { + result.retain(true); + } UTF8Buffer topicName; synchronized (mqttTopicMap) { http://git-wip-us.apache.org/repos/asf/activemq/blob/bcb60a48/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java ---------------------------------------------------------------------- diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java index 3143cfc..37016b8 100644 --- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java +++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java @@ -16,16 +16,17 @@ */ package org.apache.activemq.transport.mqtt; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertNotEquals; - import java.net.ProtocolException; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Random; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; - import javax.jms.BytesMessage; import javax.jms.Connection; import javax.jms.Destination; @@ -34,15 +35,25 @@ import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertNotEquals; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerPlugin; import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.broker.region.policy.LastImageSubscriptionRecoveryPolicy; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.filter.DestinationMapEntry; import org.apache.activemq.jaas.GroupPrincipal; -import org.apache.activemq.security.*; +import org.apache.activemq.security.AuthenticationUser; +import org.apache.activemq.security.AuthorizationEntry; +import org.apache.activemq.security.AuthorizationPlugin; +import org.apache.activemq.security.DefaultAuthorizationMap; +import org.apache.activemq.security.SimpleAuthenticationPlugin; +import org.apache.activemq.security.SimpleAuthorizationMap; import org.apache.activemq.util.ByteSequence; import org.apache.activemq.util.Wait; import org.fusesource.mqtt.client.BlockingConnection; @@ -368,7 +379,7 @@ public class MQTTTest extends AbstractMQTTTest { connection.subscribe(new Topic[] { new Topic(topic, QoS.AT_LEAST_ONCE) }); Message msg = connection.receive(1000, TimeUnit.MILLISECONDS); - assertNotNull(msg); + assertNotNull("No message for " + topic, msg); assertEquals(RETAINED + topic, new String(msg.getPayload())); msg.ack(); @@ -390,16 +401,17 @@ public class MQTTTest extends AbstractMQTTTest { connection = mqtt.blockingConnection(); connection.connect(); - connection.subscribe(new Topic[] { new Topic(wildcard, QoS.AT_LEAST_ONCE) }); + final byte[] qos = connection.subscribe(new Topic[]{new Topic(wildcard, QoS.AT_LEAST_ONCE)}); + assertNotEquals("Subscribe failed " + wildcard, (byte)0x80, qos[0]); // test retained messages - Message msg = connection.receive(1000, TimeUnit.MILLISECONDS); + Message msg = connection.receive(5000, TimeUnit.MILLISECONDS); do { assertNotNull("RETAINED null " + wildcard, msg); assertTrue("RETAINED prefix " + wildcard, new String(msg.getPayload()).startsWith(RETAINED)); assertTrue("RETAINED matching " + wildcard + " " + msg.getTopic(), pattern.matcher(msg.getTopic()).matches()); msg.ack(); - msg = connection.receive(1000, TimeUnit.MILLISECONDS); + msg = connection.receive(5000, TimeUnit.MILLISECONDS); } while (msg != null); // connection is borked after timeout in connection.receive() @@ -499,10 +511,10 @@ public class MQTTTest extends AbstractMQTTTest { QoS[] qoss = { QoS.AT_MOST_ONCE, QoS.AT_MOST_ONCE, QoS.AT_LEAST_ONCE, QoS.EXACTLY_ONCE }; for (QoS qos : qoss) { - connection.subscribe(new Topic[] { new Topic("TopicA", qos) }); + connection.subscribe(new Topic[]{new Topic("TopicA", qos)}); final Message msg = connection.receive(5000, TimeUnit.MILLISECONDS); - assertNotNull(msg); + assertNotNull("No message for " + qos, msg); assertEquals(RETAIN, new String(msg.getPayload())); msg.ack(); int waitCount = 0; @@ -1340,6 +1352,56 @@ public class MQTTTest extends AbstractMQTTTest { assertNull("Shouldn't receive the message", msg); } + @Test(timeout = 60 * 1000) + public void testActiveMQRecoveryPolicy() throws Exception { + addMQTTConnector(); + + brokerService.start(); + + // test with ActiveMQ LastImageSubscriptionRecoveryPolicy + final PolicyMap policyMap = new PolicyMap(); + final PolicyEntry policyEntry = new PolicyEntry(); + policyEntry.setSubscriptionRecoveryPolicy(new LastImageSubscriptionRecoveryPolicy()); + policyMap.put(new ActiveMQTopic(">"), policyEntry); + brokerService.setDestinationPolicy(policyMap); + + MQTT mqtt = createMQTTConnection("pub-sub", true); + final int[] retain = new int[1]; + final int[] nonretain = new int[1]; + mqtt.setTracer(new Tracer() { + @Override + public void onReceive(MQTTFrame frame) { + if (frame.messageType() == PUBLISH.TYPE) { + LOG.info("Received message with retain=" + frame.retain()); + if (frame.retain()) { + retain[0]++; + } else { + nonretain[0]++; + } + } + } + }); + + BlockingConnection connection = mqtt.blockingConnection(); + connection.connect(); + final String RETAINED = "RETAINED"; + connection.publish("one", RETAINED.getBytes(), QoS.AT_LEAST_ONCE, true); + connection.publish("two", RETAINED.getBytes(), QoS.AT_LEAST_ONCE, true); + + final String NONRETAINED = "NONRETAINED"; + connection.publish("one", NONRETAINED.getBytes(), QoS.AT_LEAST_ONCE, false); + connection.publish("two", NONRETAINED.getBytes(), QoS.AT_LEAST_ONCE, false); + + connection.subscribe(new Topic[]{new Topic("#", QoS.AT_LEAST_ONCE)}); + for (int i = 0; i < 4; i++) { + final Message message = connection.receive(30, TimeUnit.SECONDS); + assertNotNull("Should receive 4 messages", message); + message.ack(); + } + assertEquals("Should receive 2 retained messages", 2, retain[0]); + assertEquals("Should receive 2 non-retained messages", 2, nonretain[0]); + } + @Override protected String getProtocolScheme() { return "mqtt";
