This is an automated email from the ASF dual-hosted git repository.

robbie pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 09b8f67603 ARTEMIS-5184 STOMP noLocal is scoped to session not 
subscription
09b8f67603 is described below

commit 09b8f67603a4de39fedfebb7b3ce19f6d091a4d5
Author: Justin Bertram <[email protected]>
AuthorDate: Tue Dec 17 23:28:24 2024 -0600

    ARTEMIS-5184 STOMP noLocal is scoped to session not subscription
    
    This closes #5414
---
 .../core/protocol/stomp/StompConnection.java       |  3 +-
 .../core/protocol/stomp/StompProtocolManager.java  |  8 ++-
 .../artemis/core/protocol/stomp/StompSession.java  | 23 +++++--
 .../core/protocol/stomp/StompSubscription.java     | 12 +++-
 .../artemis/tests/integration/stomp/StompTest.java | 71 ++++++++++++++++++++--
 .../tests/integration/stomp/StompTestBase.java     |  9 +++
 6 files changed, 110 insertions(+), 16 deletions(-)

diff --git 
a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
 
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
index a9a474f8c7..95db593b64 100644
--- 
a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
+++ 
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
@@ -498,7 +498,8 @@ public final class StompConnection extends 
AbstractRemotingConnection {
       try {
          StompSession stompSession = getSession(txID);
 
-         if (stompSession.isNoLocal()) {
+         // only set the connection ID property if we have a noLocal 
subscription
+         if (stompSession.getNoLocalSubscriptionCount() > 0) {
             message.putStringProperty(CONNECTION_ID_PROPERTY_NAME_STRING, 
getID().toString());
          }
          if (isEnableMessageID()) {
diff --git 
a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
 
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
index 26917ac25f..bea06cd4bf 100644
--- 
a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
+++ 
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.protocol.stomp;
 
 import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -362,13 +363,12 @@ public class StompProtocolManager extends 
AbstractProtocolManager<StompFrame, St
                                              boolean noLocal,
                                              Integer consumerWindowSize) 
throws Exception {
       StompSession stompSession = getSession(connection);
-      stompSession.setNoLocal(noLocal);
       if (stompSession.containsSubscription(subscriptionID)) {
          throw new ActiveMQStompException(connection, "There already is a 
subscription for: " + subscriptionID +
             ". Either use unique subscription IDs or do not create multiple 
subscriptions for the same destination");
       }
       long consumerID = server.getStorageManager().generateID();
-      return stompSession.addSubscription(consumerID, subscriptionID, 
connection.getClientID(), durableSubscriptionName, destination, selector, ack, 
consumerWindowSize);
+      return stompSession.addSubscription(consumerID, subscriptionID, 
connection.getClientID(), durableSubscriptionName, destination, selector, ack, 
noLocal, consumerWindowSize);
    }
 
    public void unsubscribe(StompConnection connection,
@@ -407,4 +407,8 @@ public class StompProtocolManager extends 
AbstractProtocolManager<StompFrame, St
    public ActiveMQServer getServer() {
       return server;
    }
+
+   public Collection<StompSession> getSessions() {
+      return sessions.values();
+   }
 }
diff --git 
a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
 
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
index b0f75e08e7..dd85ac6bd2 100644
--- 
a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
+++ 
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
@@ -25,6 +25,7 @@ import java.util.Set;
 import java.util.concurrent.BlockingDeque;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import io.netty.channel.EventLoop;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
@@ -82,7 +83,7 @@ public class StompSession implements SessionCallback {
    // key = consumer ID and message ID, value = frame length
    private final Map<Pair<Long, Long>, Integer> messagesToAck = new 
ConcurrentHashMap<>();
 
-   private volatile boolean noLocal = false;
+   private AtomicInteger noLocalSubscriptionCount = new AtomicInteger(0);
 
    private boolean txPending = false;
 
@@ -231,6 +232,9 @@ public class StompSession implements SessionCallback {
    public void disconnect(ServerConsumer consumerId, String errorDescription) {
       StompSubscription stompSubscription = 
subscriptions.remove(consumerId.getID());
       if (stompSubscription != null) {
+         if (stompSubscription.isNoLocal()) {
+            noLocalSubscriptionCount.decrementAndGet();
+         }
          StompFrame frame = 
connection.getFrameHandler().createStompFrame(Stomp.Responses.ERROR);
          frame.addHeader(Stomp.Headers.CONTENT_TYPE, "text/plain");
          frame.setBody("consumer with ID " + consumerId + " disconnected by 
server");
@@ -306,6 +310,7 @@ public class StompSession implements SessionCallback {
                                                    String destination,
                                                    String selector,
                                                    String ack,
+                                                   boolean noLocal,
                                                    Integer consumerWindowSize) 
throws Exception {
       SimpleString address = SimpleString.of(destination);
       SimpleString queueName = SimpleString.of(destination);
@@ -342,8 +347,11 @@ public class StompSession implements SessionCallback {
             
session.createQueue(QueueConfiguration.of(queueName).setAddress(address).setFilterString(selectorSimple).setDurable(false).setTemporary(true));
          }
       }
+      if (noLocal) {
+         noLocalSubscriptionCount.incrementAndGet();
+      }
       final ServerConsumer consumer = session.createConsumer(consumerID, 
queueName, multicast ? null : selectorSimple, false, false, 0);
-      StompSubscription subscription = new StompSubscription(subscriptionID, 
ack, queueName, multicast, finalConsumerWindowSize);
+      StompSubscription subscription = new StompSubscription(subscriptionID, 
ack, queueName, multicast, noLocal, finalConsumerWindowSize);
       subscriptions.put(consumerID, subscription);
       session.start();
       /*
@@ -363,6 +371,9 @@ public class StompSession implements SessionCallback {
          StompSubscription sub = entry.getValue();
          if (id != null && id.equals(sub.getID())) {
             iterator.remove();
+            if (sub.isNoLocal()) {
+               noLocalSubscriptionCount.decrementAndGet();
+            }
             SimpleString queueName = sub.getQueueName();
             session.closeConsumer(consumerID);
             Queue queue = manager.getServer().locateQueue(queueName);
@@ -402,12 +413,12 @@ public class StompSession implements SessionCallback {
       return sessionContext;
    }
 
-   public boolean isNoLocal() {
-      return noLocal;
+   public int getNoLocalSubscriptionCount() {
+      return noLocalSubscriptionCount.get();
    }
 
-   public void setNoLocal(boolean noLocal) {
-      this.noLocal = noLocal;
+   public int getSubscriptionCount() {
+      return subscriptions.size();
    }
 
    public void sendInternal(Message message, boolean direct) throws Exception {
diff --git 
a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSubscription.java
 
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSubscription.java
index 394a15c119..02127c55bf 100644
--- 
a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSubscription.java
+++ 
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSubscription.java
@@ -29,13 +29,16 @@ public class StompSubscription {
    // whether or not this subscription follows multicast semantics (e.g. for a 
JMS topic)
    private final boolean multicast;
 
+   private final boolean noLocal;
+
    private final int consumerWindowSize;
 
-   public StompSubscription(String subID, String ack, SimpleString queueName, 
boolean multicast, int consumerWindowSize) {
+   public StompSubscription(String subID, String ack, SimpleString queueName, 
boolean multicast, boolean noLocal, int consumerWindowSize) {
       this.subID = subID;
       this.ack = ack;
       this.queueName = queueName;
       this.multicast = multicast;
+      this.noLocal = noLocal;
       this.consumerWindowSize = consumerWindowSize;
    }
 
@@ -55,13 +58,16 @@ public class StompSubscription {
       return multicast;
    }
 
+   public boolean isNoLocal() {
+      return noLocal;
+   }
+
    public int getConsumerWindowSize() {
       return consumerWindowSize;
    }
 
    @Override
    public String toString() {
-      return "StompSubscription[id=" + subID + ", ack=" + ack + ", queueName=" 
+ queueName + ", multicast=" + multicast + ", consumerWindowSize=" + 
consumerWindowSize + "]";
+      return "StompSubscription[id=" + subID + ", ack=" + ack + ", queueName=" 
+ queueName + ", multicast=" + multicast + ", noLocal=" + noLocal + ", 
consumerWindowSize=" + consumerWindowSize + "]";
    }
-
 }
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
index 1f84cd8d82..293b7b7185 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
@@ -55,6 +55,7 @@ import 
org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
 import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
 import org.apache.activemq.artemis.core.protocol.stomp.StompProtocolManager;
 import 
org.apache.activemq.artemis.core.protocol.stomp.StompProtocolManagerFactory;
+import org.apache.activemq.artemis.core.protocol.stomp.StompSession;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
@@ -65,7 +66,6 @@ import org.apache.activemq.artemis.json.JsonArray;
 import org.apache.activemq.artemis.json.JsonObject;
 import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
 import org.apache.activemq.artemis.reader.MessageUtil;
-import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
 import 
org.apache.activemq.artemis.tests.integration.mqtt.FuseMQTTClientProvider;
 import org.apache.activemq.artemis.tests.integration.mqtt.MQTTClientProvider;
 import 
org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame;
@@ -754,9 +754,7 @@ public class StompTest extends StompTestBase {
 
       Wait.assertEquals(0, () -> server.getSessions().size(), 1000, 100);
 
-      Acceptor stompAcceptor = 
server.getRemotingService().getAcceptors().get("stomp");
-      StompProtocolManager stompProtocolManager = (StompProtocolManager) 
stompAcceptor.getProtocolHandler().getProtocolMap().get("STOMP");
-      assertNotNull(stompProtocolManager);
+      StompProtocolManager stompProtocolManager = getStompProtocolManager();
 
       assertEquals(0, stompProtocolManager.getTransactedSessions().size());
    }
@@ -1530,6 +1528,71 @@ public class StompTest extends StompTestBase {
       }
    }
 
+   @Test
+   public void testSubscribeToTopicWithNoLocalAndNormal() throws Exception {
+      conn.connect(defUser, defPass);
+      String noLocalSubscriptionId = RandomUtil.randomString();
+      String normalSubscriptionId = RandomUtil.randomString();
+      subscribeTopic(conn, noLocalSubscriptionId, null, null, true, true);
+      subscribeTopic(conn, normalSubscriptionId, null, null, true, false);
+
+      StompProtocolManager stompProtocolManager = getStompProtocolManager();
+      int totalSubCount = 0;
+      int noLocalSubCount = 0;
+      for (StompSession session : stompProtocolManager.getSessions()) {
+         totalSubCount += session.getSubscriptionCount();
+         noLocalSubCount += session.getNoLocalSubscriptionCount();
+      }
+      assertEquals(1, noLocalSubCount);
+      assertEquals(2, totalSubCount);
+
+      { // Send a message on the same connection. It should be received by the 
normal subscription and not by the noLocal one.
+         send(conn, getTopicPrefix() + getTopicName(), null, "Hello World");
+
+         ClientStompFrame frame = conn.receiveFrame(100);
+         assertNotNull(frame);
+         assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
+         assertEquals(normalSubscriptionId, 
frame.getHeader(Stomp.Headers.Message.SUBSCRIPTION));
+         assertNotNull(frame.getHeader("__AMQ_CID"));
+         frame = conn.receiveFrame(100);
+         assertNull(frame);
+      }
+
+      unsubscribe(conn, noLocalSubscriptionId, true);
+
+      totalSubCount = 0;
+      noLocalSubCount = 0;
+      for (StompSession session : stompProtocolManager.getSessions()) {
+         totalSubCount += session.getSubscriptionCount();
+         noLocalSubCount += session.getNoLocalSubscriptionCount();
+      }
+      assertEquals(0, noLocalSubCount);
+      assertEquals(1, totalSubCount);
+
+      { // Send another message on the same connection. It should be received 
by the normal subscription.
+         send(conn, getTopicPrefix() + getTopicName(), null, "Hello World");
+
+         ClientStompFrame frame = conn.receiveFrame(100);
+         assertNotNull(frame);
+         assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
+         assertEquals(normalSubscriptionId, 
frame.getHeader(Stomp.Headers.Message.SUBSCRIPTION));
+         assertNull(frame.getHeader("__AMQ_CID"));
+      }
+
+      unsubscribe(conn, normalSubscriptionId, true);
+
+      totalSubCount = 0;
+      noLocalSubCount = 0;
+      for (StompSession session : stompProtocolManager.getSessions()) {
+         totalSubCount += session.getSubscriptionCount();
+         noLocalSubCount += session.getNoLocalSubscriptionCount();
+      }
+      assertEquals(0, noLocalSubCount);
+      assertEquals(0, totalSubCount);
+
+      conn.disconnect();
+   }
+
    @Test
    public void testSubscribeToTopicWithNoLocalAndSelector() throws Exception {
       conn.connect(defUser, defPass);
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java
index 891d483c1a..031b377c73 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java
@@ -45,6 +45,7 @@ import 
org.apache.activemq.artemis.api.core.TransportConfiguration;
 import org.apache.activemq.artemis.core.config.Configuration;
 import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
 import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
+import org.apache.activemq.artemis.core.protocol.stomp.StompProtocolManager;
 import 
org.apache.activemq.artemis.core.protocol.stomp.StompProtocolManagerFactory;
 import org.apache.activemq.artemis.core.remoting.impl.invm.InVMAcceptorFactory;
 import 
org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnectorFactory;
@@ -54,6 +55,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.ActiveMQServers;
 import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
 import org.apache.activemq.artemis.jms.client.ActiveMQJMSConnectionFactory;
+import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
 import 
org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
 import 
org.apache.activemq.artemis.tests.integration.stomp.util.AbstractStompClientConnection;
 import 
org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame;
@@ -657,4 +659,11 @@ public abstract class StompTestBase extends 
ActiveMQTestBase {
    public static URI createStompClientUri(String scheme, String hostname, int 
port) throws URISyntaxException {
       return new URI(scheme + "://" + hostname + ":" + port);
    }
+
+   protected StompProtocolManager getStompProtocolManager() {
+      Acceptor stompAcceptor = 
server.getRemotingService().getAcceptors().get("stomp");
+      StompProtocolManager stompProtocolManager = (StompProtocolManager) 
stompAcceptor.getProtocolHandler().getProtocolMap().get("STOMP");
+      assertNotNull(stompProtocolManager);
+      return stompProtocolManager;
+   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to