This is an automated email from the ASF dual-hosted git repository.
brusdev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 2b7737301f ARTEMIS-5182 STOMP sub w/noLocal + selector missing messages
2b7737301f is described below
commit 2b7737301f655b33fc31eeff5a7fe4de4c8ee341
Author: Justin Bertram <[email protected]>
AuthorDate: Tue Dec 17 16:19:43 2024 -0600
ARTEMIS-5182 STOMP sub w/noLocal + selector missing messages
---
.../core/protocol/stomp/StompConnection.java | 22 +++++++-
.../artemis/tests/integration/stomp/StompTest.java | 58 ++++++++++++++++++++++
.../tests/integration/stomp/StompTestBase.java | 40 ++++++++++-----
3 files changed, 105 insertions(+), 15 deletions(-)
diff --git
a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
index bf0d09f955..9b397a7338 100644
---
a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
+++
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
@@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.core.protocol.stomp;
+import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
@@ -47,6 +48,8 @@ import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.selector.filter.FilterException;
+import org.apache.activemq.artemis.selector.impl.SelectorParser;
import
org.apache.activemq.artemis.spi.core.protocol.AbstractRemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
@@ -57,7 +60,6 @@ import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.apache.activemq.artemis.utils.VersionLoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.lang.invoke.MethodHandles;
import static
org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompProtocolMessageBundle.BUNDLE;
import static
org.apache.activemq.artemis.reader.MessageUtil.CONNECTION_ID_PROPERTY_NAME_STRING;
@@ -558,6 +560,7 @@ public final class StompConnection extends
AbstractRemotingConnection {
boolean noLocal,
RoutingType subscriptionType,
Integer consumerWindowSize) throws
ActiveMQStompException {
+ validateSelector(selector);
autoCreateDestinationIfPossible(destination, subscriptionType);
checkDestination(destination);
checkRoutingSemantics(destination, subscriptionType);
@@ -566,7 +569,7 @@ public final class StompConnection extends
AbstractRemotingConnection {
if (selector == null) {
selector = noLocalFilter;
} else {
- selector += " AND " + noLocalFilter;
+ selector = "(" + selector + ") AND " + noLocalFilter;
}
}
@@ -593,6 +596,21 @@ public final class StompConnection extends
AbstractRemotingConnection {
}
}
+ private void validateSelector(String selector) throws
ActiveMQStompException {
+ // user may not specify a selector; that's ok
+ if (selector == null) {
+ return;
+ }
+ if (selector.isEmpty()) {
+ throw new ActiveMQStompException("Selector cannot be
empty").setHandler(frameHandler);
+ }
+ try {
+ SelectorParser.parse(selector);
+ } catch (FilterException e) {
+ throw new ActiveMQStompException("Invalid selector \"" + selector +
"\"", e).setHandler(frameHandler);
+ }
+ }
+
public void unsubscribe(String subscriptionID, String
durableSubscriptionName) throws ActiveMQStompException {
try {
manager.unsubscribe(this, subscriptionID, durableSubscriptionName);
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
index c4c25dcb83..991b019a4d 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
@@ -1501,6 +1501,28 @@ public class StompTest extends StompTestBase {
conn.disconnect();
}
+ @Test
+ public void testSubscribeToTopicWithNoLocalAndSelector() throws Exception {
+ conn.connect(defUser, defPass);
+ subscribeTopic(conn, null, null, null, true, true, "a=foo OR b=bar");
+
+ // send a message on the same connection => it should not be received as
noLocal = true on subscribe
+ ClientStompFrame frame =
conn.createFrame(Stomp.Commands.SEND).addHeader(Stomp.Headers.Send.DESTINATION,
getTopicPrefix() + getTopicName()).addHeader("b", "bar").setBody("Hello World");
+ conn.sendFrame(frame);
+
+ frame = conn.receiveFrame(100);
+ assertNull(frame, "No message should have been received since
noLocal=true");
+
+ // send message on another JMS connection => it should be received
+ sendJmsMessage(getName().getBytes(StandardCharsets.UTF_8), "b", "bar",
topic);
+ frame = conn.receiveFrame(10000);
+ assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
+ assertEquals(getTopicPrefix() + getTopicName(),
frame.getHeader(Stomp.Headers.Send.DESTINATION));
+ assertEquals(getName(), frame.getBody());
+
+ conn.disconnect();
+ }
+
@Test
public void testTopicExistsAfterNoUnsubscribeDisconnect() throws Exception {
conn.connect(defUser, defPass);
@@ -2170,4 +2192,40 @@ public class StompTest extends StompTestBase {
conn_r2.disconnect();
}
}
+
+ @Test
+ public void testSubscribeToTopicWithEmptySelector() throws Exception {
+ conn.connect(defUser, defPass);
+ ClientStompFrame frame = subscribeTopic(conn, null, null, null, true,
true, " ");
+ assertNotNull(frame);
+ assertEquals(Stomp.Responses.ERROR, frame.getCommand());
+ assertEquals("Selector cannot be empty",
frame.getHeader(Stomp.Headers.Error.MESSAGE));
+ }
+
+ @Test
+ public void testSubscribeToQueueWithEmptySelector() throws Exception {
+ conn.connect(defUser, defPass);
+ ClientStompFrame frame = subscribe(conn, null, null, null, " ");
+ assertNotNull(frame);
+ assertEquals(Stomp.Responses.ERROR, frame.getCommand());
+ assertEquals("Selector cannot be empty",
frame.getHeader(Stomp.Headers.Error.MESSAGE));
+ }
+
+ @Test
+ public void testSubscribeToTopicWithInvalidSelector() throws Exception {
+ conn.connect(defUser, defPass);
+ ClientStompFrame frame = subscribeTopic(conn, null, null, null, true,
true, ") foo = 'bar' (");
+ assertNotNull(frame);
+ assertEquals(Stomp.Responses.ERROR, frame.getCommand());
+
assertTrue(frame.getHeader(Stomp.Headers.Error.MESSAGE).contains("Invalid
selector"));
+ }
+
+ @Test
+ public void testSubscribeToQueueWithInvalidSelector() throws Exception {
+ conn.connect(defUser, defPass);
+ ClientStompFrame frame = subscribe(conn, null, null, null, ") foo =
'bar' (");
+ assertNotNull(frame);
+ assertEquals(Stomp.Responses.ERROR, frame.getCommand());
+
assertTrue(frame.getHeader(Stomp.Headers.Error.MESSAGE).contains("Invalid
selector"));
+ }
}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java
index 0418ad121b..891d483c1a 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java
@@ -473,12 +473,22 @@ public abstract class StompTestBase extends
ActiveMQTestBase {
}
public static ClientStompFrame subscribeTopic(StompClientConnection conn,
- String subscriptionId,
- String ack,
- String durableId,
- boolean receipt,
- boolean noLocal) throws IOException, InterruptedException {
- return subscribeTopic(conn, subscriptionId, ack, durableId,
Stomp.Headers.Subscribe.DURABLE_SUBSCRIPTION_NAME, receipt, noLocal);
+ String subscriptionId,
+ String ack,
+ String durableId,
+ boolean receipt,
+ boolean noLocal) throws
IOException, InterruptedException {
+ return subscribeTopic(conn, subscriptionId, ack, durableId,
Stomp.Headers.Subscribe.DURABLE_SUBSCRIPTION_NAME, receipt, noLocal, null);
+ }
+
+ public static ClientStompFrame subscribeTopic(StompClientConnection conn,
+ String subscriptionId,
+ String ack,
+ String durableId,
+ boolean receipt,
+ boolean noLocal,
+ String selector) throws
IOException, InterruptedException {
+ return subscribeTopic(conn, subscriptionId, ack, durableId,
Stomp.Headers.Subscribe.DURABLE_SUBSCRIPTION_NAME, receipt, noLocal, selector);
}
public static ClientStompFrame
subscribeTopicLegacyActiveMQ(StompClientConnection conn,
@@ -487,16 +497,17 @@ public abstract class StompTestBase extends
ActiveMQTestBase {
String durableId,
boolean receipt,
boolean noLocal) throws IOException, InterruptedException {
- return subscribeTopic(conn, subscriptionId, ack, durableId,
Stomp.Headers.Subscribe.ACTIVEMQ_DURABLE_SUBSCRIPTION_NAME, receipt, noLocal);
+ return subscribeTopic(conn, subscriptionId, ack, durableId,
Stomp.Headers.Subscribe.ACTIVEMQ_DURABLE_SUBSCRIPTION_NAME, receipt, noLocal,
null);
}
public static ClientStompFrame subscribeTopic(StompClientConnection conn,
- String subscriptionId,
- String ack,
- String durableId,
- String durableIdHeader,
- boolean receipt,
- boolean noLocal) throws IOException,
InterruptedException {
+ String subscriptionId,
+ String ack,
+ String durableId,
+ String durableIdHeader,
+ boolean receipt,
+ boolean noLocal,
+ String selector) throws
IOException, InterruptedException {
ClientStompFrame frame = conn.createFrame(Stomp.Commands.SUBSCRIBE)
.addHeader(Stomp.Headers.Subscribe.SUBSCRIPTION_TYPE,
RoutingType.MULTICAST.toString())
.addHeader(Stomp.Headers.Subscribe.DESTINATION, getTopicPrefix() +
getTopicName());
@@ -516,6 +527,9 @@ public abstract class StompTestBase extends
ActiveMQTestBase {
if (noLocal) {
frame.addHeader(Stomp.Headers.Subscribe.NO_LOCAL, "true");
}
+ if (selector != null) {
+ frame.addHeader(Stomp.Headers.Subscribe.SELECTOR, selector);
+ }
frame = conn.sendFrame(frame);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact