MQTT Handle ANYCAST addresses
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/0772b547 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/0772b547 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/0772b547 Branch: refs/heads/ARTEMIS-780 Commit: 0772b5478774fef25534a7c9f060c65decf959f6 Parents: be04eac Author: Martyn Taylor <[email protected]> Authored: Tue Nov 1 12:28:06 2016 +0000 Committer: Clebert Suconic <[email protected]> Committed: Mon Nov 7 11:28:07 2016 -0500 ---------------------------------------------------------------------- .../artemis/api/core/ActiveMQExceptionType.java | 6 ++++ ...ActiveMQUnexpectedRoutingTypeForAddress.java | 31 ++++++++++++++++++++ .../protocol/mqtt/MQTTSubscriptionManager.java | 17 ++++++++--- .../core/server/ActiveMQMessageBundle.java | 5 ++++ .../integration/mqtt/imported/MQTTTest.java | 24 +++++++++++++++ 5 files changed, 79 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0772b547/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQExceptionType.java ---------------------------------------------------------------------- diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQExceptionType.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQExceptionType.java index 0221562..309a8c4 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQExceptionType.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQExceptionType.java @@ -219,6 +219,12 @@ public enum ActiveMQExceptionType { public ActiveMQException createException(String msg) { return new ActiveMQQueueMaxConsumerLimitReached(msg); } + }, + UNEXPECTED_ROUTING_TYPE_FOR_ADDRESS(215) { + @Override + public ActiveMQException createException(String msg) { + return new ActiveMQUnexpectedRoutingTypeForAddress(msg); + } }; private static final Map<Integer, ActiveMQExceptionType> TYPE_MAP; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0772b547/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQUnexpectedRoutingTypeForAddress.java ---------------------------------------------------------------------- diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQUnexpectedRoutingTypeForAddress.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQUnexpectedRoutingTypeForAddress.java new file mode 100644 index 0000000..1bd7ecd --- /dev/null +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQUnexpectedRoutingTypeForAddress.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.api.core; + +/** + * An operation failed because a queue exists on the server. + */ +public final class ActiveMQUnexpectedRoutingTypeForAddress extends ActiveMQException { + + public ActiveMQUnexpectedRoutingTypeForAddress() { + super(ActiveMQExceptionType.MAX_CONSUMER_LIMIT_EXCEEDED); + } + + public ActiveMQUnexpectedRoutingTypeForAddress(String msg) { + super(ActiveMQExceptionType.MAX_CONSUMER_LIMIT_EXCEEDED, msg); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0772b547/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java index d894910..a264e88 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java @@ -25,8 +25,10 @@ import java.util.concurrent.ConcurrentMap; import io.netty.handler.codec.mqtt.MqttTopicSubscription; import org.apache.activemq.artemis.api.core.FilterConstants; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.ServerConsumer; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; public class MQTTSubscriptionManager { @@ -61,7 +63,8 @@ public class MQTTSubscriptionManager { synchronized void start() throws Exception { for (MqttTopicSubscription subscription : session.getSessionState().getSubscriptions()) { - Queue q = createQueueForSubscription(subscription.topicName(), subscription.qualityOfService().value()); + String coreAddress = MQTTUtil.convertMQTTAddressFilterToCore(subscription.topicName()); + Queue q = createQueueForSubscription(coreAddress, subscription.qualityOfService().value()); createConsumerForSubscriptionQueue(q, subscription.topicName(), subscription.qualityOfService().value()); } } @@ -84,8 +87,8 @@ public class MQTTSubscriptionManager { /** * Creates a Queue if it doesn't already exist, based on a topic and address. Returning the queue name. */ - private Queue createQueueForSubscription(String topic, int qos) throws Exception { - String address = MQTTUtil.convertMQTTAddressFilterToCore(topic); + private Queue createQueueForSubscription(String address, int qos) throws Exception { + SimpleString queue = getQueueNameForTopic(address); Queue q = session.getServer().locateQueue(queue); @@ -113,9 +116,15 @@ public class MQTTSubscriptionManager { int qos = subscription.qualityOfService().value(); String topic = subscription.topicName(); + String coreAddress = MQTTUtil.convertMQTTAddressFilterToCore(topic); + AddressInfo addressInfo = session.getServer().getAddressInfo(new SimpleString(coreAddress)); + if (addressInfo.getRoutingType() != AddressInfo.RoutingType.MULTICAST) { + throw ActiveMQMessageBundle.BUNDLE.unexpectedRoutingTypeForAddress(new SimpleString(coreAddress), AddressInfo.RoutingType.MULTICAST, addressInfo.getRoutingType()); + } + session.getSessionState().addSubscription(subscription); - Queue q = createQueueForSubscription(topic, qos); + Queue q = createQueueForSubscription(coreAddress, qos); if (s == null) { createConsumerForSubscriptionQueue(q, topic, qos); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0772b547/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java index 769d183..9475461 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java @@ -35,11 +35,13 @@ import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException; import org.apache.activemq.artemis.api.core.ActiveMQQueueMaxConsumerLimitReached; import org.apache.activemq.artemis.api.core.ActiveMQSecurityException; import org.apache.activemq.artemis.api.core.ActiveMQSessionCreationException; +import org.apache.activemq.artemis.api.core.ActiveMQUnexpectedRoutingTypeForAddress; import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.postoffice.Binding; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationSyncFileMessage; import org.apache.activemq.artemis.core.security.CheckType; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.jboss.logging.Messages; import org.jboss.logging.annotations.Cause; import org.jboss.logging.annotations.Message; @@ -381,4 +383,7 @@ public interface ActiveMQMessageBundle { @Message(id = 119200, value = "Maximum Consumer Limit Reached on Queue:(address={0},queue={1})", format = Message.Format.MESSAGE_FORMAT) ActiveMQQueueMaxConsumerLimitReached maxConsumerLimitReachedForQueue(SimpleString address, SimpleString queueName); + + @Message(id = 119201, value = "Expected Routing Type {1} but found {2} for address {0}", format = Message.Format.MESSAGE_FORMAT) + ActiveMQUnexpectedRoutingTypeForAddress unexpectedRoutingTypeForAddress(SimpleString address, AddressInfo.RoutingType expectedRoutingType, AddressInfo.RoutingType actualRoutingType); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0772b547/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java index 6406955..dd0098a 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java @@ -1639,4 +1639,28 @@ public class MQTTTest extends MQTTTestSupport { assertNotNull(peerDisconnectedException); assertTrue(peerDisconnectedException.getMessage().contains("Peer disconnected")); } + + @Test(timeout = 60 * 1000) + public void testClientDisconnectedWhenTryingToSubscribeToAnAnycastAddress() throws Exception { + Exception peerDisconnectedException = null; + try { + SimpleString coreAddress = new SimpleString("foo.bar"); + Topic[] mqttSubscription = new Topic[]{new Topic("foo/bar", QoS.AT_LEAST_ONCE)}; + + AddressInfo addressInfo = new AddressInfo(coreAddress); + addressInfo.setRoutingType(AddressInfo.RoutingType.ANYCAST); + getServer().createOrUpdateAddressInfo(addressInfo); + + MQTT mqtt = createMQTTConnection(); + mqtt.setClientId("test-mqtt"); + mqtt.setKeepAlive((short) 2); + final BlockingConnection connection = mqtt.blockingConnection(); + connection.connect(); + connection.subscribe(mqttSubscription); + } catch (EOFException e) { + peerDisconnectedException = e; + } + assertNotNull(peerDisconnectedException); + assertTrue(peerDisconnectedException.getMessage().contains("Peer disconnected")); + } }
