Check routing semantics for STOMP senders/subscribers
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/c756499c Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/c756499c Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/c756499c Branch: refs/heads/ARTEMIS-780 Commit: c756499ccb0b18940c92ab7d15cc1a85e81dd932 Parents: 3020837 Author: jbertram <[email protected]> Authored: Tue Nov 15 10:27:55 2016 -0600 Committer: jbertram <[email protected]> Committed: Tue Nov 15 10:38:13 2016 -0600 ---------------------------------------------------------------------- .../protocol/stomp/ActiveMQStompException.java | 4 +-- .../ActiveMQStompProtocolMessageBundle.java | 7 ++-- .../core/protocol/stomp/StompConnection.java | 20 ++++++++++-- .../stomp/VersionedStompFrameHandler.java | 6 +++- .../tests/integration/stomp/StompTest.java | 34 ++++++++++++++++++++ .../tests/integration/stomp/StompTestBase.java | 3 +- .../integration/stomp/v11/StompV11Test.java | 18 ----------- .../integration/stomp/v12/StompV12Test.java | 15 --------- 8 files changed, 65 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c756499c/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/ActiveMQStompException.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/ActiveMQStompException.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/ActiveMQStompException.java index 118f7f8..69fa130 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/ActiveMQStompException.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/ActiveMQStompException.java @@ -40,12 +40,12 @@ public class ActiveMQStompException extends Exception { } public ActiveMQStompException(String msg) { - super(msg); + super(msg.replace(":", "")); handler = null; } public ActiveMQStompException(String msg, Throwable t) { - super(msg, t); + super(msg.replace(":", ""), t); this.body = t.getMessage(); handler = null; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c756499c/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/ActiveMQStompProtocolMessageBundle.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/ActiveMQStompProtocolMessageBundle.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/ActiveMQStompProtocolMessageBundle.java index e535725..c1f93e4 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/ActiveMQStompProtocolMessageBundle.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/ActiveMQStompProtocolMessageBundle.java @@ -86,10 +86,10 @@ public interface ActiveMQStompProtocolMessageBundle { ActiveMQStompException noDestination(); @Message(id = 339016, value = "Error creating subscription {0}", format = Message.Format.MESSAGE_FORMAT) - ActiveMQStompException errorCreatSubscription(String subscriptionID, @Cause Exception e); + ActiveMQStompException errorCreatingSubscription(String subscriptionID, @Cause Exception e); @Message(id = 339017, value = "Error unsubscribing {0}", format = Message.Format.MESSAGE_FORMAT) - ActiveMQStompException errorUnsubscrib(String subscriptionID, @Cause Exception e); + ActiveMQStompException errorUnsubscribing(String subscriptionID, @Cause Exception e); @Message(id = 339018, value = "Error acknowledging message {0}", format = Message.Format.MESSAGE_FORMAT) ActiveMQStompException errorAck(String messageID, @Cause Exception e); @@ -150,4 +150,7 @@ public interface ActiveMQStompProtocolMessageBundle { @Message(id = 339039, value = "No id header in ACK/NACK frame.") ActiveMQStompException noIDInAck(); + + @Message(id = 339040, value = "Not allowed to specify {0} semantics on {1} address.", format = Message.Format.MESSAGE_FORMAT) + ActiveMQStompException illegalSemantics(String requested, String exists); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c756499c/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java index a6eab6b..eaeb21d 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java @@ -256,7 +256,9 @@ public final class StompConnection implements RemotingConnection { } } - public void autoCreateDestinationIfPossible(String queue, AddressInfo.RoutingType routingType) throws ActiveMQStompException { + public boolean autoCreateDestinationIfPossible(String queue, AddressInfo.RoutingType routingType) throws ActiveMQStompException { + boolean result = false; + try { if (manager.getServer().getAddressInfo(SimpleString.toSimpleString(queue)) == null) { // TODO check here to see if auto-creation is enabled @@ -266,12 +268,22 @@ public final class StompConnection implements RemotingConnection { manager.getServer().createOrUpdateAddressInfo(new AddressInfo(SimpleString.toSimpleString(queue)).setRoutingType(AddressInfo.RoutingType.ANYCAST).setAutoCreated(true)); manager.getServer().createQueue(SimpleString.toSimpleString(queue), SimpleString.toSimpleString(queue), null, null, true, false, true); } + result = true; } } catch (ActiveMQQueueExistsException e) { // ignore } catch (Exception e) { throw new ActiveMQStompException(e.getMessage(), e).setHandler(frameHandler); } + + return result; + } + + public void checkRoutingSemantics(String destination, AddressInfo.RoutingType routingType) throws ActiveMQStompException { + AddressInfo.RoutingType actualRoutingTypeOfAddress = manager.getServer().getAddressInfo(SimpleString.toSimpleString(destination)).getRoutingType(); + if (routingType != null && !routingType.equals(actualRoutingTypeOfAddress)) { + throw BUNDLE.illegalSemantics(routingType.toString(), actualRoutingTypeOfAddress.toString()); + } } @Override @@ -629,6 +641,8 @@ public final class StompConnection implements RemotingConnection { boolean noLocal, AddressInfo.RoutingType subscriptionType) throws ActiveMQStompException { autoCreateDestinationIfPossible(destination, subscriptionType); + checkDestination(destination); + checkRoutingSemantics(destination, subscriptionType); if (noLocal) { String noLocalFilter = CONNECTION_ID_PROP + " <> '" + getID().toString() + "'"; if (selector == null) { @@ -657,7 +671,7 @@ public final class StompConnection implements RemotingConnection { } catch (ActiveMQStompException e) { throw e; } catch (Exception e) { - throw BUNDLE.errorCreatSubscription(subscriptionID, e).setHandler(frameHandler); + throw BUNDLE.errorCreatingSubscription(subscriptionID, e).setHandler(frameHandler); } } @@ -667,7 +681,7 @@ public final class StompConnection implements RemotingConnection { } catch (ActiveMQStompException e) { throw e; } catch (Exception e) { - throw BUNDLE.errorUnsubscrib(subscriptionID, e).setHandler(frameHandler); + throw BUNDLE.errorUnsubscribing(subscriptionID, e).setHandler(frameHandler); } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c756499c/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java index 06af785..580bade 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java @@ -172,6 +172,7 @@ public abstract class VersionedStompFrameHandler { AddressInfo.RoutingType routingType = getRoutingType(frame.getHeader(Headers.Send.DESTINATION_TYPE), frame.getHeader(Headers.Send.DESTINATION)); connection.autoCreateDestinationIfPossible(destination, routingType); connection.checkDestination(destination); + connection.checkRoutingSemantics(destination, routingType); String txID = frame.getHeader(Stomp.Headers.TRANSACTION); long timestamp = System.currentTimeMillis(); @@ -344,12 +345,15 @@ public abstract class VersionedStompFrameHandler { } private AddressInfo.RoutingType getRoutingType(String typeHeader, String destination) { - AddressInfo.RoutingType routingType = AddressInfo.RoutingType.ANYCAST; // default + // null is valid to return here so we know when the user didn't provide any routing info + AddressInfo.RoutingType routingType = null; if (typeHeader != null) { routingType = AddressInfo.RoutingType.valueOf(typeHeader); } else if (destination != null && !connection.getAnycastPrefix().equals(connection.getMulticastPrefix())) { if (connection.getMulticastPrefix().length() > 0 && destination.startsWith(connection.getMulticastPrefix())) { routingType = AddressInfo.RoutingType.MULTICAST; + } else if (connection.getAnycastPrefix().length() > 0 && destination.startsWith(connection.getAnycastPrefix())) { + routingType = AddressInfo.RoutingType.ANYCAST; } } return routingType; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c756499c/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java index 19e9ebe..e7dcc91 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java @@ -1372,4 +1372,38 @@ public class StompTest extends StompTestBase { conn.disconnect(); } + + @Test + public void testMulticastOperationsOnAnycastAddress() throws Exception { + testRoutingSemantics(AddressInfo.RoutingType.MULTICAST.toString(), getQueuePrefix() + getQueueName()); + } + + @Test + public void testAnycastOperationsOnMulticastAddress() throws Exception { + testRoutingSemantics(AddressInfo.RoutingType.ANYCAST.toString(), getTopicPrefix() + getTopicName()); + } + + public void testRoutingSemantics(String routingType, String destination) throws Exception { + conn.connect(defUser, defPass); + + String uuid = UUID.randomUUID().toString(); + + ClientStompFrame frame = conn.createFrame(Stomp.Commands.SUBSCRIBE) + .addHeader(Stomp.Headers.Subscribe.SUBSCRIPTION_TYPE, routingType) + .addHeader(Stomp.Headers.Subscribe.DESTINATION, destination) + .addHeader(Stomp.Headers.RECEIPT_REQUESTED, uuid); + + frame = conn.sendFrame(frame); + assertEquals(Stomp.Responses.ERROR, frame.getCommand()); + + uuid = UUID.randomUUID().toString(); + + frame = conn.createFrame(Stomp.Commands.SEND) + .addHeader(Stomp.Headers.Send.DESTINATION_TYPE, AddressInfo.RoutingType.MULTICAST.toString()) + .addHeader(Stomp.Headers.Send.DESTINATION, getQueuePrefix() + getQueueName()) + .addHeader(Stomp.Headers.RECEIPT_REQUESTED, uuid); + + frame = conn.sendFrame(frame); + assertEquals(Stomp.Responses.ERROR, frame.getCommand()); + } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c756499c/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java index 278d80e..bcac436 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java @@ -218,7 +218,6 @@ public abstract class StompTestBase extends ActiveMQTestBase { MessageProducer producer = session.createProducer(destination); TextMessage message = session.createTextMessage(msg); producer.send(message); - IntegrationTestLogger.LOGGER.info("Sent message from JMS client to: " + destination); } public void sendJmsMessage(byte[] data, Destination destination) throws Exception { @@ -526,6 +525,8 @@ public abstract class StompTestBase extends ActiveMQTestBase { assertEquals(uuid, frame.getHeader(Stomp.Headers.Response.RECEIPT_ID)); } + IntegrationTestLogger.LOGGER.info("Received: " + frame); + return frame; } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c756499c/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11Test.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11Test.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11Test.java index 7cb02a3..6eb57b2 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11Test.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11Test.java @@ -2128,24 +2128,6 @@ public class StompV11Test extends StompTestBase { } @Test - public void testSendMessageToNonExistentQueueWithoutAutoCreation() throws Exception { - AddressSettings addressSettings = new AddressSettings(); - addressSettings.setAutoCreateJmsQueues(false); - server.getActiveMQServer().getAddressSettingsRepository().addMatch("#", addressSettings); - conn.connect(defUser, defPass); - - String uuid = UUID.randomUUID().toString(); - - ClientStompFrame frame = send(conn, "NonExistentQueue" + uuid, null, "Hello World", true, AddressInfo.RoutingType.ANYCAST); - - // TODO fix this test by checking auto-create settings - assertTrue(frame.getCommand().equals(Stomp.Responses.ERROR)); - IntegrationTestLogger.LOGGER.info("message: " + frame.getHeader("message")); - - conn.disconnect(); - } - - @Test public void testSendMessageToNonExistentQueueWithAutoCreation() throws Exception { conn.connect(defUser, defPass); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c756499c/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v12/StompV12Test.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v12/StompV12Test.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v12/StompV12Test.java index 0a52714..dc8cea0 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v12/StompV12Test.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v12/StompV12Test.java @@ -2167,21 +2167,6 @@ public class StompV12Test extends StompTestBase { } @Test - public void testSendMessageToNonExistentQueueWithoutAutoCreation() throws Exception { - AddressSettings addressSettings = new AddressSettings(); - addressSettings.setAutoCreateJmsQueues(false); - server.getActiveMQServer().getAddressSettingsRepository().addMatch("#", addressSettings); - conn.connect(defUser, defPass); - - ClientStompFrame frame = send(conn, "NonExistentQueue" + UUID.randomUUID().toString(), null, "Hello World", true, AddressInfo.RoutingType.ANYCAST); - - // TODO this is broken because queue auto-creation is always on - assertTrue(frame.getCommand().equals(Stomp.Responses.ERROR)); - - waitDisconnect(conn); - } - - @Test public void testSendMessageToNonExistentQueueWithAutoCreation() throws Exception { conn.connect(defUser, defPass);
