This is an automated email from the ASF dual-hosted git repository.
jbertram pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push:
new ede2051 ARTEMIS-2655 support auto-creation w/FQQN & STOMP
new b81c595 This closes #3018
ede2051 is described below
commit ede2051960ddd6bf5657076fe5889f18a5699c6c
Author: Justin Bertram <[email protected]>
AuthorDate: Thu Mar 12 14:33:29 2020 -0500
ARTEMIS-2655 support auto-creation w/FQQN & STOMP
---
.../core/protocol/stomp/StompConnection.java | 21 +++++----
.../tests/integration/stomp/FQQNStompTest.java | 53 ++++++++++++++++++++++
2 files changed, 64 insertions(+), 10 deletions(-)
diff --git
a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
index 32c09da..c050104 100644
---
a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
+++
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
@@ -54,6 +54,7 @@ import
org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
+import org.apache.activemq.artemis.utils.CompositeAddress;
import org.apache.activemq.artemis.utils.ConfigurationHelper;
import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.apache.activemq.artemis.utils.VersionLoader;
@@ -274,20 +275,20 @@ public final class StompConnection implements
RemotingConnection {
}
}
- public void autoCreateDestinationIfPossible(String queue, RoutingType
routingType) throws ActiveMQStompException {
+ public void autoCreateDestinationIfPossible(String destination, RoutingType
routingType) throws ActiveMQStompException {
try {
- ServerSession session = getSession().getCoreSession();
- SimpleString simpleQueue = SimpleString.toSimpleString(queue);
- AddressInfo addressInfo =
manager.getServer().getAddressInfo(simpleQueue);
- AddressSettings addressSettings =
manager.getServer().getAddressSettingsRepository().getMatch(queue);
+ SimpleString simpleDestination =
SimpleString.toSimpleString(destination);
+ AddressInfo addressInfo =
manager.getServer().getAddressInfo(simpleDestination);
+ AddressSettings addressSettings =
manager.getServer().getAddressSettingsRepository().getMatch(destination);
RoutingType effectiveAddressRoutingType = routingType == null ?
addressSettings.getDefaultAddressRoutingType() : routingType;
+ ServerSession session = getSession().getCoreSession();
/**
* If the address doesn't exist then it is created if possible.
* If the address does exist but doesn't support the routing-type
then the address is updated if possible.
*/
if (addressInfo == null) {
if (addressSettings.isAutoCreateAddresses()) {
- session.createAddress(simpleQueue, effectiveAddressRoutingType,
true);
+ session.createAddress(simpleDestination,
effectiveAddressRoutingType, true);
}
} else if
(!addressInfo.getRoutingTypes().contains(effectiveAddressRoutingType)) {
if (addressSettings.isAutoCreateAddresses()) {
@@ -296,13 +297,13 @@ public final class StompConnection implements
RemotingConnection {
routingTypes.add(existingRoutingType);
}
routingTypes.add(effectiveAddressRoutingType);
- manager.getServer().updateAddressInfo(simpleQueue,
routingTypes);
+ manager.getServer().updateAddressInfo(simpleDestination,
routingTypes);
}
}
- // only auto create the queue if the address is ANYCAST
- if (effectiveAddressRoutingType == RoutingType.ANYCAST &&
addressSettings.isAutoCreateQueues() &&
manager.getServer().locateQueue(simpleQueue) == null) {
- session.createQueue(new
QueueConfiguration(simpleQueue).setRoutingType(effectiveAddressRoutingType).setAutoCreated(true));
+ // auto create the queue if the address is ANYCAST or FQQN
+ if ((CompositeAddress.isFullyQualified(destination) ||
effectiveAddressRoutingType == RoutingType.ANYCAST) &&
addressSettings.isAutoCreateQueues() &&
manager.getServer().locateQueue(simpleDestination) == null) {
+ session.createQueue(new
QueueConfiguration(destination).setRoutingType(effectiveAddressRoutingType).setAutoCreated(true));
}
} catch (ActiveMQQueueExistsException e) {
// ignore
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/FQQNStompTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/FQQNStompTest.java
index 699b0b4..d57b5dc 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/FQQNStompTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/FQQNStompTest.java
@@ -203,4 +203,57 @@ public class FQQNStompTest extends StompTestBase {
assertEquals(Stomp.Responses.ERROR, frame.getCommand());
}
+ @Test
+ public void testSendFQQNAutoCreateOnSend() throws Exception {
+ final SimpleString myAddress = SimpleString.toSimpleString("myAddress");
+ final SimpleString q1Name = SimpleString.toSimpleString("q1");
+
+ conn.connect(defUser, defPass);
+ send(conn, myAddress + "\\c\\c" + q1Name, null, "Hello World!");
+
+ assertTrue(Wait.waitFor(() -> server.locateQueue(q1Name) != null, 2000,
100));
+ assertTrue(Wait.waitFor(() ->
server.locateQueue(q1Name).getMessageCount() == 1, 2000, 100));
+
+ subscribeQueue(conn, "sub-01", myAddress + "\\c\\c" + q1Name);
+ ClientStompFrame frame = conn.receiveFrame(2000);
+ assertNotNull(frame);
+ assertEquals("Hello World!", frame.getBody());
+ assertTrue(Wait.waitFor(() ->
server.locateQueue(q1Name).getMessageCount() == 0, 2000, 100));
+
+ unsubscribe(conn, "sub-01");
+ }
+
+ @Test
+ public void testSendFQQNAutoCreateOnSubscribe() throws Exception {
+ final SimpleString myAddress = SimpleString.toSimpleString("myAddress");
+ final SimpleString q1Name = SimpleString.toSimpleString("q1");
+ final SimpleString q2Name = SimpleString.toSimpleString("q2");
+
+ StompClientConnection consumer1Connection =
StompClientConnectionFactory.createClientConnection(uri);
+ consumer1Connection.connect(defUser, defPass);
+ subscribeQueue(consumer1Connection, "sub-01", myAddress + "\\c\\c" +
q1Name);
+
+ StompClientConnection consumer2Connection =
StompClientConnectionFactory.createClientConnection(uri);
+ consumer2Connection.connect(defUser, defPass);
+ subscribeQueue(consumer2Connection, "sub-02", myAddress + "\\c\\c" +
q2Name);
+
+ assertTrue(Wait.waitFor(() -> server.locateQueue(q1Name) != null, 2000,
100));
+ assertTrue(Wait.waitFor(() -> server.locateQueue(q2Name) != null, 2000,
100));
+
+ StompClientConnection senderConnection =
StompClientConnectionFactory.createClientConnection(uri);
+ senderConnection.connect(defUser, defPass);
+ send(senderConnection, myAddress + "\\c\\c" + q1Name, null, "Hello
World!", false, RoutingType.ANYCAST);
+
+ assertTrue(Wait.waitFor(() ->
server.locateQueue(q1Name).getMessagesAdded() == 1, 2000, 100));
+ assertTrue(Wait.waitFor(() ->
server.locateQueue(q2Name).getMessagesAdded() == 0, 2000, 100));
+
+ ClientStompFrame frame = consumer1Connection.receiveFrame(2000);
+ assertNotNull(frame);
+ assertEquals("Hello World!", frame.getBody());
+ assertTrue(Wait.waitFor(() ->
server.locateQueue(q1Name).getMessageCount() == 0, 4000, 100));
+
+ unsubscribe(consumer1Connection, "sub-01");
+ unsubscribe(consumer2Connection, "sub-02");
+ }
+
}