This is an automated email from the ASF dual-hosted git repository.

brusdev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 0c7b5d6f22 ARTEMIS-5597 initial distribution broken when sending to 
FQQN
0c7b5d6f22 is described below

commit 0c7b5d6f225fe528779ee643ae96bec383aa9e06
Author: Justin Bertram <[email protected]>
AuthorDate: Tue Jul 29 11:30:28 2025 -0500

    ARTEMIS-5597 initial distribution broken when sending to FQQN
    
    Ensure the broker considers remote queue bindings as well when routing
    messages to an FQQN. Otherwise consumers might starve.
---
 .../org/apache/activemq/artemis/utils/UUID.java    | 25 +++++++-
 .../artemis/core/postoffice/impl/BindingsImpl.java | 12 ++--
 .../distribution/SimpleSymmetricClusterTest.java   | 71 ++++++++++++++++++++--
 .../activemq/artemis/tests/unit/util/UUIDTest.java | 10 +++
 4 files changed, 103 insertions(+), 15 deletions(-)

diff --git 
a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/UUID.java 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/UUID.java
index 9ad36982ff..cbaffba5e5 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/UUID.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/UUID.java
@@ -16,6 +16,8 @@
  */
 package org.apache.activemq.artemis.utils;
 
+import org.apache.activemq.artemis.api.core.SimpleString;
+
 /**
  * UUID represents Universally Unique Identifiers (aka Global UID in Windows 
world). UUIDs are usually generated via
  * UUIDGenerator (or in case of 'Null UUID', 16 zero bytes, via static method 
getNullUUID()), or received from external
@@ -63,6 +65,8 @@ public final class UUID {
 
    public static final char HYPHEN = '-';
 
+   public static final int UUID_STRING_LENGTH = 36;
+
    // 'Standard' namespaces defined (suggested) by UUID specs:
    public static final String NAMESPACE_DNS = 
"6ba7b810-9dad-11d1-80b4-00c04fd430c8";
 
@@ -174,7 +178,7 @@ public final class UUID {
        */
 
       if (mDesc == null) {
-         StringBuilder b = new StringBuilder(36);
+         StringBuilder b = new StringBuilder(UUID_STRING_LENGTH);
 
          for (int i = 0; i < 16; ++i) {
             // Need to bypass hyphens:
@@ -262,7 +266,7 @@ public final class UUID {
     * @return {@code true} if the string is a valid UUID format; {@code false} 
otherwise
     */
    public static boolean isUUID(CharSequence str) {
-      if (str == null || str.length() != 36) {
+      if (str == null || str.length() != UUID_STRING_LENGTH) {
          return false;
       }
 
@@ -285,4 +289,21 @@ public final class UUID {
    private static boolean isHex(char c) {
       return (c >= '0' && c <= '9') || (c >= 'a' && c <= 'f');
    }
+
+   /**
+    * Removes a trailing UUID from the given input, if present. The method 
checks whether the last characters of the
+    * input adhere to a valid UUID format and removes them if true.
+    *
+    * @param input the input {@code SimpleString} which may contain a trailing 
UUID to be stripped
+    * @return a {@code SimpleString} with the trailing UUID removed if 
present; otherwise, returns the original input
+    *         unchanged
+    */
+   public static SimpleString stripTrailingUUID(SimpleString input) {
+      int length = input.length();
+      if (length >= UUID_STRING_LENGTH && UUID.isUUID(input.subSeq(length - 
UUID_STRING_LENGTH, length))) {
+         return input.subSeq(0, length - UUID_STRING_LENGTH);
+      } else {
+         return input;
+      }
+   }
 }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
index 2710d1df02..cf76e8561e 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
@@ -49,6 +49,7 @@ import 
org.apache.activemq.artemis.core.server.group.impl.Proposal;
 import org.apache.activemq.artemis.core.server.group.impl.Response;
 import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
 import org.apache.activemq.artemis.utils.CompositeAddress;
+import org.apache.activemq.artemis.utils.UUID;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import java.lang.invoke.MethodHandles;
@@ -343,12 +344,6 @@ public final class BindingsImpl implements Bindings {
          } else if (groupRouting && groupingHandler != null && (groupId = 
message.getGroupID()) != null) {
             context.clear().setReusable(false);
             routeUsingStrictOrdering(message, context, groupingHandler, 
groupId, 0);
-         } else if (CompositeAddress.isFullyQualified(message.getAddress())) {
-            context.clear().setReusable(false);
-            final Binding theBinding = 
bindingsNameMap.get(String.valueOf(CompositeAddress.extractQueueName(message.getAddressSimpleString())));
-            if (theBinding != null && (theBinding.getFilter() == null || 
theBinding.getFilter().match(message))) {
-               theBinding.route(message, context);
-            }
          } else {
             // in a optimization, we are reusing the previous context if 
everything is right for it
             // so the simpleRouting will only happen if needed
@@ -494,8 +489,11 @@ public final class BindingsImpl implements Bindings {
          return false;
       }
 
-      final Filter filter = binding.getFilter();
+      if (CompositeAddress.isFullyQualified(message.getAddress()) && binding 
instanceof QueueBinding && 
!UUID.stripTrailingUUID(binding.getClusterName()).equals(CompositeAddress.extractQueueName(message.getAddressSimpleString())))
 {
+         return false;
+      }
 
+      final Filter filter = binding.getFilter();
       if (filter == null || filter.match(message)) {
          return true;
       }
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java
index ce359ef62c..5878c135d7 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java
@@ -16,11 +16,6 @@
  */
 package org.apache.activemq.artemis.tests.integration.cluster.distribution;
 
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
 import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -37,8 +32,8 @@ import 
org.apache.activemq.artemis.api.core.client.ClientMessage;
 import org.apache.activemq.artemis.api.core.client.ClientProducer;
 import org.apache.activemq.artemis.api.core.client.ClientSession;
 import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
-import org.apache.activemq.artemis.core.config.DivertConfiguration;
 import org.apache.activemq.artemis.api.core.management.QueueControl;
+import org.apache.activemq.artemis.core.config.DivertConfiguration;
 import org.apache.activemq.artemis.core.postoffice.Binding;
 import 
org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType;
 import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
@@ -49,11 +44,17 @@ import 
org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancing
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import 
org.apache.activemq.artemis.tests.integration.management.ManagementControlHelper;
 import org.apache.activemq.artemis.tests.util.Wait;
+import org.apache.activemq.artemis.utils.CompositeAddress;
 import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
 public class SimpleSymmetricClusterTest extends ClusterTestBase {
 
    private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -370,6 +371,64 @@ public class SimpleSymmetricClusterTest extends 
ClusterTestBase {
 
    }
 
+   @Test
+   public void testInitialDistributionSendToFQQNConsumeFromFQQN() throws 
Exception {
+      testInitialDistribution(true, true);
+   }
+
+   @Test
+   public void testInitialDistributionSendToFQQNConsumeFromQueue() throws 
Exception {
+      testInitialDistribution(true, false);
+   }
+
+   @Test
+   public void testInitialDistributionSendToAddressConsumeFromFQQN() throws 
Exception {
+      testInitialDistribution(false, true);
+   }
+
+   @Test
+   public void testInitialDistributionSendToAddressConsumeFromQueue() throws 
Exception {
+      testInitialDistribution(false, false);
+   }
+
+   private void testInitialDistribution(boolean sendToFQQN, boolean 
consumeFromFQQN) throws Exception {
+      CountDownLatch countDownLatch = new CountDownLatch(1);
+      final String address = "myAddress";
+      final String queue = "myQueue";
+      final String fqqn = CompositeAddress.toFullyQualified(address, queue);
+
+      setupServer(0, false, isNetty());
+      setupServer(1, false, isNetty());
+
+      setupClusterConnection("cluster0", address, 
MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1);
+      setupClusterConnection("cluster1", address, 
MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0);
+
+      startServers(0, 1);
+
+      setupSessionFactory(0, isNetty());
+      setupSessionFactory(1, isNetty());
+
+      createQueue(0, address, queue, null, false);
+      createQueue(1, address, queue, null, false);
+
+      waitForBindings(0, address, 1, 0, true);
+      waitForBindings(0, address, 1, 0, false);
+      waitForBindings(1, address, 1, 0, true);
+      waitForBindings(1, address, 1, 0, false);
+
+      addConsumer(0, 0, consumeFromFQQN ? fqqn : queue, null);
+      consumers[0].consumer.setMessageHandler((m) -> 
countDownLatch.countDown());
+
+      waitForBindings(0, address, 1, 1, true);
+      waitForBindings(1, address, 1, 1, false);
+
+      send(1, sendToFQQN ? fqqn : address, 1, true, null);
+
+      assertTrue(countDownLatch.await(5, TimeUnit.SECONDS));
+
+      closeAllConsumers();
+   }
+
    @Test
    public void testSimpleSnFManagement() throws Exception {
       final String address = "queues.testaddress";
diff --git 
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/UUIDTest.java
 
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/UUIDTest.java
index 4c608422b4..06bad5191e 100644
--- 
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/UUIDTest.java
+++ 
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/UUIDTest.java
@@ -25,6 +25,7 @@ import java.util.Set;
 
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.utils.RandomUtil;
 import org.apache.activemq.artemis.utils.UUID;
 import org.apache.activemq.artemis.utils.UUIDGenerator;
 import org.junit.jupiter.api.Test;
@@ -101,4 +102,13 @@ public class UUIDTest extends ActiveMQTestBase {
       // null
       assertFalse(UUID.isUUID(null));
    }
+
+   @Test
+   public void testStringTrailingUUID() {
+      SimpleString uuid = RandomUtil.randomUUIDSimpleString();
+      assertEquals(uuid, 
UUID.stripTrailingUUID(uuid.concat(RandomUtil.randomUUIDSimpleString())));
+
+      SimpleString foo = SimpleString.of("foo");
+      assertEquals(foo, UUID.stripTrailingUUID(foo));
+   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to