http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7a51491c/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java index 184462b..5c43683 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java @@ -33,7 +33,9 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQEx import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateAddressMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueueMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueueMessage_V2; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueueMessage_V3; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSharedQueueMessage; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSharedQueueMessage_V2; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.NullResponseMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.RollbackMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionAcknowledgeMessage; @@ -78,7 +80,9 @@ import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection; import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.BindingQueryResult; +import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.QueueQueryResult; +import org.apache.activemq.artemis.core.server.RoutingType; import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.ServerSession; import org.apache.activemq.artemis.spi.core.remoting.Connection; @@ -87,7 +91,9 @@ import org.jboss.logging.Logger; import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CREATE_ADDRESS; import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CREATE_QUEUE; import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CREATE_QUEUE_V2; +import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CREATE_QUEUE_V3; import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CREATE_SHARED_QUEUE; +import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CREATE_SHARED_QUEUE_V2; import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.DELETE_QUEUE; import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_ACKNOWLEDGE; import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_BINDINGQUERY; @@ -227,7 +233,7 @@ public class ServerSessionPacketHandler implements ChannelHandler { case CREATE_ADDRESS: { CreateAddressMessage request = (CreateAddressMessage) packet; requiresResponse = request.isRequiresResponse(); - session.createAddress(request.getAddress(), request.isMulticast(), request.isAutoCreated()); + session.createAddress(request.getAddress(), request.getRoutingTypes(), request.isAutoCreated()); if (requiresResponse) { response = new NullResponseMessage(); } @@ -236,7 +242,7 @@ public class ServerSessionPacketHandler implements ChannelHandler { case CREATE_QUEUE: { CreateQueueMessage request = (CreateQueueMessage) packet; requiresResponse = request.isRequiresResponse(); - session.createQueue(request.getAddress(), request.getQueueName(), request.getFilterString(), request.isTemporary(), request.isDurable()); + session.createQueue(request.getAddress(), request.getQueueName(), RoutingType.MULTICAST, request.getFilterString(), request.isTemporary(), request.isDurable()); if (requiresResponse) { response = new NullResponseMessage(); } @@ -245,7 +251,25 @@ public class ServerSessionPacketHandler implements ChannelHandler { case CREATE_QUEUE_V2: { CreateQueueMessage_V2 request = (CreateQueueMessage_V2) packet; requiresResponse = request.isRequiresResponse(); - session.createQueue(request.getAddress(), request.getQueueName(), request.getFilterString(), request.isTemporary(), request.isDurable(), null, null, request.isAutoCreated()); + session.createQueue(request.getAddress(), + request.getQueueName(), + RoutingType.MULTICAST, + request.getFilterString(), + request.isTemporary(), + request.isDurable(), + Queue.MAX_CONSUMERS_UNLIMITED, + false, + request.isAutoCreated()); + if (requiresResponse) { + response = new NullResponseMessage(); + } + break; + } + case CREATE_QUEUE_V3: { + CreateQueueMessage_V3 request = (CreateQueueMessage_V3) packet; + requiresResponse = request.isRequiresResponse(); + session.createQueue(request.getAddress(), request.getQueueName(), request.getRoutingType(), request.getFilterString(), request.isTemporary(), request.isDurable(), request.getMaxConsumers(), request.isDeleteOnNoConsumers(), + request.isAutoCreated()); if (requiresResponse) { response = new NullResponseMessage(); } @@ -260,6 +284,15 @@ public class ServerSessionPacketHandler implements ChannelHandler { } break; } + case CREATE_SHARED_QUEUE_V2: { + CreateSharedQueueMessage_V2 request = (CreateSharedQueueMessage_V2) packet; + requiresResponse = request.isRequiresResponse(); + session.createSharedQueue(request.getAddress(), request.getQueueName(), request.getRoutingType(), request.isDurable(), request.getFilterString()); + if (requiresResponse) { + response = new NullResponseMessage(); + } + break; + } case DELETE_QUEUE: { requiresResponse = true; SessionDeleteQueueMessage request = (SessionDeleteQueueMessage) packet;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7a51491c/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java index 64e496a..39a6ac7 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java @@ -233,7 +233,7 @@ public class ActiveMQPacketHandler implements ChannelHandler { private void handleCreateQueue(final CreateQueueMessage request) { try { - server.createQueue(request.getAddress(), request.getQueueName(), request.getFilterString(), request.isDurable(), request.isTemporary()); + server.createQueue(request.getAddress(), null, request.getQueueName(), request.getFilterString(), request.isDurable(), request.isTemporary()); } catch (Exception e) { ActiveMQServerLogger.LOGGER.failedToHandleCreateQueue(e); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7a51491c/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java index 5d39df0..1c20ba5 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java @@ -17,6 +17,7 @@ package org.apache.activemq.artemis.core.server; import java.io.File; +import java.util.Set; import org.apache.activemq.artemis.api.core.ActiveMQAddressDoesNotExistException; import org.apache.activemq.artemis.api.core.ActiveMQAddressExistsException; @@ -45,7 +46,6 @@ import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.postoffice.Binding; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationSyncFileMessage; import org.apache.activemq.artemis.core.security.CheckType; -import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.jboss.logging.Messages; import org.jboss.logging.annotations.Cause; import org.jboss.logging.annotations.Message; @@ -389,7 +389,7 @@ public interface ActiveMQMessageBundle { ActiveMQQueueMaxConsumerLimitReached maxConsumerLimitReachedForQueue(SimpleString address, SimpleString queueName); @Message(id = 119201, value = "Expected Routing Type {1} but found {2} for address {0}", format = Message.Format.MESSAGE_FORMAT) - ActiveMQUnexpectedRoutingTypeForAddress unexpectedRoutingTypeForAddress(SimpleString address, AddressInfo.RoutingType expectedRoutingType, AddressInfo.RoutingType actualRoutingType); + ActiveMQUnexpectedRoutingTypeForAddress unexpectedRoutingTypeForAddress(SimpleString address, RoutingType expectedRoutingType, Set<RoutingType> supportedRoutingTypes); @Message(id = 119202, value = "Invalid Queue Configuration for Queue {0}, Address {1}. Expected {2} to be {3} but was {4}", format = Message.Format.MESSAGE_FORMAT) ActiveMQInvalidQueueConfiguration invalidQueueConfiguration(SimpleString address, SimpleString queueName, String queuePropertyName, Object expectedValue, Object actualValue); @@ -402,4 +402,12 @@ public interface ActiveMQMessageBundle { @Message(id = 119205, value = "Address {0} has bindings", format = Message.Format.MESSAGE_FORMAT) ActiveMQDeleteAddressException addressHasBindings(SimpleString address); + + @Message(id = 119206, value = "Queue {0} has invalid max consumer setting: {1}", format = Message.Format.MESSAGE_FORMAT) + IllegalArgumentException invalidMaxConsumers(String queueName, int value); + + @Message(id = 119207, value = "Can not create queue with delivery mode: {0}, Supported delivery modes for address: {1} are {2}", format = Message.Format.MESSAGE_FORMAT) + IllegalArgumentException invalidRoutingTypeForAddress(RoutingType routingType, + String address, + Set<RoutingType> supportedRoutingTypes); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7a51491c/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java index 143b12e..74292c6 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java @@ -264,6 +264,15 @@ public interface ActiveMQServer extends ActiveMQComponent { */ boolean waitForActivation(long timeout, TimeUnit unit) throws InterruptedException; + Queue createQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString filter, + SimpleString user, + boolean durable, + boolean temporary, + boolean autoCreated, + Integer maxConsumers, + Boolean deleteOnNoConsumers, + boolean autoCreateAddress) throws Exception; + /** * Creates a transient queue. A queue that will exist as long as there are consumers. * The queue will be deleted as soon as all the consumers are removed. @@ -277,72 +286,54 @@ public interface ActiveMQServer extends ActiveMQComponent { * @throws org.apache.activemq.artemis.api.core.ActiveMQInvalidTransientQueueUseException if the shared queue already exists with a different {@code address} or {@code filterString} * @throws NullPointerException if {@code address} is {@code null} */ - void createSharedQueue(final SimpleString address, - final SimpleString name, - final SimpleString filterString, + void createSharedQueue(final SimpleString address, final RoutingType routingType, final SimpleString name, final SimpleString filterString, final SimpleString user, boolean durable) throws Exception; - Queue createQueue(SimpleString address, - SimpleString queueName, - SimpleString filter, + Queue createQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString filter, boolean durable, boolean temporary) throws Exception; - Queue createQueue(SimpleString address, - SimpleString queueName, - SimpleString filterString, + @Deprecated + Queue createQueue(SimpleString address, SimpleString queueName, SimpleString filter, boolean durable, boolean temporary) throws Exception; + + Queue createQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString filterString, boolean durable, boolean temporary, - Integer maxConsumers, - Boolean deleteOnNoConsumers, + int maxConsumers, + boolean deleteOnNoConsumers, boolean autoCreateAddress) throws Exception; - Queue createQueue(SimpleString address, - SimpleString queueName, - SimpleString filter, + Queue createQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString filter, SimpleString user, boolean durable, boolean temporary) throws Exception; - Queue createQueue(SimpleString address, - SimpleString queueName, - SimpleString filter, + Queue createQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString filter, SimpleString user, boolean durable, boolean temporary, - Integer maxConsumers, - Boolean deleteOnNoConsumers, + int maxConsumers, + boolean deleteOnNoConsumers, boolean autoCreateAddress) throws Exception; - Queue createQueue(SimpleString address, - SimpleString queueName, - SimpleString filter, + Queue createQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString filter, SimpleString user, boolean durable, boolean temporary, boolean autoCreated) throws Exception; - Queue createQueue(SimpleString address, - SimpleString queueName, - SimpleString filter, - SimpleString user, - boolean durable, - boolean temporary, - boolean autoCreated, - Integer maxConsumers, - Boolean deleteOnNoConsumers, - boolean autoCreateAddress) throws Exception; + @Deprecated + Queue deployQueue(String address, String queue, String filter, boolean durable, boolean temporary) throws Exception; - Queue deployQueue(SimpleString address, - SimpleString queueName, - SimpleString filterString, + @Deprecated + Queue deployQueue(SimpleString address, SimpleString queue, SimpleString filter, boolean durable, boolean temporary) throws Exception; + + Queue deployQueue(SimpleString address, RoutingType routingType, SimpleString resourceName, SimpleString filterString, boolean durable, boolean temporary) throws Exception; - Queue deployQueue(SimpleString address, - SimpleString queueName, - SimpleString filterString, + Queue deployQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString filterString, boolean durable, boolean temporary, boolean autoCreated) throws Exception; @@ -353,14 +344,12 @@ public interface ActiveMQServer extends ActiveMQComponent { QueueQueryResult queueQuery(SimpleString name) throws Exception; - Queue deployQueue(SimpleString address, - SimpleString queueName, - SimpleString filterString, + Queue deployQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString filterString, boolean durable, boolean temporary, boolean autoCreated, - Integer maxConsumers, - Boolean deleteOnNoConsumers, + int maxConsumers, + boolean deleteOnNoConsumers, boolean autoCreateAddress) throws Exception; void destroyQueue(SimpleString queueName) throws Exception; @@ -416,6 +405,7 @@ public interface ActiveMQServer extends ActiveMQComponent { Queue createQueue(SimpleString addressName, SimpleString queueName, + RoutingType routingType, SimpleString filterString, SimpleString user, boolean durable, @@ -423,14 +413,14 @@ public interface ActiveMQServer extends ActiveMQComponent { boolean ignoreIfExists, boolean transientQueue, boolean autoCreated, - Integer maxConsumers, - Boolean deleteOnNoConsumers, + int maxConsumers, + boolean deleteOnNoConsumers, boolean autoCreateAddress) throws Exception; /* - * add a ProtocolManagerFactory to be used. Note if @see Configuration#isResolveProtocols is tur then this factory will - * replace any factories with the same protocol - * */ + * add a ProtocolManagerFactory to be used. Note if @see Configuration#isResolveProtocols is tur then this factory will + * replace any factories with the same protocol + * */ void addProtocolManagerFactory(ProtocolManagerFactory factory); /* http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7a51491c/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java index 2b845d5..cf044f1 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java @@ -32,6 +32,8 @@ import org.apache.activemq.artemis.utils.ReferenceCounter; public interface Queue extends Bindable { + int MAX_CONSUMERS_UNLIMITED = -1; + SimpleString getName(); long getID(); @@ -40,6 +42,10 @@ public interface Queue extends Bindable { PageSubscription getPageSubscription(); + RoutingType getRoutingType(); + + void setRoutingType(RoutingType routingType); + boolean isDurable(); boolean isTemporary(); @@ -233,6 +239,7 @@ public interface Queue extends Bindable { /** * if the pause was persisted + * * @return */ boolean isPersistedPause(); @@ -283,4 +290,5 @@ public interface Queue extends Bindable { SimpleString getUser(); void decDelivering(int size); + } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7a51491c/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java index 81834be..3435ca0 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.artemis.core.server; +import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.filter.FilterUtils; @@ -33,7 +34,8 @@ public final class QueueConfig { private final boolean durable; private final boolean temporary; private final boolean autoCreated; - private final Integer maxConsumers; + private final RoutingType routingType; + private final int maxConsumers; private final boolean deleteOnNoConsumers; public static final class Builder { @@ -47,7 +49,8 @@ public final class QueueConfig { private boolean durable; private boolean temporary; private boolean autoCreated; - private Integer maxConsumers; + private RoutingType routingType; + private int maxConsumers; private boolean deleteOnNoConsumers; private Builder(final long id, final SimpleString name) { @@ -64,8 +67,9 @@ public final class QueueConfig { this.durable = true; this.temporary = false; this.autoCreated = true; - this.maxConsumers = -1; - this.deleteOnNoConsumers = false; + this.routingType = ActiveMQDefaultConfiguration.getDefaultRoutingType(); + this.maxConsumers = ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(); + this.deleteOnNoConsumers = ActiveMQDefaultConfiguration.getDefaultDeleteQueueOnNoConsumers(); validateState(); } @@ -112,7 +116,7 @@ public final class QueueConfig { return this; } - public Builder maxConsumers(final Integer maxConsumers) { + public Builder maxConsumers(final int maxConsumers) { this.maxConsumers = maxConsumers; return this; } @@ -122,6 +126,11 @@ public final class QueueConfig { return this; } + public Builder deliveryMode(RoutingType routingType) { + this.routingType = routingType; + return this; + } + /** * Returns a new {@link QueueConfig} using the parameters configured on the {@link Builder}. * <br> @@ -143,7 +152,7 @@ public final class QueueConfig { } else { pageSubscription = null; } - return new QueueConfig(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, maxConsumers, deleteOnNoConsumers); + return new QueueConfig(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, routingType, maxConsumers, deleteOnNoConsumers); } } @@ -185,7 +194,8 @@ public final class QueueConfig { final boolean durable, final boolean temporary, final boolean autoCreated, - final Integer maxConsumers, + final RoutingType routingType, + final int maxConsumers, final boolean deleteOnNoConsumers) { this.id = id; this.address = address; @@ -196,6 +206,7 @@ public final class QueueConfig { this.durable = durable; this.temporary = temporary; this.autoCreated = autoCreated; + this.routingType = routingType; this.deleteOnNoConsumers = deleteOnNoConsumers; this.maxConsumers = maxConsumers; } @@ -240,10 +251,14 @@ public final class QueueConfig { return deleteOnNoConsumers; } - public Integer maxConsumers() { + public int maxConsumers() { return maxConsumers; } + public RoutingType deliveryMode() { + return routingType; + } + @Override public boolean equals(Object o) { if (this == o) @@ -269,6 +284,8 @@ public final class QueueConfig { return false; if (pageSubscription != null ? !pageSubscription.equals(that.pageSubscription) : that.pageSubscription != null) return false; + if (routingType != that.routingType) + return false; if (maxConsumers != that.maxConsumers) return false; if (deleteOnNoConsumers != that.deleteOnNoConsumers) @@ -288,6 +305,7 @@ public final class QueueConfig { result = 31 * result + (durable ? 1 : 0); result = 31 * result + (temporary ? 1 : 0); result = 31 * result + (autoCreated ? 1 : 0); + result = 31 * result + routingType.getType(); result = 31 * result + maxConsumers; result = 31 * result + (deleteOnNoConsumers ? 1 : 0); return result; @@ -305,6 +323,7 @@ public final class QueueConfig { + ", durable=" + durable + ", temporary=" + temporary + ", autoCreated=" + autoCreated + + ", routingType=" + routingType + ", maxConsumers=" + maxConsumers + ", deleteOnNoConsumers=" + deleteOnNoConsumers + '}'; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7a51491c/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java index 23426ca..badadf4 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java @@ -104,11 +104,48 @@ public interface ServerSession extends SecurityAuth { Queue createQueue(SimpleString address, SimpleString name, + RoutingType routingType, SimpleString filterString, boolean temporary, boolean durable) throws Exception; - AddressInfo createAddress(final SimpleString address, final boolean multicast, final boolean autoCreated) throws Exception; + /** Create queue with default delivery mode + * + * @param address + * @param name + * @param filterString + * @param temporary + * @param durable + * @return + * @throws Exception + */ + Queue createQueue(SimpleString address, + SimpleString name, + SimpleString filterString, + boolean temporary, + boolean durable) throws Exception; + + Queue createQueue(SimpleString address, + SimpleString name, + RoutingType routingType, + SimpleString filterString, + boolean temporary, + boolean durable, + int maxConsumers, + boolean deleteOnNoConsumers, + boolean autoCreated) throws Exception; + + Queue createQueue(SimpleString address, + SimpleString name, + RoutingType routingType, + SimpleString filterString, + boolean temporary, + boolean durable, + boolean autoCreated) throws Exception; + + AddressInfo createAddress(final SimpleString address, Set<RoutingType> routingTypes, final boolean autoCreated) throws Exception; + + AddressInfo createAddress(final SimpleString address, RoutingType routingType, final boolean autoCreated) throws Exception; void deleteQueue(SimpleString name) throws Exception; @@ -186,14 +223,11 @@ public interface ServerSession extends SecurityAuth { boolean isClosed(); - Queue createQueue(SimpleString address, - SimpleString name, - SimpleString filterString, - boolean temporary, - boolean durable, - Integer maxConsumers, - Boolean deleteOnNoConsumers, - final Boolean autoCreated) throws Exception; + void createSharedQueue(SimpleString address, + SimpleString name, + final RoutingType routingType, + boolean durable, + SimpleString filterString) throws Exception; void createSharedQueue(SimpleString address, SimpleString name, http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7a51491c/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java index 5ee94f0..2ae2329 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java @@ -52,6 +52,7 @@ import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl; import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; +import org.apache.activemq.artemis.core.server.RoutingType; import org.apache.activemq.artemis.core.server.NodeManager; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.cluster.ActiveMQServerSideProtocolManagerFactory; @@ -719,7 +720,7 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn } else { // Add binding in storage so the queue will get reloaded on startup and we can find it - it's never // actually routed to at that address though - queue = server.createQueue(queueName, queueName, null, true, false); + queue = server.createQueue(queueName, RoutingType.MULTICAST, queueName, null, true, false); } // There are a few things that will behave differently when it's an internal queue http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7a51491c/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index 9253e74..d9c2199 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -110,6 +110,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.Bindable; import org.apache.activemq.artemis.core.server.BindingQueryResult; +import org.apache.activemq.artemis.core.server.RoutingType; import org.apache.activemq.artemis.core.server.Divert; import org.apache.activemq.artemis.core.server.JournalType; import org.apache.activemq.artemis.core.server.LargeServerMessage; @@ -1462,11 +1463,12 @@ public class ActiveMQServerImpl implements ActiveMQServer { @Override public Queue createQueue(final SimpleString address, + final RoutingType routingType, final SimpleString queueName, final SimpleString filterString, final boolean durable, final boolean temporary) throws Exception { - return createQueue(address, queueName, filterString, null, durable, temporary, false, false, false); + return createQueue(address, routingType, queueName, filterString, null, durable, temporary, ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), ActiveMQDefaultConfiguration.getDefaultDeleteQueueOnNoConsumers(), true); } @Override @@ -1474,51 +1476,61 @@ public class ActiveMQServerImpl implements ActiveMQServer { final SimpleString queueName, final SimpleString filterString, final boolean durable, + final boolean temporary) throws Exception { + return createQueue(address, ActiveMQDefaultConfiguration.getDefaultRoutingType(), queueName, filterString, durable, temporary); + } + + @Override + public Queue createQueue(final SimpleString address, + final RoutingType routingType, + final SimpleString queueName, + final SimpleString filterString, + final boolean durable, final boolean temporary, - final Integer maxConsumers, - final Boolean deleteOnNoConsumers, + final int maxConsumers, + final boolean deleteOnNoConsumers, final boolean autoCreateAddress) throws Exception { - return createQueue(address, queueName, filterString, null, durable, temporary, false, false, false, maxConsumers, deleteOnNoConsumers, autoCreateAddress); + return createQueue(address, queueName, routingType, filterString, null, durable, temporary, false, false, false, maxConsumers, deleteOnNoConsumers, autoCreateAddress); } @Override public Queue createQueue(final SimpleString address, + final RoutingType routingType, final SimpleString queueName, final SimpleString filterString, final SimpleString user, final boolean durable, final boolean temporary) throws Exception { - return createQueue(address, queueName, filterString, user, durable, temporary, false, false, false); + return createQueue(address, routingType, queueName, filterString, user, durable, temporary, ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), ActiveMQDefaultConfiguration.getDefaultDeleteQueueOnNoConsumers(), true); } @Override - public Queue createQueue(SimpleString address, - SimpleString queueName, - SimpleString filter, + public Queue createQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString filter, SimpleString user, boolean durable, boolean temporary, - Integer maxConsumers, - Boolean deleteOnNoConsumers, + int maxConsumers, + boolean deleteOnNoConsumers, boolean autoCreateAddress) throws Exception { - return createQueue(address, queueName, filter, user, durable, temporary, false, false, false, maxConsumers, deleteOnNoConsumers, autoCreateAddress); + return createQueue(address, queueName, routingType, filter, user, durable, temporary, false, false, false, maxConsumers, deleteOnNoConsumers, autoCreateAddress); } @Override public Queue createQueue(final SimpleString address, + final RoutingType routingType, final SimpleString queueName, final SimpleString filterString, final SimpleString user, final boolean durable, final boolean temporary, final boolean autoCreated) throws Exception { - return createQueue(address, queueName, filterString, user, durable, temporary, false, false, autoCreated); + return createQueue(address, routingType, queueName, filterString, user, durable, temporary, + ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), + ActiveMQDefaultConfiguration.getDefaultDeleteQueueOnNoConsumers(), autoCreated); } @Override - public Queue createQueue(SimpleString address, - SimpleString queueName, - SimpleString filter, + public Queue createQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString filter, SimpleString user, boolean durable, boolean temporary, @@ -1526,20 +1538,27 @@ public class ActiveMQServerImpl implements ActiveMQServer { Integer maxConsumers, Boolean deleteOnNoConsumers, boolean autoCreateAddress) throws Exception { - return createQueue(address, queueName, filter, user, durable, temporary, false, false, autoCreated, maxConsumers, deleteOnNoConsumers, autoCreateAddress); + return createQueue(address, queueName, routingType, filter, user, durable, temporary, false, false, autoCreated, maxConsumers, deleteOnNoConsumers, autoCreateAddress); } @Override - public void createSharedQueue(final SimpleString address, - final SimpleString name, - final SimpleString filterString, + public void createSharedQueue(final SimpleString address, RoutingType routingType, final SimpleString name, final SimpleString filterString, final SimpleString user, boolean durable) throws Exception { //force the old contract about address if (address == null) { throw new NullPointerException("address can't be null!"); } - final Queue queue = createQueue(address, name, filterString, user, durable, !durable, true, !durable, false); + + if (routingType == null) { + AddressInfo addressInfo = getAddressInfo(address); + routingType = addressInfo.getRoutingTypes().size() == 1 ? addressInfo.getRoutingType() : ActiveMQDefaultConfiguration.getDefaultRoutingType(); + if (routingType == null) { + // TODO (mtaylor) throw exception Can not determine routing type info from address + } + } + + final Queue queue = createQueue(address, routingType, name, filterString, user, durable, !durable, false); if (!queue.getAddress().equals(address)) { throw ActiveMQMessageBundle.BUNDLE.queueSubscriptionBelongsToDifferentAddress(name); @@ -1578,34 +1597,55 @@ public class ActiveMQServerImpl implements ActiveMQServer { final SimpleString filterString, final boolean durable, final boolean temporary) throws Exception { - return deployQueue(address, resourceName, filterString, durable, temporary, false); + return deployQueue(address, ActiveMQDefaultConfiguration.getDefaultRoutingType(), resourceName, filterString, durable, temporary, false); + } + + @Override + public Queue deployQueue(final String address, + final String resourceName, + final String filterString, + final boolean durable, + final boolean temporary) throws Exception { + return deployQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(resourceName), SimpleString.toSimpleString(filterString), durable, temporary); + } + + @Override + public Queue deployQueue(final SimpleString address, + final RoutingType routingType, + final SimpleString resourceName, + final SimpleString filterString, + final boolean durable, + final boolean temporary) throws Exception { + return deployQueue(address, routingType, resourceName, filterString, durable, temporary, false); } @Override public Queue deployQueue(final SimpleString address, + final RoutingType routingType, final SimpleString queueName, final SimpleString filterString, final boolean durable, final boolean temporary, final boolean autoCreated) throws Exception { - return deployQueue(address, queueName, filterString, durable, temporary, autoCreated, null, null, true); + return deployQueue(address, routingType, queueName, filterString, durable, temporary, autoCreated, ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), ActiveMQDefaultConfiguration.getDefaultDeleteQueueOnNoConsumers(), true); } @Override public Queue deployQueue(final SimpleString address, + final RoutingType routingType, final SimpleString queueName, final SimpleString filterString, final boolean durable, final boolean temporary, final boolean autoCreated, - final Integer maxConsumers, - final Boolean deleteOnNoConsumers, + final int maxConsumers, + final boolean deleteOnNoConsumers, final boolean autoCreateAddress) throws Exception { // TODO: fix logging here as this could be for a topic or queue ActiveMQServerLogger.LOGGER.deployQueue(queueName); - return createQueue(address, queueName, filterString, null, durable, temporary, true, false, autoCreated, maxConsumers, deleteOnNoConsumers, autoCreateAddress); + return createQueue(address, queueName, routingType, filterString, null, durable, temporary, true, false, autoCreated, maxConsumers, deleteOnNoConsumers, autoCreateAddress); } @Override @@ -2209,11 +2249,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { private void deployAddressesFromConfiguration() throws Exception { for (CoreAddressConfiguration config : configuration.getAddressConfigurations()) { - AddressInfo info = new AddressInfo(SimpleString.toSimpleString(config.getName())); - info.setRoutingType(config.getRoutingType()); - info.setDefaultDeleteOnNoConsumers(config.getDefaultDeleteOnNoConsumers()); - info.setDefaultMaxQueueConsumers(config.getDefaultMaxConsumers()); - + AddressInfo info = new AddressInfo(SimpleString.toSimpleString(config.getName()), config.getRoutingTypes()); createOrUpdateAddressInfo(info); deployQueuesFromListCoreQueueConfiguration(config.getQueueConfigurations()); } @@ -2221,7 +2257,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { private void deployQueuesFromListCoreQueueConfiguration(List<CoreQueueConfiguration> queues) throws Exception { for (CoreQueueConfiguration config : queues) { - deployQueue(SimpleString.toSimpleString(config.getAddress()), SimpleString.toSimpleString(config.getName()), SimpleString.toSimpleString(config.getFilterString()), config.isDurable(), false, false, config.getMaxConsumers(), config.getDeleteOnNoConsumers(), true); + deployQueue(SimpleString.toSimpleString(config.getAddress()), config.getRoutingType(), SimpleString.toSimpleString(config.getName()), SimpleString.toSimpleString(config.getFilterString()), config.isDurable(), false, false, config.getMaxConsumers(), config.getDeleteOnNoConsumers(), true); } } @@ -2378,21 +2414,10 @@ public class ActiveMQServerImpl implements ActiveMQServer { return postOffice.getAddressInfo(address); } - private Queue createQueue(final SimpleString addressName, - final SimpleString queueName, - final SimpleString filterString, - final SimpleString user, - final boolean durable, - final boolean temporary, - final boolean ignoreIfExists, - final boolean transientQueue, - final boolean autoCreated) throws Exception { - return createQueue(addressName, queueName, filterString, user, durable, temporary, ignoreIfExists, transientQueue, autoCreated, null, null, true); - } - @Override public Queue createQueue(final SimpleString addressName, final SimpleString queueName, + final RoutingType routingType, final SimpleString filterString, final SimpleString user, final boolean durable, @@ -2400,8 +2425,8 @@ public class ActiveMQServerImpl implements ActiveMQServer { final boolean ignoreIfExists, final boolean transientQueue, final boolean autoCreated, - final Integer maxConsumers, - final Boolean deleteOnNoConsumers, + final int maxConsumers, + final boolean deleteOnNoConsumers, final boolean autoCreateAddress) throws Exception { final QueueBinding binding = (QueueBinding) postOffice.getBinding(queueName); @@ -2426,27 +2451,32 @@ public class ActiveMQServerImpl implements ActiveMQServer { } AddressInfo defaultAddressInfo = new AddressInfo(addressName); + defaultAddressInfo.addRoutingType(ActiveMQDefaultConfiguration.getDefaultRoutingType()); AddressInfo info = postOffice.getAddressInfo(addressName); if (info == null) { if (autoCreateAddress) { - info = defaultAddressInfo; + postOffice.addAddressInfo(defaultAddressInfo); + info = postOffice.getAddressInfo(addressName); } else { throw ActiveMQMessageBundle.BUNDLE.addressDoesNotExist(addressName); } } - final boolean isDeleteOnNoConsumers = deleteOnNoConsumers == null ? info.isDefaultDeleteOnNoConsumers() : deleteOnNoConsumers; - final int noMaxConsumers = maxConsumers == null ? info.getDefaultMaxQueueConsumers() : maxConsumers; + final QueueConfig queueConfig = queueConfigBuilder.filter(filter).pagingManager(pagingManager).user(user).durable(durable).temporary(temporary).autoCreated(autoCreated).deliveryMode(routingType).maxConsumers(maxConsumers).deleteOnNoConsumers(deleteOnNoConsumers).build(); - final QueueConfig queueConfig = queueConfigBuilder.filter(filter).pagingManager(pagingManager).user(user).durable(durable).temporary(temporary).autoCreated(autoCreated).deleteOnNoConsumers(isDeleteOnNoConsumers).maxConsumers(noMaxConsumers).build(); final Queue queue = queueFactory.createQueueWith(queueConfig); boolean addressAlreadyExists = true; - if (postOffice.getAddressInfo(queue.getAddress()) == null) { - postOffice.addAddressInfo(new AddressInfo(queue.getAddress()).setRoutingType(AddressInfo.RoutingType.MULTICAST).setDefaultMaxQueueConsumers(maxConsumers == null ? ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers() : maxConsumers)); + AddressInfo addressInfo = postOffice.getAddressInfo(queue.getAddress()); + if (addressInfo == null) { + postOffice.addAddressInfo(new AddressInfo(queue.getAddress())); addressAlreadyExists = false; + } else { + if (!addressInfo.getRoutingTypes().contains(routingType)) { + throw ActiveMQMessageBundle.BUNDLE.invalidRoutingTypeForAddress(routingType, addressInfo.getName().toString(), addressInfo.getRoutingTypes()); + } } if (transientQueue) { @@ -2455,7 +2485,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { queue.setConsumersRefCount(new AutoCreatedQueueManagerImpl(this, queue.getName())); } - final QueueBinding localQueueBinding = new LocalQueueBinding(getAddressInfo(queue.getAddress()), queue, nodeManager.getNodeId()); + final QueueBinding localQueueBinding = new LocalQueueBinding(queue.getAddress(), queue, nodeManager.getNodeId()); if (queue.isDurable()) { storageManager.addQueueBinding(txID, localQueueBinding); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7a51491c/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java index 9653a4e..64d6dd5 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java @@ -16,8 +16,12 @@ */ package org.apache.activemq.artemis.core.server.impl; -import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.server.RoutingType; public class AddressInfo { @@ -25,52 +29,36 @@ public class AddressInfo { private final SimpleString name; - private RoutingType routingType = RoutingType.MULTICAST; - - private boolean defaultDeleteOnNoConsumers = ActiveMQDefaultConfiguration.getDefaultDeleteQueueOnNoConsumers(); - - private int defaultMaxQueueConsumers = ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(); - private boolean autoCreated = false; private boolean deletable = false; + private Set<RoutingType> routingTypes; + public AddressInfo(SimpleString name) { this.name = name; + routingTypes = new HashSet<>(); } - public AddressInfo(SimpleString name, RoutingType routingType, boolean defaultDeleteOnNoConsumers, int defaultMaxConsumers) { - this(name); - this.routingType = routingType; - this.defaultDeleteOnNoConsumers = defaultDeleteOnNoConsumers; - this.defaultMaxQueueConsumers = defaultMaxConsumers; - } - - public RoutingType getRoutingType() { - return routingType; - } - - public AddressInfo setRoutingType(RoutingType routingType) { - this.routingType = routingType; - return this; - } - - public boolean isDefaultDeleteOnNoConsumers() { - return defaultDeleteOnNoConsumers; - } - - public AddressInfo setDefaultDeleteOnNoConsumers(boolean defaultDeleteOnNoConsumers) { - this.defaultDeleteOnNoConsumers = defaultDeleteOnNoConsumers; - return this; - } - - public int getDefaultMaxQueueConsumers() { - return defaultMaxQueueConsumers; + /** + * Creates an AddressInfo object with a Set of routing types + * @param name + * @param routingTypes + */ + public AddressInfo(SimpleString name, Set<RoutingType> routingTypes) { + this.name = name; + this.routingTypes = routingTypes; } - public AddressInfo setDefaultMaxQueueConsumers(int defaultMaxQueueConsumers) { - this.defaultMaxQueueConsumers = defaultMaxQueueConsumers; - return this; + /** + * Creates an AddressInfo object with a single RoutingType associated with it. + * @param name + * @param routingType + */ + public AddressInfo(SimpleString name, RoutingType routingType) { + this.name = name; + this.routingTypes = new HashSet<>(); + routingTypes.add(routingType); } public boolean isAutoCreated() { @@ -94,42 +82,47 @@ public class AddressInfo { return id; } + public Set<RoutingType> getRoutingTypes() { + return routingTypes; + } + + public AddressInfo setRoutingTypes(Set<RoutingType> routingTypes) { + this.routingTypes = routingTypes; + return this; + } + + public AddressInfo addRoutingType(RoutingType routingType) { + if (routingTypes == null) { + routingTypes = new HashSet<>(); + } + routingTypes.add(routingType); + return this; + } + + public RoutingType getRoutingType() { + /* We want to use a Set to guarantee only a single entry for ANYCAST, MULTICAST can be added to routing types. + There are cases where we also want to get any routing type (when a queue doesn't specifyc it's routing type for + example. For this reason we return the first element in the Set. + */ + // TODO There must be a better way of doing this. This creates an iterator on each lookup. + for (RoutingType routingType : routingTypes) { + return routingType; + } + return null; + } + @Override public String toString() { StringBuffer buff = new StringBuffer(); buff.append("Address [name=" + name); buff.append(", id=" + id); - buff.append(", routingType=" + routingType); - buff.append(", defaultMaxQueueConsumers=" + defaultMaxQueueConsumers); - buff.append(", defaultDeleteOnNoConsumers=" + defaultDeleteOnNoConsumers); + buff.append(", routingTypes={"); + for (RoutingType routingType : routingTypes) { + buff.append(routingType.toString() + ","); + } buff.append(", autoCreated=" + autoCreated); buff.append("]"); return buff.toString(); } - public enum RoutingType { - MULTICAST, ANYCAST; - - public byte getType() { - switch (this) { - case MULTICAST: - return 0; - case ANYCAST: - return 1; - default: - return -1; - } - } - - public static RoutingType getType(byte type) { - switch (type) { - case 0: - return MULTICAST; - case 1: - return ANYCAST; - default: - return null; - } - } - } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7a51491c/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java index a4fa5dc..8896aa4 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java @@ -28,6 +28,7 @@ import org.apache.activemq.artemis.core.paging.cursor.PageSubscription; import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.postoffice.PostOffice; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; +import org.apache.activemq.artemis.core.server.RoutingType; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.ServerMessage; @@ -56,6 +57,7 @@ public class LastValueQueue extends QueueImpl { final boolean durable, final boolean temporary, final boolean autoCreated, + final RoutingType routingType, final Integer maxConsumers, final Boolean deleteOnNoConsumers, final ScheduledExecutorService scheduledExecutor, @@ -63,7 +65,7 @@ public class LastValueQueue extends QueueImpl { final StorageManager storageManager, final HierarchicalRepository<AddressSettings> addressSettingsRepository, final Executor executor) { - super(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, maxConsumers, deleteOnNoConsumers, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor); + super(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, routingType, maxConsumers, deleteOnNoConsumers, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor); new Exception("LastValueQeue " + this).toString(); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7a51491c/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java index eb31737..20ef545 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java @@ -160,7 +160,7 @@ public class PostOfficeJournalLoader implements JournalLoader { } } - final Binding binding = new LocalQueueBinding(postOffice.getAddressInfo(queue.getAddress()), queue, nodeManager.getNodeId()); + final Binding binding = new LocalQueueBinding(queue.getAddress(), queue, nodeManager.getNodeId()); queues.put(queue.getID(), queue); postOffice.addBinding(binding); @@ -178,9 +178,7 @@ public class PostOfficeJournalLoader implements JournalLoader { // TODO: figure out what else to set here AddressInfo addressInfo = new AddressInfo(addressBindingInfo.getName()) - .setRoutingType(addressBindingInfo.getRoutingType()) - .setDefaultMaxQueueConsumers(addressBindingInfo.getDefaultMaxConsumers()); - + .setRoutingTypes(addressBindingInfo.getRoutingTypes()); postOffice.addAddressInfo(addressInfo); managementService.registerAddress(addressInfo); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7a51491c/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java index bcc7c79..46beee7 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java @@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.server.impl; import java.util.concurrent.ScheduledExecutorService; +import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.paging.cursor.PageSubscription; @@ -75,9 +76,9 @@ public class QueueFactoryImpl implements QueueFactory { final AddressSettings addressSettings = addressSettingsRepository.getMatch(config.address().toString()); final Queue queue; if (addressSettings.isLastValueQueue()) { - queue = new LastValueQueue(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), config.maxConsumers(), config.isDeleteOnNoConsumers(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor()); + queue = new LastValueQueue(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), config.deliveryMode(), config.maxConsumers(), config.isDeleteOnNoConsumers(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor()); } else { - queue = new QueueImpl(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), config.maxConsumers(), config.isDeleteOnNoConsumers(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor()); + queue = new QueueImpl(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), config.deliveryMode(), config.maxConsumers(), config.isDeleteOnNoConsumers(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor()); } return queue; } @@ -101,7 +102,7 @@ public class QueueFactoryImpl implements QueueFactory { Queue queue; if (addressSettings.isLastValueQueue()) { - queue = new LastValueQueue(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, null, null, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor()); + queue = new LastValueQueue(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, ActiveMQDefaultConfiguration.getDefaultRoutingType(), ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), ActiveMQDefaultConfiguration.getDefaultDeleteQueueOnNoConsumers(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor()); } else { queue = new QueueImpl(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor()); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7a51491c/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index a2be58b..4a06ce1 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -65,6 +65,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.Consumer; +import org.apache.activemq.artemis.core.server.RoutingType; import org.apache.activemq.artemis.core.server.HandleStatus; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.Queue; @@ -247,6 +248,8 @@ public class QueueImpl implements Queue { private final AtomicInteger noConsumers = new AtomicInteger(0); + private RoutingType routingType; + /** * This is to avoid multi-thread races on calculating direct delivery, * to guarantee ordering will be always be correct @@ -343,7 +346,7 @@ public class QueueImpl implements Queue { final StorageManager storageManager, final HierarchicalRepository<AddressSettings> addressSettingsRepository, final Executor executor) { - this(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, null, null, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor); + this(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, RoutingType.MULTICAST, null, null, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor); } public QueueImpl(final long id, @@ -355,6 +358,7 @@ public class QueueImpl implements Queue { final boolean durable, final boolean temporary, final boolean autoCreated, + final RoutingType routingType, final Integer maxConsumers, final Boolean deleteOnNoConsumers, final ScheduledExecutorService scheduledExecutor, @@ -369,6 +373,8 @@ public class QueueImpl implements Queue { this.addressInfo = postOffice == null ? null : postOffice.getAddressInfo(address); + this.routingType = routingType; + this.name = name; this.filter = filter; @@ -381,9 +387,9 @@ public class QueueImpl implements Queue { this.autoCreated = autoCreated; - this.maxConsumers = maxConsumers == null ? (addressInfo == null ? ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers() : addressInfo.getDefaultMaxQueueConsumers()) : maxConsumers; + this.maxConsumers = maxConsumers == null ? ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers() : maxConsumers; - this.deleteOnNoConsumers = deleteOnNoConsumers == null ? (addressInfo == null ? ActiveMQDefaultConfiguration.getDefaultDeleteQueueOnNoConsumers() : addressInfo.isDefaultDeleteOnNoConsumers()) : deleteOnNoConsumers; + this.deleteOnNoConsumers = deleteOnNoConsumers == null ? ActiveMQDefaultConfiguration.getDefaultDeleteQueueOnNoConsumers() : deleteOnNoConsumers; this.postOffice = postOffice; @@ -502,6 +508,18 @@ public class QueueImpl implements Queue { } @Override + public RoutingType getRoutingType() { + return routingType; + } + + @Override + public void setRoutingType(RoutingType routingType) { + if (addressInfo.getRoutingTypes().contains(routingType)) { + this.routingType = routingType; + } + } + + @Override public Filter getFilter() { return filter; } @@ -755,7 +773,7 @@ public class QueueImpl implements Queue { synchronized (this) { - if (maxConsumers != -1 && noConsumers.get() >= maxConsumers) { + if (maxConsumers != MAX_CONSUMERS_UNLIMITED && noConsumers.get() >= maxConsumers) { throw ActiveMQMessageBundle.BUNDLE.maxConsumerLimitReachedForQueue(address, name); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7a51491c/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java index 70177f4..79600b9 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java @@ -31,6 +31,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; +import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException; import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException; @@ -65,6 +66,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.BindingQueryResult; +import org.apache.activemq.artemis.core.server.RoutingType; import org.apache.activemq.artemis.core.server.LargeServerMessage; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.Queue; @@ -489,18 +491,44 @@ public class ServerSessionImpl implements ServerSession, FailureListener { final SimpleString filterString, final boolean temporary, final boolean durable) throws Exception { - return createQueue(address, name, filterString, temporary, durable, null, null, false); + return createQueue(address, + name, + ActiveMQDefaultConfiguration.getDefaultRoutingType(), + filterString, + temporary, + durable, + ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), + ActiveMQDefaultConfiguration.getDefaultDeleteQueueOnNoConsumers(), + false); } @Override public Queue createQueue(final SimpleString address, final SimpleString name, + final RoutingType routingType, + final SimpleString filterString, + final boolean temporary, + final boolean durable) throws Exception { + return createQueue(address, + name, routingType, + filterString, + temporary, + durable, + ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), + ActiveMQDefaultConfiguration.getDefaultDeleteQueueOnNoConsumers(), + false); + } + + @Override + public Queue createQueue(final SimpleString address, + final SimpleString name, + final RoutingType routingType, final SimpleString filterString, final boolean temporary, final boolean durable, - final Integer maxConsumers, - final Boolean deleteOnNoConsumers, - final Boolean autoCreated) throws Exception { + final int maxConsumers, + final boolean deleteOnNoConsumers, + final boolean autoCreated) throws Exception { if (durable) { // make sure the user has privileges to create this queue securityCheck(address, CheckType.CREATE_DURABLE_QUEUE, this); @@ -510,7 +538,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener { server.checkQueueCreationLimit(getUsername()); - Queue queue = server.createQueue(address, name, filterString, SimpleString.toSimpleString(getUsername()), durable, temporary, autoCreated, maxConsumers, deleteOnNoConsumers, true); + Queue queue = server.createQueue(address, routingType, name, filterString, SimpleString.toSimpleString(getUsername()), durable, temporary, autoCreated, maxConsumers, deleteOnNoConsumers, true); if (temporary) { // Temporary queue in core simply means the queue will be deleted if @@ -541,25 +569,54 @@ public class ServerSessionImpl implements ServerSession, FailureListener { } @Override - public AddressInfo createAddress(final SimpleString address, final boolean multicast, final boolean autoCreated) throws Exception { - securityCheck(address, CheckType.CREATE_ADDRESS, this); - AddressInfo.RoutingType routingType = multicast ? AddressInfo.RoutingType.MULTICAST : AddressInfo.RoutingType.ANYCAST; + public Queue createQueue(SimpleString address, + SimpleString name, + RoutingType routingType, + SimpleString filterString, + boolean temporary, + boolean durable, + boolean autoCreated) throws Exception { + return createQueue(address, + name, routingType, + filterString, + temporary, + durable, + ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), + ActiveMQDefaultConfiguration.getDefaultDeleteQueueOnNoConsumers(), + autoCreated); + } - AddressInfo addressInfo = server.createOrUpdateAddressInfo(new AddressInfo(address).setRoutingType(routingType).setAutoCreated(autoCreated)); + @Override + public AddressInfo createAddress(final SimpleString address, Set<RoutingType> routingTypes, final boolean autoCreated) throws Exception { + securityCheck(address, CheckType.CREATE_ADDRESS, this); + return server.createOrUpdateAddressInfo(new AddressInfo(address, routingTypes).setAutoCreated(autoCreated)); + } - return addressInfo; + @Override + public AddressInfo createAddress(final SimpleString address, RoutingType routingType, final boolean autoCreated) throws Exception { + securityCheck(address, CheckType.CREATE_ADDRESS, this); + return server.createOrUpdateAddressInfo(new AddressInfo(address, routingType).setAutoCreated(autoCreated)); } @Override public void createSharedQueue(final SimpleString address, final SimpleString name, + final RoutingType routingType, boolean durable, final SimpleString filterString) throws Exception { securityCheck(address, CheckType.CREATE_NON_DURABLE_QUEUE, this); server.checkQueueCreationLimit(getUsername()); - server.createSharedQueue(address, name, filterString, SimpleString.toSimpleString(getUsername()), durable); + server.createSharedQueue(address, routingType, name, filterString, SimpleString.toSimpleString(getUsername()), durable); + } + + @Override + public void createSharedQueue(final SimpleString address, + final SimpleString name, + boolean durable, + final SimpleString filterString) throws Exception { + createSharedQueue(address, name, null, durable, filterString); } @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7a51491c/artemis-server/src/main/resources/schema/artemis-configuration.xsd ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd index 5e5f0b9..0ed027b 100644 --- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd +++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd @@ -2577,12 +2577,6 @@ <!-- 2.0 Addressing configuration --> - <xsd:simpleType name="routingType"> - <xsd:restriction base="xsd:string"> - <xsd:enumeration value="multicast" /> - <xsd:enumeration value="anycast" /> - </xsd:restriction> - </xsd:simpleType> <xsd:complexType name="queueType"> <xsd:all> @@ -2596,7 +2590,19 @@ <xsd:complexType name="addressType"> <xsd:all> - <xsd:element name="queues" maxOccurs="1" minOccurs="0"> + <xsd:element name="anycast" maxOccurs="1" minOccurs="0"> + <xsd:annotation> + <xsd:documentation> + a list of pre configured queues to create + </xsd:documentation> + </xsd:annotation> + <xsd:complexType> + <xsd:sequence> + <xsd:element name="queue" type="queueType" maxOccurs="unbounded" minOccurs="0" /> + </xsd:sequence> + </xsd:complexType> + </xsd:element> + <xsd:element name="multicast" maxOccurs="1" minOccurs="0"> <xsd:annotation> <xsd:documentation> a list of pre configured queues to create @@ -2616,31 +2622,6 @@ </xsd:documentation> </xsd:annotation> </xsd:attribute> - <xsd:attribute name="type" type="routingType" use="required"> - <xsd:annotation> - <xsd:documentation> - The address name to matches incoming message addresses - </xsd:documentation> - </xsd:annotation> - </xsd:attribute> - <xsd:attribute name="default-max-consumers" type="xsd:int" use="optional" default="-1"> - <xsd:annotation> - <xsd:documentation> - The default value of max-consumers applied to all queues that are - auto-created under this address. Also applies to any queues that do not - specify a value for max-consumers. - </xsd:documentation> - </xsd:annotation> - </xsd:attribute> - <xsd:attribute name="default-delete-on-no-consumers" type="xsd:boolean" use="optional" default="false"> - <xsd:annotation> - <xsd:documentation> - The default value of delete-on-no-consumers applied to all queues that are - auto-created under this address. Also applies to any queues that do not - specify a value for delete-on-no-consumers. - </xsd:documentation> - </xsd:annotation> - </xsd:attribute> </xsd:complexType> <xsd:complexType name="addressesType"> http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7a51491c/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java index a1adb4c..533e65d 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java @@ -23,10 +23,12 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardCopyOption; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.core.BroadcastGroupConfiguration; import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration; import org.apache.activemq.artemis.api.core.SimpleString; @@ -43,7 +45,9 @@ import org.apache.activemq.artemis.core.config.HAPolicyConfiguration; import org.apache.activemq.artemis.core.config.ha.LiveOnlyPolicyConfiguration; import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; import org.apache.activemq.artemis.core.security.Role; +import org.apache.activemq.artemis.core.server.RoutingType; import org.apache.activemq.artemis.core.server.JournalType; +import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.SecuritySettingPlugin; import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; import org.apache.activemq.artemis.core.server.impl.LegacyLDAPSecuritySettingPlugin; @@ -51,9 +55,6 @@ import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy; import org.junit.Assert; import org.junit.Test; -import static org.apache.activemq.artemis.core.server.impl.AddressInfo.RoutingType.ANYCAST; -import static org.apache.activemq.artemis.core.server.impl.AddressInfo.RoutingType.MULTICAST; - public class FileConfigurationTest extends ConfigurationImplTest { private final String fullConfigurationName = "ConfigurationTest-full-config.xml"; @@ -373,12 +374,14 @@ public class FileConfigurationTest extends ConfigurationImplTest { } private void verifyAddresses() { - assertEquals(2, conf.getAddressConfigurations().size()); + assertEquals(3, conf.getAddressConfigurations().size()); // Addr 1 CoreAddressConfiguration addressConfiguration = conf.getAddressConfigurations().get(0); assertEquals("addr1", addressConfiguration.getName()); - assertEquals(ANYCAST, addressConfiguration.getRoutingType()); + Set<RoutingType> routingTypes = new HashSet<>(); + routingTypes.add(RoutingType.ANYCAST); + assertEquals(routingTypes, addressConfiguration.getRoutingTypes()); assertEquals(2, addressConfiguration.getQueueConfigurations().size()); // Addr 1 Queue 1 @@ -387,9 +390,9 @@ public class FileConfigurationTest extends ConfigurationImplTest { assertEquals("q1", queueConfiguration.getName()); assertFalse(queueConfiguration.isDurable()); assertEquals("color='blue'", queueConfiguration.getFilterString()); - assertEquals(addressConfiguration.getDefaultDeleteOnNoConsumers(), queueConfiguration.getDeleteOnNoConsumers()); + assertEquals(ActiveMQDefaultConfiguration.getDefaultDeleteQueueOnNoConsumers(), queueConfiguration.getDeleteOnNoConsumers()); assertEquals("addr1", queueConfiguration.getAddress()); - assertEquals(addressConfiguration.getDefaultMaxConsumers(), queueConfiguration.getMaxConsumers()); + assertEquals(ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), queueConfiguration.getMaxConsumers()); // Addr 1 Queue 2 queueConfiguration = addressConfiguration.getQueueConfigurations().get(1); @@ -397,14 +400,16 @@ public class FileConfigurationTest extends ConfigurationImplTest { assertEquals("q2", queueConfiguration.getName()); assertTrue(queueConfiguration.isDurable()); assertEquals("color='green'", queueConfiguration.getFilterString()); - assertEquals(new Integer(-1), queueConfiguration.getMaxConsumers()); + assertEquals(Queue.MAX_CONSUMERS_UNLIMITED, queueConfiguration.getMaxConsumers()); assertFalse(queueConfiguration.getDeleteOnNoConsumers()); assertEquals("addr1", queueConfiguration.getAddress()); // Addr 2 addressConfiguration = conf.getAddressConfigurations().get(1); assertEquals("addr2", addressConfiguration.getName()); - assertEquals(MULTICAST, addressConfiguration.getRoutingType()); + routingTypes = new HashSet<>(); + routingTypes.add(RoutingType.MULTICAST); + assertEquals(routingTypes, addressConfiguration.getRoutingTypes()); assertEquals(2, addressConfiguration.getQueueConfigurations().size()); // Addr 2 Queue 1 @@ -413,8 +418,8 @@ public class FileConfigurationTest extends ConfigurationImplTest { assertEquals("q3", queueConfiguration.getName()); assertTrue(queueConfiguration.isDurable()); assertEquals("color='red'", queueConfiguration.getFilterString()); - assertEquals(new Integer(10), queueConfiguration.getMaxConsumers()); - assertEquals(addressConfiguration.getDefaultDeleteOnNoConsumers(), queueConfiguration.getDeleteOnNoConsumers()); + assertEquals(10, queueConfiguration.getMaxConsumers()); + assertEquals(ActiveMQDefaultConfiguration.getDefaultDeleteQueueOnNoConsumers(), queueConfiguration.getDeleteOnNoConsumers()); assertEquals("addr2", queueConfiguration.getAddress()); // Addr 2 Queue 2 @@ -423,9 +428,17 @@ public class FileConfigurationTest extends ConfigurationImplTest { assertEquals("q4", queueConfiguration.getName()); assertTrue(queueConfiguration.isDurable()); assertNull(queueConfiguration.getFilterString()); - assertEquals(addressConfiguration.getDefaultMaxConsumers(), queueConfiguration.getMaxConsumers()); + assertEquals(ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), queueConfiguration.getMaxConsumers()); assertTrue(queueConfiguration.getDeleteOnNoConsumers()); assertEquals("addr2", queueConfiguration.getAddress()); + + // Addr 3 + addressConfiguration = conf.getAddressConfigurations().get(2); + assertEquals("addr2", addressConfiguration.getName()); + routingTypes = new HashSet<>(); + routingTypes.add(RoutingType.MULTICAST); + routingTypes.add(RoutingType.ANYCAST); + assertEquals(routingTypes, addressConfiguration.getRoutingTypes()); } @Test http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7a51491c/artemis-server/src/test/java/org/apache/activemq/artemis/core/message/impl/MessagePropertyTest.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/message/impl/MessagePropertyTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/message/impl/MessagePropertyTest.java index 8feed19..1638faa 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/message/impl/MessagePropertyTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/message/impl/MessagePropertyTest.java @@ -24,6 +24,7 @@ import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; import org.apache.activemq.artemis.api.core.client.ServerLocator; import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.RoutingType; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.junit.Before; import org.junit.Test; @@ -50,7 +51,10 @@ public class MessagePropertyTest extends ActiveMQTestBase { private void sendMessages() throws Exception { ClientSession session = sf.createSession(true, true); - session.createQueue(ADDRESS, ADDRESS, null, true); + + String filter = null; + session.createAddress(SimpleString.toSimpleString(ADDRESS), RoutingType.MULTICAST, false); + session.createQueue(ADDRESS, RoutingType.MULTICAST, ADDRESS, filter, true); ClientProducer producer = session.createProducer(ADDRESS); for (int i = 0; i < numMessages; i++) {
