http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7a51491c/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java index 01f0b08..79635d9 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java @@ -19,6 +19,7 @@ package org.apache.activemq.artemis.spi.core.remoting; import javax.transaction.xa.XAException; import javax.transaction.xa.Xid; import java.util.HashMap; +import java.util.Set; import java.util.concurrent.Executor; import org.apache.activemq.artemis.api.core.ActiveMQException; @@ -33,6 +34,7 @@ import org.apache.activemq.artemis.core.client.impl.ClientMessageInternal; import org.apache.activemq.artemis.core.client.impl.ClientProducerCreditsImpl; import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal; import org.apache.activemq.artemis.core.message.impl.MessageInternal; +import org.apache.activemq.artemis.core.server.RoutingType; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.utils.IDGenerator; import org.apache.activemq.artemis.utils.SimpleIDGenerator; @@ -64,7 +66,7 @@ public abstract class SessionContext { public abstract int getReconnectID(); /** - * it will eather reattach or reconnect, preferably reattaching it. + * it will either reattach or reconnect, preferably reattaching it. * * @param newConnection * @return true if it was possible to reattach @@ -159,20 +161,49 @@ public abstract class SessionContext { public abstract void setSendAcknowledgementHandler(final SendAcknowledgementHandler handler); + /** + * Creates a shared queue using the routing type set by the Address. If the Address supports more than one type of delivery + * then the default delivery mode (MULTICAST) is used. + * + * @param address + * @param queueName + * @param routingType + * @param filterString + * @param durable + * @throws ActiveMQException + */ public abstract void createSharedQueue(SimpleString address, SimpleString queueName, + RoutingType routingType, SimpleString filterString, boolean durable) throws ActiveMQException; + public abstract void createSharedQueue(SimpleString address, + SimpleString queueName, + SimpleString filterString, + boolean durable) throws ActiveMQException; + public abstract void deleteQueue(SimpleString queueName) throws ActiveMQException; - public abstract void createAddress(SimpleString address, boolean multicast, boolean autoCreated) throws ActiveMQException; + public abstract void createAddress(SimpleString address, Set<RoutingType> routingTypes, boolean autoCreated) throws ActiveMQException; + + + @Deprecated + public abstract void createQueue(SimpleString address, + SimpleString queueName, + SimpleString filterString, + boolean durable, + boolean temp, + boolean autoCreated) throws ActiveMQException; public abstract void createQueue(SimpleString address, + RoutingType routingType, SimpleString queueName, SimpleString filterString, boolean durable, boolean temp, + int maxConsumers, + boolean deleteOnNoConsumers, boolean autoCreated) throws ActiveMQException; public abstract ClientSession.QueueQuery queueQuery(SimpleString queueName) throws ActiveMQException;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7a51491c/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java ---------------------------------------------------------------------- diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java index 3d3fa66..47d9ff2 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java @@ -41,6 +41,7 @@ import org.apache.activemq.artemis.api.core.client.ClientMessage; import org.apache.activemq.artemis.api.core.client.ClientProducer; import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler; +import org.apache.activemq.artemis.core.server.RoutingType; import org.apache.activemq.artemis.utils.UUID; import org.apache.activemq.artemis.utils.UUIDGenerator; @@ -405,15 +406,15 @@ public class ActiveMQMessageProducer implements MessageProducer, QueueSender, To if (!query.isExists()) { if (destination.isQueue() && query.isAutoCreateJmsQueues()) { - clientSession.createAddress(address, false, true); + clientSession.createAddress(address, RoutingType.ANYCAST, true); if (destination.isTemporary()) { // TODO is it right to use the address for the queue name here? - clientSession.createTemporaryQueue(address, address); + clientSession.createTemporaryQueue(address, RoutingType.ANYCAST, address); } else { - clientSession.createQueue(address, address, null, true, true); + clientSession.createQueue(address, RoutingType.ANYCAST, address, null, true, true); } } else if (!destination.isQueue() && query.isAutoCreateJmsTopics()) { - clientSession.createAddress(address, true, true); + clientSession.createAddress(address, RoutingType.MULTICAST, true); } else if ((destination.isQueue() && !query.isAutoCreateJmsQueues()) || (!destination.isQueue() && !query.isAutoCreateJmsTopics())) { throw new InvalidDestinationException("Destination " + address + " does not exist"); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7a51491c/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java ---------------------------------------------------------------------- diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java index 7fc0fb2..fe2a1a0 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java @@ -56,6 +56,7 @@ import org.apache.activemq.artemis.api.core.client.ClientProducer; import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.api.core.client.ClientSession.AddressQuery; import org.apache.activemq.artemis.api.core.client.ClientSession.QueueQuery; +import org.apache.activemq.artemis.core.server.RoutingType; import org.apache.activemq.artemis.selector.filter.FilterException; import org.apache.activemq.artemis.selector.impl.SelectorParser; import org.apache.activemq.artemis.utils.SelectorTranslator; @@ -302,10 +303,10 @@ public class ActiveMQSession implements QueueSession, TopicSession { if (!response.isExists()) { if (jbd.isQueue() && response.isAutoCreateJmsQueues()) { // perhaps just relying on the broker to do it is simplest (i.e. deleteOnNoConsumers) - session.createAddress(jbd.getSimpleAddress(), false, true); - session.createQueue(jbd.getSimpleAddress(), jbd.getSimpleAddress(), null, true, true); + session.createAddress(jbd.getSimpleAddress(), RoutingType.ANYCAST, true); + session.createQueue(jbd.getSimpleAddress(), RoutingType.ANYCAST, jbd.getSimpleAddress(), null, true, true); } else if (!jbd.isQueue() && response.isAutoCreateJmsTopics()) { - session.createAddress(jbd.getSimpleAddress(), true, true); + session.createAddress(jbd.getSimpleAddress(), RoutingType.MULTICAST, true); } else { throw new InvalidDestinationException("Destination " + jbd.getName() + " does not exist"); } @@ -579,7 +580,7 @@ public class ActiveMQSession implements QueueSession, TopicSession { if (durability == ConsumerDurability.DURABLE) { try { - session.createSharedQueue(dest.getSimpleAddress(), queueName, coreFilterString, true); + session.createSharedQueue(dest.getSimpleAddress(), RoutingType.MULTICAST, queueName, coreFilterString, true); } catch (ActiveMQQueueExistsException ignored) { // We ignore this because querying and then creating the queue wouldn't be idempotent // we could also add a parameter to ignore existence what would require a bigger work around to avoid @@ -646,7 +647,7 @@ public class ActiveMQSession implements QueueSession, TopicSession { */ if (!response.isExists() || !response.getQueueNames().contains(dest.getSimpleAddress())) { if (response.isAutoCreateJmsQueues()) { - session.createQueue(dest.getSimpleAddress(), dest.getSimpleAddress(), null, true, true); + session.createQueue(dest.getSimpleAddress(), RoutingType.ANYCAST, dest.getSimpleAddress(), null, true, true); } else { throw new InvalidDestinationException("Destination " + dest.getName() + " does not exist"); } @@ -660,7 +661,7 @@ public class ActiveMQSession implements QueueSession, TopicSession { if (!response.isExists()) { if (response.isAutoCreateJmsTopics()) { - session.createAddress(dest.getSimpleAddress(), true, true); + session.createAddress(dest.getSimpleAddress(), RoutingType.MULTICAST, true); } else { throw new InvalidDestinationException("Topic " + dest.getName() + " does not exist"); } @@ -677,7 +678,7 @@ public class ActiveMQSession implements QueueSession, TopicSession { queueName = new SimpleString(UUID.randomUUID().toString()); - session.createTemporaryQueue(dest.getSimpleAddress(), queueName, coreFilterString); + session.createTemporaryQueue(dest.getSimpleAddress(), RoutingType.MULTICAST, queueName, coreFilterString); consumer = session.createConsumer(queueName, null, false); @@ -699,7 +700,7 @@ public class ActiveMQSession implements QueueSession, TopicSession { QueueQuery subResponse = session.queueQuery(queueName); if (!subResponse.isExists()) { - session.createQueue(dest.getSimpleAddress(), queueName, coreFilterString, true); + session.createQueue(dest.getSimpleAddress(), RoutingType.MULTICAST, queueName, coreFilterString, true); } else { // Already exists if (subResponse.getConsumerCount() > 0) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7a51491c/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/impl/JMSServerManagerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/impl/JMSServerManagerImpl.java b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/impl/JMSServerManagerImpl.java index 74fde63..01746e2 100644 --- a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/impl/JMSServerManagerImpl.java +++ b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/impl/JMSServerManagerImpl.java @@ -56,7 +56,7 @@ import org.apache.activemq.artemis.core.security.Role; import org.apache.activemq.artemis.core.server.ActivateCallback; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; -import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.server.RoutingType; import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.management.Notification; @@ -1070,9 +1070,9 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback coreFilterString = SelectorTranslator.convertToActiveMQFilterString(selectorString); } - server.createOrUpdateAddressInfo(new AddressInfo(SimpleString.toSimpleString(activeMQQueue.getName())).setRoutingType(AddressInfo.RoutingType.ANYCAST).setDefaultMaxQueueConsumers(-1)); + server.createOrUpdateAddressInfo(new AddressInfo(SimpleString.toSimpleString(activeMQQueue.getName())).addRoutingType(RoutingType.ANYCAST)); - Queue queue = server.deployQueue(SimpleString.toSimpleString(activeMQQueue.getAddress()), SimpleString.toSimpleString(activeMQQueue.getAddress()), SimpleString.toSimpleString(coreFilterString), durable, false, autoCreated); + server.deployQueue(SimpleString.toSimpleString(activeMQQueue.getAddress()), RoutingType.ANYCAST, SimpleString.toSimpleString(activeMQQueue.getAddress()), SimpleString.toSimpleString(coreFilterString), durable, false, autoCreated); queues.put(queueName, activeMQQueue); @@ -1106,7 +1106,7 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback // does not exist - otherwise we would not be able to distinguish from a non existent topic and one with no // subscriptions - core has no notion of a topic // server.deployQueue(SimpleString.toSimpleString(activeMQTopic.getAddress()), SimpleString.toSimpleString(activeMQTopic.getAddress()), SimpleString.toSimpleString(JMSServerManagerImpl.REJECT_FILTER), true, false, autoCreated); - server.createOrUpdateAddressInfo(new AddressInfo(SimpleString.toSimpleString(activeMQTopic.getAddress()))); + server.createOrUpdateAddressInfo(new AddressInfo(SimpleString.toSimpleString(activeMQTopic.getAddress()), RoutingType.MULTICAST)); topics.put(topicName, activeMQTopic); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7a51491c/artemis-junit/src/main/java/org/apache/activemq/artemis/junit/ActiveMQConsumerResource.java ---------------------------------------------------------------------- diff --git a/artemis-junit/src/main/java/org/apache/activemq/artemis/junit/ActiveMQConsumerResource.java b/artemis-junit/src/main/java/org/apache/activemq/artemis/junit/ActiveMQConsumerResource.java index c4a59ce..8b09827 100644 --- a/artemis-junit/src/main/java/org/apache/activemq/artemis/junit/ActiveMQConsumerResource.java +++ b/artemis-junit/src/main/java/org/apache/activemq/artemis/junit/ActiveMQConsumerResource.java @@ -16,11 +16,16 @@ */ package org.apache.activemq.artemis.junit; +import java.util.Collections; +import java.util.HashSet; + import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.ClientConsumer; import org.apache.activemq.artemis.api.core.client.ClientMessage; import org.apache.activemq.artemis.api.core.client.ServerLocator; +import org.apache.activemq.artemis.core.server.RoutingType; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; /** * A JUnit Rule that embeds an ActiveMQ Artemis ClientConsumer into a test. @@ -85,6 +90,7 @@ public class ActiveMQConsumerResource extends AbstractActiveMQClientResource { try { if (!session.queueQuery(queueName).isExists() && autoCreateQueue) { log.warn("{}: queue does not exist - creating queue: address = {}, name = {}", this.getClass().getSimpleName(), queueName.toString(), queueName.toString()); + session.createAddress(queueName, RoutingType.MULTICAST, true); session.createQueue(queueName, queueName); } consumer = session.createConsumer(queueName, browseOnly); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7a51491c/artemis-junit/src/main/java/org/apache/activemq/artemis/junit/EmbeddedActiveMQResource.java ---------------------------------------------------------------------- diff --git a/artemis-junit/src/main/java/org/apache/activemq/artemis/junit/EmbeddedActiveMQResource.java b/artemis-junit/src/main/java/org/apache/activemq/artemis/junit/EmbeddedActiveMQResource.java index 77c74ce..5b386e8 100644 --- a/artemis-junit/src/main/java/org/apache/activemq/artemis/junit/EmbeddedActiveMQResource.java +++ b/artemis-junit/src/main/java/org/apache/activemq/artemis/junit/EmbeddedActiveMQResource.java @@ -20,6 +20,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.TransportConfiguration; @@ -39,6 +40,7 @@ import org.apache.activemq.artemis.core.remoting.impl.invm.TransportConstants; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.BindingQueryResult; import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.server.RoutingType; import org.apache.activemq.artemis.core.server.embedded.EmbeddedActiveMQ; import org.junit.rules.ExternalResource; import org.slf4j.Logger; @@ -360,7 +362,7 @@ public class EmbeddedActiveMQResource extends ExternalResource { boolean temporary = false; Queue queue = null; try { - queue = server.getActiveMQServer().createQueue(address, name, filter, isUseDurableQueue(), temporary); + queue = server.getActiveMQServer().createQueue(address, RoutingType.MULTICAST, name, filter, isUseDurableQueue(), temporary, ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), ActiveMQDefaultConfiguration.getDefaultDeleteQueueOnNoConsumers(), true); } catch (Exception ex) { throw new EmbeddedActiveMQResourceException(String.format("Failed to create queue: queueName = %s, name = %s", address.toString(), name.toString()), ex); } @@ -379,7 +381,7 @@ public class EmbeddedActiveMQResource extends ExternalResource { public void createSharedQueue(SimpleString address, SimpleString name, SimpleString user) { SimpleString filter = null; try { - server.getActiveMQServer().createSharedQueue(address, name, filter, user, isUseDurableQueue()); + server.getActiveMQServer().createSharedQueue(address, RoutingType.MULTICAST, name, filter, user, isUseDurableQueue()); } catch (Exception ex) { throw new EmbeddedActiveMQResourceException(String.format("Failed to create shared queue: queueName = %s, name = %s, user = %s", address.toString(), name.toString(), user.toString()), ex); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7a51491c/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java index 1c87f29..b3542d3 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java @@ -26,6 +26,7 @@ import io.netty.handler.codec.mqtt.MqttTopicSubscription; import org.apache.activemq.artemis.api.core.FilterConstants; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; +import org.apache.activemq.artemis.core.server.RoutingType; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.ServerConsumer; import org.apache.activemq.artemis.core.server.impl.AddressInfo; @@ -93,7 +94,7 @@ public class MQTTSubscriptionManager { Queue q = session.getServer().locateQueue(queue); if (q == null) { - q = session.getServerSession().createQueue(new SimpleString(address), queue, managementFilter, false, MQTTUtil.DURABLE_MESSAGES && qos >= 0, -1, false, true); + q = session.getServerSession().createQueue(new SimpleString(address), queue, RoutingType.MULTICAST, managementFilter, false, MQTTUtil.DURABLE_MESSAGES && qos >= 0, true); } else { if (q.isDeleteOnNoConsumers()) { throw ActiveMQMessageBundle.BUNDLE.invalidQueueConfiguration(q.getAddress(), q.getName(), "deleteOnNoConsumers", false, true); @@ -122,8 +123,8 @@ public class MQTTSubscriptionManager { String coreAddress = MQTTUtil.convertMQTTAddressFilterToCore(topic); AddressInfo addressInfo = session.getServer().getAddressInfo(new SimpleString(coreAddress)); - if (addressInfo != null && addressInfo.getRoutingType() != AddressInfo.RoutingType.MULTICAST) { - throw ActiveMQMessageBundle.BUNDLE.unexpectedRoutingTypeForAddress(new SimpleString(coreAddress), AddressInfo.RoutingType.MULTICAST, addressInfo.getRoutingType()); + if (addressInfo != null && !addressInfo.getRoutingTypes().contains(RoutingType.MULTICAST)) { + throw ActiveMQMessageBundle.BUNDLE.unexpectedRoutingTypeForAddress(new SimpleString(coreAddress), RoutingType.MULTICAST, addressInfo.getRoutingTypes()); } session.getSessionState().addSubscription(subscription); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7a51491c/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java index 1acf48a..ff9ac70 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java @@ -721,7 +721,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se CheckType checkType = dest.isTemporary() ? CheckType.CREATE_NON_DURABLE_QUEUE : CheckType.CREATE_DURABLE_QUEUE; server.getSecurityStore().check(qName, checkType, this); server.checkQueueCreationLimit(getUsername()); - server.createQueue(qName, qName, null, connInfo == null ? null : SimpleString.toSimpleString(connInfo.getUserName()), true, false); + server.createQueue(qName, null, qName, connInfo == null ? null : SimpleString.toSimpleString(connInfo.getUserName()), true, false); } } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7a51491c/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java index d5d65a9..1803cc8 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java @@ -33,6 +33,7 @@ import org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.QueueQueryResult; +import org.apache.activemq.artemis.core.server.RoutingType; import org.apache.activemq.artemis.core.server.ServerConsumer; import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener; @@ -97,7 +98,7 @@ public class AMQConsumer { } else { SimpleString queueName = new SimpleString(openwireDestination.getPhysicalName()); try { - session.getCoreServer().createQueue(queueName, queueName, null, true, false); + session.getCoreServer().createQueue(queueName, RoutingType.ANYCAST, queueName, null, true, false); } catch (ActiveMQQueueExistsException e) { // ignore } @@ -151,10 +152,10 @@ public class AMQConsumer { session.getCoreSession().deleteQueue(queueName); // Create the new one - session.getCoreSession().createQueue(address, queueName, selector, false, true); + session.getCoreSession().createQueue(address, queueName, RoutingType.MULTICAST, selector, false, true); } } else { - session.getCoreSession().createQueue(address, queueName, selector, false, true); + session.getCoreSession().createQueue(address, queueName, RoutingType.MULTICAST, selector, false, true); } } else { queueName = new SimpleString(UUID.randomUUID().toString()); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7a51491c/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java index b2a2068..67870b6 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java @@ -172,7 +172,7 @@ public class AMQSession implements SessionCallback { if (!queueBinding.isExists()) { if (isAutoCreate) { - server.createQueue(queueName, queueName, null, true, isTemporary); + server.createQueue(queueName, null, queueName, null, true, isTemporary); connection.addKnownDestination(queueName); } else { hasQueue = false; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7a51491c/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java index 3b0991d..3f148f3 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java @@ -38,9 +38,9 @@ import org.apache.activemq.artemis.core.remoting.CloseListener; import org.apache.activemq.artemis.core.remoting.FailureListener; import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; +import org.apache.activemq.artemis.core.server.RoutingType; import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.ServerSession; -import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; @@ -260,23 +260,23 @@ public final class StompConnection implements RemotingConnection { } } - public boolean autoCreateDestinationIfPossible(String queue, AddressInfo.RoutingType routingType) throws ActiveMQStompException { + public boolean autoCreateDestinationIfPossible(String queue, RoutingType routingType) throws ActiveMQStompException { boolean result = false; ServerSession session = getSession().getSession(); try { if (manager.getServer().getAddressInfo(SimpleString.toSimpleString(queue)) == null) { AddressSettings addressSettings = manager.getServer().getAddressSettingsRepository().getMatch(queue); - if (routingType != null && routingType.equals(AddressInfo.RoutingType.MULTICAST) && addressSettings.isAutoCreateAddresses()) { - session.createAddress(SimpleString.toSimpleString(queue), true, true); + if (routingType != null && routingType == RoutingType.MULTICAST && addressSettings.isAutoCreateAddresses()) { + session.createAddress(SimpleString.toSimpleString(queue), RoutingType.MULTICAST, true); result = true; } else { if (addressSettings.isAutoCreateAddresses()) { - session.createAddress(SimpleString.toSimpleString(queue), false, true); + session.createAddress(SimpleString.toSimpleString(queue), RoutingType.ANYCAST, true); result = true; } if (addressSettings.isAutoCreateQueues()) { - session.createQueue(SimpleString.toSimpleString(queue), SimpleString.toSimpleString(queue), null, false, true, null, null, true); + session.createQueue(SimpleString.toSimpleString(queue), SimpleString.toSimpleString(queue), RoutingType.ANYCAST, null, false, true, true); result = true; } } @@ -290,10 +290,10 @@ public final class StompConnection implements RemotingConnection { return result; } - public void checkRoutingSemantics(String destination, AddressInfo.RoutingType routingType) throws ActiveMQStompException { - AddressInfo.RoutingType actualRoutingTypeOfAddress = manager.getServer().getAddressInfo(SimpleString.toSimpleString(destination)).getRoutingType(); - if (routingType != null && !routingType.equals(actualRoutingTypeOfAddress)) { - throw BUNDLE.illegalSemantics(routingType.toString(), actualRoutingTypeOfAddress.toString()); + public void checkRoutingSemantics(String destination, RoutingType routingType) throws ActiveMQStompException { + Set<RoutingType> actualDeliveryModesOfAddres = manager.getServer().getAddressInfo(SimpleString.toSimpleString(destination)).getRoutingTypes(); + if (routingType != null && !actualDeliveryModesOfAddres.contains(routingType)) { + throw BUNDLE.illegalSemantics(routingType.toString(), actualDeliveryModesOfAddres.toString()); } } @@ -654,7 +654,7 @@ public final class StompConnection implements RemotingConnection { String id, String durableSubscriptionName, boolean noLocal, - AddressInfo.RoutingType subscriptionType) throws ActiveMQStompException { + RoutingType subscriptionType) throws ActiveMQStompException { autoCreateDestinationIfPossible(destination, subscriptionType); checkDestination(destination); checkRoutingSemantics(destination, subscriptionType); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7a51491c/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java index d207544..24db587 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java @@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.protocol.stomp; import java.util.Iterator; import java.util.Map; import java.util.Map.Entry; +import java.util.Set; import java.util.concurrent.BlockingDeque; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingDeque; @@ -34,12 +35,12 @@ import org.apache.activemq.artemis.core.persistence.OperationContext; import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.persistence.impl.journal.LargeServerMessageImpl; import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; +import org.apache.activemq.artemis.core.server.RoutingType; import org.apache.activemq.artemis.core.server.LargeServerMessage; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.ServerConsumer; import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.ServerSession; -import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl; import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; @@ -286,7 +287,8 @@ public class StompSession implements SessionCallback { receiveCredits = -1; } - if (manager.getServer().getAddressInfo(SimpleString.toSimpleString(destination)).getRoutingType().equals(AddressInfo.RoutingType.MULTICAST)) { + Set<RoutingType> routingTypes = manager.getServer().getAddressInfo(SimpleString.toSimpleString(destination)).getRoutingTypes(); + if (routingTypes.size() == 1 && routingTypes.contains(RoutingType.MULTICAST)) { // subscribes to a topic pubSub = true; if (durableSubscriptionName != null) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7a51491c/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java index 580bade..cdd9e50 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java @@ -27,8 +27,8 @@ import org.apache.activemq.artemis.core.protocol.stomp.Stomp.Headers; import org.apache.activemq.artemis.core.protocol.stomp.v10.StompFrameHandlerV10; import org.apache.activemq.artemis.core.protocol.stomp.v11.StompFrameHandlerV11; import org.apache.activemq.artemis.core.protocol.stomp.v12.StompFrameHandlerV12; +import org.apache.activemq.artemis.core.server.RoutingType; import org.apache.activemq.artemis.core.server.ServerMessage; -import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl; import org.apache.activemq.artemis.utils.DataConstants; import org.apache.activemq.artemis.utils.ExecutorFactory; @@ -169,7 +169,7 @@ public abstract class VersionedStompFrameHandler { try { connection.validate(); String destination = getDestination(frame); - AddressInfo.RoutingType routingType = getRoutingType(frame.getHeader(Headers.Send.DESTINATION_TYPE), frame.getHeader(Headers.Send.DESTINATION)); + RoutingType routingType = getRoutingType(frame.getHeader(Headers.Send.DESTINATION_TYPE), frame.getHeader(Headers.Send.DESTINATION)); connection.autoCreateDestinationIfPossible(destination, routingType); connection.checkDestination(destination); connection.checkRoutingSemantics(destination, routingType); @@ -247,7 +247,7 @@ public abstract class VersionedStompFrameHandler { if (durableSubscriptionName == null) { durableSubscriptionName = request.getHeader(Stomp.Headers.Subscribe.DURABLE_SUBSCRIPTION_NAME); } - AddressInfo.RoutingType routingType = getRoutingType(request.getHeader(Headers.Subscribe.SUBSCRIPTION_TYPE), request.getHeader(Headers.Subscribe.DESTINATION)); + RoutingType routingType = getRoutingType(request.getHeader(Headers.Subscribe.SUBSCRIPTION_TYPE), request.getHeader(Headers.Subscribe.DESTINATION)); boolean noLocal = false; if (request.hasHeader(Stomp.Headers.Subscribe.NO_LOCAL)) { @@ -344,16 +344,16 @@ public abstract class VersionedStompFrameHandler { connection.destroy(); } - private AddressInfo.RoutingType getRoutingType(String typeHeader, String destination) { + private RoutingType getRoutingType(String typeHeader, String destination) { // null is valid to return here so we know when the user didn't provide any routing info - AddressInfo.RoutingType routingType = null; + RoutingType routingType = null; if (typeHeader != null) { - routingType = AddressInfo.RoutingType.valueOf(typeHeader); + routingType = RoutingType.valueOf(typeHeader); } else if (destination != null && !connection.getAnycastPrefix().equals(connection.getMulticastPrefix())) { if (connection.getMulticastPrefix().length() > 0 && destination.startsWith(connection.getMulticastPrefix())) { - routingType = AddressInfo.RoutingType.MULTICAST; + routingType = RoutingType.MULTICAST; } else if (connection.getAnycastPrefix().length() > 0 && destination.startsWith(connection.getAnycastPrefix())) { - routingType = AddressInfo.RoutingType.ANYCAST; + routingType = RoutingType.ANYCAST; } } return routingType; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7a51491c/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQRAMessageProducer.java ---------------------------------------------------------------------- diff --git a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQRAMessageProducer.java b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQRAMessageProducer.java index 74c11cc..447c824 100644 --- a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQRAMessageProducer.java +++ b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQRAMessageProducer.java @@ -224,7 +224,7 @@ public class ActiveMQRAMessageProducer implements MessageProducer { @Override public int getDeliveryMode() throws JMSException { if (ActiveMQRAMessageProducer.trace) { - ActiveMQRALogger.LOGGER.trace("getDeliveryMode()"); + ActiveMQRALogger.LOGGER.trace("getRoutingType()"); } return producer.getDeliveryMode(); @@ -314,7 +314,7 @@ public class ActiveMQRAMessageProducer implements MessageProducer { @Override public void setDeliveryMode(final int deliveryMode) throws JMSException { if (ActiveMQRAMessageProducer.trace) { - ActiveMQRALogger.LOGGER.trace("setDeliveryMode(" + deliveryMode + ")"); + ActiveMQRALogger.LOGGER.trace("setRoutingType(" + deliveryMode + ")"); } producer.setDeliveryMode(deliveryMode); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7a51491c/artemis-rest/src/test/java/org/apache/activemq/artemis/rest/test/FindDestinationTest.java ---------------------------------------------------------------------- diff --git a/artemis-rest/src/test/java/org/apache/activemq/artemis/rest/test/FindDestinationTest.java b/artemis-rest/src/test/java/org/apache/activemq/artemis/rest/test/FindDestinationTest.java index f6eaf17..db23f56 100644 --- a/artemis-rest/src/test/java/org/apache/activemq/artemis/rest/test/FindDestinationTest.java +++ b/artemis-rest/src/test/java/org/apache/activemq/artemis/rest/test/FindDestinationTest.java @@ -17,6 +17,7 @@ package org.apache.activemq.artemis.rest.test; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.server.RoutingType; import org.jboss.resteasy.client.ClientRequest; import org.jboss.resteasy.client.ClientResponse; import org.jboss.resteasy.spi.Link; @@ -29,7 +30,7 @@ public class FindDestinationTest extends MessageTestBase { @Test public void testFindQueue() throws Exception { String testName = "testFindQueue"; - server.getActiveMQServer().createQueue(new SimpleString(testName), new SimpleString(testName), null, false, false); + server.getActiveMQServer().createQueue(new SimpleString(testName), RoutingType.MULTICAST, new SimpleString(testName), null, false, false); ClientRequest request = new ClientRequest(TestPortProvider.generateURL("/queues/" + testName)); @@ -59,7 +60,7 @@ public class FindDestinationTest extends MessageTestBase { @Test public void testFindTopic() throws Exception { - server.getActiveMQServer().createQueue(new SimpleString("testTopic"), new SimpleString("testTopic"), null, false, false); + server.getActiveMQServer().createQueue(new SimpleString("testTopic"), RoutingType.MULTICAST, new SimpleString("testTopic"), null, false, false); ClientRequest request = new ClientRequest(TestPortProvider.generateURL("/topics/testTopic")); ClientResponse<?> response = request.head(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7a51491c/artemis-rest/src/test/java/org/apache/activemq/artemis/rest/test/RawAckTest.java ---------------------------------------------------------------------- diff --git a/artemis-rest/src/test/java/org/apache/activemq/artemis/rest/test/RawAckTest.java b/artemis-rest/src/test/java/org/apache/activemq/artemis/rest/test/RawAckTest.java index 3b6f598..6743a4b 100644 --- a/artemis-rest/src/test/java/org/apache/activemq/artemis/rest/test/RawAckTest.java +++ b/artemis-rest/src/test/java/org/apache/activemq/artemis/rest/test/RawAckTest.java @@ -34,6 +34,8 @@ import org.apache.activemq.artemis.core.remoting.impl.invm.InVMAcceptorFactory; import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnectorFactory; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServers; +import org.apache.activemq.artemis.core.server.RoutingType; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; @@ -64,9 +66,11 @@ public class RawAckTest { sessionFactory = serverLocator.createSessionFactory(); consumerSessionFactory = serverLocator.createSessionFactory(); - activeMQServer.createQueue(new SimpleString("testQueue"), new SimpleString("testQueue"), null, false, false); + SimpleString addr = SimpleString.toSimpleString("testQueue"); + activeMQServer.createAddressInfo(new AddressInfo(addr, RoutingType.MULTICAST)); + activeMQServer.createQueue(addr, RoutingType.MULTICAST, addr, null, false, false); session = sessionFactory.createSession(true, true); - producer = session.createProducer("testQueue"); + producer = session.createProducer(addr); session.start(); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7a51491c/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/CoreAddressConfiguration.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/CoreAddressConfiguration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/CoreAddressConfiguration.java index 6327f79..60d2d02 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/CoreAddressConfiguration.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/CoreAddressConfiguration.java @@ -18,20 +18,17 @@ package org.apache.activemq.artemis.core.config; import java.io.Serializable; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; +import java.util.Set; -import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; -import org.apache.activemq.artemis.core.server.impl.AddressInfo; +import org.apache.activemq.artemis.core.server.RoutingType; public class CoreAddressConfiguration implements Serializable { private String name = null; - private AddressInfo.RoutingType routingType = null; - - private Integer defaultMaxConsumers = ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(); - - private Boolean defaultDeleteOnNoConsumers = ActiveMQDefaultConfiguration.getDefaultDeleteQueueOnNoConsumers(); + private Set<RoutingType> routingTypes = new HashSet<>(); private List<CoreQueueConfiguration> queueConfigurations = new ArrayList<>(); @@ -47,12 +44,12 @@ public class CoreAddressConfiguration implements Serializable { return this; } - public AddressInfo.RoutingType getRoutingType() { - return routingType; + public Set<RoutingType> getRoutingTypes() { + return routingTypes; } - public CoreAddressConfiguration setRoutingType(AddressInfo.RoutingType routingType) { - this.routingType = routingType; + public CoreAddressConfiguration addDeliveryMode(RoutingType routingType) { + routingTypes.add(routingType); return this; } @@ -69,73 +66,4 @@ public class CoreAddressConfiguration implements Serializable { public List<CoreQueueConfiguration> getQueueConfigurations() { return queueConfigurations; } - - public Boolean getDefaultDeleteOnNoConsumers() { - return defaultDeleteOnNoConsumers; - } - - public CoreAddressConfiguration setDefaultDeleteOnNoConsumers(Boolean defaultDeleteOnNoConsumers) { - this.defaultDeleteOnNoConsumers = defaultDeleteOnNoConsumers; - return this; - } - - public Integer getDefaultMaxConsumers() { - return defaultMaxConsumers; - } - - public CoreAddressConfiguration setDefaultMaxConsumers(Integer defaultMaxConsumers) { - this.defaultMaxConsumers = defaultMaxConsumers; - return this; - } - - @Override - public int hashCode() { - final int prime = 31; - int result = 1; - result = prime * result + ((name == null) ? 0 : name.hashCode()); - result = prime * result + ((routingType == null) ? 0 : routingType.hashCode()); - result = prime * result + ((queueConfigurations == null) ? 0 : queueConfigurations.hashCode()); - result = prime * result + ((defaultMaxConsumers == null) ? 0 : defaultMaxConsumers.hashCode()); - result = prime * result + ((defaultDeleteOnNoConsumers == null) ? 0 : defaultDeleteOnNoConsumers.hashCode()); - return result; - } - - @Override - public boolean equals(Object obj) { - if (this == obj) - return true; - if (obj == null) - return false; - if (getClass() != obj.getClass()) - return false; - CoreAddressConfiguration other = (CoreAddressConfiguration) obj; - if (name == null) { - if (other.name != null) - return false; - } else if (!name.equals(other.name)) - return false; - if (routingType == null) { - if (other.routingType != null) - return false; - } else if (!routingType.equals(other.routingType)) - return false; - if (queueConfigurations == null) { - if (other.queueConfigurations != null) - return false; - } else if (!queueConfigurations.equals(other.queueConfigurations)) - return false; - if (defaultMaxConsumers == null) { - if (other.defaultMaxConsumers != null) - return false; - } else if (!defaultMaxConsumers.equals(other.defaultMaxConsumers)) - return false; - if (defaultDeleteOnNoConsumers == null) { - if (other.defaultDeleteOnNoConsumers != null) - return false; - } else if (!defaultDeleteOnNoConsumers.equals(other.defaultDeleteOnNoConsumers)) { - return false; - } - - return true; - } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7a51491c/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/CoreQueueConfiguration.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/CoreQueueConfiguration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/CoreQueueConfiguration.java index 79b2fd2..350765d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/CoreQueueConfiguration.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/CoreQueueConfiguration.java @@ -18,6 +18,9 @@ package org.apache.activemq.artemis.core.config; import java.io.Serializable; +import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; +import org.apache.activemq.artemis.core.server.RoutingType; + public class CoreQueueConfiguration implements Serializable { private static final long serialVersionUID = 650404974977490254L; @@ -30,9 +33,11 @@ public class CoreQueueConfiguration implements Serializable { private boolean durable = true; - private Integer maxConsumers = null; + private Integer maxConsumers = ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(); + + private Boolean deleteOnNoConsumers = ActiveMQDefaultConfiguration.getDefaultDeleteQueueOnNoConsumers(); - private Boolean deleteOnNoConsumers = null; + private RoutingType routingType = ActiveMQDefaultConfiguration.getDefaultRoutingType(); public CoreQueueConfiguration() { } @@ -53,8 +58,6 @@ public class CoreQueueConfiguration implements Serializable { return durable; } - - /** * @param address the address to set */ @@ -103,14 +106,22 @@ public class CoreQueueConfiguration implements Serializable { return this; } - public Boolean getDeleteOnNoConsumers() { + public boolean getDeleteOnNoConsumers() { return deleteOnNoConsumers; } - public Integer getMaxConsumers() { + public int getMaxConsumers() { return maxConsumers; } + public RoutingType getRoutingType() { + return routingType; + } + + public void setRoutingType(RoutingType routingType) { + this.routingType = routingType; + } + @Override public int hashCode() { final int prime = 31; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7a51491c/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/Validators.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/Validators.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/Validators.java index 98bced3..bc57978 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/Validators.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/Validators.java @@ -163,4 +163,14 @@ public final class Validators { } } }; + + public static final Validator MAX_QUEUE_CONSUMERS = new Validator() { + @Override + public void validate(String name, Object value) { + int val = (Integer) value; + if (val < -1) { + throw ActiveMQMessageBundle.BUNDLE.invalidMaxConsumers(name, val); + } + } + }; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7a51491c/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java index ea42a40..38a237b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java @@ -60,11 +60,11 @@ import org.apache.activemq.artemis.core.config.storage.FileStorageConfiguration; import org.apache.activemq.artemis.core.io.aio.AIOSequentialFileFactory; import org.apache.activemq.artemis.core.security.Role; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; +import org.apache.activemq.artemis.core.server.RoutingType; import org.apache.activemq.artemis.core.server.JournalType; import org.apache.activemq.artemis.core.server.SecuritySettingPlugin; import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; import org.apache.activemq.artemis.core.server.group.impl.GroupingHandlerConfiguration; -import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.ResourceLimitSettings; @@ -625,15 +625,16 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { NodeList elements = e.getElementsByTagName("queues"); if (elements.getLength() != 0) { Element node = (Element) elements.item(0); - config.setQueueConfigurations(parseQueueConfigurations(node)); + config.setQueueConfigurations(parseQueueConfigurations(node, ActiveMQDefaultConfiguration.DEFAULT_ROUTING_TYPE)); } } - private List<CoreQueueConfiguration> parseQueueConfigurations(final Element node) { + private List<CoreQueueConfiguration> parseQueueConfigurations(final Element node, RoutingType routingType) { List<CoreQueueConfiguration> queueConfigurations = new ArrayList<>(); NodeList list = node.getElementsByTagName("queue"); for (int i = 0; i < list.getLength(); i++) { CoreQueueConfiguration queueConfig = parseQueueConfiguration(list.item(i)); + queueConfig.setRoutingType(routingType); queueConfigurations.add(queueConfig); } return queueConfigurations; @@ -903,14 +904,15 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { String address = null; String filterString = null; boolean durable = true; - Integer maxConsumers = null; - Boolean deleteOnNoConsumers = null; + int maxConsumers = ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(); + boolean deleteOnNoConsumers = ActiveMQDefaultConfiguration.getDefaultDeleteQueueOnNoConsumers(); NamedNodeMap attributes = node.getAttributes(); for (int i = 0; i < attributes.getLength(); i++) { Node item = attributes.item(i); if (item.getNodeName().equals("max-consumers")) { maxConsumers = Integer.parseInt(item.getNodeValue()); + Validators.MAX_QUEUE_CONSUMERS.validate(name, maxConsumers); } else if (item.getNodeName().equals("delete-on-no-consumers")) { deleteOnNoConsumers = Boolean.parseBoolean(item.getNodeValue()); } @@ -929,40 +931,33 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { } } - return new CoreQueueConfiguration() - .setAddress(address) - .setName(name) - .setFilterString(filterString) - .setDurable(durable) - .setMaxConsumers(maxConsumers) - .setDeleteOnNoConsumers(deleteOnNoConsumers); + return new CoreQueueConfiguration().setAddress(address).setName(name).setFilterString(filterString).setDurable(durable).setMaxConsumers(maxConsumers).setDeleteOnNoConsumers(deleteOnNoConsumers); } protected CoreAddressConfiguration parseAddressConfiguration(final Node node) { - String name = getAttributeValue(node, "name"); - String routingType = getAttributeValue(node, "type"); - CoreAddressConfiguration addressConfiguration = new CoreAddressConfiguration(); - addressConfiguration.setName(name) - .setRoutingType(AddressInfo.RoutingType.valueOf(routingType.toUpperCase())); + String name = getAttributeValue(node, "name"); + addressConfiguration.setName(name); + + List<CoreQueueConfiguration> queueConfigurations = new ArrayList<>(); NodeList children = node.getChildNodes(); for (int j = 0; j < children.getLength(); j++) { Node child = children.item(j); - if (child.getNodeName().equals("queues")) { - addressConfiguration.setQueueConfigurations(parseQueueConfigurations((Element) child)); + if (child.getNodeName().equals("multicast")) { + addressConfiguration.addDeliveryMode(RoutingType.MULTICAST); + queueConfigurations.addAll(parseQueueConfigurations((Element) child, RoutingType.MULTICAST)); + } else if (child.getNodeName().equals("anycast")) { + addressConfiguration.addDeliveryMode(RoutingType.ANYCAST); + queueConfigurations.addAll(parseQueueConfigurations((Element) child, RoutingType.ANYCAST)); } } - for (CoreQueueConfiguration coreQueueConfiguration : addressConfiguration.getQueueConfigurations()) { - coreQueueConfiguration.setAddress(addressConfiguration.getName()); - if (coreQueueConfiguration.getMaxConsumers() == null) { - coreQueueConfiguration.setMaxConsumers(addressConfiguration.getDefaultMaxConsumers()); - } - if (coreQueueConfiguration.getDeleteOnNoConsumers() == null) { - coreQueueConfiguration.setDeleteOnNoConsumers(addressConfiguration.getDefaultDeleteOnNoConsumers()); - } + for (CoreQueueConfiguration coreQueueConfiguration : queueConfigurations) { + coreQueueConfiguration.setAddress(name); } + + addressConfiguration.setQueueConfigurations(queueConfigurations); return addressConfiguration; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7a51491c/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java index f524062..185d5c4 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java @@ -42,6 +42,7 @@ import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; +import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.core.ActiveMQAddressDoesNotExistException; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.TransportConfiguration; @@ -76,6 +77,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.ConnectorServiceFactory; import org.apache.activemq.artemis.core.server.Consumer; +import org.apache.activemq.artemis.core.server.RoutingType; import org.apache.activemq.artemis.core.server.JournalType; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.ServerConsumer; @@ -560,27 +562,19 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active } @Override - public void createAddress(String name, int routingType, boolean defaultDeleteOnNoConsumers, int defaultMaxConsumers) throws Exception { + public void createAddress(@Parameter(name = "name", desc = "The name of the address") String name, + @Parameter(name = "deliveryMode", desc = "The delivery modes enabled for this address'") Set<RoutingType> routingTypes) throws Exception { checkStarted(); clearIO(); try { - server.createAddressInfo(new AddressInfo(new SimpleString(name), AddressInfo.RoutingType.getType((byte) routingType), defaultDeleteOnNoConsumers, defaultMaxConsumers)); + server.createAddressInfo(new AddressInfo(new SimpleString(name), routingTypes)); } finally { blockOnIO(); } } @Override - public void createAddress(@Parameter(name = "name", desc = "The name of the address") String name, - @Parameter(name = "routingType", desc = "The routing type for the address either 'MULTICAST' or 'ANYCAST'") String routingType, - @Parameter(name = "defaultDeleteOnNoConsumers", desc = "Whether or not a queue with this address is deleted when it has no consumers") boolean defaultDeleteOnNoConsumers, - @Parameter(name = "defaultMaxConsumers", desc = "The maximim number of consumer a queue with this address can have") int defaultMaxConsumers) throws Exception { - AddressInfo.RoutingType rt = AddressInfo.RoutingType.valueOf(routingType.toUpperCase()); - createAddress(name, rt.ordinal(), defaultDeleteOnNoConsumers, defaultMaxConsumers); - } - - @Override public void deleteAddress(String name) throws Exception { checkStarted(); @@ -592,18 +586,20 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active } } + @Deprecated @Override public void deployQueue(final String address, final String name, final String filterString) throws Exception { checkStarted(); clearIO(); try { - server.deployQueue(SimpleString.toSimpleString(address), new SimpleString(name), new SimpleString(filterString), true, false); + server.deployQueue(SimpleString.toSimpleString(address), ActiveMQDefaultConfiguration.getDefaultRoutingType(), new SimpleString(name), new SimpleString(filterString), true, false); } finally { blockOnIO(); } } + @Deprecated @Override public void deployQueue(final String address, final String name, @@ -614,19 +610,20 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active SimpleString filter = filterStr == null ? null : new SimpleString(filterStr); clearIO(); try { - server.deployQueue(SimpleString.toSimpleString(address), new SimpleString(name), filter, durable, false); + server.deployQueue(SimpleString.toSimpleString(address), ActiveMQDefaultConfiguration.getDefaultRoutingType(), new SimpleString(name), filter, durable, false); } finally { blockOnIO(); } } + @Deprecated @Override public void createQueue(final String address, final String name) throws Exception { checkStarted(); clearIO(); try { - server.createQueue(SimpleString.toSimpleString(address), new SimpleString(name), null, true, false); + server.createQueue(SimpleString.toSimpleString(address), ActiveMQDefaultConfiguration.getDefaultRoutingType(), new SimpleString(name), null, true, false); } finally { blockOnIO(); } @@ -638,7 +635,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active clearIO(); try { - server.createQueue(SimpleString.toSimpleString(address), new SimpleString(name), null, durable, false); + server.createQueue(SimpleString.toSimpleString(address), ActiveMQDefaultConfiguration.getDefaultRoutingType(), new SimpleString(name), null, durable, false); } finally { blockOnIO(); } @@ -646,12 +643,13 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active @Override public void createQueue(@Parameter(name = "address", desc = "Address of the queue") String address, + @Parameter(name = "routingType", desc = "The routing type used for this address, 0=multicast, 1=anycast") RoutingType routingType, @Parameter(name = "name", desc = "Name of the queue") String name, @Parameter(name = "filter", desc = "Filter of the queue") String filterStr, @Parameter(name = "durable", desc = "Is the queue durable?") boolean durable, @Parameter(name = "maxConsumers", desc = "The maximum number of consumers allowed on this queue at any one time") int maxConsumers, @Parameter(name = "deleteOnNoConsumers", desc = "Delete this queue when the last consumer disconnects") boolean deleteOnNoConsumers, - @Parameter(name = "autoCreateAddress", desc = "Create an address with default values if one does not exist") boolean autoCreateAddress) throws Exception { + @Parameter(name = "autoCreateAddress", desc = "Create an address with default values should a matching address not be found") boolean autoCreateAddress) throws Exception { checkStarted(); clearIO(); @@ -662,7 +660,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active filter = new SimpleString(filterStr); } - server.createQueue(SimpleString.toSimpleString(address), new SimpleString(name), filter, durable, false, maxConsumers, deleteOnNoConsumers, autoCreateAddress); + server.createQueue(SimpleString.toSimpleString(address), routingType, new SimpleString(name), filter, durable, false, maxConsumers, deleteOnNoConsumers, autoCreateAddress); } finally { blockOnIO(); } @@ -682,7 +680,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active filter = new SimpleString(filterStr); } - server.createQueue(SimpleString.toSimpleString(address), new SimpleString(name), filter, durable, false); + server.createQueue(SimpleString.toSimpleString(address), ActiveMQDefaultConfiguration.getDefaultRoutingType(), new SimpleString(name), filter, durable, false); } finally { blockOnIO(); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7a51491c/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java index 23b8e32..06c21a1 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java @@ -40,6 +40,7 @@ import org.apache.activemq.artemis.core.security.CheckType; import org.apache.activemq.artemis.core.security.Role; import org.apache.activemq.artemis.core.security.SecurityAuth; import org.apache.activemq.artemis.core.security.SecurityStore; +import org.apache.activemq.artemis.core.server.RoutingType; import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl; import org.apache.activemq.artemis.core.server.management.ManagementService; @@ -96,8 +97,8 @@ public class AddressControlImpl extends AbstractControl implements AddressContro } @Override - public String getRoutingType() { - return addressInfo.getRoutingType().toString(); + public Set<RoutingType> getDeliveryModes() { + return addressInfo.getRoutingTypes(); } @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7a51491c/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/AddressBindingInfo.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/AddressBindingInfo.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/AddressBindingInfo.java index 2240ccd..7de8aa1 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/AddressBindingInfo.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/AddressBindingInfo.java @@ -16,8 +16,10 @@ */ package org.apache.activemq.artemis.core.persistence; +import java.util.Set; + import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.core.server.impl.AddressInfo; +import org.apache.activemq.artemis.core.server.RoutingType; public interface AddressBindingInfo { @@ -25,8 +27,5 @@ public interface AddressBindingInfo { SimpleString getName(); - AddressInfo.RoutingType getRoutingType(); - - int getDefaultMaxConsumers(); - + Set<RoutingType> getRoutingTypes(); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7a51491c/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java index b109864..ee03aa9 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java @@ -1269,9 +1269,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager { @Override public void addAddressBinding(final long tx, final AddressInfo addressInfo) throws Exception { PersistentAddressBindingEncoding bindingEncoding = new PersistentAddressBindingEncoding(addressInfo.getName(), - addressInfo.getRoutingType(), - addressInfo.getDefaultMaxQueueConsumers(), - addressInfo.isDefaultDeleteOnNoConsumers(), + addressInfo.getRoutingTypes(), addressInfo.isAutoCreated()); readLock(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7a51491c/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentAddressBindingEncoding.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentAddressBindingEncoding.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentAddressBindingEncoding.java index e47a210..c3aa9de 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentAddressBindingEncoding.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentAddressBindingEncoding.java @@ -16,11 +16,14 @@ */ package org.apache.activemq.artemis.core.persistence.impl.journal.codec; +import java.util.HashSet; +import java.util.Set; + import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.journal.EncodingSupport; import org.apache.activemq.artemis.core.persistence.AddressBindingInfo; -import org.apache.activemq.artemis.core.server.impl.AddressInfo; +import org.apache.activemq.artemis.core.server.RoutingType; import org.apache.activemq.artemis.utils.DataConstants; public class PersistentAddressBindingEncoding implements EncodingSupport, AddressBindingInfo { @@ -29,42 +32,31 @@ public class PersistentAddressBindingEncoding implements EncodingSupport, Addres public SimpleString name; - public int defaultMaxConsumers; - - public boolean defaultDeleteOnNoConsumers; - public boolean autoCreated; - public AddressInfo.RoutingType routingType; + public Set<RoutingType> routingTypes; public PersistentAddressBindingEncoding() { + routingTypes = new HashSet<>(); } @Override public String toString() { - return "PersistentAddressBindingEncoding [id=" + id + - ", name=" + - name + - ", routingType=" + - routingType + - ", defaultMaxConsumers=" + - defaultMaxConsumers + - ", defaultDeleteOnNoConsumers=" + - defaultDeleteOnNoConsumers + - ", autoCreated=" + - autoCreated + - "]"; + StringBuilder sb = new StringBuilder("PersistentAddressBindingEncoding [id=" + id); + sb.append(", name=" + name); + sb.append(", routingTypes={"); + for (RoutingType routingType : routingTypes) { + sb.append(routingType.toString() + ","); + } + sb.append(", autoCreated=" + autoCreated + "]"); + return sb.toString(); } public PersistentAddressBindingEncoding(final SimpleString name, - final AddressInfo.RoutingType routingType, - final int defaultMaxConsumers, - final boolean defaultDeleteOnNoConsumers, + final Set<RoutingType> routingTypes, final boolean autoCreated) { this.name = name; - this.routingType = routingType; - this.defaultMaxConsumers = defaultMaxConsumers; - this.defaultDeleteOnNoConsumers = defaultDeleteOnNoConsumers; + this.routingTypes = routingTypes; this.autoCreated = autoCreated; } @@ -83,35 +75,35 @@ public class PersistentAddressBindingEncoding implements EncodingSupport, Addres } @Override - public AddressInfo.RoutingType getRoutingType() { - return routingType; - } - - @Override - public int getDefaultMaxConsumers() { - return defaultMaxConsumers; + public Set<RoutingType> getRoutingTypes() { + return routingTypes; } @Override public void decode(final ActiveMQBuffer buffer) { name = buffer.readSimpleString(); - routingType = AddressInfo.RoutingType.getType(buffer.readByte()); - defaultMaxConsumers = buffer.readInt(); - defaultDeleteOnNoConsumers = buffer.readBoolean(); + int size = buffer.readInt(); + for (int i = 0; i < size; i++) { + routingTypes.add(RoutingType.getType(buffer.readByte())); + } autoCreated = buffer.readBoolean(); } @Override public void encode(final ActiveMQBuffer buffer) { buffer.writeSimpleString(name); - buffer.writeByte(routingType.getType()); - buffer.writeInt(defaultMaxConsumers); - buffer.writeBoolean(defaultDeleteOnNoConsumers); + buffer.writeInt(routingTypes.size()); + for (RoutingType d : routingTypes) { + buffer.writeByte(d.getType()); + } buffer.writeBoolean(autoCreated); } @Override public int getEncodeSize() { - return SimpleString.sizeofString(name) + DataConstants.SIZE_BYTE + DataConstants.SIZE_INT + DataConstants.SIZE_BOOLEAN + DataConstants.SIZE_BOOLEAN; + return SimpleString.sizeofString(name) + + DataConstants.SIZE_INT + + (DataConstants.SIZE_BYTE * routingTypes.size()) + + DataConstants.SIZE_BOOLEAN; } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7a51491c/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java index 2921388..30e3768 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java @@ -21,33 +21,33 @@ import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.postoffice.BindingType; import org.apache.activemq.artemis.core.postoffice.QueueBinding; import org.apache.activemq.artemis.core.server.Bindable; +import org.apache.activemq.artemis.core.server.RoutingType; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.RoutingContext; import org.apache.activemq.artemis.core.server.ServerMessage; -import org.apache.activemq.artemis.core.server.impl.AddressInfo; public class LocalQueueBinding implements QueueBinding { - private final AddressInfo address; + private final SimpleString address; private final Queue queue; private final Filter filter; - private final SimpleString name; - private final SimpleString clusterName; - public LocalQueueBinding(final AddressInfo address, final Queue queue, final SimpleString nodeID) { + private SimpleString name; + + public LocalQueueBinding(final SimpleString address, final Queue queue, final SimpleString nodeID) { this.address = address; this.queue = queue; - filter = queue.getFilter(); + this.name = queue.getName(); - name = queue.getName(); + filter = queue.getFilter(); - clusterName = name.concat(nodeID); + clusterName = queue.getName().concat(nodeID); } @Override @@ -62,7 +62,7 @@ public class LocalQueueBinding implements QueueBinding { @Override public SimpleString getAddress() { - return address.getName(); + return address; } @Override @@ -77,12 +77,15 @@ public class LocalQueueBinding implements QueueBinding { @Override public SimpleString getRoutingName() { - return (address.getRoutingType() == AddressInfo.RoutingType.MULTICAST) ? name : address.getName(); + if (queue.getRoutingType() == RoutingType.ANYCAST) { + return address; + } + return name; } @Override public SimpleString getUniqueName() { - return name; + return queue.getName(); } @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7a51491c/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java index 6ed2564..5e810d5 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java @@ -29,6 +29,7 @@ import org.apache.activemq.artemis.core.postoffice.Binding; import org.apache.activemq.artemis.core.postoffice.Bindings; import org.apache.activemq.artemis.core.postoffice.BindingsFactory; import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; +import org.apache.activemq.artemis.core.server.RoutingType; import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.transaction.Transaction; import org.jboss.logging.Logger; @@ -194,9 +195,9 @@ public class SimpleAddressManager implements AddressManager { private AddressInfo updateAddressInfo(AddressInfo from, AddressInfo to) { synchronized (from) { - from.setRoutingType(to.getRoutingType()); - from.setDefaultMaxQueueConsumers(to.getDefaultMaxQueueConsumers()); - from.setDefaultDeleteOnNoConsumers(to.isDefaultDeleteOnNoConsumers()); + for (RoutingType routingType : to.getRoutingTypes()) { + from.addRoutingType(routingType); + } return from; } }
