This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push:
new 20daf23 ARTEMIS-2817 Support Stomp subscription with FQQN+multicast
new c433f50 This closes #3202
20daf23 is described below
commit 20daf2354cb740a03104025317237bdda1bc57bd
Author: Justin Bertram <[email protected]>
AuthorDate: Wed Jun 24 10:11:49 2020 -0500
ARTEMIS-2817 Support Stomp subscription with FQQN+multicast
---
.../artemis/core/protocol/stomp/StompSession.java | 4 +-
.../tests/integration/stomp/FQQNStompTest.java | 50 +++++++++++++++++++---
2 files changed, 47 insertions(+), 7 deletions(-)
diff --git
a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
index c373ff7..786a114 100644
---
a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
+++
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
@@ -45,6 +45,7 @@ import
org.apache.activemq.artemis.core.server.impl.ServerSessionImpl;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
+import org.apache.activemq.artemis.utils.CompositeAddress;
import org.apache.activemq.artemis.utils.ConfigurationHelper;
import org.apache.activemq.artemis.utils.PendingTask;
import org.apache.activemq.artemis.utils.UUIDGenerator;
@@ -259,7 +260,8 @@ public class StompSession implements SessionCallback {
Set<RoutingType> routingTypes =
manager.getServer().getAddressInfo(getCoreSession().removePrefix(address)).getRoutingTypes();
boolean multicast = routingTypes.size() == 1 &&
routingTypes.contains(RoutingType.MULTICAST);
- if (multicast) {
+ // if the destination is FQQN then the queue will have already been
created
+ if (multicast && !CompositeAddress.isFullyQualified(destination)) {
// subscribes to a topic
if (durableSubscriptionName != null) {
if (clientID == null) {
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/FQQNStompTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/FQQNStompTest.java
index d57b5dc..f6922e5 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/FQQNStompTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/FQQNStompTest.java
@@ -204,7 +204,7 @@ public class FQQNStompTest extends StompTestBase {
}
@Test
- public void testSendFQQNAutoCreateOnSend() throws Exception {
+ public void testAutoCreateOnSendFQQN() throws Exception {
final SimpleString myAddress = SimpleString.toSimpleString("myAddress");
final SimpleString q1Name = SimpleString.toSimpleString("q1");
@@ -224,30 +224,68 @@ public class FQQNStompTest extends StompTestBase {
}
@Test
- public void testSendFQQNAutoCreateOnSubscribe() throws Exception {
+ public void testAutoCreateOnSubscribeFQQNAnycast() throws Exception {
+ internalTestAutoCreateOnSubscribeFQQN(RoutingType.ANYCAST);
+ }
+
+ @Test
+ public void testAutoCreateOnSubscribeFQQNMulticast() throws Exception {
+ internalTestAutoCreateOnSubscribeFQQN(RoutingType.MULTICAST);
+ }
+
+ @Test
+ public void testAutoCreateOnSubscribeFQQNNoRoutingType() throws Exception {
+ internalTestAutoCreateOnSubscribeFQQN(null);
+ }
+
+ private void internalTestAutoCreateOnSubscribeFQQN(RoutingType routingType)
throws Exception {
final SimpleString myAddress = SimpleString.toSimpleString("myAddress");
final SimpleString q1Name = SimpleString.toSimpleString("q1");
final SimpleString q2Name = SimpleString.toSimpleString("q2");
StompClientConnection consumer1Connection =
StompClientConnectionFactory.createClientConnection(uri);
consumer1Connection.connect(defUser, defPass);
- subscribeQueue(consumer1Connection, "sub-01", myAddress + "\\c\\c" +
q1Name);
+
+ ClientStompFrame frame = consumer1Connection
+ .createFrame(Stomp.Commands.SUBSCRIBE)
+ .addHeader(Stomp.Headers.Subscribe.DESTINATION, myAddress + "\\c\\c"
+ q1Name)
+ .addHeader(Stomp.Headers.Subscribe.ID, "sub-01")
+ .addHeader(Stomp.Headers.Subscribe.ACK_MODE,
Stomp.Headers.Subscribe.AckModeValues.AUTO);
+
+ if (routingType != null) {
+ frame.addHeader(Stomp.Headers.Subscribe.SUBSCRIPTION_TYPE,
routingType.toString());
+ }
+
+ consumer1Connection.sendFrame(frame);
+
+ assertTrue(Wait.waitFor(() -> server.locateQueue(q1Name) != null, 2000,
100));
StompClientConnection consumer2Connection =
StompClientConnectionFactory.createClientConnection(uri);
consumer2Connection.connect(defUser, defPass);
- subscribeQueue(consumer2Connection, "sub-02", myAddress + "\\c\\c" +
q2Name);
+
+ frame = consumer2Connection
+ .createFrame(Stomp.Commands.SUBSCRIBE)
+ .addHeader(Stomp.Headers.Subscribe.DESTINATION, myAddress + "\\c\\c"
+ q2Name)
+ .addHeader(Stomp.Headers.Subscribe.ID, "sub-02")
+ .addHeader(Stomp.Headers.Subscribe.ACK_MODE,
Stomp.Headers.Subscribe.AckModeValues.AUTO);
+
+ if (routingType != null) {
+ frame.addHeader(Stomp.Headers.Subscribe.SUBSCRIPTION_TYPE,
routingType.toString());
+ }
+
+ consumer2Connection.sendFrame(frame);
assertTrue(Wait.waitFor(() -> server.locateQueue(q1Name) != null, 2000,
100));
assertTrue(Wait.waitFor(() -> server.locateQueue(q2Name) != null, 2000,
100));
StompClientConnection senderConnection =
StompClientConnectionFactory.createClientConnection(uri);
senderConnection.connect(defUser, defPass);
- send(senderConnection, myAddress + "\\c\\c" + q1Name, null, "Hello
World!", false, RoutingType.ANYCAST);
+ send(senderConnection, myAddress + "\\c\\c" + q1Name, null, "Hello
World!", false, routingType);
assertTrue(Wait.waitFor(() ->
server.locateQueue(q1Name).getMessagesAdded() == 1, 2000, 100));
assertTrue(Wait.waitFor(() ->
server.locateQueue(q2Name).getMessagesAdded() == 0, 2000, 100));
- ClientStompFrame frame = consumer1Connection.receiveFrame(2000);
+ frame = consumer1Connection.receiveFrame(2000);
assertNotNull(frame);
assertEquals("Hello World!", frame.getBody());
assertTrue(Wait.waitFor(() ->
server.locateQueue(q1Name).getMessageCount() == 0, 4000, 100));