This is an automated email from the ASF dual-hosted git repository.

gtully pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 2167ac2  ARTEMIS-1925 - ensure OFF_WITH_REDISTRIBUTION behaves like 
OFF for initial routing
2167ac2 is described below

commit 2167ac2e30be04208da46b93af362a061f0b88c9
Author: gtully <[email protected]>
AuthorDate: Fri Oct 29 09:03:58 2021 +0100

    ARTEMIS-1925 - ensure OFF_WITH_REDISTRIBUTION behaves like OFF for initial 
routing
---
 .../artemis/core/postoffice/impl/BindingsImpl.java |  2 +-
 .../cluster/impl/RemoteQueueBindingImpl.java       |  2 +-
 .../core/postoffice/impl/BindingsImplTest.java     | 99 +++++++++++++++++++---
 .../cluster/impl/RemoteQueueBindImplTest.java      | 17 ++++
 4 files changed, 105 insertions(+), 15 deletions(-)

diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
index d7420d3..e193296 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
@@ -408,7 +408,7 @@ public final class BindingsImpl implements Bindings {
    private static boolean matchBinding(final Message message,
                                        final Binding binding,
                                        final MessageLoadBalancingType 
loadBalancingType) {
-      if (loadBalancingType.equals(MessageLoadBalancingType.OFF) && binding 
instanceof RemoteQueueBinding) {
+      if ((loadBalancingType.equals(MessageLoadBalancingType.OFF) || 
loadBalancingType.equals(MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION)) && 
binding instanceof RemoteQueueBinding) {
          return false;
       }
 
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/RemoteQueueBindingImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/RemoteQueueBindingImpl.java
index e87ded4..8a2f88c 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/RemoteQueueBindingImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/RemoteQueueBindingImpl.java
@@ -154,7 +154,7 @@ public class RemoteQueueBindingImpl implements 
RemoteQueueBinding {
 
    @Override
    public synchronized boolean isHighAcceptPriority(final Message message) {
-      if (consumerCount == 0 || 
messageLoadBalancingType.equals(MessageLoadBalancingType.OFF)) {
+      if (consumerCount == 0 || 
messageLoadBalancingType.equals(MessageLoadBalancingType.OFF) || 
messageLoadBalancingType.equals(MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION))
 {
          return false;
       }
 
diff --git 
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java
 
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java
index 72eb506..2b00dad 100644
--- 
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java
+++ 
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java
@@ -22,6 +22,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.Message;
@@ -35,6 +36,8 @@ import 
org.apache.activemq.artemis.core.postoffice.impl.BindingsImpl;
 import org.apache.activemq.artemis.core.server.Bindable;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.RoutingContext;
+import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
+import 
org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
 import org.apache.activemq.artemis.core.server.impl.AckReason;
 import org.apache.activemq.artemis.core.server.impl.RefsOperation;
 import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
@@ -45,15 +48,39 @@ import 
org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
 import org.junit.Test;
 
 public class BindingsImplTest extends ActiveMQTestBase {
-   // Constants -----------------------------------------------------
 
-   // Attributes ----------------------------------------------------
-
-   // Static --------------------------------------------------------
+   @Test
+   public void testGetNextBindingWithLoadBalancingOnDemand() throws Exception {
+      final FakeRemoteBinding fake = new FakeRemoteBinding(new 
SimpleString("a"));
+      fake.filter = null;  // such that it wil match all messages
+      fake.messageLoadBalancingType = MessageLoadBalancingType.ON_DEMAND;
+      final Bindings bind = new BindingsImpl(null, null);
+      bind.addBinding(fake);
+      bind.route(new CoreMessage(0, 100), new RoutingContextImpl(new 
FakeTransaction()));
+      assertEquals(1, fake.routedCount.get());
+   }
 
-   // Constructors --------------------------------------------------
+   @Test
+   public void testGetNextBindingWithLoadBalancingOff() throws Exception {
+      final FakeRemoteBinding fake = new FakeRemoteBinding(new 
SimpleString("a"));
+      fake.filter = null;  // such that it wil match all messages
+      fake.messageLoadBalancingType = MessageLoadBalancingType.OFF;
+      final Bindings bind = new BindingsImpl(null, null);
+      bind.addBinding(fake);
+      bind.route(new CoreMessage(0, 100), new RoutingContextImpl(new 
FakeTransaction()));
+      assertEquals(0, fake.routedCount.get());
+   }
 
-   // Public --------------------------------------------------------
+   @Test
+   public void testGetNextBindingWithLoadBalancingOffWithRedistribution() 
throws Exception {
+      final FakeRemoteBinding fake = new FakeRemoteBinding(new 
SimpleString("a"));
+      fake.filter = null;  // such that it wil match all messages
+      fake.messageLoadBalancingType = 
MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION;
+      final Bindings bind = new BindingsImpl(null, null);
+      bind.addBinding(fake);
+      bind.route(new CoreMessage(0, 100), new RoutingContextImpl(new 
FakeTransaction()));
+      assertEquals(0, fake.routedCount.get());
+   }
 
    @Test
    public void testRemoveWhileRouting() throws Exception {
@@ -299,8 +326,10 @@ public class BindingsImplTest extends ActiveMQTestBase {
 
    }
 
-   private final class FakeBinding implements Binding {
+   private class FakeBinding implements Binding {
 
+      Filter filter = new FakeFilter();
+      AtomicInteger routedCount = new AtomicInteger();
       @Override
       public void close() throws Exception {
 
@@ -354,7 +383,7 @@ public class BindingsImplTest extends ActiveMQTestBase {
        */
       @Override
       public Filter getFilter() {
-         return new FakeFilter();
+         return filter;
       }
 
       @Override
@@ -399,7 +428,7 @@ public class BindingsImplTest extends ActiveMQTestBase {
 
       @Override
       public void route(final Message message, final RoutingContext context) 
throws Exception {
-
+         routedCount.incrementAndGet();
       }
 
       /* (non-Javadoc)
@@ -422,12 +451,56 @@ public class BindingsImplTest extends ActiveMQTestBase {
 
    }
 
-   // Package protected ---------------------------------------------
+   private final class FakeRemoteBinding extends FakeBinding implements 
RemoteQueueBinding  {
+      MessageLoadBalancingType messageLoadBalancingType;
+      FakeRemoteBinding(SimpleString name) {
+         super(name);
+      }
 
-   // Protected -----------------------------------------------------
+      @Override
+      public boolean isLocal() {
+         return false;
+      }
 
-   // Private -------------------------------------------------------
+      @Override
+      public int consumerCount() {
+         return 0;
+      }
 
-   // Inner classes -------------------------------------------------
+      @Override
+      public Queue getQueue() {
+         return null;
+      }
 
+      @Override
+      public void addConsumer(SimpleString filterString) throws Exception {
+
+      }
+
+      @Override
+      public void removeConsumer(SimpleString filterString) throws Exception {
+      }
+
+      @Override
+      public void reset() {
+      }
+
+      @Override
+      public void disconnect() {
+      }
+
+      @Override
+      public void connect() {
+      }
+
+      @Override
+      public long getRemoteQueueID() {
+         return 0;
+      }
+
+      @Override
+      public MessageLoadBalancingType getMessageLoadBalancingType() {
+         return messageLoadBalancingType;
+      }
+   }
 }
diff --git 
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/cluster/impl/RemoteQueueBindImplTest.java
 
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/cluster/impl/RemoteQueueBindImplTest.java
index 1aee0fe..82f5239 100644
--- 
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/cluster/impl/RemoteQueueBindImplTest.java
+++ 
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/cluster/impl/RemoteQueueBindImplTest.java
@@ -81,4 +81,21 @@ public class RemoteQueueBindImplTest extends 
ActiveMQTestBase {
       testAddRemoveConsumerWithFilter(i -> null, 1, 0);
    }
 
+   @Test
+   public void testIsHighAcceptPriority() throws Exception {
+      final long id = RandomUtil.randomLong();
+      final SimpleString address = RandomUtil.randomSimpleString();
+      final SimpleString uniqueName = RandomUtil.randomSimpleString();
+      final SimpleString routingName = RandomUtil.randomSimpleString();
+      final Long remoteQueueID = RandomUtil.randomLong();
+      final SimpleString filterString = new SimpleString("A>B");
+      final Queue storeAndForwardQueue = new FakeQueue(null);
+      final SimpleString bridgeName = RandomUtil.randomSimpleString();
+      final int distance = 0;
+      RemoteQueueBindingImpl bindingOff = new RemoteQueueBindingImpl(id, 
address, uniqueName, routingName, remoteQueueID, filterString, 
storeAndForwardQueue, bridgeName, distance, MessageLoadBalancingType.OFF);
+      assertFalse(bindingOff.isHighAcceptPriority(null));
+
+      RemoteQueueBindingImpl bindingOffWithRedistribution = new 
RemoteQueueBindingImpl(id, address, uniqueName, routingName, remoteQueueID, 
filterString, storeAndForwardQueue, bridgeName, distance, 
MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION);
+      assertFalse(bindingOffWithRedistribution.isHighAcceptPriority(null));
+   }
 }

Reply via email to