ARTEMIS-641 filter out management notifications in MQTT
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/e341b54c Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/e341b54c Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/e341b54c Branch: refs/heads/master Commit: e341b54c49fd31636530f2d471cb68d8143bb62d Parents: bed73f5 Author: Martyn Taylor <[email protected]> Authored: Wed Jul 20 13:30:34 2016 +0100 Committer: jbertram <[email protected]> Committed: Thu Jul 21 14:47:20 2016 -0500 ---------------------------------------------------------------------- .../protocol/mqtt/MQTTSubscriptionManager.java | 20 ++++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e341b54c/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java index 954a1bd..cbe64a6 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java @@ -23,6 +23,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import io.netty.handler.codec.mqtt.MqttTopicSubscription; +import org.apache.activemq.artemis.api.core.FilterConstants; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.ServerConsumer; @@ -37,11 +38,27 @@ public class MQTTSubscriptionManager { private MQTTLogger log = MQTTLogger.LOGGER; + // We filter out Artemis managment messages and notifications + private SimpleString managementFilter; + public MQTTSubscriptionManager(MQTTSession session) { this.session = session; consumers = new ConcurrentHashMap<>(); consumerQoSLevels = new ConcurrentHashMap<>(); + + // Create filter string to ignore management messages + StringBuilder builder = new StringBuilder(); + builder.append("NOT (("); + builder.append(FilterConstants.ACTIVEMQ_ADDRESS); + builder.append(" = '"); + builder.append(session.getServer().getConfiguration().getManagementAddress()); + builder.append("') OR ("); + builder.append(FilterConstants.ACTIVEMQ_ADDRESS); + builder.append(" = '"); + builder.append(session.getServer().getConfiguration().getManagementNotificationAddress()); + builder.append("'))"); + managementFilter = new SimpleString(builder.toString()); } synchronized void start() throws Exception { @@ -85,8 +102,7 @@ public class MQTTSubscriptionManager { */ private void createConsumerForSubscriptionQueue(SimpleString queue, String topic, int qos) throws Exception { long cid = session.getServer().getStorageManager().generateID(); - - ServerConsumer consumer = session.getServerSession().createConsumer(cid, queue, null, false, true, -1); + ServerConsumer consumer = session.getServerSession().createConsumer(cid, queue, managementFilter, false, true, -1); consumer.setStarted(true); consumers.put(topic, consumer);
