Update ClientSession API for autoCreated queues
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/ffeaf48f Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/ffeaf48f Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/ffeaf48f Branch: refs/heads/ARTEMIS-780 Commit: ffeaf48f39def21a983756c47ed0a2490d91e8bd Parents: 2b2b7c3 Author: jbertram <[email protected]> Authored: Thu Nov 17 07:25:04 2016 -0600 Committer: jbertram <[email protected]> Committed: Wed Nov 23 09:04:35 2016 -0600 ---------------------------------------------------------------------- .../artemis/api/core/client/ClientSession.java | 28 ++++++ .../core/client/impl/ClientSessionImpl.java | 37 ++++++-- .../core/impl/ActiveMQSessionContext.java | 6 +- .../core/protocol/core/impl/PacketDecoder.java | 6 ++ .../core/protocol/core/impl/PacketImpl.java | 2 + .../impl/wireformat/CreateQueueMessage.java | 26 +++-- .../impl/wireformat/CreateQueueMessage_V2.java | 99 ++++++++++++++++++++ .../spi/core/remoting/SessionContext.java | 3 +- .../jms/client/ActiveMQMessageProducer.java | 2 +- .../artemis/jms/client/ActiveMQSession.java | 8 +- .../jms/server/impl/JMSServerManagerImpl.java | 4 +- .../impl/ActiveMQServerControlImpl.java | 2 +- .../core/ServerSessionPacketHandler.java | 11 +++ .../impl/AutoCreatedQueueManagerImpl.java | 4 +- .../core/server/impl/ServerSessionImpl.java | 2 +- .../core/settings/impl/AddressSettings.java | 2 +- .../core/config/impl/FileConfigurationTest.java | 4 +- .../core/settings/AddressSettingsTest.java | 2 +- .../jms/tests/message/MessageHeaderTest.java | 30 +++--- 19 files changed, 231 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ffeaf48f/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java index 35bc9f9..72b1a11 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java @@ -294,6 +294,34 @@ public interface ClientSession extends XAResource, AutoCloseable { void createQueue(String address, String queueName, String filter, boolean durable) throws ActiveMQException; /** + * Creates a <em>non-temporary</em> queue. + * + * @param address the queue will be bound to this address + * @param queueName the name of the queue + * @param filter only messages which match this filter will be put in the queue + * @param durable whether the queue is durable or not + * @param autoCreated whether to mark this queue as autoCreated or not + * @throws ActiveMQException in an exception occurs while creating the queue + */ + void createQueue(SimpleString address, + SimpleString queueName, + SimpleString filter, + boolean durable, + boolean autoCreated) throws ActiveMQException; + + /** + * Creates a <em>non-temporary</em>queue. + * + * @param address the queue will be bound to this address + * @param queueName the name of the queue + * @param filter only messages which match this filter will be put in the queue + * @param durable whether the queue is durable or not + * @param autoCreated whether to mark this queue as autoCreated or not + * @throws ActiveMQException in an exception occurs while creating the queue + */ + void createQueue(String address, String queueName, String filter, boolean durable, boolean autoCreated) throws ActiveMQException; + + /** * Creates a <em>temporary</em> queue. * * @param address the queue will be bound to this address http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ffeaf48f/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java index 16311b0..145ca99 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java @@ -237,14 +237,14 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi @Override public void createQueue(final SimpleString address, final SimpleString queueName) throws ActiveMQException { - internalCreateQueue(address, queueName, null, false, false); + internalCreateQueue(address, queueName, null, false, false, false); } @Override public void createQueue(final SimpleString address, final SimpleString queueName, final boolean durable) throws ActiveMQException { - internalCreateQueue(address, queueName, null, durable, false); + internalCreateQueue(address, queueName, null, durable, false, false); } @Override @@ -295,7 +295,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi final SimpleString queueName, final SimpleString filterString, final boolean durable) throws ActiveMQException { - internalCreateQueue(address, queueName, filterString, durable, false); + internalCreateQueue(address, queueName, filterString, durable, false, false); } @Override @@ -307,27 +307,45 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi } @Override + public void createQueue(final SimpleString address, + final SimpleString queueName, + final SimpleString filterString, + final boolean durable, + final boolean autoCreated) throws ActiveMQException { + internalCreateQueue(address, queueName, filterString, durable, false, autoCreated); + } + + @Override + public void createQueue(final String address, + final String queueName, + final String filterString, + final boolean durable, + final boolean autoCreated) throws ActiveMQException { + createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), SimpleString.toSimpleString(filterString), durable, autoCreated); + } + + @Override public void createTemporaryQueue(final SimpleString address, final SimpleString queueName) throws ActiveMQException { - internalCreateQueue(address, queueName, null, false, true); + internalCreateQueue(address, queueName, null, false, true, false); } @Override public void createTemporaryQueue(final String address, final String queueName) throws ActiveMQException { - internalCreateQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), null, false, true); + internalCreateQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), null, false, true, false); } @Override public void createTemporaryQueue(final SimpleString address, final SimpleString queueName, final SimpleString filter) throws ActiveMQException { - internalCreateQueue(address, queueName, filter, false, true); + internalCreateQueue(address, queueName, filter, false, true, false); } @Override public void createTemporaryQueue(final String address, final String queueName, final String filter) throws ActiveMQException { - internalCreateQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), SimpleString.toSimpleString(filter), false, true); + internalCreateQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), SimpleString.toSimpleString(filter), false, true, false); } @Override @@ -1551,7 +1569,8 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi final SimpleString queueName, final SimpleString filterString, final boolean durable, - final boolean temp) throws ActiveMQException { + final boolean temp, + final boolean autoCreated) throws ActiveMQException { checkClosed(); if (durable && temp) { @@ -1560,7 +1579,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi startCall(); try { - sessionContext.createQueue(address, queueName, filterString, durable, temp); + sessionContext.createQueue(address, queueName, filterString, durable, temp, autoCreated); } finally { endCall(); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ffeaf48f/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java index 919da19..cbbe2b7 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java @@ -52,6 +52,7 @@ import org.apache.activemq.artemis.core.protocol.core.Packet; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateAddressMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueueMessage; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueueMessage_V2; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSessionMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSharedQueueMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectConsumerMessage; @@ -594,8 +595,9 @@ public class ActiveMQSessionContext extends SessionContext { SimpleString queueName, SimpleString filterString, boolean durable, - boolean temp) throws ActiveMQException { - CreateQueueMessage request = new CreateQueueMessage(address, queueName, filterString, durable, temp, true); + boolean temp, + boolean autoCreated) throws ActiveMQException { + CreateQueueMessage request = new CreateQueueMessage_V2(address, queueName, filterString, durable, temp, autoCreated, true); sessionChannel.sendBlocking(request, PacketImpl.NULL_RESPONSE); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ffeaf48f/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketDecoder.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketDecoder.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketDecoder.java index 834822c..de1edbc 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketDecoder.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketDecoder.java @@ -29,6 +29,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ClusterTop import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage_V3; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateAddressMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueueMessage; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueueMessage_V2; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSessionMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSessionResponseMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSharedQueueMessage; @@ -91,6 +92,7 @@ import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CRE import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CREATESESSION_RESP; import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CREATE_ADDRESS; import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CREATE_QUEUE; +import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CREATE_QUEUE_V2; import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CREATE_SHARED_QUEUE; import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.DELETE_QUEUE; import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.DISCONNECT; @@ -245,6 +247,10 @@ public abstract class PacketDecoder implements Serializable { packet = new CreateQueueMessage(); break; } + case CREATE_QUEUE_V2: { + packet = new CreateQueueMessage_V2(); + break; + } case CREATE_SHARED_QUEUE: { packet = new CreateSharedQueueMessage(); break; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ffeaf48f/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java index e07d9b5..abc1eef 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java @@ -251,6 +251,8 @@ public class PacketImpl implements Packet { public static final byte CREATE_ADDRESS = -11; + public static final byte CREATE_QUEUE_V2 = -12; + // Static -------------------------------------------------------- public PacketImpl(final byte type) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ffeaf48f/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateQueueMessage.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateQueueMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateQueueMessage.java index e837d55..2ebf147 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateQueueMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateQueueMessage.java @@ -22,17 +22,17 @@ import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; public class CreateQueueMessage extends PacketImpl { - private SimpleString address; + protected SimpleString address; - private SimpleString queueName; + protected SimpleString queueName; - private SimpleString filterString; + protected SimpleString filterString; - private boolean durable; + protected boolean durable; - private boolean temporary; + protected boolean temporary; - private boolean requiresResponse; + protected boolean requiresResponse; public CreateQueueMessage(final SimpleString address, final SimpleString queueName, @@ -55,16 +55,28 @@ public class CreateQueueMessage extends PacketImpl { } // Public -------------------------------------------------------- + /** + * @param createQueueMessageV2 + */ + public CreateQueueMessage(byte createQueueMessageV2) { + super(createQueueMessageV2); + } @Override public String toString() { StringBuffer buff = new StringBuffer(getParentString()); + buff.append("]"); + return buff.toString(); + } + + @Override + public String getParentString() { + StringBuffer buff = new StringBuffer(super.getParentString()); buff.append(", address=" + address); buff.append(", queueName=" + queueName); buff.append(", filterString=" + filterString); buff.append(", durable=" + durable); buff.append(", temporary=" + temporary); - buff.append("]"); return buff.toString(); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ffeaf48f/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateQueueMessage_V2.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateQueueMessage_V2.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateQueueMessage_V2.java new file mode 100644 index 0000000..13a4a58 --- /dev/null +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateQueueMessage_V2.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.protocol.core.impl.wireformat; + +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.SimpleString; + +public class CreateQueueMessage_V2 extends CreateQueueMessage { + + private boolean autoCreated; + + public CreateQueueMessage_V2(final SimpleString address, + final SimpleString queueName, + final SimpleString filterString, + final boolean durable, + final boolean temporary, + final boolean autoCreated, + final boolean requiresResponse) { + this(); + + this.address = address; + this.queueName = queueName; + this.filterString = filterString; + this.durable = durable; + this.temporary = temporary; + this.autoCreated = autoCreated; + this.requiresResponse = requiresResponse; + } + + public CreateQueueMessage_V2() { + super(CREATE_QUEUE_V2); + } + + // Public -------------------------------------------------------- + + @Override + public String toString() { + StringBuffer buff = new StringBuffer(super.getParentString()); + buff.append(", autoCreated=" + autoCreated); + buff.append("]"); + return buff.toString(); + } + + public boolean isAutoCreated() { + return autoCreated; + } + + public void setAutoCreated(boolean autoCreated) { + this.autoCreated = autoCreated; + } + + @Override + public void encodeRest(final ActiveMQBuffer buffer) { + super.encodeRest(buffer); + buffer.writeBoolean(autoCreated); + } + + @Override + public void decodeRest(final ActiveMQBuffer buffer) { + super.decodeRest(buffer); + autoCreated = buffer.readBoolean(); + } + + @Override + public int hashCode() { + final int prime = 31; + int result = super.hashCode(); + result = prime * result + (autoCreated ? 1231 : 1237); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (!super.equals(obj)) + return false; + if (!(obj instanceof CreateQueueMessage_V2)) + return false; + CreateQueueMessage_V2 other = (CreateQueueMessage_V2) obj; + if (autoCreated != other.autoCreated) + return false; + return true; + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ffeaf48f/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java index 16e8314..01f0b08 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java @@ -172,7 +172,8 @@ public abstract class SessionContext { SimpleString queueName, SimpleString filterString, boolean durable, - boolean temp) throws ActiveMQException; + boolean temp, + boolean autoCreated) throws ActiveMQException; public abstract ClientSession.QueueQuery queueQuery(SimpleString queueName) throws ActiveMQException; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ffeaf48f/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java ---------------------------------------------------------------------- diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java index 5cbd40f..3d3fa66 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java @@ -410,7 +410,7 @@ public class ActiveMQMessageProducer implements MessageProducer, QueueSender, To // TODO is it right to use the address for the queue name here? clientSession.createTemporaryQueue(address, address); } else { - clientSession.createQueue(address, address, null, true); + clientSession.createQueue(address, address, null, true, true); } } else if (!destination.isQueue() && query.isAutoCreateJmsTopics()) { clientSession.createAddress(address, true, true); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ffeaf48f/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java ---------------------------------------------------------------------- diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java index f514dba..26a941b 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java @@ -304,7 +304,7 @@ public class ActiveMQSession implements QueueSession, TopicSession { // TODO create queue here in such a way that it is deleted when consumerCount == 0 // perhaps just relying on the broker to do it is simplest (i.e. deleteOnNoConsumers) session.createAddress(jbd.getSimpleAddress(), false, true); - session.createQueue(jbd.getSimpleAddress(), jbd.getSimpleAddress(), null, true); + session.createQueue(jbd.getSimpleAddress(), jbd.getSimpleAddress(), null, true, true); } else if (!jbd.isQueue() && response.isAutoCreateJmsTopics()) { session.createAddress(jbd.getSimpleAddress(), true, true); } else { @@ -647,9 +647,7 @@ public class ActiveMQSession implements QueueSession, TopicSession { */ if (!response.isExists() || !response.getQueueNames().contains(dest.getSimpleAddress())) { if (response.isAutoCreateJmsQueues()) { - // TODO create queue here in such a way that it is deleted when consumerCount == 0 - // perhaps just relying on the broker to do it is simplest (i.e. deleteOnNoConsumers) - session.createQueue(dest.getSimpleAddress(), dest.getSimpleAddress(), true); + session.createQueue(dest.getSimpleAddress(), dest.getSimpleAddress(), null, true, true); } else { throw new InvalidDestinationException("Destination " + dest.getName() + " does not exist"); } @@ -797,7 +795,7 @@ public class ActiveMQSession implements QueueSession, TopicSession { AddressQuery response = session.addressQuery(new SimpleString(activeMQDestination.getAddress())); if (!response.isExists()) { if (response.isAutoCreateJmsQueues()) { - session.createQueue(activeMQDestination.getSimpleAddress(), activeMQDestination.getSimpleAddress(), true); + session.createQueue(activeMQDestination.getSimpleAddress(), activeMQDestination.getSimpleAddress(), null, true, true); } else { throw new InvalidDestinationException("Destination " + activeMQDestination.getName() + " does not exist"); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ffeaf48f/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/impl/JMSServerManagerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/impl/JMSServerManagerImpl.java b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/impl/JMSServerManagerImpl.java index 97108d1..648854b 100644 --- a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/impl/JMSServerManagerImpl.java +++ b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/impl/JMSServerManagerImpl.java @@ -1634,9 +1634,9 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback // long consumerCount = queue.getConsumerCount(); // long messageCount = queue.getMessageCount(); // -// if (address.toString().startsWith(ActiveMQDestination.JMS_QUEUE_ADDRESS_PREFIX) && settings.getAutoDeleteJmsQueues() && queue.getMessageCount() == 0) { +// if (address.toString().startsWith(ActiveMQDestination.JMS_QUEUE_ADDRESS_PREFIX) && settings.isAutoDeleteJmsQueues() && queue.getMessageCount() == 0) { // if (ActiveMQJMSServerLogger.LOGGER.isDebugEnabled()) { -// ActiveMQJMSServerLogger.LOGGER.debug("deleting auto-created queue \"" + queueName + ".\" consumerCount = " + consumerCount + "; messageCount = " + messageCount + "; getAutoDeleteJmsQueues = " + settings.getAutoDeleteJmsQueues()); +// ActiveMQJMSServerLogger.LOGGER.debug("deleting auto-created queue \"" + queueName + ".\" consumerCount = " + consumerCount + "; messageCount = " + messageCount + "; isAutoDeleteJmsQueues = " + settings.isAutoDeleteJmsQueues()); // } // // return destroyQueue(queueName.toString().substring(ActiveMQDestination.JMS_QUEUE_ADDRESS_PREFIX.length()), false); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ffeaf48f/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java index 25b9fdb..e7900f2 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java @@ -1695,7 +1695,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active if (addressSettings.getExpiryAddress() != null) { settings.add("expiryAddress", addressSettings.getExpiryAddress().toString()); } - return settings.add("expiryDelay", addressSettings.getExpiryDelay()).add("maxDeliveryAttempts", addressSettings.getMaxDeliveryAttempts()).add("pageCacheMaxSize", addressSettings.getPageCacheMaxSize()).add("maxSizeBytes", addressSettings.getMaxSizeBytes()).add("pageSizeBytes", addressSettings.getPageSizeBytes()).add("redeliveryDelay", addressSettings.getRedeliveryDelay()).add("redeliveryMultiplier", addressSettings.getRedeliveryMultiplier()).add("maxRedeliveryDelay", addressSettings.getMaxRedeliveryDelay()).add("redistributionDelay", addressSettings.getRedistributionDelay()).add("lastValueQueue", addressSettings.isLastValueQueue()).add("sendToDLAOnNoRoute", addressSettings.isSendToDLAOnNoRoute()).add("addressFullMessagePolicy", policy).add("slowConsumerThreshold", addressSettings.getSlowConsumerThreshold()).add("slowConsumerCheckPeriod", addressSettings.getSlowConsumerCheckPeriod()).add("slowConsumerPolicy", consumerPolicy).add("autoCreateJmsQueues", addressSettings.isAutoCreat eJmsQueues()).add("autoCreateJmsTopics", addressSettings.isAutoCreateJmsTopics()).add("autoDeleteJmsQueues", addressSettings.getAutoDeleteJmsQueues()).add("autoDeleteJmsTopics", addressSettings.getAutoDeleteJmsQueues()).build().toString(); + return settings.add("expiryDelay", addressSettings.getExpiryDelay()).add("maxDeliveryAttempts", addressSettings.getMaxDeliveryAttempts()).add("pageCacheMaxSize", addressSettings.getPageCacheMaxSize()).add("maxSizeBytes", addressSettings.getMaxSizeBytes()).add("pageSizeBytes", addressSettings.getPageSizeBytes()).add("redeliveryDelay", addressSettings.getRedeliveryDelay()).add("redeliveryMultiplier", addressSettings.getRedeliveryMultiplier()).add("maxRedeliveryDelay", addressSettings.getMaxRedeliveryDelay()).add("redistributionDelay", addressSettings.getRedistributionDelay()).add("lastValueQueue", addressSettings.isLastValueQueue()).add("sendToDLAOnNoRoute", addressSettings.isSendToDLAOnNoRoute()).add("addressFullMessagePolicy", policy).add("slowConsumerThreshold", addressSettings.getSlowConsumerThreshold()).add("slowConsumerCheckPeriod", addressSettings.getSlowConsumerCheckPeriod()).add("slowConsumerPolicy", consumerPolicy).add("autoCreateJmsQueues", addressSettings.isAutoCreat eJmsQueues()).add("autoCreateJmsTopics", addressSettings.isAutoCreateJmsTopics()).add("autoDeleteJmsQueues", addressSettings.isAutoDeleteJmsQueues()).add("autoDeleteJmsTopics", addressSettings.isAutoDeleteJmsQueues()).build().toString(); } @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ffeaf48f/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java index be71a92..ac8d68a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java @@ -32,6 +32,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateAddressMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueueMessage; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueueMessage_V2; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSharedQueueMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.NullResponseMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.RollbackMessage; @@ -85,6 +86,7 @@ import org.jboss.logging.Logger; import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CREATE_ADDRESS; import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CREATE_QUEUE; +import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CREATE_QUEUE_V2; import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CREATE_SHARED_QUEUE; import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.DELETE_QUEUE; import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_ACKNOWLEDGE; @@ -240,6 +242,15 @@ public class ServerSessionPacketHandler implements ChannelHandler { } break; } + case CREATE_QUEUE_V2: { + CreateQueueMessage_V2 request = (CreateQueueMessage_V2) packet; + requiresResponse = request.isRequiresResponse(); + session.createQueue(request.getAddress(), request.getQueueName(), request.getFilterString(), request.isTemporary(), request.isDurable(), null, null, request.isAutoCreated()); + if (requiresResponse) { + response = new NullResponseMessage(); + } + break; + } case CREATE_SHARED_QUEUE: { CreateSharedQueueMessage request = (CreateSharedQueueMessage) packet; requiresResponse = request.isRequiresResponse(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ffeaf48f/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AutoCreatedQueueManagerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AutoCreatedQueueManagerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AutoCreatedQueueManagerImpl.java index a211a96..8bea315 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AutoCreatedQueueManagerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AutoCreatedQueueManagerImpl.java @@ -40,9 +40,9 @@ public class AutoCreatedQueueManagerImpl implements AutoCreatedQueueManager { long messageCount = queue.getMessageCount(); // TODO make sure this is the right check - if ((queue.isAutoCreated() || queue.isDeleteOnNoConsumers()) && queue.getMessageCount() == 0) { + if (((queue.isAutoCreated() && settings.isAutoDeleteJmsQueues()) || queue.isDeleteOnNoConsumers()) && queue.getMessageCount() == 0) { if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) { - ActiveMQServerLogger.LOGGER.debug("deleting auto-created queue \"" + queueName + ".\" consumerCount = " + consumerCount + "; messageCount = " + messageCount + "; getAutoDeleteJmsQueues = " + settings.getAutoDeleteJmsQueues()); + ActiveMQServerLogger.LOGGER.debug("deleting auto-created queue \"" + queueName + ".\" consumerCount = " + consumerCount + "; messageCount = " + messageCount + "; isAutoDeleteJmsQueues = " + settings.isAutoDeleteJmsQueues()); } // TODO handle this exception better http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ffeaf48f/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java index aafcced..8d73eda 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java @@ -509,7 +509,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener { server.checkQueueCreationLimit(getUsername()); - Queue queue = server.createQueue(address, name, filterString, SimpleString.toSimpleString(getUsername()), durable, temporary, maxConsumers, deleteOnNoConsumers, true); + Queue queue = server.createQueue(address, name, filterString, SimpleString.toSimpleString(getUsername()), durable, temporary, autoCreated, maxConsumers, deleteOnNoConsumers, true); if (temporary) { // Temporary queue in core simply means the queue will be deleted if http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ffeaf48f/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java index 68d9656..e613ee6 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java @@ -174,7 +174,7 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable return this; } - public boolean getAutoDeleteJmsQueues() { + public boolean isAutoDeleteJmsQueues() { return autoDeleteJmsQueues != null ? autoDeleteJmsQueues : AddressSettings.DEFAULT_AUTO_DELETE_JMS_QUEUES; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ffeaf48f/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java index d95ea52..cbd2e65 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java @@ -294,7 +294,7 @@ public class FileConfigurationTest extends ConfigurationImplTest { assertEquals(5, conf.getAddressesSettings().get("a1").getSlowConsumerCheckPeriod()); assertEquals(SlowConsumerPolicy.NOTIFY, conf.getAddressesSettings().get("a1").getSlowConsumerPolicy()); assertEquals(true, conf.getAddressesSettings().get("a1").isAutoCreateJmsQueues()); - assertEquals(true, conf.getAddressesSettings().get("a1").getAutoDeleteJmsQueues()); + assertEquals(true, conf.getAddressesSettings().get("a1").isAutoDeleteJmsQueues()); assertEquals(true, conf.getAddressesSettings().get("a1").isAutoCreateJmsTopics()); assertEquals(true, conf.getAddressesSettings().get("a1").isAutoDeleteJmsTopics()); @@ -309,7 +309,7 @@ public class FileConfigurationTest extends ConfigurationImplTest { assertEquals(15, conf.getAddressesSettings().get("a2").getSlowConsumerCheckPeriod()); assertEquals(SlowConsumerPolicy.KILL, conf.getAddressesSettings().get("a2").getSlowConsumerPolicy()); assertEquals(false, conf.getAddressesSettings().get("a2").isAutoCreateJmsQueues()); - assertEquals(false, conf.getAddressesSettings().get("a2").getAutoDeleteJmsQueues()); + assertEquals(false, conf.getAddressesSettings().get("a2").isAutoDeleteJmsQueues()); assertEquals(false, conf.getAddressesSettings().get("a2").isAutoCreateJmsTopics()); assertEquals(false, conf.getAddressesSettings().get("a2").isAutoDeleteJmsTopics()); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ffeaf48f/artemis-server/src/test/java/org/apache/activemq/artemis/core/settings/AddressSettingsTest.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/settings/AddressSettingsTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/settings/AddressSettingsTest.java index 3861782..4041a16 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/settings/AddressSettingsTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/settings/AddressSettingsTest.java @@ -40,7 +40,7 @@ public class AddressSettingsTest extends ActiveMQTestBase { Assert.assertEquals(AddressSettings.DEFAULT_SLOW_CONSUMER_CHECK_PERIOD, addressSettings.getSlowConsumerCheckPeriod()); Assert.assertEquals(AddressSettings.DEFAULT_SLOW_CONSUMER_POLICY, addressSettings.getSlowConsumerPolicy()); Assert.assertEquals(AddressSettings.DEFAULT_AUTO_CREATE_JMS_QUEUES, addressSettings.isAutoCreateJmsQueues()); - Assert.assertEquals(AddressSettings.DEFAULT_AUTO_DELETE_JMS_QUEUES, addressSettings.getAutoDeleteJmsQueues()); + Assert.assertEquals(AddressSettings.DEFAULT_AUTO_DELETE_JMS_QUEUES, addressSettings.isAutoDeleteJmsQueues()); // Assert.assertEquals(AddressSettings.DEFAULT_AUTO_CREATE_TOPICS, addressSettings.isAutoCreateJmsTopics()); // Assert.assertEquals(AddressSettings.DEFAULT_AUTO_DELETE_TOPICS, addressSettings.isAutoDeleteJmsTopics()); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ffeaf48f/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/message/MessageHeaderTest.java ---------------------------------------------------------------------- diff --git a/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/message/MessageHeaderTest.java b/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/message/MessageHeaderTest.java index f8b7153..6d70569 100644 --- a/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/message/MessageHeaderTest.java +++ b/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/message/MessageHeaderTest.java @@ -821,18 +821,6 @@ public class MessageHeaderTest extends MessageHeaderTestBase { final boolean durable) throws ActiveMQException { } - public void createQueue(final SimpleString address, - final SimpleString queueName, - final boolean durable, - final boolean temporary) throws ActiveMQException { - } - - public void createQueue(final String address, - final String queueName, - final boolean durable, - final boolean temporary) throws ActiveMQException { - } - @Override public void createQueue(final String address, final String queueName, @@ -841,6 +829,24 @@ public class MessageHeaderTest extends MessageHeaderTestBase { } @Override + public void createQueue(SimpleString address, + SimpleString queueName, + SimpleString filter, + boolean durable, + boolean autoCreated) throws ActiveMQException { + + } + + @Override + public void createQueue(String address, + String queueName, + String filter, + boolean durable, + boolean autoCreated) throws ActiveMQException { + + } + + @Override public void createTemporaryQueue(final SimpleString address, final SimpleString queueName) throws ActiveMQException { }
