Support new attributes on queue queries, etc.
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/be9483a7 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/be9483a7 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/be9483a7 Branch: refs/heads/ARTEMIS-780 Commit: be9483a741f89cf2eaf64c4b8edac90d3b61c9fc Parents: 24dd31a Author: jbertram <[email protected]> Authored: Tue Nov 29 22:14:15 2016 -0600 Committer: jbertram <[email protected]> Committed: Tue Nov 29 22:32:07 2016 -0600 ---------------------------------------------------------------------- .../artemis/api/core/client/ClientSession.java | 10 +- .../core/client/impl/QueueQueryImpl.java | 59 ++++++++- .../core/impl/ActiveMQSessionContext.java | 6 +- .../core/protocol/core/impl/ChannelImpl.java | 2 + .../core/protocol/core/impl/PacketDecoder.java | 6 + .../core/protocol/core/impl/PacketImpl.java | 2 + .../SessionQueueQueryResponseMessage.java | 12 +- .../SessionQueueQueryResponseMessage_V2.java | 41 ++++--- .../SessionQueueQueryResponseMessage_V3.java | 122 ++++++++++++++----- .../artemis/core/server/QueueQueryResult.java | 56 ++++++--- .../artemis/jms/client/ActiveMQQueue.java | 5 + .../artemis/jms/client/ActiveMQSession.java | 2 +- .../jms/client/ActiveMQTemporaryQueue.java | 5 + .../jms/client/ActiveMQTemporaryTopic.java | 5 + .../artemis/jms/client/ActiveMQTopic.java | 5 + .../amqp/broker/AMQPSessionCallback.java | 4 +- .../core/ServerSessionPacketHandler.java | 9 +- .../core/server/impl/ActiveMQServerImpl.java | 12 +- .../tests/integration/client/SessionTest.java | 2 +- .../jms/cluster/JMSReconnectTest.java | 3 +- 20 files changed, 279 insertions(+), 89 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/be9483a7/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java index a414f95..c8d483c 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java @@ -100,7 +100,7 @@ public interface ClientSession extends XAResource, AutoCloseable { * Returns <code>true</code> if auto-creation for this queue is enabled and if the queue queried is a JMS queue, * <code>false</code> else. */ - boolean isAutoCreateJmsQueues(); + boolean isAutoCreateQueues(); /** * Returns the number of consumers attached to the queue. @@ -128,6 +128,14 @@ public interface ClientSession extends XAResource, AutoCloseable { * @return */ SimpleString getName(); + + RoutingType getRoutingType(); + + int getMaxConsumers(); + + boolean isDeleteOnNoConsumers(); + + boolean isAutoCreated(); } // Lifecycle operations ------------------------------------------ http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/be9483a7/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/QueueQueryImpl.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/QueueQueryImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/QueueQueryImpl.java index 40ea86a..5afdd8d 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/QueueQueryImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/QueueQueryImpl.java @@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.client.impl; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.ClientSession; +import org.apache.activemq.artemis.core.server.RoutingType; public class QueueQueryImpl implements ClientSession.QueueQuery { @@ -37,7 +38,15 @@ public class QueueQueryImpl implements ClientSession.QueueQuery { private final SimpleString name; - private final boolean autoCreateJmsQueues; + private final boolean autoCreateQueues; + + private final boolean autoCreated; + + private final RoutingType routingType; + + private final boolean deleteOnNoConsumers; + + private final int maxConsumers; public QueueQueryImpl(final boolean durable, final boolean temporary, @@ -58,7 +67,23 @@ public class QueueQueryImpl implements ClientSession.QueueQuery { final SimpleString address, final SimpleString name, final boolean exists, - final boolean autoCreateJmsQueues) { + final boolean autoCreateQueues) { + this(durable, temporary, consumerCount, messageCount, filterString, address, name, exists, autoCreateQueues, -1, false, false, RoutingType.MULTICAST); + } + + public QueueQueryImpl(final boolean durable, + final boolean temporary, + final int consumerCount, + final long messageCount, + final SimpleString filterString, + final SimpleString address, + final SimpleString name, + final boolean exists, + final boolean autoCreateQueues, + final int maxConsumers, + final boolean autoCreated, + final boolean deleteOnNoConsumers, + final RoutingType routingType) { this.durable = durable; this.temporary = temporary; this.consumerCount = consumerCount; @@ -67,7 +92,11 @@ public class QueueQueryImpl implements ClientSession.QueueQuery { this.address = address; this.name = name; this.exists = exists; - this.autoCreateJmsQueues = autoCreateJmsQueues; + this.autoCreateQueues = autoCreateQueues; + this.maxConsumers = maxConsumers; + this.autoCreated = autoCreated; + this.deleteOnNoConsumers = deleteOnNoConsumers; + this.routingType = routingType; } @Override @@ -101,8 +130,8 @@ public class QueueQueryImpl implements ClientSession.QueueQuery { } @Override - public boolean isAutoCreateJmsQueues() { - return autoCreateJmsQueues; + public boolean isAutoCreateQueues() { + return autoCreateQueues; } @Override @@ -115,5 +144,25 @@ public class QueueQueryImpl implements ClientSession.QueueQuery { return exists; } + @Override + public RoutingType getRoutingType() { + return routingType; + } + + @Override + public int getMaxConsumers() { + return maxConsumers; + } + + @Override + public boolean isDeleteOnNoConsumers() { + return deleteOnNoConsumers; + } + + @Override + public boolean isAutoCreated() { + return autoCreated; + } + } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/be9483a7/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java index ed08142..1a52e9b 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java @@ -80,6 +80,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionPro import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionProducerCreditsMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQueueQueryMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage_V2; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage_V3; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveLargeMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveMessage; @@ -290,7 +291,7 @@ public class ActiveMQSessionContext extends SessionContext { SessionCreateConsumerMessage request = new SessionCreateConsumerMessage(consumerID, queueName, filterString, browseOnly, true); - SessionQueueQueryResponseMessage_V2 queueInfo = (SessionQueueQueryResponseMessage_V2) sessionChannel.sendBlocking(request, PacketImpl.SESS_QUEUEQUERY_RESP_V2); + SessionQueueQueryResponseMessage_V3 queueInfo = (SessionQueueQueryResponseMessage_V3) sessionChannel.sendBlocking(request, PacketImpl.SESS_QUEUEQUERY_RESP_V3); // The actual windows size that gets used is determined by the user since // could be overridden on the queue settings @@ -710,8 +711,7 @@ public class ActiveMQSessionContext extends SessionContext { // they are defined in broker.xml // This allows e.g. JMS non durable subs and temporary queues to continue to be used after failover if (!queueInfo.isDurable()) { - // TODO (mtaylor) QueueInfo needs updating to include new parameters, this method should pass in del mode - CreateQueueMessage createQueueRequest = new CreateQueueMessage(queueInfo.getAddress(), queueInfo.getName(), queueInfo.getFilterString(), false, queueInfo.isTemporary(), false); + CreateQueueMessage_V2 createQueueRequest = new CreateQueueMessage_V2(queueInfo.getAddress(), queueInfo.getName(), queueInfo.getRoutingType(), queueInfo.getFilterString(), false, queueInfo.isTemporary(), queueInfo.getMaxConsumers(), queueInfo.isDeleteOnNoConsumers(), queueInfo.isAutoCreated(), false); sendPacketWithoutLock(sessionChannel, createQueueRequest); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/be9483a7/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java index 41be080..d1b17bf 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java @@ -172,6 +172,8 @@ public final class ChannelImpl implements Channel { return version >= 126; case PacketImpl.SESS_BINDINGQUERY_RESP_V3: return version >= 127; + case PacketImpl.SESS_QUEUEQUERY_RESP_V3: + return version >= 129; default: return true; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/be9483a7/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketDecoder.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketDecoder.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketDecoder.java index 15629c8..89a6c9a 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketDecoder.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketDecoder.java @@ -64,6 +64,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionPro import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQueueQueryMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage_V2; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage_V3; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionRequestProducerCreditsMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendContinuationMessage; @@ -127,6 +128,7 @@ import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SES import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_QUEUEQUERY; import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_QUEUEQUERY_RESP; import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_QUEUEQUERY_RESP_V2; +import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_QUEUEQUERY_RESP_V3; import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_RECEIVE_CONTINUATION; import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_ROLLBACK; import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_SEND_CONTINUATION; @@ -241,6 +243,10 @@ public abstract class PacketDecoder implements Serializable { packet = new SessionQueueQueryResponseMessage_V2(); break; } + case SESS_QUEUEQUERY_RESP_V3: { + packet = new SessionQueueQueryResponseMessage_V3(); + break; + } case CREATE_ADDRESS: { packet = new CreateAddressMessage(); break; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/be9483a7/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java index a65bdfc..5bdf727 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java @@ -255,6 +255,8 @@ public class PacketImpl implements Packet { public static final byte CREATE_SHARED_QUEUE_V2 = -13; + public static final byte SESS_QUEUEQUERY_RESP_V3 = -14; + // Static -------------------------------------------------------- public PacketImpl(final byte type) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/be9483a7/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage.java index b8313b2..7d9c184 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage.java @@ -49,8 +49,8 @@ public class SessionQueueQueryResponseMessage extends PacketImpl { this(null, null, false, false, null, 0, 0, false); } - public SessionQueueQueryResponseMessage(byte v2) { - super(v2); + public SessionQueueQueryResponseMessage(byte v) { + super(v); } private SessionQueueQueryResponseMessage(final SimpleString name, @@ -159,6 +159,13 @@ public class SessionQueueQueryResponseMessage extends PacketImpl { @Override public String toString() { StringBuffer buff = new StringBuffer(getParentString()); + buff.append("]"); + return buff.toString(); + } + + @Override + public String getParentString() { + StringBuffer buff = new StringBuffer(super.getParentString()); buff.append(", address=" + address); buff.append(", name=" + name); buff.append(", consumerCount=" + consumerCount); @@ -167,7 +174,6 @@ public class SessionQueueQueryResponseMessage extends PacketImpl { buff.append(", exists=" + exists); buff.append(", temporary=" + temporary); buff.append(", messageCount=" + messageCount); - buff.append("]"); return buff.toString(); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/be9483a7/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage_V2.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage_V2.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage_V2.java index 77ad0f3..667ce6e 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage_V2.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage_V2.java @@ -24,10 +24,10 @@ import org.apache.activemq.artemis.core.server.QueueQueryResult; public class SessionQueueQueryResponseMessage_V2 extends SessionQueueQueryResponseMessage { - private boolean autoCreationEnabled; + protected boolean autoCreateQueues; public SessionQueueQueryResponseMessage_V2(final QueueQueryResult result) { - this(result.getName(), result.getAddress(), result.isDurable(), result.isTemporary(), result.getFilterString(), result.getConsumerCount(), result.getMessageCount(), result.isExists(), result.isAutoCreateJmsQueues()); + this(result.getName(), result.getAddress(), result.isDurable(), result.isTemporary(), result.getFilterString(), result.getConsumerCount(), result.getMessageCount(), result.isExists(), result.isAutoCreateQueues()); } public SessionQueueQueryResponseMessage_V2() { @@ -42,7 +42,7 @@ public class SessionQueueQueryResponseMessage_V2 extends SessionQueueQueryRespon final int consumerCount, final long messageCount, final boolean exists, - final boolean autoCreationEnabled) { + final boolean autoCreateQueues) { super(SESS_QUEUEQUERY_RESP_V2); this.durable = durable; @@ -61,52 +61,53 @@ public class SessionQueueQueryResponseMessage_V2 extends SessionQueueQueryRespon this.exists = exists; - this.autoCreationEnabled = autoCreationEnabled; + this.autoCreateQueues = autoCreateQueues; + } + public SessionQueueQueryResponseMessage_V2(byte v) { + super(v); } - public boolean isAutoCreationEnabled() { - return autoCreationEnabled; + public boolean isAutoCreateQueues() { + return autoCreateQueues; } @Override public void encodeRest(final ActiveMQBuffer buffer) { super.encodeRest(buffer); - buffer.writeBoolean(autoCreationEnabled); + buffer.writeBoolean(autoCreateQueues); } @Override public void decodeRest(final ActiveMQBuffer buffer) { super.decodeRest(buffer); - autoCreationEnabled = buffer.readBoolean(); + autoCreateQueues = buffer.readBoolean(); } @Override public int hashCode() { final int prime = 31; int result = super.hashCode(); - result = prime * result + (autoCreationEnabled ? 1231 : 1237); + result = prime * result + (autoCreateQueues ? 1231 : 1237); return result; } @Override public String toString() { StringBuffer buff = new StringBuffer(getParentString()); - buff.append(", address=" + address); - buff.append(", name=" + name); - buff.append(", consumerCount=" + consumerCount); - buff.append(", filterString=" + filterString); - buff.append(", durable=" + durable); - buff.append(", exists=" + exists); - buff.append(", temporary=" + temporary); - buff.append(", messageCount=" + messageCount); - buff.append(", autoCreationEnabled=" + autoCreationEnabled); buff.append("]"); return buff.toString(); } @Override + public String getParentString() { + StringBuffer buff = new StringBuffer(super.getParentString()); + buff.append(", autoCreationEnabled=" + autoCreateQueues); + return buff.toString(); + } + + @Override public ClientSession.QueueQuery toQueueQuery() { - return new QueueQueryImpl(isDurable(), isTemporary(), getConsumerCount(), getMessageCount(), getFilterString(), getAddress(), getName(), isExists(), isAutoCreationEnabled()); + return new QueueQueryImpl(isDurable(), isTemporary(), getConsumerCount(), getMessageCount(), getFilterString(), getAddress(), getName(), isExists(), isAutoCreateQueues()); } @Override @@ -118,7 +119,7 @@ public class SessionQueueQueryResponseMessage_V2 extends SessionQueueQueryRespon if (!(obj instanceof SessionQueueQueryResponseMessage_V2)) return false; SessionQueueQueryResponseMessage_V2 other = (SessionQueueQueryResponseMessage_V2) obj; - if (autoCreationEnabled != other.autoCreationEnabled) + if (autoCreateQueues != other.autoCreateQueues) return false; return true; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/be9483a7/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage_V3.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage_V3.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage_V3.java index 77ad0f3..b3664da 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage_V3.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage_V3.java @@ -21,20 +21,27 @@ import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.core.client.impl.QueueQueryImpl; import org.apache.activemq.artemis.core.server.QueueQueryResult; +import org.apache.activemq.artemis.core.server.RoutingType; -public class SessionQueueQueryResponseMessage_V2 extends SessionQueueQueryResponseMessage { +public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryResponseMessage_V2 { - private boolean autoCreationEnabled; + protected boolean autoCreated; - public SessionQueueQueryResponseMessage_V2(final QueueQueryResult result) { - this(result.getName(), result.getAddress(), result.isDurable(), result.isTemporary(), result.getFilterString(), result.getConsumerCount(), result.getMessageCount(), result.isExists(), result.isAutoCreateJmsQueues()); + protected boolean deleteOnNoConsumers; + + protected RoutingType routingType; + + protected int maxConsumers; + + public SessionQueueQueryResponseMessage_V3(final QueueQueryResult result) { + this(result.getName(), result.getAddress(), result.isDurable(), result.isTemporary(), result.getFilterString(), result.getConsumerCount(), result.getMessageCount(), result.isExists(), result.isAutoCreateQueues(), result.isAutoCreated(), result.isDeleteOnNoConsumers(), result.getRoutingType(), result.getMaxConsumers()); } - public SessionQueueQueryResponseMessage_V2() { - this(null, null, false, false, null, 0, 0, false, false); + public SessionQueueQueryResponseMessage_V3() { + this(null, null, false, false, null, 0, 0, false, false, false, false, RoutingType.MULTICAST, -1); } - private SessionQueueQueryResponseMessage_V2(final SimpleString name, + private SessionQueueQueryResponseMessage_V3(final SimpleString name, final SimpleString address, final boolean durable, final boolean temporary, @@ -42,8 +49,12 @@ public class SessionQueueQueryResponseMessage_V2 extends SessionQueueQueryRespon final int consumerCount, final long messageCount, final boolean exists, - final boolean autoCreationEnabled) { - super(SESS_QUEUEQUERY_RESP_V2); + final boolean autoCreateQueues, + final boolean autoCreated, + final boolean deleteOnNoConsumers, + final RoutingType routingType, + final int maxConsumers) { + super(SESS_QUEUEQUERY_RESP_V3); this.durable = durable; @@ -61,52 +72,98 @@ public class SessionQueueQueryResponseMessage_V2 extends SessionQueueQueryRespon this.exists = exists; - this.autoCreationEnabled = autoCreationEnabled; + this.autoCreateQueues = autoCreateQueues; + + this.autoCreated = autoCreated; + + this.deleteOnNoConsumers = deleteOnNoConsumers; + + this.routingType = routingType; + + this.maxConsumers = maxConsumers; + } + + public boolean isAutoCreated() { + return autoCreated; + } + + public void setAutoCreated(boolean autoCreated) { + this.autoCreated = autoCreated; + } + + public boolean isDeleteOnNoConsumers() { + return deleteOnNoConsumers; + } + + public void setDeleteOnNoConsumers(boolean deleteOnNoConsumers) { + this.deleteOnNoConsumers = deleteOnNoConsumers; + } + + public RoutingType getRoutingType() { + return routingType; + } + + public void setRoutingType(RoutingType routingType) { + this.routingType = routingType; + } + + public int getMaxConsumers() { + return maxConsumers; } - public boolean isAutoCreationEnabled() { - return autoCreationEnabled; + public void setMaxConsumers(int maxConsumers) { + this.maxConsumers = maxConsumers; } @Override public void encodeRest(final ActiveMQBuffer buffer) { super.encodeRest(buffer); - buffer.writeBoolean(autoCreationEnabled); + buffer.writeBoolean(autoCreated); + buffer.writeBoolean(deleteOnNoConsumers); + buffer.writeByte(routingType.getType()); + buffer.writeInt(maxConsumers); } @Override public void decodeRest(final ActiveMQBuffer buffer) { super.decodeRest(buffer); - autoCreationEnabled = buffer.readBoolean(); + autoCreated = buffer.readBoolean(); + deleteOnNoConsumers = buffer.readBoolean(); + routingType = RoutingType.getType(buffer.readByte()); + maxConsumers = buffer.readInt(); } @Override public int hashCode() { final int prime = 31; int result = super.hashCode(); - result = prime * result + (autoCreationEnabled ? 1231 : 1237); + result = prime * result + (autoCreated ? 1231 : 1237); + result = prime * result + (deleteOnNoConsumers ? 1231 : 1237); + result = prime * result + routingType.hashCode(); + result = prime * result + maxConsumers; return result; } @Override public String toString() { StringBuffer buff = new StringBuffer(getParentString()); - buff.append(", address=" + address); - buff.append(", name=" + name); - buff.append(", consumerCount=" + consumerCount); - buff.append(", filterString=" + filterString); - buff.append(", durable=" + durable); - buff.append(", exists=" + exists); - buff.append(", temporary=" + temporary); - buff.append(", messageCount=" + messageCount); - buff.append(", autoCreationEnabled=" + autoCreationEnabled); buff.append("]"); return buff.toString(); } @Override + public String getParentString() { + StringBuffer buff = new StringBuffer(super.getParentString()); + buff.append(", autoCreated=" + autoCreated); + buff.append(", deleteOnNoConsumers=" + deleteOnNoConsumers); + buff.append(", routingType=" + routingType); + buff.append(", maxConsumers=" + maxConsumers); + return buff.toString(); + } + + @Override public ClientSession.QueueQuery toQueueQuery() { - return new QueueQueryImpl(isDurable(), isTemporary(), getConsumerCount(), getMessageCount(), getFilterString(), getAddress(), getName(), isExists(), isAutoCreationEnabled()); + return new QueueQueryImpl(isDurable(), isTemporary(), getConsumerCount(), getMessageCount(), getFilterString(), getAddress(), getName(), isExists(), isAutoCreateQueues(), getMaxConsumers(), isAutoCreated(), isDeleteOnNoConsumers(), getRoutingType()); } @Override @@ -115,10 +172,19 @@ public class SessionQueueQueryResponseMessage_V2 extends SessionQueueQueryRespon return true; if (!super.equals(obj)) return false; - if (!(obj instanceof SessionQueueQueryResponseMessage_V2)) + if (!(obj instanceof SessionQueueQueryResponseMessage_V3)) + return false; + SessionQueueQueryResponseMessage_V3 other = (SessionQueueQueryResponseMessage_V3) obj; + if (autoCreated != other.autoCreated) + return false; + if (deleteOnNoConsumers != other.deleteOnNoConsumers) + return false; + if (routingType == null) { + if (other.routingType != null) + return false; + } else if (!routingType.equals(other.routingType)) return false; - SessionQueueQueryResponseMessage_V2 other = (SessionQueueQueryResponseMessage_V2) obj; - if (autoCreationEnabled != other.autoCreationEnabled) + if (maxConsumers != other.maxConsumers) return false; return true; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/be9483a7/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/QueueQueryResult.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/QueueQueryResult.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/QueueQueryResult.java index f9740de..de14888 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/QueueQueryResult.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/QueueQueryResult.java @@ -36,18 +36,15 @@ public class QueueQueryResult { private boolean temporary; - private boolean autoCreateJmsQueues; + private boolean autoCreateQueues; - public QueueQueryResult(final SimpleString name, - final SimpleString address, - final boolean durable, - final boolean temporary, - final SimpleString filterString, - final int consumerCount, - final long messageCount, - final boolean autoCreateJmsQueues) { - this(name, address, durable, temporary, filterString, consumerCount, messageCount, autoCreateJmsQueues, true); - } + private boolean autoCreated; + + private boolean deleteOnNoConsumers; + + private RoutingType routingType; + + private int maxConsumers; public QueueQueryResult(final SimpleString name, final SimpleString address, @@ -56,8 +53,12 @@ public class QueueQueryResult { final SimpleString filterString, final int consumerCount, final long messageCount, - final boolean autoCreateJmsQueues, - final boolean exists) { + final boolean autoCreateQueues, + final boolean exists, + final boolean autoCreated, + final boolean deleteOnNoConsumers, + final RoutingType routingType, + final int maxConsumers) { this.durable = durable; this.temporary = temporary; @@ -72,9 +73,17 @@ public class QueueQueryResult { this.name = name; - this.autoCreateJmsQueues = autoCreateJmsQueues; + this.autoCreateQueues = autoCreateQueues; this.exists = exists; + + this.autoCreated = autoCreated; + + this.deleteOnNoConsumers = deleteOnNoConsumers; + + this.routingType = routingType; + + this.maxConsumers = maxConsumers; } public boolean isExists() { @@ -109,8 +118,23 @@ public class QueueQueryResult { return temporary; } - public boolean isAutoCreateJmsQueues() { - return autoCreateJmsQueues; + public boolean isAutoCreateQueues() { + return autoCreateQueues; + } + + public boolean isAutoCreated() { + return autoCreated; } + public boolean isDeleteOnNoConsumers() { + return deleteOnNoConsumers; + } + + public RoutingType getRoutingType() { + return routingType; + } + + public int getMaxConsumers() { + return maxConsumers; + } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/be9483a7/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQQueue.java ---------------------------------------------------------------------- diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQQueue.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQQueue.java index 883a71d..a6d047a 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQQueue.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQQueue.java @@ -91,6 +91,11 @@ public class ActiveMQQueue extends ActiveMQDestination implements Queue { return super.getAddress().equals(that.getAddress()); } + @Override + public int hashCode() { + return super.getAddress().hashCode(); + } + // Package protected --------------------------------------------- // Protected ----------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/be9483a7/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java ---------------------------------------------------------------------- diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java index d0c9592..3e9b76f 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java @@ -1074,7 +1074,7 @@ public class ActiveMQSession implements QueueSession, TopicSession { QueueQuery response = session.queueQuery(queue.getSimpleAddress()); - if (!response.isExists() && !response.isAutoCreateJmsQueues()) { + if (!response.isExists() && !response.isAutoCreateQueues()) { return null; } else { return queue; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/be9483a7/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQTemporaryQueue.java ---------------------------------------------------------------------- diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQTemporaryQueue.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQTemporaryQueue.java index daae8ed..88a822a 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQTemporaryQueue.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQTemporaryQueue.java @@ -67,6 +67,11 @@ public class ActiveMQTemporaryQueue extends ActiveMQQueue implements TemporaryQu return super.getAddress().equals(that.getAddress()); } + @Override + public int hashCode() { + return super.getAddress().hashCode(); + } + // Package protected --------------------------------------------- // Protected ----------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/be9483a7/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQTemporaryTopic.java ---------------------------------------------------------------------- diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQTemporaryTopic.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQTemporaryTopic.java index 4cccb81..98b5ba6 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQTemporaryTopic.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQTemporaryTopic.java @@ -51,6 +51,11 @@ public class ActiveMQTemporaryTopic extends ActiveMQTopic implements TemporaryTo return super.getAddress().equals(that.getAddress()); } + @Override + public int hashCode() { + return super.getAddress().hashCode(); + } + // Package protected --------------------------------------------- // Protected ----------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/be9483a7/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQTopic.java ---------------------------------------------------------------------- diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQTopic.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQTopic.java index e251e6a..941b440 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQTopic.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQTopic.java @@ -86,6 +86,11 @@ public class ActiveMQTopic extends ActiveMQDestination implements Topic { return super.getAddress().equals(that.getAddress()); } + @Override + public int hashCode() { + return super.getAddress().hashCode(); + } + // Package protected --------------------------------------------- // Protected ----------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/be9483a7/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java index acbb2e9..6382cb2 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java @@ -207,13 +207,13 @@ public class AMQPSessionCallback implements SessionCallback { public QueueQueryResult queueQuery(String queueName, boolean autoCreate) throws Exception { QueueQueryResult queueQueryResult = serverSession.executeQueueQuery(SimpleString.toSimpleString(queueName)); - if (!queueQueryResult.isExists() && queueQueryResult.isAutoCreateJmsQueues() && autoCreate) { + if (!queueQueryResult.isExists() && queueQueryResult.isAutoCreateQueues() && autoCreate) { try { serverSession.createQueue(new SimpleString(queueName), new SimpleString(queueName), null, false, true); } catch (ActiveMQQueueExistsException e) { // The queue may have been created by another thread in the mean time. Catch and do nothing. } - queueQueryResult = new QueueQueryResult(queueQueryResult.getName(), queueQueryResult.getAddress(), queueQueryResult.isDurable(), queueQueryResult.isTemporary(), queueQueryResult.getFilterString(), queueQueryResult.getConsumerCount(), queueQueryResult.getMessageCount(), queueQueryResult.isAutoCreateJmsQueues(), true); + queueQueryResult = new QueueQueryResult(queueQueryResult.getName(), queueQueryResult.getAddress(), queueQueryResult.isDurable(), queueQueryResult.isTemporary(), queueQueryResult.getFilterString(), queueQueryResult.getConsumerCount(), queueQueryResult.getMessageCount(), queueQueryResult.isAutoCreateQueues(), true, queueQueryResult.isAutoCreated(), queueQueryResult.isDeleteOnNoConsumers(), queueQueryResult.getRoutingType(), queueQueryResult.getMaxConsumers()); } return queueQueryResult; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/be9483a7/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java index 65ffc69..d3cc617 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java @@ -54,6 +54,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionInd import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQueueQueryMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage_V2; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage_V3; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionRequestProducerCreditsMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendContinuationMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendLargeMessage; @@ -218,7 +219,9 @@ public class ServerSessionPacketHandler implements ChannelHandler { // We send back queue information on the queue as a response- this allows the queue to // be automatically recreated on failover QueueQueryResult queueQueryResult = session.executeQueueQuery(request.getQueueName()); - if (channel.supports(PacketImpl.SESS_QUEUEQUERY_RESP_V2)) { + if (channel.supports(PacketImpl.SESS_QUEUEQUERY_RESP_V3)) { + response = new SessionQueueQueryResponseMessage_V3(queueQueryResult); + } else if (channel.supports(PacketImpl.SESS_QUEUEQUERY_RESP_V2)) { response = new SessionQueueQueryResponseMessage_V2(queueQueryResult); } else { response = new SessionQueueQueryResponseMessage(queueQueryResult); @@ -284,7 +287,9 @@ public class ServerSessionPacketHandler implements ChannelHandler { requiresResponse = true; SessionQueueQueryMessage request = (SessionQueueQueryMessage) packet; QueueQueryResult result = session.executeQueueQuery(request.getQueueName()); - if (channel.supports(PacketImpl.SESS_QUEUEQUERY_RESP_V2)) { + if (channel.supports(PacketImpl.SESS_QUEUEQUERY_RESP_V3)) { + response = new SessionQueueQueryResponseMessage_V3(result); + } else if (channel.supports(PacketImpl.SESS_QUEUEQUERY_RESP_V2)) { response = new SessionQueueQueryResponseMessage_V2(result); } else { response = new SessionQueueQueryResponseMessage(result); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/be9483a7/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index d6b1be4..2147868 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -651,7 +651,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { throw ActiveMQMessageBundle.BUNDLE.queueNameIsNull(); } - boolean autoCreateJmsQueues = getAddressSettingsRepository().getMatch(name.toString()).isAutoCreateQueues(); + boolean autoCreateQueues = getAddressSettingsRepository().getMatch(name.toString()).isAutoCreateQueues(); QueueQueryResult response; @@ -666,14 +666,14 @@ public class ActiveMQServerImpl implements ActiveMQServer { SimpleString filterString = filter == null ? null : filter.getFilterString(); - response = new QueueQueryResult(name, binding.getAddress(), queue.isDurable(), queue.isTemporary(), filterString, queue.getConsumerCount(), queue.getMessageCount(), autoCreateJmsQueues); + response = new QueueQueryResult(name, binding.getAddress(), queue.isDurable(), queue.isTemporary(), filterString, queue.getConsumerCount(), queue.getMessageCount(), autoCreateQueues, true, queue.isAutoCreated(), queue.isDeleteOnNoConsumers(), queue.getRoutingType(), queue.getMaxConsumers()); } else if (name.equals(managementAddress)) { // make an exception for the management address (see HORNETQ-29) - response = new QueueQueryResult(name, managementAddress, true, false, null, -1, -1, autoCreateJmsQueues); - } else if (autoCreateJmsQueues) { - response = new QueueQueryResult(name, name, true, false, null, 0, 0, true, false); + response = new QueueQueryResult(name, managementAddress, true, false, null, -1, -1, autoCreateQueues, true, false, false, RoutingType.MULTICAST, -1); + } else if (autoCreateQueues) { + response = new QueueQueryResult(name, name, true, false, null, 0, 0, true, false, false, false, null, 0); } else { - response = new QueueQueryResult(null, null, false, false, null, 0, 0, false, false); + response = new QueueQueryResult(null, null, false, false, null, 0, 0, false, false, false, false, null, 0); } return response; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/be9483a7/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SessionTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SessionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SessionTest.java index 0882078..2242cfc 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SessionTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SessionTest.java @@ -234,7 +234,7 @@ public class SessionTest extends ActiveMQTestBase { ClientSession clientSession = cf.createSession(false, true, true); QueueQuery resp = clientSession.queueQuery(new SimpleString(queueName)); Assert.assertFalse(resp.isExists()); - Assert.assertFalse(resp.isAutoCreateJmsQueues()); + Assert.assertFalse(resp.isAutoCreateQueues()); Assert.assertEquals(null, resp.getAddress()); clientSession.close(); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/be9483a7/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/JMSReconnectTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/JMSReconnectTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/JMSReconnectTest.java index 92741e7..da05fae 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/JMSReconnectTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/JMSReconnectTest.java @@ -39,6 +39,7 @@ import org.apache.activemq.artemis.api.jms.JMSFactoryType; import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServers; +import org.apache.activemq.artemis.core.server.RoutingType; import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; import org.apache.activemq.artemis.jms.client.ActiveMQSession; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; @@ -100,7 +101,7 @@ public class JMSReconnectTest extends ActiveMQTestBase { SimpleString jmsQueueName = new SimpleString("myqueue"); - coreSession.createQueue(jmsQueueName, jmsQueueName, null, true); + coreSession.createQueue(jmsQueueName, RoutingType.ANYCAST, jmsQueueName, null, true); Queue queue = sess.createQueue("myqueue");
