ARTEMIS-1617 - Properly set autoCreated flag on address Flag needs to be set when auto creating an address so that the address can be removed later if auto delete is configured when creating a subscription with MQTT
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/3aef7caa Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/3aef7caa Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/3aef7caa Branch: refs/heads/master Commit: 3aef7caac6b63567df4705869ba9609c53c0a8ec Parents: 0d9a114 Author: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com> Authored: Thu Jan 18 08:36:07 2018 -0500 Committer: Justin Bertram <jbert...@apache.org> Committed: Thu Jan 18 08:59:35 2018 -0600 ---------------------------------------------------------------------- .../protocol/mqtt/MQTTSubscriptionManager.java | 16 +++++++++------- .../tests/integration/mqtt/imported/MQTTTest.java | 17 +++++++++++++++++ 2 files changed, 26 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3aef7caa/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java index ae6b56c..49ab5d9 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java @@ -24,28 +24,29 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import io.netty.handler.codec.mqtt.MqttTopicSubscription; import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException; import org.apache.activemq.artemis.api.core.FilterConstants; +import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; import org.apache.activemq.artemis.core.server.BindingQueryResult; import org.apache.activemq.artemis.core.server.Queue; -import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.core.server.ServerConsumer; import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.utils.CompositeAddress; +import io.netty.handler.codec.mqtt.MqttTopicSubscription; + public class MQTTSubscriptionManager { - private MQTTSession session; + private final MQTTSession session; - private ConcurrentMap<Long, Integer> consumerQoSLevels; + private final ConcurrentMap<Long, Integer> consumerQoSLevels; - private ConcurrentMap<String, ServerConsumer> consumers; + private final ConcurrentMap<String, ServerConsumer> consumers; // We filter out Artemis management messages and notifications - private SimpleString managementFilter; + private final SimpleString managementFilter; public MQTTSubscriptionManager(MQTTSession session) { this.session = session; @@ -108,7 +109,8 @@ public class MQTTSubscriptionManager { if (!bindingQueryResult.isAutoCreateAddresses()) { throw ActiveMQMessageBundle.BUNDLE.addressDoesNotExist(SimpleString.toSimpleString(address)); } - addressInfo = session.getServerSession().createAddress(SimpleString.toSimpleString(address), RoutingType.MULTICAST, false); + addressInfo = session.getServerSession().createAddress(SimpleString.toSimpleString(address), + RoutingType.MULTICAST, true); } return findOrCreateQueue(bindingQueryResult, addressInfo, queue, qos); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3aef7caa/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java index d5978b0..bfc83e0 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java @@ -48,6 +48,7 @@ import org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.impl.AddressInfo; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.tests.util.Wait; import org.apache.activemq.transport.amqp.client.AmqpClient; import org.apache.activemq.transport.amqp.client.AmqpConnection; @@ -1946,4 +1947,20 @@ public class MQTTTest extends MQTTTestSupport { connection2.disconnect(); } } + + @Test + public void autoDestroyAddress() throws Exception { + AddressSettings addressSettings = new AddressSettings(); + addressSettings.setAutoDeleteAddresses(true); + server.getAddressSettingsRepository().addMatch("foo.bar", addressSettings); + + final MQTTClientProvider subscriptionProvider = getMQTTClientProvider(); + initializeConnection(subscriptionProvider); + subscriptionProvider.subscribe("foo/bar", AT_MOST_ONCE); + assertNotNull(server.getAddressInfo(SimpleString.toSimpleString("foo.bar"))); + + subscriptionProvider.disconnect(); + + assertNull(server.getAddressInfo(SimpleString.toSimpleString("foo.bar"))); + } }