This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 096a869c1e ARTEMIS-4954 AddressControl.pause() can pause the snf queue
096a869c1e is described below
commit 096a869c1ed6102a43074434db07899c70231c4a
Author: Howard Gao <[email protected]>
AuthorDate: Thu Jul 25 22:50:14 2024 +0800
ARTEMIS-4954 AddressControl.pause() can pause the snf queue
---
.../artemis/core/server/impl/AddressInfo.java | 14 +--
.../bridge/ClusteredBridgeReconnectTest.java | 107 +++++++++++++++++++++
2 files changed, 114 insertions(+), 7 deletions(-)
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java
index 44942f5e9a..e35a30828c 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java
@@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.core.server.impl;
+import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
import org.apache.activemq.artemis.json.JsonArray;
import org.apache.activemq.artemis.json.JsonArrayBuilder;
import org.apache.activemq.artemis.json.JsonNumber;
@@ -35,7 +36,6 @@ import
org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.Bindings;
import org.apache.activemq.artemis.core.postoffice.PostOffice;
-import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import
org.apache.activemq.artemis.core.settings.HierarchicalRepositoryChangeListener;
import org.apache.activemq.artemis.utils.CompositeAddress;
@@ -216,8 +216,8 @@ public class AddressInfo {
Bindings bindings =
postOffice.lookupBindingsForAddress(this.getName());
if (bindings != null) {
for (Binding binding : bindings.getBindings()) {
- if (binding instanceof QueueBinding) {
- ((QueueBinding) binding).getQueue().pause(false);
+ if (binding instanceof LocalQueueBinding) {
+ ((LocalQueueBinding) binding).getQueue().pause(false);
}
}
}
@@ -250,8 +250,8 @@ public class AddressInfo {
Bindings bindings =
postOffice.lookupBindingsForAddress(this.getName());
if (bindings != null) {
for (Binding binding : bindings.getBindings()) {
- if (binding instanceof QueueBinding) {
- ((QueueBinding) binding).getQueue().pause(false);
+ if (binding instanceof LocalQueueBinding) {
+ ((LocalQueueBinding) binding).getQueue().pause(false);
}
}
}
@@ -278,8 +278,8 @@ public class AddressInfo {
Bindings bindings =
postOffice.lookupBindingsForAddress(this.getName());
if (bindings != null) {
for (Binding binding : bindings.getBindings()) {
- if (binding instanceof QueueBinding) {
- ((QueueBinding) binding).getQueue().resume();
+ if (binding instanceof LocalQueueBinding) {
+ ((LocalQueueBinding) binding).getQueue().resume();
}
}
}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/ClusteredBridgeReconnectTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/ClusteredBridgeReconnectTest.java
index e2ac8e8aa9..62fecf764d 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/ClusteredBridgeReconnectTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/ClusteredBridgeReconnectTest.java
@@ -17,7 +17,9 @@
package org.apache.activemq.artemis.tests.integration.cluster.bridge;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.ArrayList;
@@ -28,17 +30,26 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.TopologyMember;
+import org.apache.activemq.artemis.api.core.management.AddressControl;
+import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.core.client.impl.TopologyMemberImpl;
+import org.apache.activemq.artemis.core.postoffice.Binding;
+import org.apache.activemq.artemis.core.postoffice.Bindings;
+import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
import org.apache.activemq.artemis.core.server.cluster.MessageFlowRecord;
+import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
import org.apache.activemq.artemis.core.server.cluster.impl.BridgeTestAccessor;
import
org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionBridge;
import
org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionImpl;
import
org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import
org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase;
import org.apache.activemq.artemis.tests.util.Wait;
import org.junit.jupiter.api.AfterEach;
@@ -305,6 +316,102 @@ public class ClusteredBridgeReconnectTest extends
ClusterTestBase {
stopServers(0, 1);
}
+ @Test
+ public void testPauseAddressBlockingSnFQueue() throws Exception {
+ setupServer(0, isFileStorage(), isNetty());
+ setupServer(1, isFileStorage(), isNetty());
+
+ setupClusterConnection("cluster0", "queues",
MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1);
+
+ setupClusterConnection("cluster1", "queues",
MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0);
+
+ AddressSettings addressSettings = new AddressSettings();
+ addressSettings.setRedistributionDelay(0);
+
+ servers[0].getAddressSettingsRepository().addMatch("#", addressSettings);
+ servers[1].getAddressSettingsRepository().addMatch("#", addressSettings);
+
+ startServers(0, 1);
+
+ setupSessionFactory(0, isNetty());
+ setupSessionFactory(1, isNetty());
+
+ ClientSession session0 = sfs[0].createSession();
+ ClientSession session1 = sfs[1].createSession();
+
+ session0.start();
+ session1.start();
+
+ createQueue(0, "queues.testaddress", "queue1", null, true);
+ createQueue(1, "queues.testaddress", "queue1", null, true);
+ ClientConsumer consumer1 = session1.createConsumer("queue1");
+
+ waitForBindings(0, "queues.testaddress", 1, 0, true);
+ waitForBindings(1, "queues.testaddress", 1, 1, true);
+
+ waitForBindings(0, "queues.testaddress", 1, 1, false);
+ waitForBindings(1, "queues.testaddress", 1, 0, false);
+
+ final int num = 10;
+ //normal message flow should work
+ ClientProducer goodProducer0 =
session0.createProducer("queues.testaddress");
+ for (int i = 0; i < num; i++) {
+ Message msg = session0.createMessage(true);
+ msg.putStringProperty("origin", "from producer 0");
+ goodProducer0.send(msg);
+ }
+
+ //consumer1 can receive from node0
+ for (int i = 0; i < num; i++) {
+ ClientMessage m = consumer1.receive(5000);
+ assertNotNull(m);
+ String propValue = m.getStringProperty("origin");
+ assertEquals("from producer 0", propValue);
+ m.acknowledge();
+ }
+ assertNull(consumer1.receiveImmediate());
+
+ //pause address from node0
+ String addressControlResourceName = ResourceNames.ADDRESS +
"queues.testaddress";
+ Object resource =
servers[0].getManagementService().getResource(addressControlResourceName);
+ AddressControl addressControl0 = (AddressControl) resource;
+ addressControl0.pause();
+
+ Bindings bindings0 =
servers[0].getPostOffice().getBindingsForAddress(SimpleString.of("queues.testaddress"));
+ assertNotNull(bindings0);
+ assertEquals(2, bindings0.getBindings().size());
+ boolean localBindingPaused = false;
+ boolean remoteBindingPaused = true;
+ for (Binding bd : bindings0.getBindings()) {
+ if (bd instanceof LocalQueueBinding) {
+ localBindingPaused = ((LocalQueueBinding)bd).getQueue().isPaused();
+ }
+ if (bd instanceof RemoteQueueBinding) {
+ remoteBindingPaused =
((RemoteQueueBinding)bd).getQueue().isPaused();
+ }
+ }
+ assertTrue(localBindingPaused);
+ assertFalse(remoteBindingPaused);
+
+ //now message should flow to node 1 regardless of the pause
+ for (int i = 0; i < num; i++) {
+ Message msg = session0.createMessage(true);
+ msg.putStringProperty("origin", "from producer 0");
+ goodProducer0.send(msg);
+ }
+
+ //consumer1 can receive from node0
+ for (int i = 0; i < num; i++) {
+ ClientMessage m = consumer1.receive(5000);
+ assertNotNull(m);
+ String propValue = m.getStringProperty("origin");
+ assertEquals("from producer 0", propValue);
+ m.acknowledge();
+ }
+ assertNull(consumer1.receiveImmediate());
+
+ stopServers(0, 1);
+ }
@Override
@AfterEach
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact