This is an automated email from the ASF dual-hosted git repository.

brusdev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 2b7737301f ARTEMIS-5182 STOMP sub w/noLocal + selector missing messages
2b7737301f is described below

commit 2b7737301f655b33fc31eeff5a7fe4de4c8ee341
Author: Justin Bertram <[email protected]>
AuthorDate: Tue Dec 17 16:19:43 2024 -0600

    ARTEMIS-5182 STOMP sub w/noLocal + selector missing messages
---
 .../core/protocol/stomp/StompConnection.java       | 22 +++++++-
 .../artemis/tests/integration/stomp/StompTest.java | 58 ++++++++++++++++++++++
 .../tests/integration/stomp/StompTestBase.java     | 40 ++++++++++-----
 3 files changed, 105 insertions(+), 15 deletions(-)

diff --git 
a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
 
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
index bf0d09f955..9b397a7338 100644
--- 
a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
+++ 
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.artemis.core.protocol.stomp;
 
+import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.EnumSet;
@@ -47,6 +48,8 @@ import org.apache.activemq.artemis.core.server.ServerConsumer;
 import org.apache.activemq.artemis.core.server.ServerSession;
 import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.selector.filter.FilterException;
+import org.apache.activemq.artemis.selector.impl.SelectorParser;
 import 
org.apache.activemq.artemis.spi.core.protocol.AbstractRemotingConnection;
 import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
 import org.apache.activemq.artemis.spi.core.remoting.Connection;
@@ -57,7 +60,6 @@ import org.apache.activemq.artemis.utils.ExecutorFactory;
 import org.apache.activemq.artemis.utils.VersionLoader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import java.lang.invoke.MethodHandles;
 
 import static 
org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompProtocolMessageBundle.BUNDLE;
 import static 
org.apache.activemq.artemis.reader.MessageUtil.CONNECTION_ID_PROPERTY_NAME_STRING;
@@ -558,6 +560,7 @@ public final class StompConnection extends 
AbstractRemotingConnection {
                                       boolean noLocal,
                                       RoutingType subscriptionType,
                                       Integer consumerWindowSize) throws 
ActiveMQStompException {
+      validateSelector(selector);
       autoCreateDestinationIfPossible(destination, subscriptionType);
       checkDestination(destination);
       checkRoutingSemantics(destination, subscriptionType);
@@ -566,7 +569,7 @@ public final class StompConnection extends 
AbstractRemotingConnection {
          if (selector == null) {
             selector = noLocalFilter;
          } else {
-            selector += " AND " + noLocalFilter;
+            selector = "(" + selector + ") AND " + noLocalFilter;
          }
       }
 
@@ -593,6 +596,21 @@ public final class StompConnection extends 
AbstractRemotingConnection {
       }
    }
 
+   private void validateSelector(String selector) throws 
ActiveMQStompException {
+      // user may not specify a selector; that's ok
+      if (selector == null) {
+         return;
+      }
+      if (selector.isEmpty()) {
+         throw new ActiveMQStompException("Selector cannot be 
empty").setHandler(frameHandler);
+      }
+      try {
+         SelectorParser.parse(selector);
+      } catch (FilterException e) {
+         throw new ActiveMQStompException("Invalid selector \"" + selector + 
"\"", e).setHandler(frameHandler);
+      }
+   }
+
    public void unsubscribe(String subscriptionID, String 
durableSubscriptionName) throws ActiveMQStompException {
       try {
          manager.unsubscribe(this, subscriptionID, durableSubscriptionName);
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
index c4c25dcb83..991b019a4d 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
@@ -1501,6 +1501,28 @@ public class StompTest extends StompTestBase {
       conn.disconnect();
    }
 
+   @Test
+   public void testSubscribeToTopicWithNoLocalAndSelector() throws Exception {
+      conn.connect(defUser, defPass);
+      subscribeTopic(conn, null, null, null, true, true, "a=foo OR b=bar");
+
+      // send a message on the same connection => it should not be received as 
noLocal = true on subscribe
+      ClientStompFrame frame = 
conn.createFrame(Stomp.Commands.SEND).addHeader(Stomp.Headers.Send.DESTINATION, 
getTopicPrefix() + getTopicName()).addHeader("b", "bar").setBody("Hello World");
+      conn.sendFrame(frame);
+
+      frame = conn.receiveFrame(100);
+      assertNull(frame, "No message should have been received since 
noLocal=true");
+
+      // send message on another JMS connection => it should be received
+      sendJmsMessage(getName().getBytes(StandardCharsets.UTF_8), "b", "bar", 
topic);
+      frame = conn.receiveFrame(10000);
+      assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
+      assertEquals(getTopicPrefix() + getTopicName(), 
frame.getHeader(Stomp.Headers.Send.DESTINATION));
+      assertEquals(getName(), frame.getBody());
+
+      conn.disconnect();
+   }
+
    @Test
    public void testTopicExistsAfterNoUnsubscribeDisconnect() throws Exception {
       conn.connect(defUser, defPass);
@@ -2170,4 +2192,40 @@ public class StompTest extends StompTestBase {
          conn_r2.disconnect();
       }
    }
+
+   @Test
+   public void testSubscribeToTopicWithEmptySelector() throws Exception {
+      conn.connect(defUser, defPass);
+      ClientStompFrame frame = subscribeTopic(conn, null, null, null, true, 
true, "   ");
+      assertNotNull(frame);
+      assertEquals(Stomp.Responses.ERROR, frame.getCommand());
+      assertEquals("Selector cannot be empty", 
frame.getHeader(Stomp.Headers.Error.MESSAGE));
+   }
+
+   @Test
+   public void testSubscribeToQueueWithEmptySelector() throws Exception {
+      conn.connect(defUser, defPass);
+      ClientStompFrame frame = subscribe(conn, null, null, null, "   ");
+      assertNotNull(frame);
+      assertEquals(Stomp.Responses.ERROR, frame.getCommand());
+      assertEquals("Selector cannot be empty", 
frame.getHeader(Stomp.Headers.Error.MESSAGE));
+   }
+
+   @Test
+   public void testSubscribeToTopicWithInvalidSelector() throws Exception {
+      conn.connect(defUser, defPass);
+      ClientStompFrame frame = subscribeTopic(conn, null, null, null, true, 
true, ") foo = 'bar' (");
+      assertNotNull(frame);
+      assertEquals(Stomp.Responses.ERROR, frame.getCommand());
+      
assertTrue(frame.getHeader(Stomp.Headers.Error.MESSAGE).contains("Invalid 
selector"));
+   }
+
+   @Test
+   public void testSubscribeToQueueWithInvalidSelector() throws Exception {
+      conn.connect(defUser, defPass);
+      ClientStompFrame frame = subscribe(conn, null, null, null, ") foo = 
'bar' (");
+      assertNotNull(frame);
+      assertEquals(Stomp.Responses.ERROR, frame.getCommand());
+      
assertTrue(frame.getHeader(Stomp.Headers.Error.MESSAGE).contains("Invalid 
selector"));
+   }
 }
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java
index 0418ad121b..891d483c1a 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java
@@ -473,12 +473,22 @@ public abstract class StompTestBase extends 
ActiveMQTestBase {
    }
 
    public static ClientStompFrame subscribeTopic(StompClientConnection conn,
-         String subscriptionId,
-         String ack,
-         String durableId,
-         boolean receipt,
-         boolean noLocal) throws IOException, InterruptedException {
-      return subscribeTopic(conn, subscriptionId, ack, durableId, 
Stomp.Headers.Subscribe.DURABLE_SUBSCRIPTION_NAME, receipt, noLocal);
+                                                 String subscriptionId,
+                                                 String ack,
+                                                 String durableId,
+                                                 boolean receipt,
+                                                 boolean noLocal) throws 
IOException, InterruptedException {
+      return subscribeTopic(conn, subscriptionId, ack, durableId, 
Stomp.Headers.Subscribe.DURABLE_SUBSCRIPTION_NAME, receipt, noLocal, null);
+   }
+
+   public static ClientStompFrame subscribeTopic(StompClientConnection conn,
+                                                 String subscriptionId,
+                                                 String ack,
+                                                 String durableId,
+                                                 boolean receipt,
+                                                 boolean noLocal,
+                                                 String selector) throws 
IOException, InterruptedException {
+      return subscribeTopic(conn, subscriptionId, ack, durableId, 
Stomp.Headers.Subscribe.DURABLE_SUBSCRIPTION_NAME, receipt, noLocal, selector);
    }
 
    public static ClientStompFrame 
subscribeTopicLegacyActiveMQ(StompClientConnection conn,
@@ -487,16 +497,17 @@ public abstract class StompTestBase extends 
ActiveMQTestBase {
          String durableId,
          boolean receipt,
          boolean noLocal) throws IOException, InterruptedException {
-      return subscribeTopic(conn, subscriptionId, ack, durableId, 
Stomp.Headers.Subscribe.ACTIVEMQ_DURABLE_SUBSCRIPTION_NAME, receipt, noLocal);
+      return subscribeTopic(conn, subscriptionId, ack, durableId, 
Stomp.Headers.Subscribe.ACTIVEMQ_DURABLE_SUBSCRIPTION_NAME, receipt, noLocal, 
null);
    }
 
    public static ClientStompFrame subscribeTopic(StompClientConnection conn,
-                                          String subscriptionId,
-                                          String ack,
-                                          String durableId,
-                                          String durableIdHeader,
-                                          boolean receipt,
-                                          boolean noLocal) throws IOException, 
InterruptedException {
+                                                 String subscriptionId,
+                                                 String ack,
+                                                 String durableId,
+                                                 String durableIdHeader,
+                                                 boolean receipt,
+                                                 boolean noLocal,
+                                                 String selector) throws 
IOException, InterruptedException {
       ClientStompFrame frame = conn.createFrame(Stomp.Commands.SUBSCRIBE)
                                    
.addHeader(Stomp.Headers.Subscribe.SUBSCRIPTION_TYPE, 
RoutingType.MULTICAST.toString())
                                    
.addHeader(Stomp.Headers.Subscribe.DESTINATION, getTopicPrefix() + 
getTopicName());
@@ -516,6 +527,9 @@ public abstract class StompTestBase extends 
ActiveMQTestBase {
       if (noLocal) {
          frame.addHeader(Stomp.Headers.Subscribe.NO_LOCAL, "true");
       }
+      if (selector != null) {
+         frame.addHeader(Stomp.Headers.Subscribe.SELECTOR, selector);
+      }
 
       frame = conn.sendFrame(frame);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to