ARTEMIS-788 Stomp refactor + track autocreation for addresses
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/a88853fe Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/a88853fe Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/a88853fe Branch: refs/heads/master Commit: a88853fe5390ef254ad3f3be51c55f33dfaf9ced Parents: 0189f15 Author: jbertram <[email protected]> Authored: Tue Oct 18 19:45:02 2016 +0100 Committer: Martyn Taylor <[email protected]> Committed: Fri Dec 9 18:43:15 2016 +0000 ---------------------------------------------------------------------- .../apache/activemq/cli/test/ArtemisTest.java | 4 +- .../artemis/api/core/client/ClientSession.java | 2 +- .../core/client/impl/ClientSessionImpl.java | 4 +- .../core/impl/ActiveMQSessionContext.java | 4 +- .../impl/wireformat/CreateAddressMessage.java | 14 + .../remoting/impl/netty/TransportConstants.java | 10 + .../spi/core/remoting/SessionContext.java | 2 +- .../jms/client/ActiveMQMessageProducer.java | 26 +- .../artemis/jms/client/ActiveMQSession.java | 41 +- .../protocol/mqtt/MQTTSubscriptionManager.java | 2 +- .../protocol/stomp/ActiveMQStompException.java | 4 +- .../ActiveMQStompProtocolMessageBundle.java | 7 +- .../artemis/core/protocol/stomp/Stomp.java | 34 +- .../core/protocol/stomp/StompConnection.java | 63 +- .../protocol/stomp/StompProtocolManager.java | 16 +- .../stomp/VersionedStompFrameHandler.java | 43 +- .../management/impl/AddressControlImpl.java | 2 +- .../core/management/impl/QueueControlImpl.java | 2 +- .../journal/AbstractJournalStorageManager.java | 7 +- .../codec/PersistentAddressBindingEncoding.java | 20 +- .../artemis/core/postoffice/PostOffice.java | 7 +- .../core/postoffice/impl/PostOfficeImpl.java | 30 +- .../artemis/core/server/ActiveMQServer.java | 26 +- .../artemis/core/server/QueueCreator.java | 32 - .../artemis/core/server/QueueDeleter.java | 28 - .../artemis/core/server/QueueFactory.java | 2 +- .../artemis/core/server/ServerSession.java | 5 +- .../core/server/impl/ActiveMQServerImpl.java | 63 +- .../artemis/core/server/impl/AddressInfo.java | 12 + .../impl/AutoCreatedQueueManagerImpl.java | 32 +- .../artemis/core/server/impl/DivertImpl.java | 2 +- .../server/impl/PostOfficeJournalLoader.java | 4 +- .../artemis/core/server/impl/QueueImpl.java | 4 +- .../core/server/impl/ServerSessionImpl.java | 25 +- .../management/impl/ManagementServiceImpl.java | 3 +- .../core/config/impl/FileConfigurationTest.java | 8 +- .../vertx/IncomingVertxEventHandler.java | 2 +- .../tests/extras/jms/bridge/BridgeTestBase.java | 5 +- .../tests/integration/amqp/ProtonTest.java | 4 +- .../client/AutoCreateJmsDestinationTest.java | 9 +- .../integration/client/HangConsumerTest.java | 4 +- .../tests/integration/client/SessionTest.java | 2 + .../jms/cluster/AutoCreateQueueClusterTest.java | 6 +- .../jms/jms2client/NonExistentQueueTest.java | 16 +- .../persistence/XmlImportExportTest.java | 161 -- .../integration/stomp/ConcurrentStompTest.java | 136 -- .../tests/integration/stomp/ExtraStompTest.java | 848 --------- .../stomp/StompConnectionCleanupTest.java | 52 +- .../integration/stomp/StompOverHttpTest.java | 78 - .../stomp/StompOverWebsocketTest.java | 151 -- .../tests/integration/stomp/StompTest.java | 1674 ++++++---------- .../tests/integration/stomp/StompTestBase.java | 550 +++--- .../stomp/StompTestWithInterceptors.java | 159 ++ .../stomp/StompTestWithLargeMessages.java | 416 ++++ .../stomp/StompTestWithMessageID.java | 78 + .../stomp/StompTestWithSecurity.java | 28 +- .../stomp/util/AbstractClientStompFrame.java | 77 +- .../util/AbstractStompClientConnection.java | 100 +- .../stomp/util/ClientStompFrame.java | 10 +- .../stomp/util/ClientStompFrameV10.java | 10 +- .../stomp/util/ClientStompFrameV11.java | 22 +- .../stomp/util/ClientStompFrameV12.java | 38 +- .../stomp/util/StompClientConnection.java | 5 +- .../stomp/util/StompClientConnectionV10.java | 43 +- .../stomp/util/StompClientConnectionV11.java | 104 +- .../stomp/util/StompClientConnectionV12.java | 79 +- .../stomp/util/StompFrameFactory.java | 2 + .../stomp/util/StompFrameFactoryV10.java | 11 +- .../stomp/util/StompFrameFactoryV11.java | 28 +- .../stomp/util/StompFrameFactoryV12.java | 38 +- .../integration/stomp/v11/ExtraStompTest.java | 341 +--- .../integration/stomp/v11/StompV11Test.java | 1800 +++++++----------- .../integration/stomp/v11/StompV11TestBase.java | 167 -- .../integration/stomp/v12/StompV12Test.java | 1775 +++++++---------- .../tests/util/JMSClusteredTestBase.java | 23 +- .../artemis/jms/tests/MessageProducerTest.java | 1 + .../activemq/artemis/jms/tests/SessionTest.java | 2 + .../jms/tests/message/MessageHeaderTest.java | 2 +- .../core/server/impl/fakes/FakePostOffice.java | 7 +- 79 files changed, 3610 insertions(+), 6044 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/artemis-cli/src/test/java/org/apache/activemq/cli/test/ArtemisTest.java ---------------------------------------------------------------------- diff --git a/artemis-cli/src/test/java/org/apache/activemq/cli/test/ArtemisTest.java b/artemis-cli/src/test/java/org/apache/activemq/cli/test/ArtemisTest.java index ba02bd3..eb3d48a 100644 --- a/artemis-cli/src/test/java/org/apache/activemq/cli/test/ArtemisTest.java +++ b/artemis-cli/src/test/java/org/apache/activemq/cli/test/ArtemisTest.java @@ -550,11 +550,11 @@ public class ArtemisTest { ClientSessionFactory factory = locator.createSessionFactory(); ClientSession coreSession = factory.createSession("admin", "admin", false, true, true, false, 0)) { for (String str : queues.split(",")) { - ClientSession.QueueQuery queryResult = coreSession.queueQuery(SimpleString.toSimpleString("jms.queue." + str)); + ClientSession.QueueQuery queryResult = coreSession.queueQuery(SimpleString.toSimpleString(str)); assertTrue("Couldn't find queue " + str, queryResult.isExists()); } for (String str : topics.split(",")) { - ClientSession.QueueQuery queryResult = coreSession.queueQuery(SimpleString.toSimpleString("jms.topic." + str)); + ClientSession.QueueQuery queryResult = coreSession.queueQuery(SimpleString.toSimpleString(str)); assertTrue("Couldn't find topic " + str, queryResult.isExists()); } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java index fbd33d3..35bc9f9 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java @@ -198,7 +198,7 @@ public interface ClientSession extends XAResource, AutoCloseable { */ int getVersion(); - void createAddress(final SimpleString address, final boolean multicast) throws ActiveMQException; + void createAddress(final SimpleString address, final boolean multicast, final boolean autoCreated) throws ActiveMQException; // Queue Operations ---------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java index 2739109..16311b0 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java @@ -279,12 +279,12 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi } @Override - public void createAddress(final SimpleString address, final boolean multicast) throws ActiveMQException { + public void createAddress(final SimpleString address, final boolean multicast, boolean autoCreated) throws ActiveMQException { checkClosed(); startCall(); try { - sessionContext.createAddress(address, multicast); + sessionContext.createAddress(address, multicast, autoCreated); } finally { endCall(); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java index 4e25037..919da19 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java @@ -584,8 +584,8 @@ public class ActiveMQSessionContext extends SessionContext { } @Override - public void createAddress(SimpleString address, final boolean multicast) throws ActiveMQException { - CreateAddressMessage request = new CreateAddressMessage(address, multicast, true); + public void createAddress(SimpleString address, final boolean multicast, final boolean autoCreated) throws ActiveMQException { + CreateAddressMessage request = new CreateAddressMessage(address, multicast, autoCreated, true); sessionChannel.sendBlocking(request, PacketImpl.NULL_RESPONSE); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateAddressMessage.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateAddressMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateAddressMessage.java index 484a2ac..10c7ff3 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateAddressMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateAddressMessage.java @@ -26,15 +26,19 @@ public class CreateAddressMessage extends PacketImpl { private boolean multicast; + private boolean autoCreated; + private boolean requiresResponse; public CreateAddressMessage(final SimpleString address, final boolean multicast, + final boolean autoCreated, final boolean requiresResponse) { this(); this.address = address; this.multicast = multicast; + this.autoCreated = autoCreated; this.requiresResponse = requiresResponse; } @@ -49,6 +53,7 @@ public class CreateAddressMessage extends PacketImpl { StringBuffer buff = new StringBuffer(getParentString()); buff.append(", address=" + address); buff.append(", multicast=" + multicast); + buff.append(", autoCreated=" + autoCreated); buff.append("]"); return buff.toString(); } @@ -65,6 +70,10 @@ public class CreateAddressMessage extends PacketImpl { return requiresResponse; } + public boolean isAutoCreated() { + return autoCreated; + } + public void setAddress(SimpleString address) { this.address = address; } @@ -74,6 +83,7 @@ public class CreateAddressMessage extends PacketImpl { buffer.writeSimpleString(address); buffer.writeBoolean(multicast); buffer.writeBoolean(requiresResponse); + buffer.writeBoolean(autoCreated); } @Override @@ -81,6 +91,7 @@ public class CreateAddressMessage extends PacketImpl { address = buffer.readSimpleString(); multicast = buffer.readBoolean(); requiresResponse = buffer.readBoolean(); + autoCreated = buffer.readBoolean(); } @Override @@ -89,6 +100,7 @@ public class CreateAddressMessage extends PacketImpl { int result = super.hashCode(); result = prime * result + ((address == null) ? 0 : address.hashCode()); result = prime * result + (multicast ? 1231 : 1237); + result = prime * result + (autoCreated ? 1231 : 1237); result = prime * result + (requiresResponse ? 1231 : 1237); return result; } @@ -109,6 +121,8 @@ public class CreateAddressMessage extends PacketImpl { return false; if (multicast != other.multicast) return false; + if (autoCreated != other.autoCreated) + return false; if (requiresResponse != other.requiresResponse) return false; return true; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java index 14efb79..a8e613e 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java @@ -203,6 +203,14 @@ public class TransportConstants { public static final String STOMP_MIN_LARGE_MESSAGE_SIZE = "stomp-min-large-message-size"; + public static final String STOMP_ANYCAST_PREFIX = "stompAnycastPrefix"; + + public static final String DEFAULT_STOMP_ANYCAST_PREFIX = ""; + + public static final String STOMP_MULTICAST_PREFIX = "stompMulticastPrefix"; + + public static final String DEFAULT_STOMP_MULTICAST_PREFIX = ""; + public static final String NETTY_CONNECT_TIMEOUT = "connect-timeout-millis"; public static final int DEFAULT_NETTY_CONNECT_TIMEOUT = -1; @@ -242,6 +250,8 @@ public class TransportConstants { allowableAcceptorKeys.add(TransportConstants.CLUSTER_CONNECTION); allowableAcceptorKeys.add(TransportConstants.STOMP_CONSUMERS_CREDIT); allowableAcceptorKeys.add(TransportConstants.STOMP_MIN_LARGE_MESSAGE_SIZE); + allowableAcceptorKeys.add(TransportConstants.STOMP_ANYCAST_PREFIX); + allowableAcceptorKeys.add(TransportConstants.STOMP_MULTICAST_PREFIX); allowableAcceptorKeys.add(TransportConstants.CONNECTION_TTL); allowableAcceptorKeys.add(TransportConstants.CONNECTION_TTL_MAX); allowableAcceptorKeys.add(TransportConstants.CONNECTION_TTL_MIN); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java index 79d50c1..16e8314 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java @@ -166,7 +166,7 @@ public abstract class SessionContext { public abstract void deleteQueue(SimpleString queueName) throws ActiveMQException; - public abstract void createAddress(SimpleString address, boolean multicast) throws ActiveMQException; + public abstract void createAddress(SimpleString address, boolean multicast, boolean autoCreated) throws ActiveMQException; public abstract void createQueue(SimpleString address, SimpleString queueName, http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java ---------------------------------------------------------------------- diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java index c552d69..5cbd40f 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java @@ -403,20 +403,20 @@ public class ActiveMQMessageProducer implements MessageProducer, QueueSender, To try { ClientSession.AddressQuery query = clientSession.addressQuery(address); - if (!query.isExists() && query.isAutoCreateJmsQueues()) { - if (destination.isQueue() && !destination.isTemporary()) { - clientSession.createAddress(address, false); - clientSession.createQueue(address, address, null, true); - } else if (destination.isQueue() && destination.isTemporary()) { - clientSession.createAddress(address, false); - clientSession.createTemporaryQueue(address, address); - } else if (!destination.isQueue() && !destination.isTemporary()) { - clientSession.createAddress(address, true); - } else if (!destination.isQueue() && destination.isTemporary()) { - clientSession.createAddress(address, true); + if (!query.isExists()) { + if (destination.isQueue() && query.isAutoCreateJmsQueues()) { + clientSession.createAddress(address, false, true); + if (destination.isTemporary()) { + // TODO is it right to use the address for the queue name here? + clientSession.createTemporaryQueue(address, address); + } else { + clientSession.createQueue(address, address, null, true); + } + } else if (!destination.isQueue() && query.isAutoCreateJmsTopics()) { + clientSession.createAddress(address, true, true); + } else if ((destination.isQueue() && !query.isAutoCreateJmsQueues()) || (!destination.isQueue() && !query.isAutoCreateJmsTopics())) { + throw new InvalidDestinationException("Destination " + address + " does not exist"); } - } else if (!query.isExists() && !query.isAutoCreateJmsQueues()) { - throw new InvalidDestinationException("Destination " + address + " does not exist"); } else { connection.addKnownDestination(address); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java ---------------------------------------------------------------------- diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java index d554cf8..f514dba 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java @@ -299,29 +299,16 @@ public class ActiveMQSession implements QueueSession, TopicSession { if (jbd != null) { ClientSession.AddressQuery response = session.addressQuery(jbd.getSimpleAddress()); - if (jbd.isQueue()) { - if (!response.isExists()) { - if (response.isAutoCreateJmsQueues()) { - session.createAddress(jbd.getSimpleAddress(), false); - } else { - throw new InvalidDestinationException("Destination " + jbd.getName() + " does not exist"); - } - } - - if (response.getQueueNames().isEmpty()) { - if (response.isAutoCreateJmsQueues()) { - session.createQueue(jbd.getSimpleAddress(), jbd.getSimpleAddress(), null, true); - } else { - throw new InvalidDestinationException("Destination " + jbd.getName() + " does not exist"); - } - } - } else { - if (!response.isExists()) { - if (response.isAutoCreateJmsTopics()) { - session.createAddress(jbd.getSimpleAddress(), true); - } else { - throw new InvalidDestinationException("Destination " + jbd.getName() + " does not exist"); - } + if (!response.isExists()) { + if (jbd.isQueue() && response.isAutoCreateJmsQueues()) { + // TODO create queue here in such a way that it is deleted when consumerCount == 0 + // perhaps just relying on the broker to do it is simplest (i.e. deleteOnNoConsumers) + session.createAddress(jbd.getSimpleAddress(), false, true); + session.createQueue(jbd.getSimpleAddress(), jbd.getSimpleAddress(), null, true); + } else if (!jbd.isQueue() && response.isAutoCreateJmsTopics()) { + session.createAddress(jbd.getSimpleAddress(), true, true); + } else { + throw new InvalidDestinationException("Destination " + jbd.getName() + " does not exist"); } } } @@ -660,6 +647,8 @@ public class ActiveMQSession implements QueueSession, TopicSession { */ if (!response.isExists() || !response.getQueueNames().contains(dest.getSimpleAddress())) { if (response.isAutoCreateJmsQueues()) { + // TODO create queue here in such a way that it is deleted when consumerCount == 0 + // perhaps just relying on the broker to do it is simplest (i.e. deleteOnNoConsumers) session.createQueue(dest.getSimpleAddress(), dest.getSimpleAddress(), true); } else { throw new InvalidDestinationException("Destination " + dest.getName() + " does not exist"); @@ -673,8 +662,8 @@ public class ActiveMQSession implements QueueSession, TopicSession { AddressQuery response = session.addressQuery(dest.getSimpleAddress()); if (!response.isExists()) { - if (response.isAutoCreateJmsQueues()) { - session.createAddress(dest.getSimpleAddress(), true); + if (response.isAutoCreateJmsTopics()) { + session.createAddress(dest.getSimpleAddress(), true, true); } else { throw new InvalidDestinationException("Topic " + dest.getName() + " does not exist"); } @@ -1106,7 +1095,7 @@ public class ActiveMQSession implements QueueSession, TopicSession { AddressQuery query = session.addressQuery(topic.getSimpleAddress()); - if (!query.isExists() && !query.isAutoCreateJmsQueues()) { + if (!query.isExists() && !query.isAutoCreateJmsTopics()) { return null; } else { return topic; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java index 1187db0..1c87f29 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java @@ -93,7 +93,7 @@ public class MQTTSubscriptionManager { Queue q = session.getServer().locateQueue(queue); if (q == null) { - q = session.getServerSession().createQueue(new SimpleString(address), queue, managementFilter, false, MQTTUtil.DURABLE_MESSAGES && qos >= 0, -1, false); + q = session.getServerSession().createQueue(new SimpleString(address), queue, managementFilter, false, MQTTUtil.DURABLE_MESSAGES && qos >= 0, -1, false, true); } else { if (q.isDeleteOnNoConsumers()) { throw ActiveMQMessageBundle.BUNDLE.invalidQueueConfiguration(q.getAddress(), q.getName(), "deleteOnNoConsumers", false, true); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/ActiveMQStompException.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/ActiveMQStompException.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/ActiveMQStompException.java index f4f23e6..15fb4ac 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/ActiveMQStompException.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/ActiveMQStompException.java @@ -41,12 +41,12 @@ public class ActiveMQStompException extends Exception { } public ActiveMQStompException(String msg) { - super(msg); + super(msg.replace(":", "")); handler = null; } public ActiveMQStompException(String msg, Throwable t) { - super(msg, t); + super(msg.replace(":", ""), t); this.body = t.getMessage(); handler = null; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/ActiveMQStompProtocolMessageBundle.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/ActiveMQStompProtocolMessageBundle.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/ActiveMQStompProtocolMessageBundle.java index 8108f32..861c524 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/ActiveMQStompProtocolMessageBundle.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/ActiveMQStompProtocolMessageBundle.java @@ -86,10 +86,10 @@ public interface ActiveMQStompProtocolMessageBundle { ActiveMQStompException noDestination(); @Message(id = 339016, value = "Error creating subscription {0}", format = Message.Format.MESSAGE_FORMAT) - ActiveMQStompException errorCreatSubscription(String subscriptionID, @Cause Exception e); + ActiveMQStompException errorCreatingSubscription(String subscriptionID, @Cause Exception e); @Message(id = 339017, value = "Error unsubscribing {0}", format = Message.Format.MESSAGE_FORMAT) - ActiveMQStompException errorUnsubscrib(String subscriptionID, @Cause Exception e); + ActiveMQStompException errorUnsubscribing(String subscriptionID, @Cause Exception e); @Message(id = 339018, value = "Error acknowledging message {0}", format = Message.Format.MESSAGE_FORMAT) ActiveMQStompException errorAck(String messageID, @Cause Exception e); @@ -153,4 +153,7 @@ public interface ActiveMQStompProtocolMessageBundle { @Message(id = 339040, value = "Undefined escape sequence: {0}", format = Message.Format.MESSAGE_FORMAT) ActiveMQStompException undefinedEscapeSequence(String sequence); + + @Message(id = 339041, value = "Not allowed to specify {0} semantics on {1} address.", format = Message.Format.MESSAGE_FORMAT) + ActiveMQStompException illegalSemantics(String requested, String exists); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/Stomp.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/Stomp.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/Stomp.java index badcc1a..89c14e7 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/Stomp.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/Stomp.java @@ -18,8 +18,6 @@ package org.apache.activemq.artemis.core.protocol.stomp; /** * The standard verbs and headers used for the <a href="http://stomp.codehaus.org/">STOMP</a> protocol. - * - * @version $Revision: 57 $ */ public interface Stomp { @@ -27,7 +25,7 @@ public interface Stomp { String NEWLINE = "\n"; - public interface Commands { + interface Commands { String CONNECT = "CONNECT"; @@ -53,7 +51,7 @@ public interface Stomp { String STOMP = "STOMP"; } - public interface Responses { + interface Responses { String CONNECTED = "CONNECTED"; @@ -64,7 +62,7 @@ public interface Stomp { String RECEIPT = "RECEIPT"; } - public interface Headers { + interface Headers { String SEPARATOR = ":"; @@ -78,15 +76,17 @@ public interface Stomp { String CONTENT_TYPE = "content-type"; - public interface Response { + interface Response { String RECEIPT_ID = "receipt-id"; } - public interface Send { + interface Send { String DESTINATION = "destination"; + String DESTINATION_TYPE = "destination-type"; + String CORRELATION_ID = "correlation-id"; String REPLY_TO = "reply-to"; @@ -97,10 +97,10 @@ public interface Stomp { String TYPE = "type"; - Object PERSISTENT = "persistent"; + String PERSISTENT = "persistent"; } - public interface Message { + interface Message { String MESSAGE_ID = "message-id"; @@ -129,7 +129,7 @@ public interface Stomp { String VALIDATED_USER = "JMSXUserID"; } - public interface Subscribe { + interface Subscribe { String DESTINATION = "destination"; @@ -144,6 +144,8 @@ public interface Stomp { String DURABLE_SUBSCRIPTION_NAME = "durable-subscription-name"; + String SUBSCRIPTION_TYPE = "subscription-type"; + String NO_LOCAL = "no-local"; public interface AckModeValues { @@ -156,7 +158,7 @@ public interface Stomp { } } - public interface Unsubscribe { + interface Unsubscribe { String DESTINATION = "destination"; @@ -168,7 +170,7 @@ public interface Stomp { String DURABLE_SUBSCRIPTION_NAME = "durable-subscription-name"; } - public interface Connect { + interface Connect { String LOGIN = "login"; @@ -182,10 +184,10 @@ public interface Stomp { String ACCEPT_VERSION = "accept-version"; String HOST = "host"; - Object HEART_BEAT = "heart-beat"; + String HEART_BEAT = "heart-beat"; } - public interface Error { + interface Error { String MESSAGE = "message"; @@ -193,7 +195,7 @@ public interface Stomp { String VERSION = "version"; } - public interface Connected { + interface Connected { String SESSION = "session"; @@ -207,7 +209,7 @@ public interface Stomp { String HEART_BEAT = "heart-beat"; } - public interface Ack { + interface Ack { String MESSAGE_ID = "message-id"; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java index e58d4da..4cb8fe9 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java @@ -86,6 +86,12 @@ public final class StompConnection implements RemotingConnection { private final boolean enableMessageID; + private final int minLargeMessageSize; + + private final String anycastPrefix; + + private final String multicastPrefix; + private StompVersions version; private VersionedStompFrameHandler frameHandler; @@ -97,8 +103,6 @@ public final class StompConnection implements RemotingConnection { private final Object sendLock = new Object(); - private final int minLargeMessageSize; - private final ScheduledExecutorService scheduledExecutorService; private final ExecutorFactory factory; @@ -162,6 +166,8 @@ public final class StompConnection implements RemotingConnection { this.enableMessageID = ConfigurationHelper.getBooleanProperty(TransportConstants.STOMP_ENABLE_MESSAGE_ID, false, acceptorUsed.getConfiguration()); this.minLargeMessageSize = ConfigurationHelper.getIntProperty(TransportConstants.STOMP_MIN_LARGE_MESSAGE_SIZE, ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, acceptorUsed.getConfiguration()); + this.anycastPrefix = ConfigurationHelper.getStringProperty(TransportConstants.STOMP_ANYCAST_PREFIX, TransportConstants.DEFAULT_STOMP_ANYCAST_PREFIX, acceptorUsed.getConfiguration()); + this.multicastPrefix = ConfigurationHelper.getStringProperty(TransportConstants.STOMP_MULTICAST_PREFIX, TransportConstants.DEFAULT_STOMP_MULTICAST_PREFIX, acceptorUsed.getConfiguration()); } @Override @@ -246,23 +252,39 @@ public final class StompConnection implements RemotingConnection { } public void checkDestination(String destination) throws ActiveMQStompException { - autoCreateDestinationIfPossible(destination); - if (!manager.destinationExists(destination)) { throw BUNDLE.destinationNotExist(destination).setHandler(frameHandler); } } - public void autoCreateDestinationIfPossible(String queue) throws ActiveMQStompException { - // TODO: STOMP clients will have to prefix their destination with queue:// or topic:// so we can determine what to do here + public boolean autoCreateDestinationIfPossible(String queue, AddressInfo.RoutingType routingType) throws ActiveMQStompException { + boolean result = false; + try { - manager.getServer().createOrUpdateAddressInfo(new AddressInfo(SimpleString.toSimpleString(queue)).setRoutingType(AddressInfo.RoutingType.ANYCAST)); - manager.getServer().createQueue(SimpleString.toSimpleString(queue), SimpleString.toSimpleString(queue), null, true, false); + if (manager.getServer().getAddressInfo(SimpleString.toSimpleString(queue)) == null) { + // TODO check here to see if auto-creation is enabled + if (routingType.equals(AddressInfo.RoutingType.MULTICAST)) { + manager.getServer().createOrUpdateAddressInfo(new AddressInfo(SimpleString.toSimpleString(queue)).setAutoCreated(true)); + } else { + manager.getServer().createOrUpdateAddressInfo(new AddressInfo(SimpleString.toSimpleString(queue)).setRoutingType(AddressInfo.RoutingType.ANYCAST).setAutoCreated(true)); + manager.getServer().createQueue(SimpleString.toSimpleString(queue), SimpleString.toSimpleString(queue), null, null, true, false, true); + } + result = true; + } } catch (ActiveMQQueueExistsException e) { // ignore } catch (Exception e) { throw new ActiveMQStompException(e.getMessage(), e).setHandler(frameHandler); } + + return result; + } + + public void checkRoutingSemantics(String destination, AddressInfo.RoutingType routingType) throws ActiveMQStompException { + AddressInfo.RoutingType actualRoutingTypeOfAddress = manager.getServer().getAddressInfo(SimpleString.toSimpleString(destination)).getRoutingType(); + if (routingType != null && !routingType.equals(actualRoutingTypeOfAddress)) { + throw BUNDLE.illegalSemantics(routingType.toString(), actualRoutingTypeOfAddress.toString()); + } } @Override @@ -560,7 +582,7 @@ public final class StompConnection implements RemotingConnection { if (stompSession.isNoLocal()) { message.putStringProperty(CONNECTION_ID_PROP, getID().toString()); } - if (enableMessageID()) { + if (isEnableMessageID()) { message.putStringProperty("amqMessageId", "STOMP" + message.getMessageID()); } try { @@ -617,8 +639,11 @@ public final class StompConnection implements RemotingConnection { String ack, String id, String durableSubscriptionName, - boolean noLocal) throws ActiveMQStompException { - autoCreateDestinationIfPossible(destination); + boolean noLocal, + AddressInfo.RoutingType subscriptionType) throws ActiveMQStompException { + autoCreateDestinationIfPossible(destination, subscriptionType); + checkDestination(destination); + checkRoutingSemantics(destination, subscriptionType); if (noLocal) { String noLocalFilter = CONNECTION_ID_PROP + " <> '" + getID().toString() + "'"; if (selector == null) { @@ -643,11 +668,11 @@ public final class StompConnection implements RemotingConnection { } try { - manager.createSubscription(this, subscriptionID, durableSubscriptionName, destination, selector, ack, noLocal); + manager.subscribe(this, subscriptionID, durableSubscriptionName, destination, selector, ack, noLocal); } catch (ActiveMQStompException e) { throw e; } catch (Exception e) { - throw BUNDLE.errorCreatSubscription(subscriptionID, e).setHandler(frameHandler); + throw BUNDLE.errorCreatingSubscription(subscriptionID, e).setHandler(frameHandler); } } @@ -657,7 +682,7 @@ public final class StompConnection implements RemotingConnection { } catch (ActiveMQStompException e) { throw e; } catch (Exception e) { - throw BUNDLE.errorUnsubscrib(subscriptionID, e).setHandler(frameHandler); + throw BUNDLE.errorUnsubscribing(subscriptionID, e).setHandler(frameHandler); } } @@ -710,7 +735,7 @@ public final class StompConnection implements RemotingConnection { return this.frameHandler; } - public boolean enableMessageID() { + public boolean isEnableMessageID() { return enableMessageID; } @@ -718,6 +743,14 @@ public final class StompConnection implements RemotingConnection { return minLargeMessageSize; } + public String getAnycastPrefix() { + return anycastPrefix; + } + + public String getMulticastPrefix() { + return multicastPrefix; + } + public StompProtocolManager getManager() { return manager; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java index 6029b37..0c1f7dd 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java @@ -368,13 +368,13 @@ public class StompProtocolManager extends AbstractProtocolManager<StompFrame, St } // Inner classes ------------------------------------------------- - public void createSubscription(StompConnection connection, - String subscriptionID, - String durableSubscriptionName, - String destination, - String selector, - String ack, - boolean noLocal) throws Exception { + public void subscribe(StompConnection connection, + String subscriptionID, + String durableSubscriptionName, + String destination, + String selector, + String ack, + boolean noLocal) throws Exception { StompSession stompSession = getSession(connection); stompSession.setNoLocal(noLocal); if (stompSession.containsSubscription(subscriptionID)) { @@ -411,7 +411,7 @@ public class StompProtocolManager extends AbstractProtocolManager<StompFrame, St } public boolean destinationExists(String destination) { - return server.getPostOffice().getAddresses().contains(SimpleString.toSimpleString(destination)); + return server.getPostOffice().getAddressInfo(SimpleString.toSimpleString(destination)) != null; } public ActiveMQServer getServer() { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java index 003865c..580bade 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java @@ -28,6 +28,7 @@ import org.apache.activemq.artemis.core.protocol.stomp.v10.StompFrameHandlerV10; import org.apache.activemq.artemis.core.protocol.stomp.v11.StompFrameHandlerV11; import org.apache.activemq.artemis.core.protocol.stomp.v12.StompFrameHandlerV12; import org.apache.activemq.artemis.core.server.ServerMessage; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl; import org.apache.activemq.artemis.utils.DataConstants; import org.apache.activemq.artemis.utils.ExecutorFactory; @@ -167,8 +168,11 @@ public abstract class VersionedStompFrameHandler { StompFrame response = null; try { connection.validate(); - String destination = frame.getHeader(Stomp.Headers.Send.DESTINATION); - checkDestination(destination); + String destination = getDestination(frame); + AddressInfo.RoutingType routingType = getRoutingType(frame.getHeader(Headers.Send.DESTINATION_TYPE), frame.getHeader(Headers.Send.DESTINATION)); + connection.autoCreateDestinationIfPossible(destination, routingType); + connection.checkDestination(destination); + connection.checkRoutingSemantics(destination, routingType); String txID = frame.getHeader(Stomp.Headers.TRANSACTION); long timestamp = System.currentTimeMillis(); @@ -197,10 +201,6 @@ public abstract class VersionedStompFrameHandler { return response; } - private void checkDestination(String destination) throws ActiveMQStompException { - connection.checkDestination(destination); - } - public StompFrame onBegin(StompFrame frame) { StompFrame response = null; String txID = frame.getHeader(Stomp.Headers.TRANSACTION); @@ -238,7 +238,7 @@ public abstract class VersionedStompFrameHandler { public StompFrame onSubscribe(StompFrame request) { StompFrame response = null; - String destination = request.getHeader(Stomp.Headers.Subscribe.DESTINATION); + String destination = getDestination(request); String selector = request.getHeader(Stomp.Headers.Subscribe.SELECTOR); String ack = request.getHeader(Stomp.Headers.Subscribe.ACK_MODE); @@ -247,6 +247,7 @@ public abstract class VersionedStompFrameHandler { if (durableSubscriptionName == null) { durableSubscriptionName = request.getHeader(Stomp.Headers.Subscribe.DURABLE_SUBSCRIPTION_NAME); } + AddressInfo.RoutingType routingType = getRoutingType(request.getHeader(Headers.Subscribe.SUBSCRIPTION_TYPE), request.getHeader(Headers.Subscribe.DESTINATION)); boolean noLocal = false; if (request.hasHeader(Stomp.Headers.Subscribe.NO_LOCAL)) { @@ -254,7 +255,7 @@ public abstract class VersionedStompFrameHandler { } try { - connection.subscribe(destination, selector, ack, id, durableSubscriptionName, noLocal); + connection.subscribe(destination, selector, ack, id, durableSubscriptionName, noLocal, routingType); } catch (ActiveMQStompException e) { response = e.getFrame(); } @@ -262,6 +263,17 @@ public abstract class VersionedStompFrameHandler { return response; } + public String getDestination(StompFrame request) { + String destination = request.getHeader(Headers.Subscribe.DESTINATION); + if (connection.getMulticastPrefix().length() > 0 && destination.startsWith(connection.getMulticastPrefix())) { + destination = destination.substring(connection.getMulticastPrefix().length()); + } else if (connection.getAnycastPrefix().length() > 0 && destination.startsWith(connection.getAnycastPrefix())) { + destination = destination.substring(connection.getAnycastPrefix().length()); + } + + return destination; + } + public StompFrame postprocess(StompFrame request) { StompFrame response = null; if (request.hasHeader(Stomp.Headers.RECEIPT_REQUESTED)) { @@ -332,4 +344,19 @@ public abstract class VersionedStompFrameHandler { connection.destroy(); } + private AddressInfo.RoutingType getRoutingType(String typeHeader, String destination) { + // null is valid to return here so we know when the user didn't provide any routing info + AddressInfo.RoutingType routingType = null; + if (typeHeader != null) { + routingType = AddressInfo.RoutingType.valueOf(typeHeader); + } else if (destination != null && !connection.getAnycastPrefix().equals(connection.getMulticastPrefix())) { + if (connection.getMulticastPrefix().length() > 0 && destination.startsWith(connection.getMulticastPrefix())) { + routingType = AddressInfo.RoutingType.MULTICAST; + } else if (connection.getAnycastPrefix().length() > 0 && destination.startsWith(connection.getAnycastPrefix())) { + routingType = AddressInfo.RoutingType.ANYCAST; + } + } + return routingType; + } + } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java index 5808bd3..23b8e32 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java @@ -276,7 +276,7 @@ public class AddressControlImpl extends AbstractControl implements AddressContro message.getBodyBuffer().writeBytes(Base64.decode(body)); } message.setAddress(addressInfo.getName()); - postOffice.route(message, null, true); + postOffice.route(message, true); return "" + message.getMessageID(); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java index 7a1bb26..c4d25ac 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java @@ -728,7 +728,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { ByteBuffer buffer = ByteBuffer.allocate(8); buffer.putLong(queue.getID()); message.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, buffer.array()); - postOffice.route(message, null, true); + postOffice.route(message, true); return "" + message.getMessageID(); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java index 2299e20..12eac9f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java @@ -1268,7 +1268,11 @@ public abstract class AbstractJournalStorageManager implements StorageManager { @Override public void addAddressBinding(final long tx, final AddressInfo addressInfo) throws Exception { - PersistentAddressBindingEncoding bindingEncoding = new PersistentAddressBindingEncoding(addressInfo.getName(), addressInfo.getRoutingType(), addressInfo.getDefaultMaxQueueConsumers()); + PersistentAddressBindingEncoding bindingEncoding = new PersistentAddressBindingEncoding(addressInfo.getName(), + addressInfo.getRoutingType(), + addressInfo.getDefaultMaxQueueConsumers(), + addressInfo.isDefaultDeleteOnNoConsumers(), + addressInfo.isAutoCreated()); readLock(); try { @@ -1398,7 +1402,6 @@ public abstract class AbstractJournalStorageManager implements StorageManager { idGenerator.loadState(record.id, buffer); } else if (rec == JournalRecordIds.ADDRESS_BINDING_RECORD) { PersistentAddressBindingEncoding bindingEncoding = newAddressBindingEncoding(id, buffer); - ActiveMQServerLogger.LOGGER.info("=== Loading: " + bindingEncoding); addressBindingInfos.add(bindingEncoding); } else if (rec == JournalRecordIds.GROUP_RECORD) { GroupingEncoding encoding = newGroupEncoding(id, buffer); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentAddressBindingEncoding.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentAddressBindingEncoding.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentAddressBindingEncoding.java index 3821b34..e47a210 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentAddressBindingEncoding.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentAddressBindingEncoding.java @@ -31,6 +31,10 @@ public class PersistentAddressBindingEncoding implements EncodingSupport, Addres public int defaultMaxConsumers; + public boolean defaultDeleteOnNoConsumers; + + public boolean autoCreated; + public AddressInfo.RoutingType routingType; public PersistentAddressBindingEncoding() { @@ -45,15 +49,23 @@ public class PersistentAddressBindingEncoding implements EncodingSupport, Addres routingType + ", defaultMaxConsumers=" + defaultMaxConsumers + + ", defaultDeleteOnNoConsumers=" + + defaultDeleteOnNoConsumers + + ", autoCreated=" + + autoCreated + "]"; } public PersistentAddressBindingEncoding(final SimpleString name, final AddressInfo.RoutingType routingType, - final int defaultMaxConsumers) { + final int defaultMaxConsumers, + final boolean defaultDeleteOnNoConsumers, + final boolean autoCreated) { this.name = name; this.routingType = routingType; this.defaultMaxConsumers = defaultMaxConsumers; + this.defaultDeleteOnNoConsumers = defaultDeleteOnNoConsumers; + this.autoCreated = autoCreated; } @Override @@ -85,6 +97,8 @@ public class PersistentAddressBindingEncoding implements EncodingSupport, Addres name = buffer.readSimpleString(); routingType = AddressInfo.RoutingType.getType(buffer.readByte()); defaultMaxConsumers = buffer.readInt(); + defaultDeleteOnNoConsumers = buffer.readBoolean(); + autoCreated = buffer.readBoolean(); } @Override @@ -92,10 +106,12 @@ public class PersistentAddressBindingEncoding implements EncodingSupport, Addres buffer.writeSimpleString(name); buffer.writeByte(routingType.getType()); buffer.writeInt(defaultMaxConsumers); + buffer.writeBoolean(defaultDeleteOnNoConsumers); + buffer.writeBoolean(autoCreated); } @Override public int getEncodeSize() { - return SimpleString.sizeofString(name) + DataConstants.SIZE_BYTE + DataConstants.SIZE_INT; + return SimpleString.sizeofString(name) + DataConstants.SIZE_BYTE + DataConstants.SIZE_INT + DataConstants.SIZE_BOOLEAN + DataConstants.SIZE_BOOLEAN; } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java index bc8a6cf..f1225c1 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java @@ -24,7 +24,6 @@ import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.server.ActiveMQComponent; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.Queue; -import org.apache.activemq.artemis.core.server.QueueCreator; import org.apache.activemq.artemis.core.server.RoutingContext; import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.impl.AddressInfo; @@ -77,26 +76,22 @@ public interface PostOffice extends ActiveMQComponent { Map<SimpleString, Binding> getAllBindings(); - RoutingStatus route(ServerMessage message, QueueCreator queueCreator, boolean direct) throws Exception; + RoutingStatus route(ServerMessage message, boolean direct) throws Exception; RoutingStatus route(ServerMessage message, - QueueCreator queueCreator, Transaction tx, boolean direct) throws Exception; RoutingStatus route(ServerMessage message, - QueueCreator queueCreator, Transaction tx, boolean direct, boolean rejectDuplicates) throws Exception; RoutingStatus route(ServerMessage message, - QueueCreator queueCreator, RoutingContext context, boolean direct) throws Exception; RoutingStatus route(ServerMessage message, - QueueCreator queueCreator, RoutingContext context, boolean direct, boolean rejectDuplicates) throws Exception; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java index 1dba309..135597f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java @@ -63,7 +63,6 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.LargeServerMessage; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.Queue; -import org.apache.activemq.artemis.core.server.QueueCreator; import org.apache.activemq.artemis.core.server.QueueFactory; import org.apache.activemq.artemis.core.server.RouteContextList; import org.apache.activemq.artemis.core.server.RoutingContext; @@ -441,6 +440,11 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding @Override public AddressInfo removeAddressInfo(SimpleString address) { + try { + getServer().getManagementService().unregisterAddress(address); + } catch (Exception e) { + e.printStackTrace(); + } return addressManager.removeAddressInfo(address); } @@ -595,39 +599,34 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding @Override public RoutingStatus route(final ServerMessage message, - QueueCreator queueCreator, final boolean direct) throws Exception { - return route(message, queueCreator, (Transaction) null, direct); + return route(message, (Transaction) null, direct); } @Override public RoutingStatus route(final ServerMessage message, - QueueCreator queueCreator, final Transaction tx, final boolean direct) throws Exception { - return route(message, queueCreator, new RoutingContextImpl(tx), direct); + return route(message, new RoutingContextImpl(tx), direct); } @Override public RoutingStatus route(final ServerMessage message, - final QueueCreator queueCreator, final Transaction tx, final boolean direct, final boolean rejectDuplicates) throws Exception { - return route(message, queueCreator, new RoutingContextImpl(tx), direct, rejectDuplicates); + return route(message, new RoutingContextImpl(tx), direct, rejectDuplicates); } @Override public RoutingStatus route(final ServerMessage message, - final QueueCreator queueCreator, final RoutingContext context, final boolean direct) throws Exception { - return route(message, queueCreator, context, direct, true); + return route(message, context, direct, true); } @Override public RoutingStatus route(final ServerMessage message, - final QueueCreator queueCreator, final RoutingContext context, final boolean direct, boolean rejectDuplicates) throws Exception { @@ -657,14 +656,15 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding Bindings bindings = addressManager.getBindingsForRoutingAddress(address); + // TODO auto-create queues here? // first check for the auto-queue creation thing - if (bindings == null && queueCreator != null) { + if (bindings == null) { // There is no queue with this address, we will check if it needs to be created - if (queueCreator.create(address)) { +// if (queueCreator.create(address)) { // TODO: this is not working!!!! // reassign bindings if it was created - bindings = addressManager.getBindingsForRoutingAddress(address); - } +// bindings = addressManager.getBindingsForRoutingAddress(address); +// } } if (bindings != null) { @@ -704,7 +704,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding message.setAddress(dlaAddress); - route(message, null, context.getTransaction(), false); + route(message, context.getTransaction(), false); result = RoutingStatus.NO_BINDINGS_DLA; } } else { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java index f716847..84f554d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java @@ -241,30 +241,6 @@ public interface ActiveMQServer extends ActiveMQComponent { long getUptimeMillis(); /** - * This is the queue creator responsible for automatic JMS Queue creations. - * - * @param queueCreator - */ - void setJMSQueueCreator(QueueCreator queueCreator); - - /** - * @see org.apache.activemq.artemis.core.server.ActiveMQServer#setJMSQueueCreator(QueueCreator) - */ - QueueCreator getJMSDestinationCreator(); - - /** - * This is the queue deleter responsible for automatic JMS Queue deletions. - * - * @param queueDeleter - */ - void setJMSQueueDeleter(QueueDeleter queueDeleter); - - /** - * @see org.apache.activemq.artemis.core.server.ActiveMQServer#setJMSQueueDeleter(QueueDeleter) - */ - QueueDeleter getJMSQueueDeleter(); - - /** * Returns whether the initial replication synchronization process with the backup server is complete; applicable for * either the live or backup server. */ @@ -369,7 +345,7 @@ public interface ActiveMQServer extends ActiveMQComponent { QueueQueryResult queueQuery(SimpleString name) throws Exception; Queue deployQueue(SimpleString address, - SimpleString resourceName, + SimpleString queueName, SimpleString filterString, boolean durable, boolean temporary, http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueCreator.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueCreator.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueCreator.java deleted file mode 100644 index f89a2b0..0000000 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueCreator.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.activemq.artemis.core.server; - -import org.apache.activemq.artemis.api.core.SimpleString; - -public interface QueueCreator { - - /** - * You should return true if you even tried to create the queue and the queue was already there. - * As the callers of this method will use that as an indicator that they should re-route the messages. - * * - * - * @return True if a queue was created. - */ - boolean create(SimpleString address) throws Exception; -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueDeleter.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueDeleter.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueDeleter.java deleted file mode 100644 index d062848..0000000 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueDeleter.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.activemq.artemis.core.server; - -import org.apache.activemq.artemis.api.core.SimpleString; - -public interface QueueDeleter { - - /** - * @return True if a queue was deleted. - */ - boolean delete(SimpleString queueName) throws Exception; -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueFactory.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueFactory.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueFactory.java index 64e7a5d..2557b73 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueFactory.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueFactory.java @@ -29,7 +29,7 @@ import org.apache.activemq.artemis.core.postoffice.PostOffice; */ public interface QueueFactory { - Queue createQueueWith(final QueueConfig config); + Queue createQueueWith(final QueueConfig config) throws Exception; /** * @deprecated Replaced by {@link #createQueueWith} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java index 910eb22..28d283d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java @@ -87,8 +87,6 @@ public interface ServerSession extends SecurityAuth { void markTXFailed(Throwable e); - QueueCreator getQueueCreator(); - List<Xid> xaGetInDoubtXids(); int xaGetTimeout(); @@ -194,7 +192,8 @@ public interface ServerSession extends SecurityAuth { boolean temporary, boolean durable, Integer maxConsumers, - Boolean deleteOnNoConsumers) throws Exception; + Boolean deleteOnNoConsumers, + final Boolean autoCreated) throws Exception; void createSharedQueue(SimpleString address, SimpleString name, http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index 285bf3b..7aa802b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -119,8 +119,6 @@ import org.apache.activemq.artemis.core.server.PostQueueCreationCallback; import org.apache.activemq.artemis.core.server.PostQueueDeletionCallback; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.QueueConfig; -import org.apache.activemq.artemis.core.server.QueueCreator; -import org.apache.activemq.artemis.core.server.QueueDeleter; import org.apache.activemq.artemis.core.server.QueueFactory; import org.apache.activemq.artemis.core.server.QueueQueryResult; import org.apache.activemq.artemis.core.server.SecuritySettingPlugin; @@ -273,16 +271,6 @@ public class ActiveMQServerImpl implements ActiveMQServer { private FileStoreMonitor fileStoreMonitor; - /** - * This will be set by the JMS Queue Manager. - */ - private QueueCreator jmsQueueCreator; - - /** - * This will be set by the JMS Queue Manager. - */ - private QueueDeleter jmsQueueDeleter; - private final Map<String, ServerSession> sessions = new ConcurrentHashMap<>(); private final Semaphore activationLock = new Semaphore(1); @@ -721,11 +709,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { } } - if (autoCreateJmsTopics) { - putAddressInfoIfAbsent(new AddressInfo(address)); - } - - return new BindingQueryResult(getAddressInfo(address) != null, names, autoCreateJmsQueues, autoCreateJmsTopics); + return new BindingQueryResult(!names.isEmpty(), names, autoCreateJmsQueues, autoCreateJmsTopics); } @Override @@ -794,26 +778,6 @@ public class ActiveMQServerImpl implements ActiveMQServer { } @Override - public QueueCreator getJMSDestinationCreator() { - return jmsQueueCreator; - } - - @Override - public void setJMSQueueCreator(QueueCreator jmsQueueCreator) { - this.jmsQueueCreator = jmsQueueCreator; - } - - @Override - public QueueDeleter getJMSQueueDeleter() { - return jmsQueueDeleter; - } - - @Override - public void setJMSQueueDeleter(QueueDeleter jmsQueueDeleter) { - this.jmsQueueDeleter = jmsQueueDeleter; - } - - @Override public boolean isReplicaSync() { if (activation instanceof SharedNothingLiveActivation) { ReplicationManager replicationManager = getReplicationManager(); @@ -1358,7 +1322,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { SessionCallback callback, OperationContext context, boolean autoCreateJMSQueues) throws Exception { - return new ServerSessionImpl(name, username, password, validatedUser, minLargeMessageSize, autoCommitSends, autoCommitAcks, preAcknowledge, configuration.isPersistDeliveryCountBeforeDelivery(), xa, connection, storageManager, postOffice, resourceManager, securityStore, managementService, this, configuration.getManagementAddress(), defaultAddress == null ? null : new SimpleString(defaultAddress), callback, context, autoCreateJMSQueues ? jmsQueueCreator : null, pagingManager); + return new ServerSessionImpl(name, username, password, validatedUser, minLargeMessageSize, autoCommitSends, autoCommitAcks, preAcknowledge, configuration.isPersistDeliveryCountBeforeDelivery(), xa, connection, storageManager, postOffice, resourceManager, securityStore, managementService, this, configuration.getManagementAddress(), defaultAddress == null ? null : new SimpleString(defaultAddress), callback, context, pagingManager); } @Override @@ -1616,17 +1580,17 @@ public class ActiveMQServerImpl implements ActiveMQServer { @Override public Queue deployQueue(final SimpleString address, - final SimpleString resourceName, + final SimpleString queueName, final SimpleString filterString, final boolean durable, final boolean temporary, final boolean autoCreated) throws Exception { - return deployQueue(address, resourceName, filterString, durable, temporary, autoCreated, null, null); + return deployQueue(address, queueName, filterString, durable, temporary, autoCreated, null, null); } @Override public Queue deployQueue(final SimpleString address, - final SimpleString resourceName, + final SimpleString queueName, final SimpleString filterString, final boolean durable, final boolean temporary, @@ -1635,9 +1599,9 @@ public class ActiveMQServerImpl implements ActiveMQServer { final Boolean deleteOnNoConsumers) throws Exception { // TODO: fix logging here as this could be for a topic or queue - ActiveMQServerLogger.LOGGER.deployQueue(resourceName); + ActiveMQServerLogger.LOGGER.deployQueue(queueName); - return createQueue(address, resourceName, filterString, null, durable, temporary, true, false, autoCreated, maxConsumers, deleteOnNoConsumers); + return createQueue(address, queueName, filterString, null, durable, temporary, true, false, autoCreated, maxConsumers, deleteOnNoConsumers); } @Override @@ -2137,6 +2101,16 @@ public class ActiveMQServerImpl implements ActiveMQServer { // Deploy any predefined queues deployQueuesFromConfiguration(); + registerPostQueueDeletionCallback(new PostQueueDeletionCallback() { + // TODO delete auto-created addresses when queueCount == 0 + @Override + public void callback(SimpleString address, SimpleString queueName) throws Exception { + if (getAddressInfo(address).isAutoCreated() && postOffice.getBindingsForAddress(address).getBindings().size() == 0) { + removeAddressInfo(address); + } + } + }); + // We need to call this here, this gives any dependent server a chance to deploy its own addresses // this needs to be done before clustering is fully activated callActivateCallbacks(); @@ -2408,7 +2382,6 @@ public class ActiveMQServerImpl implements ActiveMQServer { final boolean autoCreated, final Integer maxConsumers, final Boolean deleteOnNoConsumers) throws Exception { - final QueueBinding binding = (QueueBinding) postOffice.getBinding(queueName); if (binding != null) { if (ignoreIfExists) { @@ -2465,7 +2438,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { if (transientQueue) { queue.setConsumersRefCount(new TransientQueueManagerImpl(this, queue.getName())); } else if (queue.isAutoCreated()) { - queue.setConsumersRefCount(new AutoCreatedQueueManagerImpl(this.getJMSQueueDeleter(), queue.getName())); + queue.setConsumersRefCount(new AutoCreatedQueueManagerImpl(this, queue.getName())); } final QueueBinding localQueueBinding = new LocalQueueBinding(getAddressInfo(queue.getAddress()), queue, nodeManager.getNodeId()); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java index 708aeda..6ad40fa 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java @@ -29,6 +29,8 @@ public class AddressInfo { private int defaultMaxQueueConsumers = ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(); + private boolean autoCreated = false; + public AddressInfo(SimpleString name) { this.name = name; } @@ -67,6 +69,15 @@ public class AddressInfo { return this; } + public boolean isAutoCreated() { + return autoCreated; + } + + public AddressInfo setAutoCreated(boolean autoCreated) { + this.autoCreated = autoCreated; + return this; + } + public SimpleString getName() { return name; } @@ -78,6 +89,7 @@ public class AddressInfo { buff.append(", routingType=" + routingType); buff.append(", defaultMaxQueueConsumers=" + defaultMaxQueueConsumers); buff.append(", defaultDeleteOnNoConsumers=" + defaultDeleteOnNoConsumers); + buff.append(", autoCreated=" + autoCreated); buff.append("]"); return buff.toString(); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AutoCreatedQueueManagerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AutoCreatedQueueManagerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AutoCreatedQueueManagerImpl.java index 535e53b..a211a96 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AutoCreatedQueueManagerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AutoCreatedQueueManagerImpl.java @@ -17,34 +17,48 @@ package org.apache.activemq.artemis.core.server.impl; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.AutoCreatedQueueManager; -import org.apache.activemq.artemis.core.server.QueueDeleter; +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.utils.ReferenceCounterUtil; public class AutoCreatedQueueManagerImpl implements AutoCreatedQueueManager { private final SimpleString queueName; - private final QueueDeleter deleter; + private final ActiveMQServer server; private final Runnable runnable = new Runnable() { @Override public void run() { - try { - if (deleter != null) { - deleter.delete(queueName); + Queue queue = server.locateQueue(queueName); + SimpleString address = queue.getAddress(); + AddressSettings settings = server.getAddressSettingsRepository().getMatch(address.toString()); + long consumerCount = queue.getConsumerCount(); + long messageCount = queue.getMessageCount(); + + // TODO make sure this is the right check + if ((queue.isAutoCreated() || queue.isDeleteOnNoConsumers()) && queue.getMessageCount() == 0) { + if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) { + ActiveMQServerLogger.LOGGER.debug("deleting auto-created queue \"" + queueName + ".\" consumerCount = " + consumerCount + "; messageCount = " + messageCount + "; getAutoDeleteJmsQueues = " + settings.getAutoDeleteJmsQueues()); + } + + // TODO handle this exception better + try { + server.destroyQueue(queueName, null, true, false); + } catch (Exception e) { + e.printStackTrace(); } - } catch (Exception e) { - ActiveMQServerLogger.LOGGER.errorRemovingAutoCreatedQueue(e, queueName); } } }; private final ReferenceCounterUtil referenceCounterUtil = new ReferenceCounterUtil(runnable); - public AutoCreatedQueueManagerImpl(QueueDeleter deleter, SimpleString queueName) { - this.deleter = deleter; + public AutoCreatedQueueManagerImpl(ActiveMQServer server, SimpleString queueName) { + this.server = server; this.queueName = queueName; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java index e583fc0..5782379 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java @@ -104,7 +104,7 @@ public class DivertImpl implements Divert { copy = message; } - postOffice.route(copy, null, context.getTransaction(), false); + postOffice.route(copy, context.getTransaction(), false); } @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java index 76fc69b..eb31737 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java @@ -151,9 +151,7 @@ public class PostOfficeJournalLoader implements JournalLoader { .deleteOnNoConsumers(queueBindingInfo.isDeleteOnNoConsumers()) .maxConsumers(queueBindingInfo.getMaxConsumers()); final Queue queue = queueFactory.createQueueWith(queueConfigBuilder.build()); - if (queue.isAutoCreated()) { - queue.setConsumersRefCount(new AutoCreatedQueueManagerImpl(((PostOfficeImpl) postOffice).getServer().getJMSQueueDeleter(), queueBindingInfo.getQueueName())); - } + queue.setConsumersRefCount(new AutoCreatedQueueManagerImpl(((PostOfficeImpl)postOffice).getServer(), queueBindingInfo.getQueueName())); if (queueBindingInfo.getQueueStatusEncodings() != null) { for (QueueStatusEncoding encoding : queueBindingInfo.getQueueStatusEncodings()) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index c391b90..7c614ae 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -2326,7 +2326,7 @@ public class QueueImpl implements Queue { copyMessage.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, buffer.array()); } - postOffice.route(copyMessage, null, tx, false, rejectDuplicate); + postOffice.route(copyMessage, tx, false, rejectDuplicate); acknowledge(tx, ref); } @@ -2530,7 +2530,7 @@ public class QueueImpl implements Queue { copyMessage.setAddress(address); - postOffice.route(copyMessage, null, tx, false, rejectDuplicate); + postOffice.route(copyMessage, tx, false, rejectDuplicate); acknowledge(tx, ref, reason);
