ACTIVEMQ6-76 auto queue creation on STOMP send/sub Implements a new feature for the broker whereby it may automatically create and delete queues which are not explicitly defined through the management API or file-based configuration when a client sends a message to or consumes from a queue via the STOMP protocol. Note, the destination has to be named like "jms.queue.*" to be auto- created. The queue may subsequently be deleted when it no longer has any messages and consumers. Auto-creation and auto-deletion can both be turned on/off via address-setting.
Project: http://git-wip-us.apache.org/repos/asf/activemq-6/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-6/commit/13104422 Tree: http://git-wip-us.apache.org/repos/asf/activemq-6/tree/13104422 Diff: http://git-wip-us.apache.org/repos/asf/activemq-6/diff/13104422 Branch: refs/heads/master Commit: 13104422442c181f87325a8844cf646462faf242 Parents: 593ea2d Author: jbertram <[email protected]> Authored: Tue Feb 3 12:21:09 2015 -0600 Committer: jbertram <[email protected]> Committed: Mon Feb 9 09:03:58 2015 -0600 ---------------------------------------------------------------------- .../stomp/ActiveMQStompProtocolLogger.java | 3 +- .../core/protocol/stomp/StompConnection.java | 29 ++++++ .../protocol/stomp/StompProtocolManager.java | 5 ++ .../activemq/core/server/ActiveMQServer.java | 2 +- .../core/server/impl/ActiveMQServerImpl.java | 2 +- .../impl/AutoCreatedQueueManagerImpl.java | 2 +- .../tests/integration/stomp/StompTest.java | 95 ++++++++++++++++++++ .../tests/integration/stomp/StompTestBase.java | 4 +- .../integration/stomp/v11/StompV11Test.java | 26 +++++- .../integration/stomp/v12/StompV12Test.java | 30 ++++++- 10 files changed, 190 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-6/blob/13104422/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/ActiveMQStompProtocolLogger.java ---------------------------------------------------------------------- diff --git a/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/ActiveMQStompProtocolLogger.java b/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/ActiveMQStompProtocolLogger.java index e66d1ed..236b38c 100644 --- a/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/ActiveMQStompProtocolLogger.java +++ b/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/ActiveMQStompProtocolLogger.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.core.protocol.stomp; +import org.jboss.logging.BasicLogger; import org.jboss.logging.Logger; import org.jboss.logging.annotations.Cause; import org.jboss.logging.annotations.LogMessage; @@ -41,7 +42,7 @@ import org.jboss.logging.annotations.MessageLogger; */ @MessageLogger(projectCode = "AMQ") -public interface ActiveMQStompProtocolLogger +public interface ActiveMQStompProtocolLogger extends BasicLogger { /** * The default logger. http://git-wip-us.apache.org/repos/asf/activemq-6/blob/13104422/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompConnection.java ---------------------------------------------------------------------- diff --git a/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompConnection.java b/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompConnection.java index 23d399d..16cf55e 100644 --- a/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompConnection.java +++ b/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompConnection.java @@ -27,7 +27,9 @@ import java.util.concurrent.CopyOnWriteArrayList; import org.apache.activemq.api.core.ActiveMQBuffer; import org.apache.activemq.api.core.ActiveMQBuffers; import org.apache.activemq.api.core.ActiveMQException; +import org.apache.activemq.api.core.SimpleString; import org.apache.activemq.api.core.client.ActiveMQClient; +import org.apache.activemq.api.core.management.ResourceNames; import org.apache.activemq.core.protocol.stomp.v10.StompFrameHandlerV10; import org.apache.activemq.core.protocol.stomp.v12.StompFrameHandlerV12; import org.apache.activemq.core.remoting.CloseListener; @@ -245,12 +247,38 @@ public final class StompConnection implements RemotingConnection public void checkDestination(String destination) throws ActiveMQStompException { + if (autoCreateQueueIfPossible(destination)) + { + return; + } + if (!manager.destinationExists(destination)) { throw BUNDLE.destinationNotExist(destination); } } + public boolean autoCreateQueueIfPossible(String queue) throws ActiveMQStompException + { + boolean autoCreated = false; + + if (queue.startsWith(ResourceNames.JMS_QUEUE) && manager.getServer().getAddressSettingsRepository().getMatch(queue).isAutoCreateJmsQueues() && manager.getServer().locateQueue(new SimpleString(queue)) == null) + { + SimpleString queueName = new SimpleString(queue); + try + { + manager.getServer().createQueue(queueName, queueName, null, true, false, true); + } + catch (Exception e) + { + throw new ActiveMQStompException(e.getMessage(), e); + } + autoCreated = true; + } + + return autoCreated; + } + @Override public ActiveMQBuffer createBuffer(int size) { @@ -689,6 +717,7 @@ public final class StompConnection implements RemotingConnection void subscribe(String destination, String selector, String ack, String id, String durableSubscriptionName, boolean noLocal) throws ActiveMQStompException { + autoCreateQueueIfPossible(destination); if (noLocal) { String noLocalFilter = CONNECTION_ID_PROP + " <> '" + getID().toString() + "'"; http://git-wip-us.apache.org/repos/asf/activemq-6/blob/13104422/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompProtocolManager.java ---------------------------------------------------------------------- diff --git a/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompProtocolManager.java b/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompProtocolManager.java index e73e9e4..6ce03db 100644 --- a/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompProtocolManager.java +++ b/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompProtocolManager.java @@ -500,4 +500,9 @@ class StompProtocolManager implements ProtocolManager, NotificationListener break; } } + + public ActiveMQServer getServer() + { + return server; + } } http://git-wip-us.apache.org/repos/asf/activemq-6/blob/13104422/activemq-server/src/main/java/org/apache/activemq/core/server/ActiveMQServer.java ---------------------------------------------------------------------- diff --git a/activemq-server/src/main/java/org/apache/activemq/core/server/ActiveMQServer.java b/activemq-server/src/main/java/org/apache/activemq/core/server/ActiveMQServer.java index 2054be1..2197690 100644 --- a/activemq-server/src/main/java/org/apache/activemq/core/server/ActiveMQServer.java +++ b/activemq-server/src/main/java/org/apache/activemq/core/server/ActiveMQServer.java @@ -190,7 +190,7 @@ public interface ActiveMQServer extends ActiveMQComponent boolean durable, boolean temporary) throws Exception; - Queue locateQueue(SimpleString queueName) throws Exception; + Queue locateQueue(SimpleString queueName); void destroyQueue(SimpleString queueName) throws Exception; http://git-wip-us.apache.org/repos/asf/activemq-6/blob/13104422/activemq-server/src/main/java/org/apache/activemq/core/server/impl/ActiveMQServerImpl.java ---------------------------------------------------------------------- diff --git a/activemq-server/src/main/java/org/apache/activemq/core/server/impl/ActiveMQServerImpl.java b/activemq-server/src/main/java/org/apache/activemq/core/server/impl/ActiveMQServerImpl.java index cb91351..469ece2 100644 --- a/activemq-server/src/main/java/org/apache/activemq/core/server/impl/ActiveMQServerImpl.java +++ b/activemq-server/src/main/java/org/apache/activemq/core/server/impl/ActiveMQServerImpl.java @@ -1234,7 +1234,7 @@ public class ActiveMQServerImpl implements ActiveMQServer } - public Queue locateQueue(SimpleString queueName) throws Exception + public Queue locateQueue(SimpleString queueName) { Binding binding = postOffice.getBinding(queueName); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/13104422/activemq-server/src/main/java/org/apache/activemq/core/server/impl/AutoCreatedQueueManagerImpl.java ---------------------------------------------------------------------- diff --git a/activemq-server/src/main/java/org/apache/activemq/core/server/impl/AutoCreatedQueueManagerImpl.java b/activemq-server/src/main/java/org/apache/activemq/core/server/impl/AutoCreatedQueueManagerImpl.java index 5bcd2c6..05cc9cf 100644 --- a/activemq-server/src/main/java/org/apache/activemq/core/server/impl/AutoCreatedQueueManagerImpl.java +++ b/activemq-server/src/main/java/org/apache/activemq/core/server/impl/AutoCreatedQueueManagerImpl.java @@ -24,7 +24,7 @@ import org.apache.activemq.core.server.Queue; import org.apache.activemq.utils.ReferenceCounterUtil; /** - * @author Clebert Suconic + * @author Justin Bertram */ public class AutoCreatedQueueManagerImpl implements AutoCreatedQueueManager http://git-wip-us.apache.org/repos/asf/activemq-6/blob/13104422/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/StompTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/StompTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/StompTest.java index 072b2e7..b32e1bf 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/StompTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/StompTest.java @@ -30,8 +30,12 @@ import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; import java.util.regex.Pattern; +import org.apache.activemq.api.core.SimpleString; +import org.apache.activemq.api.core.management.ResourceNames; +import org.apache.activemq.api.jms.ActiveMQJMSClient; import org.apache.activemq.core.protocol.stomp.Stomp; import org.apache.activemq.tests.integration.IntegrationTestLogger; +import org.apache.activemq.tests.util.RandomUtil; import org.junit.Assert; import org.junit.Test; @@ -147,6 +151,40 @@ public class StompTest extends StompTestBase Assert.assertTrue(Math.abs(tnow - tmsg) < 1000); } + @Test + public void testSendMessageToNonExistentQueue() throws Exception + { + String nonExistantQueue = RandomUtil.randomString(); + String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL; + sendFrame(frame); + + frame = receiveFrame(10000); + Assert.assertTrue(frame.startsWith("CONNECTED")); + + frame = "SEND\n" + "destination:" + getQueuePrefix() + nonExistantQueue + "\n\n" + "Hello World" + Stomp.NULL; + + sendFrame(frame); + receiveFrame(1000); + + MessageConsumer consumer = session.createConsumer(ActiveMQJMSClient.createQueue(nonExistantQueue)); + TextMessage message = (TextMessage) consumer.receive(1000); + Assert.assertNotNull(message); + Assert.assertEquals("Hello World", message.getText()); + // Assert default priority 4 is used when priority header is not set + Assert.assertEquals("getJMSPriority", 4, message.getJMSPriority()); + + // Make sure that the timestamp is valid - should + // be very close to the current time. + long tnow = System.currentTimeMillis(); + long tmsg = message.getJMSTimestamp(); + Assert.assertTrue(Math.abs(tnow - tmsg) < 1500); + + // closing the consumer here should trigger auto-deletion + assertNotNull(server.getActiveMQServer().getPostOffice().getBinding(new SimpleString(ResourceNames.JMS_QUEUE + nonExistantQueue))); + consumer.close(); + assertNull(server.getActiveMQServer().getPostOffice().getBinding(new SimpleString(ResourceNames.JMS_QUEUE + nonExistantQueue))); + } + /* * Some STOMP clients erroneously put a new line \n *after* the terminating NUL char at the end of the frame * This means next frame read might have a \n a the beginning. @@ -1095,6 +1133,63 @@ public class StompTest extends StompTestBase } @Test + public void testSubscribeToNonExistantQueue() throws Exception + { + String nonExistantQueue = RandomUtil.randomString(); + + String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL; + sendFrame(frame); + + frame = receiveFrame(100000); + Assert.assertTrue(frame.startsWith("CONNECTED")); + + frame = "SUBSCRIBE\n" + "destination:" + + getQueuePrefix() + + nonExistantQueue + + "\n" + + "receipt: 12\n" + + "\n\n" + + Stomp.NULL; + sendFrame(frame); + // wait for SUBSCRIBE's receipt + frame = receiveFrame(10000); + Assert.assertTrue(frame.startsWith("RECEIPT")); + + sendMessage(getName(), ActiveMQJMSClient.createQueue(nonExistantQueue)); + + frame = receiveFrame(10000); + Assert.assertTrue(frame.startsWith("MESSAGE")); + Assert.assertTrue(frame.indexOf("destination:") > 0); + Assert.assertTrue(frame.indexOf(getName()) > 0); + + assertNotNull(server.getActiveMQServer().getPostOffice().getBinding(new SimpleString(ResourceNames.JMS_QUEUE + nonExistantQueue))); + + frame = "UNSUBSCRIBE\n" + "destination:" + + getQueuePrefix() + + nonExistantQueue + + "\n" + + "receipt: 1234\n" + + "\n\n" + + Stomp.NULL; + sendFrame(frame); + // wait for UNSUBSCRIBE's receipt + frame = receiveFrame(10000); + Assert.assertTrue(frame.startsWith("RECEIPT")); + + assertNull(server.getActiveMQServer().getPostOffice().getBinding(new SimpleString(ResourceNames.JMS_QUEUE + nonExistantQueue))); + + sendMessage(getName(), ActiveMQJMSClient.createQueue(nonExistantQueue)); + + frame = receiveFrame(1000); + log.info("Received frame: " + frame); + Assert.assertNull("No message should have been received since subscription was removed", frame); + + + frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; + sendFrame(frame); + } + + @Test public void testDurableSubscriberWithReconnection() throws Exception { http://git-wip-us.apache.org/repos/asf/activemq-6/blob/13104422/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/StompTestBase.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/StompTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/StompTestBase.java index 3aec083..e7c7819 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/StompTestBase.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/StompTestBase.java @@ -114,6 +114,7 @@ public abstract class StompTestBase extends UnitTestCase if (autoCreateServer) { server = createServer(); + addServer(server.getActiveMQServer()); server.start(); connectionFactory = createConnectionFactory(); createBootstrap(); @@ -231,9 +232,8 @@ public abstract class StompTestBase extends UnitTestCase if (group != null) { channel.close(); - group.shutdown(); + group.shutdownGracefully(0, 5000, TimeUnit.MILLISECONDS); } - server.stop(); } super.tearDown(); } http://git-wip-us.apache.org/repos/asf/activemq-6/blob/13104422/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/v11/StompV11Test.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/v11/StompV11Test.java b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/v11/StompV11Test.java index 0984353..5117952 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/v11/StompV11Test.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/v11/StompV11Test.java @@ -30,6 +30,7 @@ import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import org.apache.activemq.core.settings.impl.AddressSettings; import org.apache.activemq.tests.integration.IntegrationTestLogger; import org.apache.activemq.tests.integration.stomp.util.ClientStompFrame; import org.apache.activemq.tests.integration.stomp.util.StompClientConnection; @@ -2354,8 +2355,11 @@ public class StompV11Test extends StompV11TestBase } @Test - public void testSendMessageToNonExistentJmsQueue() throws Exception + public void testSendMessageToNonExistentJmsQueueWithoutAutoCreation() throws Exception { + AddressSettings addressSettings = new AddressSettings(); + addressSettings.setAutoCreateJmsQueues(false); + server.getActiveMQServer().getAddressSettingsRepository().addMatch("#", addressSettings); connV11.connect(defUser, defPass); ClientStompFrame frame = connV11.createFrame("SEND"); @@ -2374,6 +2378,26 @@ public class StompV11Test extends StompV11TestBase } @Test + public void testSendMessageToNonExistentJmsQueueWithAutoCreation() throws Exception + { + connV11.connect(defUser, defPass); + + ClientStompFrame frame = connV11.createFrame("SEND"); + String guid = UUID.randomUUID().toString(); + frame.addHeader("destination", "jms.queue.NonExistentQueue" + guid); + frame.addHeader("receipt", "1234"); + frame.setBody("Hello World"); + + frame = connV11.sendFrame(frame); + + assertTrue(frame.getCommand().equals("RECEIPT")); + assertEquals("1234", frame.getHeader("receipt-id")); + System.out.println("message: " + frame.getHeader("message")); + + connV11.disconnect(); + } + + @Test public void testSendAndReceiveWithEscapedCharactersInSenderId() throws Exception { connV11.connect(defUser, defPass); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/13104422/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/v12/StompV12Test.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/v12/StompV12Test.java b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/v12/StompV12Test.java index 3b6e471..7a308af 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/v12/StompV12Test.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/v12/StompV12Test.java @@ -31,6 +31,7 @@ import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import org.apache.activemq.core.settings.impl.AddressSettings; import org.apache.activemq.tests.integration.IntegrationTestLogger; import org.apache.activemq.tests.integration.stomp.util.ClientStompFrame; import org.apache.activemq.tests.integration.stomp.util.StompClientConnection; @@ -400,6 +401,10 @@ public class StompV12Test extends StompV11TestBase @Test public void testHeaderRepetitive() throws Exception { + AddressSettings addressSettings = new AddressSettings(); + addressSettings.setAutoCreateJmsQueues(false); + server.getActiveMQServer().getAddressSettingsRepository().addMatch("#", addressSettings); + connV12.connect(defUser, defPass); ClientStompFrame frame = connV12.createFrame("SEND"); @@ -2617,8 +2622,11 @@ public class StompV12Test extends StompV11TestBase } @Test - public void testSendMessageToNonExistentJmsQueue() throws Exception + public void testSendMessageToNonExistentJmsQueueWithoutAutoCreation() throws Exception { + AddressSettings addressSettings = new AddressSettings(); + addressSettings.setAutoCreateJmsQueues(false); + server.getActiveMQServer().getAddressSettingsRepository().addMatch("#", addressSettings); connV12.connect(defUser, defPass); ClientStompFrame frame = connV12.createFrame("SEND"); @@ -2637,6 +2645,26 @@ public class StompV12Test extends StompV11TestBase } @Test + public void testSendMessageToNonExistentJmsQueueWithAutoCreation() throws Exception + { + connV12.connect(defUser, defPass); + + ClientStompFrame frame = connV12.createFrame("SEND"); + String guid = UUID.randomUUID().toString(); + frame.addHeader("destination", "jms.queue.NonExistentQueue" + guid); + frame.addHeader("receipt", "1234"); + frame.setBody("Hello World"); + + frame = connV12.sendFrame(frame); + + assertTrue(frame.getCommand().equals("RECEIPT")); + assertEquals("1234", frame.getHeader("receipt-id")); + System.out.println("message: " + frame.getHeader("message")); + + connV12.disconnect(); + } + + @Test public void testInvalidStompCommand() throws Exception { try
