This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new 20daf23  ARTEMIS-2817 Support Stomp subscription with FQQN+multicast
     new c433f50  This closes #3202
20daf23 is described below

commit 20daf2354cb740a03104025317237bdda1bc57bd
Author: Justin Bertram <[email protected]>
AuthorDate: Wed Jun 24 10:11:49 2020 -0500

    ARTEMIS-2817 Support Stomp subscription with FQQN+multicast
---
 .../artemis/core/protocol/stomp/StompSession.java  |  4 +-
 .../tests/integration/stomp/FQQNStompTest.java     | 50 +++++++++++++++++++---
 2 files changed, 47 insertions(+), 7 deletions(-)

diff --git 
a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
 
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
index c373ff7..786a114 100644
--- 
a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
+++ 
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
@@ -45,6 +45,7 @@ import 
org.apache.activemq.artemis.core.server.impl.ServerSessionImpl;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
 import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
 import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
+import org.apache.activemq.artemis.utils.CompositeAddress;
 import org.apache.activemq.artemis.utils.ConfigurationHelper;
 import org.apache.activemq.artemis.utils.PendingTask;
 import org.apache.activemq.artemis.utils.UUIDGenerator;
@@ -259,7 +260,8 @@ public class StompSession implements SessionCallback {
 
       Set<RoutingType> routingTypes = 
manager.getServer().getAddressInfo(getCoreSession().removePrefix(address)).getRoutingTypes();
       boolean multicast = routingTypes.size() == 1 && 
routingTypes.contains(RoutingType.MULTICAST);
-      if (multicast) {
+      // if the destination is FQQN then the queue will have already been 
created
+      if (multicast && !CompositeAddress.isFullyQualified(destination)) {
          // subscribes to a topic
          if (durableSubscriptionName != null) {
             if (clientID == null) {
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/FQQNStompTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/FQQNStompTest.java
index d57b5dc..f6922e5 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/FQQNStompTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/FQQNStompTest.java
@@ -204,7 +204,7 @@ public class FQQNStompTest extends StompTestBase {
    }
 
    @Test
-   public void testSendFQQNAutoCreateOnSend() throws Exception {
+   public void testAutoCreateOnSendFQQN() throws Exception {
       final SimpleString myAddress = SimpleString.toSimpleString("myAddress");
       final SimpleString q1Name = SimpleString.toSimpleString("q1");
 
@@ -224,30 +224,68 @@ public class FQQNStompTest extends StompTestBase {
    }
 
    @Test
-   public void testSendFQQNAutoCreateOnSubscribe() throws Exception {
+   public void testAutoCreateOnSubscribeFQQNAnycast() throws Exception {
+      internalTestAutoCreateOnSubscribeFQQN(RoutingType.ANYCAST);
+   }
+
+   @Test
+   public void testAutoCreateOnSubscribeFQQNMulticast() throws Exception {
+      internalTestAutoCreateOnSubscribeFQQN(RoutingType.MULTICAST);
+   }
+
+   @Test
+   public void testAutoCreateOnSubscribeFQQNNoRoutingType() throws Exception {
+      internalTestAutoCreateOnSubscribeFQQN(null);
+   }
+
+   private void internalTestAutoCreateOnSubscribeFQQN(RoutingType routingType) 
throws Exception {
       final SimpleString myAddress = SimpleString.toSimpleString("myAddress");
       final SimpleString q1Name = SimpleString.toSimpleString("q1");
       final SimpleString q2Name = SimpleString.toSimpleString("q2");
 
       StompClientConnection consumer1Connection = 
StompClientConnectionFactory.createClientConnection(uri);
       consumer1Connection.connect(defUser, defPass);
-      subscribeQueue(consumer1Connection, "sub-01", myAddress + "\\c\\c" + 
q1Name);
+
+      ClientStompFrame frame = consumer1Connection
+         .createFrame(Stomp.Commands.SUBSCRIBE)
+         .addHeader(Stomp.Headers.Subscribe.DESTINATION, myAddress + "\\c\\c" 
+ q1Name)
+         .addHeader(Stomp.Headers.Subscribe.ID, "sub-01")
+         .addHeader(Stomp.Headers.Subscribe.ACK_MODE, 
Stomp.Headers.Subscribe.AckModeValues.AUTO);
+
+      if (routingType != null) {
+         frame.addHeader(Stomp.Headers.Subscribe.SUBSCRIPTION_TYPE, 
routingType.toString());
+      }
+
+      consumer1Connection.sendFrame(frame);
+
+      assertTrue(Wait.waitFor(() -> server.locateQueue(q1Name) != null, 2000, 
100));
 
       StompClientConnection consumer2Connection = 
StompClientConnectionFactory.createClientConnection(uri);
       consumer2Connection.connect(defUser, defPass);
-      subscribeQueue(consumer2Connection, "sub-02", myAddress + "\\c\\c" + 
q2Name);
+
+      frame = consumer2Connection
+         .createFrame(Stomp.Commands.SUBSCRIBE)
+         .addHeader(Stomp.Headers.Subscribe.DESTINATION, myAddress + "\\c\\c" 
+ q2Name)
+         .addHeader(Stomp.Headers.Subscribe.ID, "sub-02")
+         .addHeader(Stomp.Headers.Subscribe.ACK_MODE, 
Stomp.Headers.Subscribe.AckModeValues.AUTO);
+
+      if (routingType != null) {
+         frame.addHeader(Stomp.Headers.Subscribe.SUBSCRIPTION_TYPE, 
routingType.toString());
+      }
+
+      consumer2Connection.sendFrame(frame);
 
       assertTrue(Wait.waitFor(() -> server.locateQueue(q1Name) != null, 2000, 
100));
       assertTrue(Wait.waitFor(() -> server.locateQueue(q2Name) != null, 2000, 
100));
 
       StompClientConnection senderConnection = 
StompClientConnectionFactory.createClientConnection(uri);
       senderConnection.connect(defUser, defPass);
-      send(senderConnection, myAddress + "\\c\\c" + q1Name, null, "Hello 
World!", false, RoutingType.ANYCAST);
+      send(senderConnection, myAddress + "\\c\\c" + q1Name, null, "Hello 
World!", false, routingType);
 
       assertTrue(Wait.waitFor(() -> 
server.locateQueue(q1Name).getMessagesAdded() == 1, 2000, 100));
       assertTrue(Wait.waitFor(() -> 
server.locateQueue(q2Name).getMessagesAdded() == 0, 2000, 100));
 
-      ClientStompFrame frame = consumer1Connection.receiveFrame(2000);
+      frame = consumer1Connection.receiveFrame(2000);
       assertNotNull(frame);
       assertEquals("Hello World!", frame.getBody());
       assertTrue(Wait.waitFor(() -> 
server.locateQueue(q1Name).getMessageCount() == 0, 4000, 100));

Reply via email to