This is an automated email from the ASF dual-hosted git repository.
brusdev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 0c7b5d6f22 ARTEMIS-5597 initial distribution broken when sending to
FQQN
0c7b5d6f22 is described below
commit 0c7b5d6f225fe528779ee643ae96bec383aa9e06
Author: Justin Bertram <[email protected]>
AuthorDate: Tue Jul 29 11:30:28 2025 -0500
ARTEMIS-5597 initial distribution broken when sending to FQQN
Ensure the broker considers remote queue bindings as well when routing
messages to an FQQN. Otherwise consumers might starve.
---
.../org/apache/activemq/artemis/utils/UUID.java | 25 +++++++-
.../artemis/core/postoffice/impl/BindingsImpl.java | 12 ++--
.../distribution/SimpleSymmetricClusterTest.java | 71 ++++++++++++++++++++--
.../activemq/artemis/tests/unit/util/UUIDTest.java | 10 +++
4 files changed, 103 insertions(+), 15 deletions(-)
diff --git
a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/UUID.java
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/UUID.java
index 9ad36982ff..cbaffba5e5 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/UUID.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/UUID.java
@@ -16,6 +16,8 @@
*/
package org.apache.activemq.artemis.utils;
+import org.apache.activemq.artemis.api.core.SimpleString;
+
/**
* UUID represents Universally Unique Identifiers (aka Global UID in Windows
world). UUIDs are usually generated via
* UUIDGenerator (or in case of 'Null UUID', 16 zero bytes, via static method
getNullUUID()), or received from external
@@ -63,6 +65,8 @@ public final class UUID {
public static final char HYPHEN = '-';
+ public static final int UUID_STRING_LENGTH = 36;
+
// 'Standard' namespaces defined (suggested) by UUID specs:
public static final String NAMESPACE_DNS =
"6ba7b810-9dad-11d1-80b4-00c04fd430c8";
@@ -174,7 +178,7 @@ public final class UUID {
*/
if (mDesc == null) {
- StringBuilder b = new StringBuilder(36);
+ StringBuilder b = new StringBuilder(UUID_STRING_LENGTH);
for (int i = 0; i < 16; ++i) {
// Need to bypass hyphens:
@@ -262,7 +266,7 @@ public final class UUID {
* @return {@code true} if the string is a valid UUID format; {@code false}
otherwise
*/
public static boolean isUUID(CharSequence str) {
- if (str == null || str.length() != 36) {
+ if (str == null || str.length() != UUID_STRING_LENGTH) {
return false;
}
@@ -285,4 +289,21 @@ public final class UUID {
private static boolean isHex(char c) {
return (c >= '0' && c <= '9') || (c >= 'a' && c <= 'f');
}
+
+ /**
+ * Removes a trailing UUID from the given input, if present. The method
checks whether the last characters of the
+ * input adhere to a valid UUID format and removes them if true.
+ *
+ * @param input the input {@code SimpleString} which may contain a trailing
UUID to be stripped
+ * @return a {@code SimpleString} with the trailing UUID removed if
present; otherwise, returns the original input
+ * unchanged
+ */
+ public static SimpleString stripTrailingUUID(SimpleString input) {
+ int length = input.length();
+ if (length >= UUID_STRING_LENGTH && UUID.isUUID(input.subSeq(length -
UUID_STRING_LENGTH, length))) {
+ return input.subSeq(0, length - UUID_STRING_LENGTH);
+ } else {
+ return input;
+ }
+ }
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
index 2710d1df02..cf76e8561e 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
@@ -49,6 +49,7 @@ import
org.apache.activemq.artemis.core.server.group.impl.Proposal;
import org.apache.activemq.artemis.core.server.group.impl.Response;
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
import org.apache.activemq.artemis.utils.CompositeAddress;
+import org.apache.activemq.artemis.utils.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;
@@ -343,12 +344,6 @@ public final class BindingsImpl implements Bindings {
} else if (groupRouting && groupingHandler != null && (groupId =
message.getGroupID()) != null) {
context.clear().setReusable(false);
routeUsingStrictOrdering(message, context, groupingHandler,
groupId, 0);
- } else if (CompositeAddress.isFullyQualified(message.getAddress())) {
- context.clear().setReusable(false);
- final Binding theBinding =
bindingsNameMap.get(String.valueOf(CompositeAddress.extractQueueName(message.getAddressSimpleString())));
- if (theBinding != null && (theBinding.getFilter() == null ||
theBinding.getFilter().match(message))) {
- theBinding.route(message, context);
- }
} else {
// in a optimization, we are reusing the previous context if
everything is right for it
// so the simpleRouting will only happen if needed
@@ -494,8 +489,11 @@ public final class BindingsImpl implements Bindings {
return false;
}
- final Filter filter = binding.getFilter();
+ if (CompositeAddress.isFullyQualified(message.getAddress()) && binding
instanceof QueueBinding &&
!UUID.stripTrailingUUID(binding.getClusterName()).equals(CompositeAddress.extractQueueName(message.getAddressSimpleString())))
{
+ return false;
+ }
+ final Filter filter = binding.getFilter();
if (filter == null || filter.match(message)) {
return true;
}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java
index ce359ef62c..5878c135d7 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java
@@ -16,11 +16,6 @@
*/
package org.apache.activemq.artemis.tests.integration.cluster.distribution;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Arrays;
@@ -37,8 +32,8 @@ import
org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
-import org.apache.activemq.artemis.core.config.DivertConfiguration;
import org.apache.activemq.artemis.api.core.management.QueueControl;
+import org.apache.activemq.artemis.core.config.DivertConfiguration;
import org.apache.activemq.artemis.core.postoffice.Binding;
import
org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType;
import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
@@ -49,11 +44,17 @@ import
org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancing
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import
org.apache.activemq.artemis.tests.integration.management.ManagementControlHelper;
import org.apache.activemq.artemis.tests.util.Wait;
+import org.apache.activemq.artemis.utils.CompositeAddress;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
public class SimpleSymmetricClusterTest extends ClusterTestBase {
private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -370,6 +371,64 @@ public class SimpleSymmetricClusterTest extends
ClusterTestBase {
}
+ @Test
+ public void testInitialDistributionSendToFQQNConsumeFromFQQN() throws
Exception {
+ testInitialDistribution(true, true);
+ }
+
+ @Test
+ public void testInitialDistributionSendToFQQNConsumeFromQueue() throws
Exception {
+ testInitialDistribution(true, false);
+ }
+
+ @Test
+ public void testInitialDistributionSendToAddressConsumeFromFQQN() throws
Exception {
+ testInitialDistribution(false, true);
+ }
+
+ @Test
+ public void testInitialDistributionSendToAddressConsumeFromQueue() throws
Exception {
+ testInitialDistribution(false, false);
+ }
+
+ private void testInitialDistribution(boolean sendToFQQN, boolean
consumeFromFQQN) throws Exception {
+ CountDownLatch countDownLatch = new CountDownLatch(1);
+ final String address = "myAddress";
+ final String queue = "myQueue";
+ final String fqqn = CompositeAddress.toFullyQualified(address, queue);
+
+ setupServer(0, false, isNetty());
+ setupServer(1, false, isNetty());
+
+ setupClusterConnection("cluster0", address,
MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1);
+ setupClusterConnection("cluster1", address,
MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0);
+
+ startServers(0, 1);
+
+ setupSessionFactory(0, isNetty());
+ setupSessionFactory(1, isNetty());
+
+ createQueue(0, address, queue, null, false);
+ createQueue(1, address, queue, null, false);
+
+ waitForBindings(0, address, 1, 0, true);
+ waitForBindings(0, address, 1, 0, false);
+ waitForBindings(1, address, 1, 0, true);
+ waitForBindings(1, address, 1, 0, false);
+
+ addConsumer(0, 0, consumeFromFQQN ? fqqn : queue, null);
+ consumers[0].consumer.setMessageHandler((m) ->
countDownLatch.countDown());
+
+ waitForBindings(0, address, 1, 1, true);
+ waitForBindings(1, address, 1, 1, false);
+
+ send(1, sendToFQQN ? fqqn : address, 1, true, null);
+
+ assertTrue(countDownLatch.await(5, TimeUnit.SECONDS));
+
+ closeAllConsumers();
+ }
+
@Test
public void testSimpleSnFManagement() throws Exception {
final String address = "queues.testaddress";
diff --git
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/UUIDTest.java
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/UUIDTest.java
index 4c608422b4..06bad5191e 100644
---
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/UUIDTest.java
+++
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/UUIDTest.java
@@ -25,6 +25,7 @@ import java.util.Set;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.utils.RandomUtil;
import org.apache.activemq.artemis.utils.UUID;
import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.junit.jupiter.api.Test;
@@ -101,4 +102,13 @@ public class UUIDTest extends ActiveMQTestBase {
// null
assertFalse(UUID.isUUID(null));
}
+
+ @Test
+ public void testStringTrailingUUID() {
+ SimpleString uuid = RandomUtil.randomUUIDSimpleString();
+ assertEquals(uuid,
UUID.stripTrailingUUID(uuid.concat(RandomUtil.randomUUIDSimpleString())));
+
+ SimpleString foo = SimpleString.of("foo");
+ assertEquals(foo, UUID.stripTrailingUUID(foo));
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact