ARTEMIS-780 Added ability to define 2 Routing Types on a single addres
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/7a51491c Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/7a51491c Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/7a51491c Branch: refs/heads/master Commit: 7a51491c3235d5bcbc0407f38b7b080b926eaf00 Parents: 0861be1 Author: Martyn Taylor <[email protected]> Authored: Fri Nov 25 13:06:21 2016 +0000 Committer: Martyn Taylor <[email protected]> Committed: Fri Dec 9 18:43:15 2016 +0000 ---------------------------------------------------------------------- .../cli/commands/address/CreateAddress.java | 20 +- .../config/ActiveMQDefaultConfiguration.java | 7 + .../artemis/api/core/client/ClientSession.java | 201 ++++++++++++- .../core/management/ActiveMQServerControl.java | 42 +-- .../api/core/management/AddressControl.java | 9 +- .../core/client/impl/ClientSessionImpl.java | 290 +++++++++++++++++-- .../core/impl/ActiveMQSessionContext.java | 41 ++- .../core/protocol/core/impl/PacketDecoder.java | 12 + .../core/protocol/core/impl/PacketImpl.java | 4 + .../impl/wireformat/CreateAddressMessage.java | 39 ++- .../impl/wireformat/CreateQueueMessage_V2.java | 6 +- .../impl/wireformat/CreateQueueMessage_V3.java | 134 +++++++++ .../wireformat/CreateSharedQueueMessage.java | 16 +- .../wireformat/CreateSharedQueueMessage_V2.java | 134 +++++++++ .../artemis/core/server/RoutingType.java | 44 +++ .../spi/core/remoting/SessionContext.java | 35 ++- .../jms/client/ActiveMQMessageProducer.java | 9 +- .../artemis/jms/client/ActiveMQSession.java | 17 +- .../jms/server/impl/JMSServerManagerImpl.java | 8 +- .../artemis/junit/ActiveMQConsumerResource.java | 6 + .../artemis/junit/EmbeddedActiveMQResource.java | 6 +- .../protocol/mqtt/MQTTSubscriptionManager.java | 7 +- .../protocol/openwire/OpenWireConnection.java | 2 +- .../core/protocol/openwire/amq/AMQConsumer.java | 7 +- .../core/protocol/openwire/amq/AMQSession.java | 2 +- .../core/protocol/stomp/StompConnection.java | 22 +- .../core/protocol/stomp/StompSession.java | 6 +- .../stomp/VersionedStompFrameHandler.java | 16 +- .../artemis/ra/ActiveMQRAMessageProducer.java | 4 +- .../artemis/rest/test/FindDestinationTest.java | 5 +- .../activemq/artemis/rest/test/RawAckTest.java | 8 +- .../core/config/CoreAddressConfiguration.java | 88 +----- .../core/config/CoreQueueConfiguration.java | 23 +- .../artemis/core/config/impl/Validators.java | 10 + .../deployers/impl/FileConfigurationParser.java | 49 ++-- .../impl/ActiveMQServerControlImpl.java | 34 +-- .../management/impl/AddressControlImpl.java | 5 +- .../core/persistence/AddressBindingInfo.java | 9 +- .../journal/AbstractJournalStorageManager.java | 4 +- .../codec/PersistentAddressBindingEncoding.java | 68 ++--- .../core/postoffice/impl/LocalQueueBinding.java | 25 +- .../postoffice/impl/SimpleAddressManager.java | 7 +- .../core/ServerSessionPacketHandler.java | 39 ++- .../core/impl/ActiveMQPacketHandler.java | 2 +- .../core/server/ActiveMQMessageBundle.java | 12 +- .../artemis/core/server/ActiveMQServer.java | 86 +++--- .../activemq/artemis/core/server/Queue.java | 8 + .../artemis/core/server/QueueConfig.java | 35 ++- .../artemis/core/server/ServerSession.java | 52 +++- .../cluster/impl/ClusterConnectionImpl.java | 3 +- .../core/server/impl/ActiveMQServerImpl.java | 134 +++++---- .../artemis/core/server/impl/AddressInfo.java | 123 ++++---- .../core/server/impl/LastValueQueue.java | 4 +- .../server/impl/PostOfficeJournalLoader.java | 6 +- .../core/server/impl/QueueFactoryImpl.java | 7 +- .../artemis/core/server/impl/QueueImpl.java | 26 +- .../core/server/impl/ServerSessionImpl.java | 79 ++++- .../resources/schema/artemis-configuration.xsd | 45 +-- .../core/config/impl/FileConfigurationTest.java | 37 ++- .../core/message/impl/MessagePropertyTest.java | 6 +- .../impl/ScheduledDeliveryHandlerTest.java | 11 + .../resources/ConfigurationTest-full-config.xml | 22 +- .../addressing/AddressConfigTest.java | 9 +- .../integration/addressing/AddressingTest.java | 53 +--- .../integration/cli/AddressCommandTest.java | 9 +- .../integration/client/HangConsumerTest.java | 10 +- .../tests/integration/client/ProducerTest.java | 3 +- .../AnycastRoutingWithClusterTest.java | 10 +- .../cluster/distribution/ClusterTestBase.java | 8 +- .../failover/AsynchronousFailoverTest.java | 2 +- .../cluster/failover/BackupSyncJournalTest.java | 3 +- .../cluster/failover/FailoverTest.java | 49 ++-- .../ReplicatedMultipleServerFailoverTest.java | 3 +- .../MultiThreadRandomReattachTestBase.java | 28 +- .../cluster/reattach/OrderReattachTest.java | 3 +- .../crossprotocol/OpenWireToAMQPTest.java | 3 +- .../tests/integration/divert/DivertTest.java | 79 ++--- .../jms/client/TopicCleanupTest.java | 2 +- .../integration/jms/consumer/ConsumerTest.java | 2 +- .../ActiveMQServerControlUsingCoreTest.java | 36 ++- .../integration/mqtt/imported/MQTTTest.java | 9 +- .../integration/openwire/BasicOpenWireTest.java | 9 +- .../openwire/amq/JmsTopicRedeliverTest.java | 2 +- .../integration/security/LDAPSecurityTest.java | 5 +- .../integration/server/ExpiryRunnerTest.java | 11 +- .../integration/server/PredefinedQueueTest.java | 3 +- .../tests/integration/stomp/StompTest.java | 51 ++-- .../tests/integration/stomp/StompTestBase.java | 10 +- .../integration/stomp/v11/StompV11Test.java | 9 +- .../integration/stomp/v12/StompV12Test.java | 9 +- .../jms/tests/message/MessageHeaderTest.java | 239 ++++++++++++++- .../activemq/artemis/common/AbstractAdmin.java | 4 +- .../jms/conform/message/MessageDefaultTest.java | 2 +- .../message/headers/MessageHeaderTest.java | 4 +- .../unit/core/postoffice/impl/FakeQueue.java | 11 + 95 files changed, 2085 insertions(+), 823 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7a51491c/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/address/CreateAddress.java ---------------------------------------------------------------------- diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/address/CreateAddress.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/address/CreateAddress.java index 86aafaf..ac1a9a9 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/address/CreateAddress.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/address/CreateAddress.java @@ -17,12 +17,16 @@ package org.apache.activemq.artemis.cli.commands.address; +import java.util.HashSet; +import java.util.Set; + import io.airlift.airline.Command; import io.airlift.airline.Option; import org.apache.activemq.artemis.api.core.client.ClientMessage; import org.apache.activemq.artemis.api.core.management.ManagementHelper; import org.apache.activemq.artemis.cli.commands.AbstractAction; import org.apache.activemq.artemis.cli.commands.ActionContext; +import org.apache.activemq.artemis.core.server.RoutingType; @Command(name = "create", description = "create an address") public class CreateAddress extends AbstractAction { @@ -30,8 +34,8 @@ public class CreateAddress extends AbstractAction { @Option(name = "--name", description = "The name of this address") String name; - @Option(name = "--routingType", description = "The routing type of the address, options are 'anycast' or 'multicast', defaults to 1 = 'multicast'") - String routingType = "multicast"; + @Option(name = "--routingTypes", description = "The routing types supported by this address, options are 'anycast' or 'multicast', enter comma separated list, defaults to 'multicast' only") + Set<RoutingType> routingTypes = new HashSet<>(); @Option(name = "--defaultMaxConsumers", description = "Sets the default max consumers for any queues created under this address, default = -1 (no limit)") int defaultMaxConsumers = -1; @@ -50,7 +54,7 @@ public class CreateAddress extends AbstractAction { performCoreManagement(new ManagementCallback<ClientMessage>() { @Override public void setUpInvocation(ClientMessage message) throws Exception { - ManagementHelper.putOperationInvocation(message, "broker", "createAddress", getName(), routingType, defaultDeleteOnNoConsumers, defaultMaxConsumers); + ManagementHelper.putOperationInvocation(message, "broker", "createAddress", getName(), routingTypes, defaultDeleteOnNoConsumers, defaultMaxConsumers); } @Override @@ -74,12 +78,14 @@ public class CreateAddress extends AbstractAction { return name; } - public String getRoutingType() { - return routingType; + public Set<RoutingType> getRoutingTypes() { + return routingTypes; } - public void setRoutingType(String routingType) { - this.routingType = routingType; + public void setRoutingTypes(String routingTypes) { + for (String s : routingTypes.split(",")) { + this.routingTypes.add(RoutingType.valueOf(s.trim())); + } } public int getDefaultMaxConsumers() { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7a51491c/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java index 6fb5580..06b420a 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java @@ -18,6 +18,7 @@ package org.apache.activemq.artemis.api.config; import org.apache.activemq.artemis.ArtemisConstants; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.server.RoutingType; /** * Default values of ActiveMQ Artemis configuration parameters. @@ -441,6 +442,8 @@ public final class ActiveMQDefaultConfiguration { public static final boolean DEFAULT_DELETE_QUEUE_ON_NO_CONSUMERS = false; + public static final RoutingType DEFAULT_ROUTING_TYPE = RoutingType.MULTICAST; + public static final String DEFAULT_SYSTEM_PROPERTY_PREFIX = "brokerconfig."; public static String DEFAULT_NETWORK_CHECK_LIST = null; @@ -1205,6 +1208,10 @@ public final class ActiveMQDefaultConfiguration { return DEFAULT_INTERNAL_NAMING_PREFIX; } + public static RoutingType getDefaultRoutingType() { + return DEFAULT_ROUTING_TYPE; + } + public static String getDefaultSystemPropertyPrefix() { return DEFAULT_SYSTEM_PROPERTY_PREFIX; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7a51491c/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java index 72b1a11..a414f95 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java @@ -18,9 +18,11 @@ package org.apache.activemq.artemis.api.core.client; import javax.transaction.xa.XAResource; import java.util.List; +import java.util.Set; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.server.RoutingType; /** * A ClientSession is a single-thread object required for producing and consuming messages. @@ -198,7 +200,22 @@ public interface ClientSession extends XAResource, AutoCloseable { */ int getVersion(); - void createAddress(final SimpleString address, final boolean multicast, final boolean autoCreated) throws ActiveMQException; + /** + * Create Address with a single initial routing type + * @param address + * @param autoCreated + * @throws ActiveMQException + */ + void createAddress(final SimpleString address, Set<RoutingType> routingTypes, final boolean autoCreated) throws ActiveMQException; + + /** + * Create Address with a single initial routing type + * @param address + * @param routingType + * @param autoCreated + * @throws ActiveMQException + */ + void createAddress(final SimpleString address, RoutingType routingType, final boolean autoCreated) throws ActiveMQException; // Queue Operations ---------------------------------------------- @@ -210,6 +227,7 @@ public interface ClientSession extends XAResource, AutoCloseable { * @param durable whether the queue is durable or not * @throws ActiveMQException in an exception occurs while creating the queue */ + @Deprecated void createQueue(SimpleString address, SimpleString queueName, boolean durable) throws ActiveMQException; /** @@ -222,6 +240,7 @@ public interface ClientSession extends XAResource, AutoCloseable { * @param durable if the queue is durable * @throws ActiveMQException in an exception occurs while creating the queue */ + @Deprecated void createSharedQueue(SimpleString address, SimpleString queueName, boolean durable) throws ActiveMQException; /** @@ -235,6 +254,7 @@ public interface ClientSession extends XAResource, AutoCloseable { * @param durable if the queue is durable * @throws ActiveMQException in an exception occurs while creating the queue */ + @Deprecated void createSharedQueue(SimpleString address, SimpleString queueName, SimpleString filter, @@ -248,6 +268,7 @@ public interface ClientSession extends XAResource, AutoCloseable { * @param durable whether the queue is durable or not * @throws ActiveMQException in an exception occurs while creating the queue */ + @Deprecated void createQueue(String address, String queueName, boolean durable) throws ActiveMQException; /** @@ -257,6 +278,7 @@ public interface ClientSession extends XAResource, AutoCloseable { * @param queueName the name of the queue * @throws ActiveMQException in an exception occurs while creating the queue */ + @Deprecated void createQueue(String address, String queueName) throws ActiveMQException; /** @@ -266,6 +288,7 @@ public interface ClientSession extends XAResource, AutoCloseable { * @param queueName the name of the queue * @throws ActiveMQException in an exception occurs while creating the queue */ + @Deprecated void createQueue(SimpleString address, SimpleString queueName) throws ActiveMQException; /** @@ -277,6 +300,7 @@ public interface ClientSession extends XAResource, AutoCloseable { * @param durable whether the queue is durable or not * @throws ActiveMQException in an exception occurs while creating the queue */ + @Deprecated void createQueue(SimpleString address, SimpleString queueName, SimpleString filter, @@ -291,6 +315,7 @@ public interface ClientSession extends XAResource, AutoCloseable { * @param filter only messages which match this filter will be put in the queue * @throws ActiveMQException in an exception occurs while creating the queue */ + @Deprecated void createQueue(String address, String queueName, String filter, boolean durable) throws ActiveMQException; /** @@ -303,6 +328,7 @@ public interface ClientSession extends XAResource, AutoCloseable { * @param autoCreated whether to mark this queue as autoCreated or not * @throws ActiveMQException in an exception occurs while creating the queue */ + @Deprecated void createQueue(SimpleString address, SimpleString queueName, SimpleString filter, @@ -319,6 +345,7 @@ public interface ClientSession extends XAResource, AutoCloseable { * @param autoCreated whether to mark this queue as autoCreated or not * @throws ActiveMQException in an exception occurs while creating the queue */ + @Deprecated void createQueue(String address, String queueName, String filter, boolean durable, boolean autoCreated) throws ActiveMQException; /** @@ -328,6 +355,7 @@ public interface ClientSession extends XAResource, AutoCloseable { * @param queueName the name of the queue * @throws ActiveMQException in an exception occurs while creating the queue */ + @Deprecated void createTemporaryQueue(SimpleString address, SimpleString queueName) throws ActiveMQException; /** @@ -337,6 +365,7 @@ public interface ClientSession extends XAResource, AutoCloseable { * @param queueName the name of the queue * @throws ActiveMQException in an exception occurs while creating the queue */ + @Deprecated void createTemporaryQueue(String address, String queueName) throws ActiveMQException; /** @@ -347,6 +376,7 @@ public interface ClientSession extends XAResource, AutoCloseable { * @param filter only messages which match this filter will be put in the queue * @throws ActiveMQException in an exception occurs while creating the queue */ + @Deprecated void createTemporaryQueue(SimpleString address, SimpleString queueName, SimpleString filter) throws ActiveMQException; @@ -359,8 +389,177 @@ public interface ClientSession extends XAResource, AutoCloseable { * @param filter only messages which match this filter will be put in the queue * @throws ActiveMQException in an exception occurs while creating the queue */ + @Deprecated void createTemporaryQueue(String address, String queueName, String filter) throws ActiveMQException; + /** Deprecate **/ + + + /** + * Creates a <em>non-temporary</em> queue. + * + * @param address the queue will be bound to this address + * @param routingType the delivery mode for this queue, MULTICAST or ANYCAST + * @param queueName the name of the queue + * @param durable whether the queue is durable or not + * @throws ActiveMQException in an exception occurs while creating the queue + */ + void createQueue(SimpleString address, RoutingType routingType, SimpleString queueName, boolean durable) throws ActiveMQException; + + /** + * Creates a transient queue. A queue that will exist as long as there are consumers. When the last consumer is closed the queue will be deleted + * <p> + * Notice: you will get an exception if the address or the filter doesn't match to an already existent queue + * + * @param address the queue will be bound to this address + * @param routingType the delivery mode for this queue, MULTICAST or ANYCAST + * @param queueName the name of the queue + * @param durable if the queue is durable + * @throws ActiveMQException in an exception occurs while creating the queue + */ + void createSharedQueue(SimpleString address, RoutingType routingType, SimpleString queueName, boolean durable) throws ActiveMQException; + + /** + * Creates a transient queue. A queue that will exist as long as there are consumers. When the last consumer is closed the queue will be deleted + * <p> + * Notice: you will get an exception if the address or the filter doesn't match to an already existent queue + * + * @param address the queue will be bound to this address + * @param routingType the delivery mode for this queue, MULTICAST or ANYCAST + * @param queueName the name of the queue + * @param filter whether the queue is durable or not + * @param durable if the queue is durable + * @throws ActiveMQException in an exception occurs while creating the queue + */ + void createSharedQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString filter, + boolean durable) throws ActiveMQException; + + /** + * Creates a <em>non-temporary</em> queue. + * + * @param address the queue will be bound to this address + * @param routingType the delivery mode for this queue, MULTICAST or ANYCAST + * @param queueName the name of the queue + * @param durable whether the queue is durable or not + * @throws ActiveMQException in an exception occurs while creating the queue + */ + void createQueue(String address, RoutingType routingType, String queueName, boolean durable) throws ActiveMQException; + + /** + * Creates a <em>non-temporary</em> queue <em>non-durable</em> queue. + * + * @param address the queue will be bound to this address + * @param routingType the delivery mode for this queue, MULTICAST or ANYCAST + * @param queueName the name of the queue + * @throws ActiveMQException in an exception occurs while creating the queue + */ + void createQueue(String address, RoutingType routingType, String queueName) throws ActiveMQException; + + /** + * Creates a <em>non-temporary</em> queue <em>non-durable</em> queue. + * + * @param address the queue will be bound to this address + * @param routingType the delivery mode for this queue, MULTICAST or ANYCAST + * @param queueName the name of the queue + * @throws ActiveMQException in an exception occurs while creating the queue + */ + void createQueue(SimpleString address, RoutingType routingType, SimpleString queueName) throws ActiveMQException; + + /** + * Creates a <em>non-temporary</em> queue. + * + * @param address the queue will be bound to this address + * @param routingType the delivery mode for this queue, MULTICAST or ANYCAST + * @param queueName the name of the queue + * @param filter only messages which match this filter will be put in the queue + * @param durable whether the queue is durable or not + * @throws ActiveMQException in an exception occurs while creating the queue + */ + void createQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString filter, + boolean durable) throws ActiveMQException; + + /** + * Creates a <em>non-temporary</em>queue. + * + * @param address the queue will be bound to this address + * @param routingType the delivery mode for this queue, MULTICAST or ANYCAST + * @param queueName the name of the queue + * @param filter only messages which match this filter will be put in the queue + * @param durable whether the queue is durable or not + * @throws ActiveMQException in an exception occurs while creating the queue + */ + void createQueue(String address, RoutingType routingType, String queueName, String filter, boolean durable) throws ActiveMQException; + + /** + * Creates a <em>non-temporary</em> queue. + * + * @param address the queue will be bound to this address + * @param routingType the delivery mode for this queue, MULTICAST or ANYCAST + * @param queueName the name of the queue + * @param filter only messages which match this filter will be put in the queue + * @param durable whether the queue is durable or not + * @param autoCreated whether to mark this queue as autoCreated or not + * @throws ActiveMQException in an exception occurs while creating the queue + */ + void createQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString filter, + boolean durable, + boolean autoCreated) throws ActiveMQException; + + /** + * Creates a <em>non-temporary</em>queue. + * + * @param address the queue will be bound to this address + * @param routingType the delivery mode for this queue, MULTICAST or ANYCAST + * @param queueName the name of the queue + * @param filter only messages which match this filter will be put in the queue + * @param durable whether the queue is durable or not + * @param autoCreated whether to mark this queue as autoCreated or not + * @throws ActiveMQException in an exception occurs while creating the queue + */ + void createQueue(String address, RoutingType routingType, String queueName, String filter, boolean durable, boolean autoCreated) throws ActiveMQException; + + /** + * Creates a <em>temporary</em> queue. + * + * @param address the queue will be bound to this address + * @param routingType the delivery mode for this queue, MULTICAST or ANYCAST + * @param queueName the name of the queue + * @throws ActiveMQException in an exception occurs while creating the queue + */ + void createTemporaryQueue(SimpleString address, RoutingType routingType, SimpleString queueName) throws ActiveMQException; + + /** + * Creates a <em>temporary</em> queue. + * + * @param address the queue will be bound to this address + * @param routingType the delivery mode for this queue, MULTICAST or ANYCAST + * @param queueName the name of the queue + * @throws ActiveMQException in an exception occurs while creating the queue + */ + void createTemporaryQueue(String address, RoutingType routingType, String queueName) throws ActiveMQException; + + /** + * Creates a <em>temporary</em> queue with a filter. + * + * @param address the queue will be bound to this address + * @param routingType the delivery mode for this queue, MULTICAST or ANYCAST + * @param queueName the name of the queue + * @param filter only messages which match this filter will be put in the queue + * @throws ActiveMQException in an exception occurs while creating the queue + */ + void createTemporaryQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString filter) throws ActiveMQException; + + /** + * Creates a <em>temporary</em> queue with a filter. + * + * @param address the queue will be bound to this address + * @param routingType the delivery mode for this queue, MULTICAST or ANYCAST + * @param queueName the name of the queue + * @param filter only messages which match this filter will be put in the queue + * @throws ActiveMQException in an exception occurs while creating the queue + */ + void createTemporaryQueue(String address, RoutingType routingType, String queueName, String filter) throws ActiveMQException; + /** * Deletes the queue. * http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7a51491c/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java index 33584bf..43e7a4d 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java @@ -18,8 +18,10 @@ package org.apache.activemq.artemis.api.core.management; import javax.management.MBeanOperationInfo; import java.util.Map; +import java.util.Set; import org.apache.activemq.artemis.api.core.ActiveMQAddressDoesNotExistException; +import org.apache.activemq.artemis.core.server.RoutingType; /** * An ActiveMQServerControl is used to manage ActiveMQ Artemis servers. @@ -434,18 +436,9 @@ public interface ActiveMQServerControl { // Operations ---------------------------------------------------- - @Operation(desc = "create an address", impact = MBeanOperationInfo.ACTION) - void createAddress(@Parameter(name = "name", desc = "The name of the address") String name, - @Parameter(name = "routingType", desc = "the routing type of the address either 0 for multicast or 1 for anycast") int routingType, - @Parameter(name = "defaultDeleteOnNoConsumers", desc = "Whether or not a queue with this address is deleted when it has no consumers") boolean defaultDeleteOnNoConsumers, - @Parameter(name = "defaultMaxConsumers", desc = "The maximim number of consumer a queue with this address can have") int defaultMaxConsumers) throws Exception; - - - @Operation(desc = "create an address", impact = MBeanOperationInfo.ACTION) + @Operation(desc = "delete an address", impact = MBeanOperationInfo.ACTION) void createAddress(@Parameter(name = "name", desc = "The name of the address") String name, - @Parameter(name = "routingType", desc = "The routing type for the address either 'MULTICAST' or 'ANYCAST'") String routingType, - @Parameter(name = "defaultDeleteOnNoConsumers", desc = "Whether or not a queue with this address is deleted when it has no consumers") boolean defaultDeleteOnNoConsumers, - @Parameter(name = "defaultMaxConsumers", desc = "The maximim number of consumer a queue with this address can have") int defaultMaxConsumers) throws Exception; + @Parameter(name = "deliveryMode", desc = "The delivery modes enabled for this address'") Set<RoutingType> routingTypes) throws Exception; @Operation(desc = "delete an address", impact = MBeanOperationInfo.ACTION) void deleteAddress(@Parameter(name = "name", desc = "The name of the address") String name) throws Exception; @@ -464,6 +457,14 @@ public interface ActiveMQServerControl { void createQueue(@Parameter(name = "address", desc = "Address of the queue") String address, @Parameter(name = "name", desc = "Name of the queue") String name) throws Exception; + void createQueue(@Parameter(name = "address", desc = "Address of the queue") String address, + @Parameter(name = "routingType", desc = "The routing type used for this address, 0=multicast, 1=anycast") RoutingType routingType, + @Parameter(name = "name", desc = "Name of the queue") String name, + @Parameter(name = "filter", desc = "Filter of the queue") String filterStr, + @Parameter(name = "durable", desc = "Is the queue durable?") boolean durable, + @Parameter(name = "maxConsumers", desc = "The maximum number of consumers allowed on this queue at any one time") int maxConsumers, + @Parameter(name = "deleteOnNoConsumers", desc = "Delete this queue when the last consumer disconnects") boolean deleteOnNoConsumers, + @Parameter(name = "autoCreateAddress", desc = "Create an address with default values should a matching address not be found") boolean autoCreateAddress) throws Exception; /** * Create a queue. @@ -500,25 +501,6 @@ public interface ActiveMQServerControl { @Parameter(name = "durable", desc = "Is the queue durable?") boolean durable) throws Exception; /** - * Create a queue. - * <br> - * If {@code address} is {@code null} it will be defaulted to {@code name}. - * <br> - * This method throws a {@link org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException}) exception if the queue already exits. - * - * @param address address to bind the queue to - * @param name name of the queue - * @param durable whether the queue is durable - */ - @Operation(desc = "Create a queue with the specified address, name and durability", impact = MBeanOperationInfo.ACTION) - void createQueue(@Parameter(name = "address", desc = "Address of the queue") String address, - @Parameter(name = "name", desc = "Name of the queue") String name, - @Parameter(name = "filter", desc = "Filter of the queue") String filter, - @Parameter(name = "durable", desc = "Is the queue durable?") boolean durable, - @Parameter(name = "maxConsumers", desc = "The maximum number of consumers allowed on this queue at any one time") int maxConsumers, - @Parameter(name = "deleteOnNoConsumers", desc = "Delete this queue when the last consumer disconnects") boolean deleteOnNoConsumers, - @Parameter(name = "autoCreateAddress", desc = "Create an address with default values should a matching address not be found") boolean autoCreateAddress) throws Exception; - /** * Deploy a durable queue. * <br> * If {@code address} is {@code null} it will be defaulted to {@code name}. http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7a51491c/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/AddressControl.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/AddressControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/AddressControl.java index 5e7d600..c48ef88 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/AddressControl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/AddressControl.java @@ -18,6 +18,9 @@ package org.apache.activemq.artemis.api.core.management; import javax.management.MBeanOperationInfo; import java.util.Map; +import java.util.Set; + +import org.apache.activemq.artemis.core.server.RoutingType; /** * An AddressControl is used to manage an address. @@ -31,10 +34,10 @@ public interface AddressControl { String getAddress(); /* - * The routing type of this address, either multicast (topic subscriptions) or anycast (queue semantics). + * Whether multicast routing is enabled for this address * */ - @Attribute(desc = "The routing type of this address") - String getRoutingType(); + @Attribute(desc = "Get the delivery modes enabled on this address") + Set<RoutingType> getDeliveryModes(); /** * Returns the roles (name and permissions) associated with this address. http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7a51491c/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java index 145ca99..1ed825b 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java @@ -27,6 +27,7 @@ import java.util.Set; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.api.core.ActiveMQException; @@ -43,6 +44,7 @@ import org.apache.activemq.artemis.api.core.client.SessionFailureListener; import org.apache.activemq.artemis.core.client.ActiveMQClientLogger; import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle; import org.apache.activemq.artemis.core.remoting.FailureListener; +import org.apache.activemq.artemis.core.server.RoutingType; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.spi.core.remoting.ConsumerContext; import org.apache.activemq.artemis.spi.core.remoting.ReadyListener; @@ -237,14 +239,14 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi @Override public void createQueue(final SimpleString address, final SimpleString queueName) throws ActiveMQException { - internalCreateQueue(address, queueName, null, false, false, false); + createQueue(address, ActiveMQDefaultConfiguration.getDefaultRoutingType(), queueName); } @Override public void createQueue(final SimpleString address, final SimpleString queueName, final boolean durable) throws ActiveMQException { - internalCreateQueue(address, queueName, null, durable, false, false); + createQueue(address, ActiveMQDefaultConfiguration.getDefaultRoutingType(), queueName, durable); } @Override @@ -258,7 +260,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi public void createSharedQueue(SimpleString address, SimpleString queueName, boolean durable) throws ActiveMQException { - createSharedQueue(address, queueName, null, durable); + createSharedQueue(address, ActiveMQDefaultConfiguration.getDefaultRoutingType(), queueName, null, durable); } @Override @@ -266,28 +268,26 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi SimpleString queueName, SimpleString filterString, boolean durable) throws ActiveMQException { + createSharedQueue(address, ActiveMQDefaultConfiguration.getDefaultRoutingType(), queueName, filterString, durable); + } + @Override + public void createAddress(final SimpleString address, Set<RoutingType> routingTypes, boolean autoCreated) throws ActiveMQException { checkClosed(); startCall(); try { - sessionContext.createSharedQueue(address, queueName, filterString, durable); + sessionContext.createAddress(address, routingTypes, autoCreated); } finally { endCall(); } - } @Override - public void createAddress(final SimpleString address, final boolean multicast, boolean autoCreated) throws ActiveMQException { - checkClosed(); - - startCall(); - try { - sessionContext.createAddress(address, multicast, autoCreated); - } finally { - endCall(); - } + public void createAddress(final SimpleString address, RoutingType routingType, boolean autoCreated) throws ActiveMQException { + Set<RoutingType> routingTypes = new HashSet<>(); + routingTypes.add(routingType); + createAddress(address, routingTypes, autoCreated); } @Override @@ -295,7 +295,8 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi final SimpleString queueName, final SimpleString filterString, final boolean durable) throws ActiveMQException { - internalCreateQueue(address, queueName, filterString, durable, false, false); + createQueue(address, ActiveMQDefaultConfiguration.getDefaultRoutingType(), queueName, filterString, + durable); } @Override @@ -303,7 +304,10 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi final String queueName, final String filterString, final boolean durable) throws ActiveMQException { - createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), SimpleString.toSimpleString(filterString), durable); + createQueue(SimpleString.toSimpleString(address), + SimpleString.toSimpleString(queueName), + SimpleString.toSimpleString(filterString), + durable); } @Override @@ -312,7 +316,9 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi final SimpleString filterString, final boolean durable, final boolean autoCreated) throws ActiveMQException { - internalCreateQueue(address, queueName, filterString, durable, false, autoCreated); + createQueue(address, ActiveMQDefaultConfiguration.getDefaultRoutingType(), queueName, filterString, + durable, + autoCreated); } @Override @@ -326,29 +332,258 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi @Override public void createTemporaryQueue(final SimpleString address, final SimpleString queueName) throws ActiveMQException { - internalCreateQueue(address, queueName, null, false, true, false); + createTemporaryQueue(address, ActiveMQDefaultConfiguration.getDefaultRoutingType(), queueName); } @Override public void createTemporaryQueue(final String address, final String queueName) throws ActiveMQException { - internalCreateQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), null, false, true, false); + createTemporaryQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName)); } @Override public void createTemporaryQueue(final SimpleString address, final SimpleString queueName, final SimpleString filter) throws ActiveMQException { - internalCreateQueue(address, queueName, filter, false, true, false); + createTemporaryQueue(address, ActiveMQDefaultConfiguration.getDefaultRoutingType(), queueName, filter); } @Override public void createTemporaryQueue(final String address, final String queueName, final String filter) throws ActiveMQException { - internalCreateQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), SimpleString.toSimpleString(filter), false, true, false); + createTemporaryQueue(address, ActiveMQDefaultConfiguration.getDefaultRoutingType(), queueName, filter); + } + + + /** New Queue API **/ + + + @Override + public void createQueue(final SimpleString address, + final RoutingType routingType, + final SimpleString queueName, + final SimpleString filterString, + final boolean durable, + final boolean autoCreated) throws ActiveMQException { + internalCreateQueue(address, + queueName, routingType, + filterString, + durable, + false, + ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), + ActiveMQDefaultConfiguration.getDefaultDeleteQueueOnNoConsumers(), + autoCreated); + } + + @Override + public void createQueue(final String address, final RoutingType routingType, final String queueName, final String filterString, + final boolean durable, + final boolean autoCreated) throws ActiveMQException { + createQueue(SimpleString.toSimpleString(address), + SimpleString.toSimpleString(queueName), + SimpleString.toSimpleString(filterString), + durable, + autoCreated); } @Override + public void createTemporaryQueue(final SimpleString address, + final RoutingType routingType, + final SimpleString queueName) throws ActiveMQException { + internalCreateQueue(address, + queueName, routingType, + null, + false, + true, + ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), + ActiveMQDefaultConfiguration.getDefaultDeleteQueueOnNoConsumers(), + false); + } + + @Override + public void createTemporaryQueue(final String address, final RoutingType routingType, final String queueName) throws ActiveMQException { + createTemporaryQueue(SimpleString.toSimpleString(address), routingType, SimpleString.toSimpleString(queueName)); + } + + @Override + public void createTemporaryQueue(final SimpleString address, + final RoutingType routingType, + final SimpleString queueName, + final SimpleString filter) throws ActiveMQException { + internalCreateQueue(address, + queueName, routingType, + filter, + false, + true, + ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), + ActiveMQDefaultConfiguration.getDefaultDeleteQueueOnNoConsumers(), + false); + } + + @Override + public void createTemporaryQueue(final String address, final RoutingType routingType, final String queueName, final String filter) throws ActiveMQException { + createTemporaryQueue(SimpleString.toSimpleString(address), routingType, SimpleString.toSimpleString(queueName)); + } + + /** + * Creates a <em>non-temporary</em> queue. + * + * @param address the queue will be bound to this address + * @param routingType the delivery mode for this queue, MULTICAST or ANYCAST + * @param queueName the name of the queue + * @param durable whether the queue is durable or not + * @throws ActiveMQException in an exception occurs while creating the queue + */ + @Override + public void createQueue(SimpleString address, RoutingType routingType, SimpleString queueName, boolean durable) throws ActiveMQException { + internalCreateQueue(address, + queueName, routingType, + null, + durable, + false, + ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), + ActiveMQDefaultConfiguration.getDefaultDeleteQueueOnNoConsumers(), + false); + } + + /** + * Creates a transient queue. A queue that will exist as long as there are consumers. When the last consumer is closed the queue will be deleted + * <p> + * Notice: you will get an exception if the address or the filter doesn't match to an already existent queue + * + * @param address the queue will be bound to this address + * @param routingType the delivery mode for this queue, MULTICAST or ANYCAST + * @param queueName the name of the queue + * @param durable if the queue is durable + * @throws ActiveMQException in an exception occurs while creating the queue + */ + @Override + public void createSharedQueue(SimpleString address, RoutingType routingType, SimpleString queueName, boolean durable) throws ActiveMQException { + createSharedQueue(address, routingType, queueName, null, durable); + } + + /** + * Creates a transient queue. A queue that will exist as long as there are consumers. When the last consumer is closed the queue will be deleted + * <p> + * Notice: you will get an exception if the address or the filter doesn't match to an already existent queue + * + * @param address the queue will be bound to this address + * @param routingType the delivery mode for this queue, MULTICAST or ANYCAST + * @param queueName the name of the queue + * @param filter whether the queue is durable or not + * @param durable if the queue is durable + * @throws ActiveMQException in an exception occurs while creating the queue + */ + @Override + public void createSharedQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString filter, + boolean durable) throws ActiveMQException { + checkClosed(); + + startCall(); + try { + sessionContext.createSharedQueue(address, queueName, routingType, filter, durable); + } finally { + endCall(); + } + } + + /** + * Creates a <em>non-temporary</em> queue. + * + * @param address the queue will be bound to this address + * @param routingType the delivery mode for this queue, MULTICAST or ANYCAST + * @param queueName the name of the queue + * @param durable whether the queue is durable or not + * @throws ActiveMQException in an exception occurs while creating the queue + */ + @Override + public void createQueue(String address, RoutingType routingType, String queueName, boolean durable) throws ActiveMQException { + createQueue(SimpleString.toSimpleString(address), routingType, SimpleString.toSimpleString(queueName), durable); + } + + /** + * Creates a <em>non-temporary</em> queue <em>non-durable</em> queue. + * + * @param address the queue will be bound to this address + * @param routingType the delivery mode for this queue, MULTICAST or ANYCAST + * @param queueName the name of the queue + * @throws ActiveMQException in an exception occurs while creating the queue + */ + @Override + public void createQueue(String address, RoutingType routingType, String queueName) throws ActiveMQException { + internalCreateQueue(SimpleString.toSimpleString(address), + SimpleString.toSimpleString(queueName), routingType, + null, + false, + true, + ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), + ActiveMQDefaultConfiguration.getDefaultDeleteQueueOnNoConsumers(), + false); + } + + /** + * Creates a <em>non-temporary</em> queue <em>non-durable</em> queue. + * + * @param address the queue will be bound to this address + * @param routingType the delivery mode for this queue, MULTICAST or ANYCAST + * @param queueName the name of the queue + * @throws ActiveMQException in an exception occurs while creating the queue + */ + @Override + public void createQueue(SimpleString address, RoutingType routingType, SimpleString queueName) throws ActiveMQException { + internalCreateQueue(address, + queueName, + routingType, + null, + true, + false, + ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), + ActiveMQDefaultConfiguration.getDefaultDeleteQueueOnNoConsumers(), + false); + } + + /** + * Creates a <em>non-temporary</em> queue. + * + * @param address the queue will be bound to this address + * @param routingType the delivery mode for this queue, MULTICAST or ANYCAST + * @param queueName the name of the queue + * @param filter only messages which match this filter will be put in the queue + * @param durable whether the queue is durable or not + * @throws ActiveMQException in an exception occurs while creating the queue + */ + @Override + public void createQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString filter, + boolean durable) throws ActiveMQException { + internalCreateQueue(address, + queueName, + routingType, + filter, + durable, + !durable, + ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), + ActiveMQDefaultConfiguration.getDefaultDeleteQueueOnNoConsumers(), + false); + } + + /** + * Creates a <em>non-temporary</em>queue. + * + * @param address the queue will be bound to this address + * @param routingType the delivery mode for this queue, MULTICAST or ANYCAST + * @param queueName the name of the queue + * @param filter only messages which match this filter will be put in the queue + * @param durable whether the queue is durable or not + * @throws ActiveMQException in an exception occurs while creating the queue + */ + @Override + public void createQueue(String address, RoutingType routingType, String queueName, String filter, boolean durable) throws ActiveMQException { + createQueue(SimpleString.toSimpleString(address), routingType, SimpleString.toSimpleString(queueName), SimpleString.toSimpleString(filter), + durable); + } + + + @Override public void deleteQueue(final SimpleString queueName) throws ActiveMQException { checkClosed(); @@ -1567,9 +1802,12 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi private void internalCreateQueue(final SimpleString address, final SimpleString queueName, + final RoutingType routingType, final SimpleString filterString, final boolean durable, final boolean temp, + final int maxConsumers, + final boolean deleteOnNoConsumers, final boolean autoCreated) throws ActiveMQException { checkClosed(); @@ -1579,7 +1817,15 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi startCall(); try { - sessionContext.createQueue(address, queueName, filterString, durable, temp, autoCreated); + sessionContext.createQueue(address, + routingType, + queueName, + filterString, + durable, + temp, + ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), + ActiveMQDefaultConfiguration.getDefaultDeleteQueueOnNoConsumers(), + autoCreated); } finally { endCall(); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7a51491c/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java index cbbe2b7..29426dd 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java @@ -24,8 +24,10 @@ import java.security.PrivilegedAction; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.Executor; +import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; @@ -52,9 +54,9 @@ import org.apache.activemq.artemis.core.protocol.core.Packet; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateAddressMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueueMessage; -import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueueMessage_V2; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueueMessage_V3; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSessionMessage; -import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSharedQueueMessage; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSharedQueueMessage_V2; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectConsumerMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectConsumerWithKillMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReattachSessionMessage; @@ -100,6 +102,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAR import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXASetTimeoutMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXASetTimeoutResponseMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAStartMessage; +import org.apache.activemq.artemis.core.server.RoutingType; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.spi.core.remoting.Connection; import org.apache.activemq.artemis.spi.core.remoting.ReadyListener; @@ -240,9 +243,18 @@ public class ActiveMQSessionContext extends SessionContext { @Override public void createSharedQueue(SimpleString address, SimpleString queueName, + RoutingType routingType, SimpleString filterString, boolean durable) throws ActiveMQException { - sessionChannel.sendBlocking(new CreateSharedQueueMessage(address, queueName, filterString, durable, true), PacketImpl.NULL_RESPONSE); + sessionChannel.sendBlocking(new CreateSharedQueueMessage_V2(address, queueName, routingType, filterString, durable, true), PacketImpl.NULL_RESPONSE); + } + + @Override + public void createSharedQueue(SimpleString address, + SimpleString queueName, + SimpleString filterString, + boolean durable) throws ActiveMQException { + createSharedQueue(address, queueName, null, filterString, durable); } @Override @@ -585,19 +597,35 @@ public class ActiveMQSessionContext extends SessionContext { } @Override - public void createAddress(SimpleString address, final boolean multicast, final boolean autoCreated) throws ActiveMQException { - CreateAddressMessage request = new CreateAddressMessage(address, multicast, autoCreated, true); + public void createAddress(SimpleString address, + Set<RoutingType> routingTypes, + final boolean autoCreated) throws ActiveMQException { + CreateAddressMessage request = new CreateAddressMessage(address, routingTypes, autoCreated, true); sessionChannel.sendBlocking(request, PacketImpl.NULL_RESPONSE); } + @Deprecated + @Override + public void createQueue(SimpleString address, + SimpleString queueName, + SimpleString filterString, + boolean durable, + boolean temp, + boolean autoCreated) throws ActiveMQException { + createQueue(address, ActiveMQDefaultConfiguration.getDefaultRoutingType(), queueName, filterString, durable, temp, ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), ActiveMQDefaultConfiguration.getDefaultDeleteQueueOnNoConsumers(), autoCreated); + } + @Override public void createQueue(SimpleString address, + RoutingType routingType, SimpleString queueName, SimpleString filterString, boolean durable, boolean temp, + int maxConsumers, + boolean deleteOnNoConsumers, boolean autoCreated) throws ActiveMQException { - CreateQueueMessage request = new CreateQueueMessage_V2(address, queueName, filterString, durable, temp, autoCreated, true); + CreateQueueMessage request = new CreateQueueMessage_V3(address, queueName, routingType, filterString, durable, temp, maxConsumers, deleteOnNoConsumers, autoCreated, true); sessionChannel.sendBlocking(request, PacketImpl.NULL_RESPONSE); } @@ -682,6 +710,7 @@ public class ActiveMQSessionContext extends SessionContext { // they are defined in broker.xml // This allows e.g. JMS non durable subs and temporary queues to continue to be used after failover if (!queueInfo.isDurable()) { + // TODO (mtaylor) QueueInfo needs updating to include new parameters, this method should pass in del mode CreateQueueMessage createQueueRequest = new CreateQueueMessage(queueInfo.getAddress(), queueInfo.getName(), queueInfo.getFilterString(), false, queueInfo.isTemporary(), false); sendPacketWithoutLock(sessionChannel, createQueueRequest); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7a51491c/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketDecoder.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketDecoder.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketDecoder.java index de1edbc..dbd7091 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketDecoder.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketDecoder.java @@ -30,9 +30,11 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ClusterTop import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateAddressMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueueMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueueMessage_V2; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueueMessage_V3; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSessionMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSessionResponseMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSharedQueueMessage; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSharedQueueMessage_V2; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectConsumerMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectMessage_V2; @@ -93,7 +95,9 @@ import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CRE import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CREATE_ADDRESS; import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CREATE_QUEUE; import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CREATE_QUEUE_V2; +import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CREATE_QUEUE_V3; import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CREATE_SHARED_QUEUE; +import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CREATE_SHARED_QUEUE_V2; import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.DELETE_QUEUE; import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.DISCONNECT; import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.DISCONNECT_CONSUMER; @@ -251,10 +255,18 @@ public abstract class PacketDecoder implements Serializable { packet = new CreateQueueMessage_V2(); break; } + case CREATE_QUEUE_V3: { + packet = new CreateQueueMessage_V3(); + break; + } case CREATE_SHARED_QUEUE: { packet = new CreateSharedQueueMessage(); break; } + case CREATE_SHARED_QUEUE_V2: { + packet = new CreateSharedQueueMessage_V2(); + break; + } case DELETE_QUEUE: { packet = new SessionDeleteQueueMessage(); break; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7a51491c/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java index abc1eef..e252623 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java @@ -253,6 +253,10 @@ public class PacketImpl implements Packet { public static final byte CREATE_QUEUE_V2 = -12; + public static final byte CREATE_QUEUE_V3 = -13; + + public static final byte CREATE_SHARED_QUEUE_V2 = -14; + // Static -------------------------------------------------------- public PacketImpl(final byte type) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7a51491c/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateAddressMessage.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateAddressMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateAddressMessage.java index 10c7ff3..9b18e48 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateAddressMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateAddressMessage.java @@ -16,28 +16,32 @@ */ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat; +import java.util.HashSet; +import java.util.Set; + import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; +import org.apache.activemq.artemis.core.server.RoutingType; public class CreateAddressMessage extends PacketImpl { private SimpleString address; - private boolean multicast; + private Set<RoutingType> routingTypes; private boolean autoCreated; private boolean requiresResponse; public CreateAddressMessage(final SimpleString address, - final boolean multicast, + Set<RoutingType> routingTypes, final boolean autoCreated, final boolean requiresResponse) { this(); this.address = address; - this.multicast = multicast; + this.routingTypes = routingTypes; this.autoCreated = autoCreated; this.requiresResponse = requiresResponse; } @@ -52,7 +56,7 @@ public class CreateAddressMessage extends PacketImpl { public String toString() { StringBuffer buff = new StringBuffer(getParentString()); buff.append(", address=" + address); - buff.append(", multicast=" + multicast); + buff.append(", routingTypes=" + routingTypes); buff.append(", autoCreated=" + autoCreated); buff.append("]"); return buff.toString(); @@ -62,10 +66,6 @@ public class CreateAddressMessage extends PacketImpl { return address; } - public boolean isMulticast() { - return multicast; - } - public boolean isRequiresResponse() { return requiresResponse; } @@ -78,10 +78,21 @@ public class CreateAddressMessage extends PacketImpl { this.address = address; } + public Set<RoutingType> getRoutingTypes() { + return routingTypes; + } + + public void setRoutingTypes(Set<RoutingType> routingTypes) { + this.routingTypes = routingTypes; + } + @Override public void encodeRest(final ActiveMQBuffer buffer) { buffer.writeSimpleString(address); - buffer.writeBoolean(multicast); + buffer.writeInt(routingTypes.size()); + for (RoutingType routingType : routingTypes) { + buffer.writeByte(routingType.getType()); + } buffer.writeBoolean(requiresResponse); buffer.writeBoolean(autoCreated); } @@ -89,7 +100,11 @@ public class CreateAddressMessage extends PacketImpl { @Override public void decodeRest(final ActiveMQBuffer buffer) { address = buffer.readSimpleString(); - multicast = buffer.readBoolean(); + int routingTypeSetSize = buffer.readInt(); + routingTypes = new HashSet<>(routingTypeSetSize); + for (int i = 0; i < routingTypeSetSize; i++) { + routingTypes.add(RoutingType.getType(buffer.readByte())); + } requiresResponse = buffer.readBoolean(); autoCreated = buffer.readBoolean(); } @@ -99,7 +114,7 @@ public class CreateAddressMessage extends PacketImpl { final int prime = 31; int result = super.hashCode(); result = prime * result + ((address == null) ? 0 : address.hashCode()); - result = prime * result + (multicast ? 1231 : 1237); + result = prime * result + (routingTypes.hashCode()); result = prime * result + (autoCreated ? 1231 : 1237); result = prime * result + (requiresResponse ? 1231 : 1237); return result; @@ -119,7 +134,7 @@ public class CreateAddressMessage extends PacketImpl { return false; } else if (!address.equals(other.address)) return false; - if (multicast != other.multicast) + if (routingTypes.equals(other.routingTypes)) return false; if (autoCreated != other.autoCreated) return false; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7a51491c/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateQueueMessage_V2.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateQueueMessage_V2.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateQueueMessage_V2.java index 13a4a58..610646e 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateQueueMessage_V2.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateQueueMessage_V2.java @@ -21,7 +21,7 @@ import org.apache.activemq.artemis.api.core.SimpleString; public class CreateQueueMessage_V2 extends CreateQueueMessage { - private boolean autoCreated; + protected boolean autoCreated; public CreateQueueMessage_V2(final SimpleString address, final SimpleString queueName, @@ -45,6 +45,10 @@ public class CreateQueueMessage_V2 extends CreateQueueMessage { super(CREATE_QUEUE_V2); } + public CreateQueueMessage_V2(byte packet) { + super(packet); + } + // Public -------------------------------------------------------- @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7a51491c/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateQueueMessage_V3.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateQueueMessage_V3.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateQueueMessage_V3.java new file mode 100644 index 0000000..fb5c9ef --- /dev/null +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateQueueMessage_V3.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.protocol.core.impl.wireformat; + +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.server.RoutingType; + +public class CreateQueueMessage_V3 extends CreateQueueMessage_V2 { + + private RoutingType routingType; + + private int maxConsumers; + + private boolean deleteOnNoConsumers; + + public CreateQueueMessage_V3(final SimpleString address, + final SimpleString queueName, + final RoutingType routingType, + final SimpleString filterString, + final boolean durable, + final boolean temporary, + final int maxConsumers, + final boolean deleteOnNoConsumers, + final boolean autoCreated, + final boolean requiresResponse) { + this(); + + this.address = address; + this.queueName = queueName; + this.filterString = filterString; + this.durable = durable; + this.temporary = temporary; + this.autoCreated = autoCreated; + this.requiresResponse = requiresResponse; + this.routingType = routingType; + this.maxConsumers = maxConsumers; + this.deleteOnNoConsumers = deleteOnNoConsumers; + } + + public CreateQueueMessage_V3() { + super(CREATE_QUEUE_V3); + } + + // Public -------------------------------------------------------- + + @Override + public String toString() { + StringBuffer buff = new StringBuffer(super.getParentString()); + buff.append(", routingType=" + routingType); + buff.append(", maxConsumers=" + maxConsumers); + buff.append(", deleteOnNoConsumers=" + deleteOnNoConsumers); + buff.append("]"); + return buff.toString(); + } + + public RoutingType getRoutingType() { + return routingType; + } + + public void setRoutingType(RoutingType routingType) { + this.routingType = routingType; + } + + public int getMaxConsumers() { + return maxConsumers; + } + + public void setMaxConsumers(int maxConsumers) { + this.maxConsumers = maxConsumers; + } + + public boolean isDeleteOnNoConsumers() { + return deleteOnNoConsumers; + } + + public void setDeleteOnNoConsumers(boolean deleteOnNoConsumers) { + this.deleteOnNoConsumers = deleteOnNoConsumers; + } + + @Override + public void encodeRest(final ActiveMQBuffer buffer) { + super.encodeRest(buffer); + buffer.writeByte(routingType.getType()); + buffer.writeInt(maxConsumers); + buffer.writeBoolean(deleteOnNoConsumers); + } + + @Override + public void decodeRest(final ActiveMQBuffer buffer) { + super.decodeRest(buffer); + routingType = RoutingType.getType(buffer.readByte()); + maxConsumers = buffer.readInt(); + deleteOnNoConsumers = buffer.readBoolean(); + } + + @Override + public int hashCode() { + final int prime = 31; + int result = super.hashCode(); + result = prime * result + (routingType.getType()); + result = prime * result + (maxConsumers); + result = prime * result + (deleteOnNoConsumers ? 1231 : 1237); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (!super.equals(obj)) + return false; + if (!(obj instanceof CreateQueueMessage_V3)) + return false; + CreateQueueMessage_V3 other = (CreateQueueMessage_V3) obj; + if (autoCreated != other.autoCreated) + return false; + return true; + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7a51491c/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateSharedQueueMessage.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateSharedQueueMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateSharedQueueMessage.java index f896102..af25ae9 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateSharedQueueMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateSharedQueueMessage.java @@ -22,15 +22,15 @@ import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; public class CreateSharedQueueMessage extends PacketImpl { - private SimpleString address; + protected SimpleString address; - private SimpleString queueName; + protected SimpleString queueName; - private SimpleString filterString; + protected SimpleString filterString; - private boolean durable; + protected boolean durable; - private boolean requiresResponse; + protected boolean requiresResponse; public CreateSharedQueueMessage(final SimpleString address, final SimpleString queueName, @@ -47,7 +47,11 @@ public class CreateSharedQueueMessage extends PacketImpl { } public CreateSharedQueueMessage() { - super(CREATE_SHARED_QUEUE); + this(CREATE_SHARED_QUEUE); + } + + public CreateSharedQueueMessage(byte packetType) { + super(packetType); } // Public -------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7a51491c/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateSharedQueueMessage_V2.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateSharedQueueMessage_V2.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateSharedQueueMessage_V2.java new file mode 100644 index 0000000..7c45ca7 --- /dev/null +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateSharedQueueMessage_V2.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.protocol.core.impl.wireformat; + +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; +import org.apache.activemq.artemis.core.server.RoutingType; + +public class CreateSharedQueueMessage_V2 extends CreateSharedQueueMessage { + + private RoutingType routingType; + + public CreateSharedQueueMessage_V2(final SimpleString address, + final SimpleString queueName, + final RoutingType routingType, + final SimpleString filterString, + final boolean durable, + final boolean requiresResponse) { + this(); + + this.address = address; + this.queueName = queueName; + this.filterString = filterString; + this.durable = durable; + this.requiresResponse = requiresResponse; + this.routingType = routingType; + } + + public CreateSharedQueueMessage_V2() { + super(CREATE_SHARED_QUEUE_V2); + } + + public RoutingType getRoutingType() { + return routingType; + } + + public void setRoutingType(RoutingType routingType) { + this.routingType = routingType; + } + + @Override + public String toString() { + StringBuffer buff = new StringBuffer(getParentString()); + buff.append(", address=" + address); + buff.append(", queueName=" + queueName); + buff.append(", filterString=" + filterString); + buff.append(", durable=" + durable); + buff.append(", requiresResponse=" + requiresResponse); + buff.append("]"); + return buff.toString(); + } + + + @Override + public void encodeRest(final ActiveMQBuffer buffer) { + buffer.writeSimpleString(address); + buffer.writeSimpleString(queueName); + buffer.writeNullableSimpleString(filterString); + buffer.writeBoolean(durable); + buffer.writeByte(routingType.getType()); + buffer.writeBoolean(requiresResponse); + } + + @Override + public void decodeRest(final ActiveMQBuffer buffer) { + address = buffer.readSimpleString(); + queueName = buffer.readSimpleString(); + filterString = buffer.readNullableSimpleString(); + durable = buffer.readBoolean(); + routingType = RoutingType.getType(buffer.readByte()); + requiresResponse = buffer.readBoolean(); + } + + @Override + public int hashCode() { + final int prime = 31; + int result = super.hashCode(); + result = prime * result + ((address == null) ? 0 : address.hashCode()); + result = prime * result + ((filterString == null) ? 0 : filterString.hashCode()); + result = prime * result + ((queueName == null) ? 0 : queueName.hashCode()); + result = prime * result + (durable ? 1231 : 1237); + result = prime * result + routingType.getType(); + result = prime * result + (requiresResponse ? 1231 : 1237); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (!super.equals(obj)) + return false; + if (!(obj instanceof CreateSharedQueueMessage_V2)) + return false; + CreateSharedQueueMessage_V2 other = (CreateSharedQueueMessage_V2) obj; + if (address == null) { + if (other.address != null) + return false; + } else if (!address.equals(other.address)) + return false; + if (filterString == null) { + if (other.filterString != null) + return false; + } else if (!filterString.equals(other.filterString)) + return false; + if (queueName == null) { + if (other.queueName != null) + return false; + } else if (!queueName.equals(other.queueName)) + return false; + if (durable != other.durable) + return false; + if (routingType != other.routingType) + return false; + if (requiresResponse != other.requiresResponse) + return false; + return true; + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7a51491c/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/RoutingType.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/RoutingType.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/RoutingType.java new file mode 100644 index 0000000..2f17335 --- /dev/null +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/RoutingType.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.server; + +public enum RoutingType { + + MULTICAST, ANYCAST; + + public byte getType() { + switch (this) { + case MULTICAST: + return 0; + case ANYCAST: + return 1; + default: + return -1; + } + } + + public static RoutingType getType(byte type) { + switch (type) { + case 0: + return MULTICAST; + case 1: + return ANYCAST; + default: + return null; + } + } +}
