This is an automated email from the ASF dual-hosted git repository.
robbie pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 09b8f67603 ARTEMIS-5184 STOMP noLocal is scoped to session not
subscription
09b8f67603 is described below
commit 09b8f67603a4de39fedfebb7b3ce19f6d091a4d5
Author: Justin Bertram <[email protected]>
AuthorDate: Tue Dec 17 23:28:24 2024 -0600
ARTEMIS-5184 STOMP noLocal is scoped to session not subscription
This closes #5414
---
.../core/protocol/stomp/StompConnection.java | 3 +-
.../core/protocol/stomp/StompProtocolManager.java | 8 ++-
.../artemis/core/protocol/stomp/StompSession.java | 23 +++++--
.../core/protocol/stomp/StompSubscription.java | 12 +++-
.../artemis/tests/integration/stomp/StompTest.java | 71 ++++++++++++++++++++--
.../tests/integration/stomp/StompTestBase.java | 9 +++
6 files changed, 110 insertions(+), 16 deletions(-)
diff --git
a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
index a9a474f8c7..95db593b64 100644
---
a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
+++
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
@@ -498,7 +498,8 @@ public final class StompConnection extends
AbstractRemotingConnection {
try {
StompSession stompSession = getSession(txID);
- if (stompSession.isNoLocal()) {
+ // only set the connection ID property if we have a noLocal
subscription
+ if (stompSession.getNoLocalSubscriptionCount() > 0) {
message.putStringProperty(CONNECTION_ID_PROPERTY_NAME_STRING,
getID().toString());
}
if (isEnableMessageID()) {
diff --git
a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
index 26917ac25f..bea06cd4bf 100644
---
a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
+++
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.protocol.stomp;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
+import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -362,13 +363,12 @@ public class StompProtocolManager extends
AbstractProtocolManager<StompFrame, St
boolean noLocal,
Integer consumerWindowSize)
throws Exception {
StompSession stompSession = getSession(connection);
- stompSession.setNoLocal(noLocal);
if (stompSession.containsSubscription(subscriptionID)) {
throw new ActiveMQStompException(connection, "There already is a
subscription for: " + subscriptionID +
". Either use unique subscription IDs or do not create multiple
subscriptions for the same destination");
}
long consumerID = server.getStorageManager().generateID();
- return stompSession.addSubscription(consumerID, subscriptionID,
connection.getClientID(), durableSubscriptionName, destination, selector, ack,
consumerWindowSize);
+ return stompSession.addSubscription(consumerID, subscriptionID,
connection.getClientID(), durableSubscriptionName, destination, selector, ack,
noLocal, consumerWindowSize);
}
public void unsubscribe(StompConnection connection,
@@ -407,4 +407,8 @@ public class StompProtocolManager extends
AbstractProtocolManager<StompFrame, St
public ActiveMQServer getServer() {
return server;
}
+
+ public Collection<StompSession> getSessions() {
+ return sessions.values();
+ }
}
diff --git
a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
index b0f75e08e7..dd85ac6bd2 100644
---
a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
+++
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
@@ -25,6 +25,7 @@ import java.util.Set;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicInteger;
import io.netty.channel.EventLoop;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
@@ -82,7 +83,7 @@ public class StompSession implements SessionCallback {
// key = consumer ID and message ID, value = frame length
private final Map<Pair<Long, Long>, Integer> messagesToAck = new
ConcurrentHashMap<>();
- private volatile boolean noLocal = false;
+ private AtomicInteger noLocalSubscriptionCount = new AtomicInteger(0);
private boolean txPending = false;
@@ -231,6 +232,9 @@ public class StompSession implements SessionCallback {
public void disconnect(ServerConsumer consumerId, String errorDescription) {
StompSubscription stompSubscription =
subscriptions.remove(consumerId.getID());
if (stompSubscription != null) {
+ if (stompSubscription.isNoLocal()) {
+ noLocalSubscriptionCount.decrementAndGet();
+ }
StompFrame frame =
connection.getFrameHandler().createStompFrame(Stomp.Responses.ERROR);
frame.addHeader(Stomp.Headers.CONTENT_TYPE, "text/plain");
frame.setBody("consumer with ID " + consumerId + " disconnected by
server");
@@ -306,6 +310,7 @@ public class StompSession implements SessionCallback {
String destination,
String selector,
String ack,
+ boolean noLocal,
Integer consumerWindowSize)
throws Exception {
SimpleString address = SimpleString.of(destination);
SimpleString queueName = SimpleString.of(destination);
@@ -342,8 +347,11 @@ public class StompSession implements SessionCallback {
session.createQueue(QueueConfiguration.of(queueName).setAddress(address).setFilterString(selectorSimple).setDurable(false).setTemporary(true));
}
}
+ if (noLocal) {
+ noLocalSubscriptionCount.incrementAndGet();
+ }
final ServerConsumer consumer = session.createConsumer(consumerID,
queueName, multicast ? null : selectorSimple, false, false, 0);
- StompSubscription subscription = new StompSubscription(subscriptionID,
ack, queueName, multicast, finalConsumerWindowSize);
+ StompSubscription subscription = new StompSubscription(subscriptionID,
ack, queueName, multicast, noLocal, finalConsumerWindowSize);
subscriptions.put(consumerID, subscription);
session.start();
/*
@@ -363,6 +371,9 @@ public class StompSession implements SessionCallback {
StompSubscription sub = entry.getValue();
if (id != null && id.equals(sub.getID())) {
iterator.remove();
+ if (sub.isNoLocal()) {
+ noLocalSubscriptionCount.decrementAndGet();
+ }
SimpleString queueName = sub.getQueueName();
session.closeConsumer(consumerID);
Queue queue = manager.getServer().locateQueue(queueName);
@@ -402,12 +413,12 @@ public class StompSession implements SessionCallback {
return sessionContext;
}
- public boolean isNoLocal() {
- return noLocal;
+ public int getNoLocalSubscriptionCount() {
+ return noLocalSubscriptionCount.get();
}
- public void setNoLocal(boolean noLocal) {
- this.noLocal = noLocal;
+ public int getSubscriptionCount() {
+ return subscriptions.size();
}
public void sendInternal(Message message, boolean direct) throws Exception {
diff --git
a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSubscription.java
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSubscription.java
index 394a15c119..02127c55bf 100644
---
a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSubscription.java
+++
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSubscription.java
@@ -29,13 +29,16 @@ public class StompSubscription {
// whether or not this subscription follows multicast semantics (e.g. for a
JMS topic)
private final boolean multicast;
+ private final boolean noLocal;
+
private final int consumerWindowSize;
- public StompSubscription(String subID, String ack, SimpleString queueName,
boolean multicast, int consumerWindowSize) {
+ public StompSubscription(String subID, String ack, SimpleString queueName,
boolean multicast, boolean noLocal, int consumerWindowSize) {
this.subID = subID;
this.ack = ack;
this.queueName = queueName;
this.multicast = multicast;
+ this.noLocal = noLocal;
this.consumerWindowSize = consumerWindowSize;
}
@@ -55,13 +58,16 @@ public class StompSubscription {
return multicast;
}
+ public boolean isNoLocal() {
+ return noLocal;
+ }
+
public int getConsumerWindowSize() {
return consumerWindowSize;
}
@Override
public String toString() {
- return "StompSubscription[id=" + subID + ", ack=" + ack + ", queueName="
+ queueName + ", multicast=" + multicast + ", consumerWindowSize=" +
consumerWindowSize + "]";
+ return "StompSubscription[id=" + subID + ", ack=" + ack + ", queueName="
+ queueName + ", multicast=" + multicast + ", noLocal=" + noLocal + ",
consumerWindowSize=" + consumerWindowSize + "]";
}
-
}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
index 1f84cd8d82..293b7b7185 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
@@ -55,6 +55,7 @@ import
org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
import org.apache.activemq.artemis.core.protocol.stomp.StompProtocolManager;
import
org.apache.activemq.artemis.core.protocol.stomp.StompProtocolManagerFactory;
+import org.apache.activemq.artemis.core.protocol.stomp.StompSession;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
@@ -65,7 +66,6 @@ import org.apache.activemq.artemis.json.JsonArray;
import org.apache.activemq.artemis.json.JsonObject;
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
import org.apache.activemq.artemis.reader.MessageUtil;
-import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
import
org.apache.activemq.artemis.tests.integration.mqtt.FuseMQTTClientProvider;
import org.apache.activemq.artemis.tests.integration.mqtt.MQTTClientProvider;
import
org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame;
@@ -754,9 +754,7 @@ public class StompTest extends StompTestBase {
Wait.assertEquals(0, () -> server.getSessions().size(), 1000, 100);
- Acceptor stompAcceptor =
server.getRemotingService().getAcceptors().get("stomp");
- StompProtocolManager stompProtocolManager = (StompProtocolManager)
stompAcceptor.getProtocolHandler().getProtocolMap().get("STOMP");
- assertNotNull(stompProtocolManager);
+ StompProtocolManager stompProtocolManager = getStompProtocolManager();
assertEquals(0, stompProtocolManager.getTransactedSessions().size());
}
@@ -1530,6 +1528,71 @@ public class StompTest extends StompTestBase {
}
}
+ @Test
+ public void testSubscribeToTopicWithNoLocalAndNormal() throws Exception {
+ conn.connect(defUser, defPass);
+ String noLocalSubscriptionId = RandomUtil.randomString();
+ String normalSubscriptionId = RandomUtil.randomString();
+ subscribeTopic(conn, noLocalSubscriptionId, null, null, true, true);
+ subscribeTopic(conn, normalSubscriptionId, null, null, true, false);
+
+ StompProtocolManager stompProtocolManager = getStompProtocolManager();
+ int totalSubCount = 0;
+ int noLocalSubCount = 0;
+ for (StompSession session : stompProtocolManager.getSessions()) {
+ totalSubCount += session.getSubscriptionCount();
+ noLocalSubCount += session.getNoLocalSubscriptionCount();
+ }
+ assertEquals(1, noLocalSubCount);
+ assertEquals(2, totalSubCount);
+
+ { // Send a message on the same connection. It should be received by the
normal subscription and not by the noLocal one.
+ send(conn, getTopicPrefix() + getTopicName(), null, "Hello World");
+
+ ClientStompFrame frame = conn.receiveFrame(100);
+ assertNotNull(frame);
+ assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
+ assertEquals(normalSubscriptionId,
frame.getHeader(Stomp.Headers.Message.SUBSCRIPTION));
+ assertNotNull(frame.getHeader("__AMQ_CID"));
+ frame = conn.receiveFrame(100);
+ assertNull(frame);
+ }
+
+ unsubscribe(conn, noLocalSubscriptionId, true);
+
+ totalSubCount = 0;
+ noLocalSubCount = 0;
+ for (StompSession session : stompProtocolManager.getSessions()) {
+ totalSubCount += session.getSubscriptionCount();
+ noLocalSubCount += session.getNoLocalSubscriptionCount();
+ }
+ assertEquals(0, noLocalSubCount);
+ assertEquals(1, totalSubCount);
+
+ { // Send another message on the same connection. It should be received
by the normal subscription.
+ send(conn, getTopicPrefix() + getTopicName(), null, "Hello World");
+
+ ClientStompFrame frame = conn.receiveFrame(100);
+ assertNotNull(frame);
+ assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
+ assertEquals(normalSubscriptionId,
frame.getHeader(Stomp.Headers.Message.SUBSCRIPTION));
+ assertNull(frame.getHeader("__AMQ_CID"));
+ }
+
+ unsubscribe(conn, normalSubscriptionId, true);
+
+ totalSubCount = 0;
+ noLocalSubCount = 0;
+ for (StompSession session : stompProtocolManager.getSessions()) {
+ totalSubCount += session.getSubscriptionCount();
+ noLocalSubCount += session.getNoLocalSubscriptionCount();
+ }
+ assertEquals(0, noLocalSubCount);
+ assertEquals(0, totalSubCount);
+
+ conn.disconnect();
+ }
+
@Test
public void testSubscribeToTopicWithNoLocalAndSelector() throws Exception {
conn.connect(defUser, defPass);
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java
index 891d483c1a..031b377c73 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java
@@ -45,6 +45,7 @@ import
org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
+import org.apache.activemq.artemis.core.protocol.stomp.StompProtocolManager;
import
org.apache.activemq.artemis.core.protocol.stomp.StompProtocolManagerFactory;
import org.apache.activemq.artemis.core.remoting.impl.invm.InVMAcceptorFactory;
import
org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnectorFactory;
@@ -54,6 +55,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServers;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.jms.client.ActiveMQJMSConnectionFactory;
+import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
import
org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
import
org.apache.activemq.artemis.tests.integration.stomp.util.AbstractStompClientConnection;
import
org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame;
@@ -657,4 +659,11 @@ public abstract class StompTestBase extends
ActiveMQTestBase {
public static URI createStompClientUri(String scheme, String hostname, int
port) throws URISyntaxException {
return new URI(scheme + "://" + hostname + ":" + port);
}
+
+ protected StompProtocolManager getStompProtocolManager() {
+ Acceptor stompAcceptor =
server.getRemotingService().getAcceptors().get("stomp");
+ StompProtocolManager stompProtocolManager = (StompProtocolManager)
stompAcceptor.getProtocolHandler().getProtocolMap().get("STOMP");
+ assertNotNull(stompProtocolManager);
+ return stompProtocolManager;
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact