This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 096a869c1e ARTEMIS-4954 AddressControl.pause() can pause the snf queue
096a869c1e is described below

commit 096a869c1ed6102a43074434db07899c70231c4a
Author: Howard Gao <[email protected]>
AuthorDate: Thu Jul 25 22:50:14 2024 +0800

    ARTEMIS-4954 AddressControl.pause() can pause the snf queue
---
 .../artemis/core/server/impl/AddressInfo.java      |  14 +--
 .../bridge/ClusteredBridgeReconnectTest.java       | 107 +++++++++++++++++++++
 2 files changed, 114 insertions(+), 7 deletions(-)

diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java
index 44942f5e9a..e35a30828c 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.artemis.core.server.impl;
 
+import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
 import org.apache.activemq.artemis.json.JsonArray;
 import org.apache.activemq.artemis.json.JsonArrayBuilder;
 import org.apache.activemq.artemis.json.JsonNumber;
@@ -35,7 +36,6 @@ import 
org.apache.activemq.artemis.core.persistence.StorageManager;
 import org.apache.activemq.artemis.core.postoffice.Binding;
 import org.apache.activemq.artemis.core.postoffice.Bindings;
 import org.apache.activemq.artemis.core.postoffice.PostOffice;
-import org.apache.activemq.artemis.core.postoffice.QueueBinding;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import 
org.apache.activemq.artemis.core.settings.HierarchicalRepositoryChangeListener;
 import org.apache.activemq.artemis.utils.CompositeAddress;
@@ -216,8 +216,8 @@ public class AddressInfo {
          Bindings bindings = 
postOffice.lookupBindingsForAddress(this.getName());
          if (bindings != null) {
             for (Binding binding : bindings.getBindings()) {
-               if (binding instanceof QueueBinding) {
-                  ((QueueBinding) binding).getQueue().pause(false);
+               if (binding instanceof LocalQueueBinding) {
+                  ((LocalQueueBinding) binding).getQueue().pause(false);
                }
             }
          }
@@ -250,8 +250,8 @@ public class AddressInfo {
          Bindings bindings = 
postOffice.lookupBindingsForAddress(this.getName());
          if (bindings != null) {
             for (Binding binding : bindings.getBindings()) {
-               if (binding instanceof QueueBinding) {
-                  ((QueueBinding) binding).getQueue().pause(false);
+               if (binding instanceof LocalQueueBinding) {
+                  ((LocalQueueBinding) binding).getQueue().pause(false);
                }
             }
          }
@@ -278,8 +278,8 @@ public class AddressInfo {
          Bindings bindings = 
postOffice.lookupBindingsForAddress(this.getName());
          if (bindings != null) {
             for (Binding binding : bindings.getBindings()) {
-               if (binding instanceof QueueBinding) {
-                  ((QueueBinding) binding).getQueue().resume();
+               if (binding instanceof LocalQueueBinding) {
+                  ((LocalQueueBinding) binding).getQueue().resume();
                }
             }
          }
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/ClusteredBridgeReconnectTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/ClusteredBridgeReconnectTest.java
index e2ac8e8aa9..62fecf764d 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/ClusteredBridgeReconnectTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/ClusteredBridgeReconnectTest.java
@@ -17,7 +17,9 @@
 package org.apache.activemq.artemis.tests.integration.cluster.bridge;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.util.ArrayList;
@@ -28,17 +30,26 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.client.ClientConsumer;
 import org.apache.activemq.artemis.api.core.client.ClientMessage;
 import org.apache.activemq.artemis.api.core.client.ClientProducer;
 import org.apache.activemq.artemis.api.core.client.ClientSession;
 import org.apache.activemq.artemis.api.core.client.TopologyMember;
+import org.apache.activemq.artemis.api.core.management.AddressControl;
+import org.apache.activemq.artemis.api.core.management.ResourceNames;
 import org.apache.activemq.artemis.core.client.impl.TopologyMemberImpl;
+import org.apache.activemq.artemis.core.postoffice.Binding;
+import org.apache.activemq.artemis.core.postoffice.Bindings;
+import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
 import org.apache.activemq.artemis.core.server.cluster.MessageFlowRecord;
+import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
 import org.apache.activemq.artemis.core.server.cluster.impl.BridgeTestAccessor;
 import 
org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionBridge;
 import 
org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionImpl;
 import 
org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import 
org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase;
 import org.apache.activemq.artemis.tests.util.Wait;
 import org.junit.jupiter.api.AfterEach;
@@ -305,6 +316,102 @@ public class ClusteredBridgeReconnectTest extends 
ClusterTestBase {
       stopServers(0, 1);
    }
 
+   @Test
+   public void testPauseAddressBlockingSnFQueue() throws Exception {
+      setupServer(0, isFileStorage(), isNetty());
+      setupServer(1, isFileStorage(), isNetty());
+
+      setupClusterConnection("cluster0", "queues", 
MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1);
+
+      setupClusterConnection("cluster1", "queues", 
MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0);
+
+      AddressSettings addressSettings = new AddressSettings();
+      addressSettings.setRedistributionDelay(0);
+
+      servers[0].getAddressSettingsRepository().addMatch("#", addressSettings);
+      servers[1].getAddressSettingsRepository().addMatch("#", addressSettings);
+
+      startServers(0, 1);
+
+      setupSessionFactory(0, isNetty());
+      setupSessionFactory(1, isNetty());
+
+      ClientSession session0 = sfs[0].createSession();
+      ClientSession session1 = sfs[1].createSession();
+
+      session0.start();
+      session1.start();
+
+      createQueue(0, "queues.testaddress", "queue1", null, true);
+      createQueue(1, "queues.testaddress", "queue1", null, true);
+      ClientConsumer consumer1 = session1.createConsumer("queue1");
+
+      waitForBindings(0, "queues.testaddress", 1, 0, true);
+      waitForBindings(1, "queues.testaddress", 1, 1, true);
+
+      waitForBindings(0, "queues.testaddress", 1, 1, false);
+      waitForBindings(1, "queues.testaddress", 1, 0, false);
+
+      final int num = 10;
+      //normal message flow should work
+      ClientProducer goodProducer0 = 
session0.createProducer("queues.testaddress");
+      for (int i = 0; i < num; i++) {
+         Message msg = session0.createMessage(true);
+         msg.putStringProperty("origin", "from producer 0");
+         goodProducer0.send(msg);
+      }
+
+      //consumer1 can receive from node0
+      for (int i = 0; i < num; i++) {
+         ClientMessage m = consumer1.receive(5000);
+         assertNotNull(m);
+         String propValue = m.getStringProperty("origin");
+         assertEquals("from producer 0", propValue);
+         m.acknowledge();
+      }
+      assertNull(consumer1.receiveImmediate());
+
+      //pause address from node0
+      String addressControlResourceName = ResourceNames.ADDRESS + 
"queues.testaddress";
+      Object resource = 
servers[0].getManagementService().getResource(addressControlResourceName);
+      AddressControl addressControl0 = (AddressControl) resource;
+      addressControl0.pause();
+
+      Bindings bindings0 = 
servers[0].getPostOffice().getBindingsForAddress(SimpleString.of("queues.testaddress"));
+      assertNotNull(bindings0);
+      assertEquals(2, bindings0.getBindings().size());
+      boolean localBindingPaused = false;
+      boolean remoteBindingPaused = true;
+      for (Binding bd : bindings0.getBindings()) {
+         if (bd instanceof LocalQueueBinding) {
+            localBindingPaused = ((LocalQueueBinding)bd).getQueue().isPaused();
+         }
+         if (bd instanceof RemoteQueueBinding) {
+            remoteBindingPaused = 
((RemoteQueueBinding)bd).getQueue().isPaused();
+         }
+      }
+      assertTrue(localBindingPaused);
+      assertFalse(remoteBindingPaused);
+
+      //now message should flow to node 1 regardless of the pause
+      for (int i = 0; i < num; i++) {
+         Message msg = session0.createMessage(true);
+         msg.putStringProperty("origin", "from producer 0");
+         goodProducer0.send(msg);
+      }
+
+      //consumer1 can receive from node0
+      for (int i = 0; i < num; i++) {
+         ClientMessage m = consumer1.receive(5000);
+         assertNotNull(m);
+         String propValue = m.getStringProperty("origin");
+         assertEquals("from producer 0", propValue);
+         m.acknowledge();
+      }
+      assertNull(consumer1.receiveImmediate());
+
+      stopServers(0, 1);
+   }
 
    @Override
    @AfterEach


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to