This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new a6243bee59 ARTEMIS-5232 Speed up RedirectTest
a6243bee59 is described below
commit a6243bee594e10c9254e812af60d5e609adadda5
Author: Domenico Francesco Bruscino <[email protected]>
AuthorDate: Fri Jan 10 14:31:27 2025 +0100
ARTEMIS-5232 Speed up RedirectTest
---
.../tests/integration/routing/RedirectTest.java | 196 +++++++++++----------
.../tests/integration/routing/RoutingTestBase.java | 2 -
2 files changed, 100 insertions(+), 98 deletions(-)
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/routing/RedirectTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/routing/RedirectTest.java
index f35c58a9c2..23012ecb37 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/routing/RedirectTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/routing/RedirectTest.java
@@ -26,7 +26,6 @@ import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
@@ -45,37 +44,22 @@ import
org.apache.activemq.artemis.tests.extensions.parameterized.ParameterizedT
import org.apache.activemq.artemis.tests.extensions.parameterized.Parameters;
import org.apache.activemq.artemis.core.server.routing.KeyType;
import
org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
-import org.apache.activemq.artemis.tests.util.Wait;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
@ExtendWith(ParameterizedTestExtension.class)
public class RedirectTest extends RoutingTestBase {
- @Parameters(name = "protocol: {0}, pool: {1}")
- public static Collection<Object[]> data() {
- final String[] protocols = new String[] {AMQP_PROTOCOL, CORE_PROTOCOL,
OPENWIRE_PROTOCOL};
- final String[] pools = new String[] {CLUSTER_POOL, DISCOVERY_POOL,
STATIC_POOL};
- Collection<Object[]> data = new ArrayList<>();
-
- for (String protocol : Arrays.asList(protocols)) {
- for (String pool : Arrays.asList(pools)) {
- data.add(new Object[] {protocol, pool});
- }
- }
-
- return data;
+ @Parameters(name = "pool: {0}")
+ public static Collection<Object[]> parameters() {
+ return Arrays.asList(new Object[][] {{CLUSTER_POOL}, {DISCOVERY_POOL},
{STATIC_POOL} });
}
- private final String protocol;
-
private final String pool;
- public RedirectTest(String protocol, String pool) {
- this.protocol = protocol;
-
+ public RedirectTest(String pool) {
this.pool = pool;
}
@@ -100,16 +84,24 @@ public class RedirectTest extends RoutingTestBase {
getServer(0).createQueue(QueueConfiguration.of(queueName).setRoutingType(RoutingType.ANYCAST));
getServer(1).createQueue(QueueConfiguration.of(queueName).setRoutingType(RoutingType.ANYCAST));
+ testSimpleRedirectWithProtocol(AMQP_PROTOCOL, queueName);
+ testSimpleRedirectWithProtocol(CORE_PROTOCOL, queueName);
+ testSimpleRedirectWithProtocol(OPENWIRE_PROTOCOL, queueName);
+
+ stopServers(0, 1);
+ }
+
+ private void testSimpleRedirectWithProtocol(final String protocol, final
String queueName) throws Exception {
QueueControl queueControl0 =
(QueueControl)getServer(0).getManagementService()
- .getResource(ResourceNames.QUEUE + queueName);
+ .getResource(ResourceNames.QUEUE + queueName);
QueueControl queueControl1 =
(QueueControl)getServer(1).getManagementService()
- .getResource(ResourceNames.QUEUE + queueName);
+ .getResource(ResourceNames.QUEUE + queueName);
assertEquals(0, queueControl0.countMessages());
assertEquals(0, queueControl1.countMessages());
ConnectionFactory connectionFactory = createFactory(protocol, false,
TransportConstants.DEFAULT_HOST,
- TransportConstants.DEFAULT_PORT + 0, null, "admin", "admin");
+ TransportConstants.DEFAULT_PORT + 0, null, "admin", "admin");
try (Connection connection = connectionFactory.createConnection()) {
@@ -117,7 +109,7 @@ public class RedirectTest extends RoutingTestBase {
try (Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE)) {
javax.jms.Queue queue = session.createQueue(queueName);
try (MessageProducer producer = session.createProducer(queue)) {
- producer.send(session.createTextMessage("TEST"));
+ producer.send(session.createTextMessage("TEST" + protocol));
}
}
}
@@ -131,15 +123,13 @@ public class RedirectTest extends RoutingTestBase {
try (MessageConsumer consumer =
session.createConsumer(session.createQueue(queueName))) {
TextMessage message = (TextMessage) consumer.receive(1000);
assertNotNull(message);
- assertEquals("TEST", message.getText());
+ assertEquals("TEST" + protocol, message.getText());
}
}
}
assertEquals(0, queueControl0.countMessages());
assertEquals(0, queueControl1.countMessages());
-
- stopServers(0, 1);
}
@TestTemplate
@@ -166,28 +156,21 @@ public class RedirectTest extends RoutingTestBase {
private void testEvenlyRedirect(final String policyName, final Map<String,
String> properties, final boolean withFailure) throws Exception {
final String queueName = "RedirectTestQueue";
- final int targets = MULTIPLE_TARGETS;
- int[] nodes = new int[targets + 1];
- int[] targetNodes = new int[targets];
- QueueControl[] queueControls = new QueueControl[targets + 1];
+ final int[] nodes = new int[] {0, 1, 2, 3};
- nodes[0] = 0;
- setupPrimaryServerWithDiscovery(0, GROUP_ADDRESS, GROUP_PORT, true,
true, false);
- for (int i = 0; i < targets; i++) {
- nodes[i + 1] = i + 1;
- targetNodes[i] = i + 1;
- setupPrimaryServerWithDiscovery(i + 1, GROUP_ADDRESS, GROUP_PORT,
true, true, false);
+ for (int node : nodes) {
+ setupPrimaryServerWithDiscovery(node, GROUP_ADDRESS, GROUP_PORT,
true, true, false);
}
if (CLUSTER_POOL.equals(pool)) {
for (int node : nodes) {
setupDiscoveryClusterConnection("cluster" + node, node, "dg1",
"queues", MessageLoadBalancingType.OFF, 1, true);
}
- setupRouterServerWithCluster(0, KeyType.USER_NAME, policyName,
properties, false, "ACTIVEMQ.CLUSTER.ADMIN.USER", targets, "cluster0");
+ setupRouterServerWithCluster(0, KeyType.USER_NAME, policyName,
properties, false, "ACTIVEMQ.CLUSTER.ADMIN.USER", nodes.length - 1, "cluster0");
} else if (DISCOVERY_POOL.equals(pool)) {
- setupRouterServerWithDiscovery(0, KeyType.USER_NAME, policyName,
properties, false, null, targets);
+ setupRouterServerWithDiscovery(0, KeyType.USER_NAME, policyName,
properties, false, null, nodes.length - 1);
} else {
- setupRouterServerWithStaticConnectors(0, KeyType.USER_NAME,
policyName, properties, false, null, targets, 1, 2, 3);
+ setupRouterServerWithStaticConnectors(0, KeyType.USER_NAME,
policyName, properties, false, null, nodes.length - 1, 1, 2, 3);
}
if (withFailure) {
@@ -198,21 +181,32 @@ public class RedirectTest extends RoutingTestBase {
for (int node : nodes) {
getServer(node).createQueue(QueueConfiguration.of(queueName).setRoutingType(RoutingType.ANYCAST));
+ }
+
+ testEvenlyRedirectWithProtocol(AMQP_PROTOCOL, queueName, nodes,
withFailure);
+ testEvenlyRedirectWithProtocol(CORE_PROTOCOL, queueName, nodes,
withFailure);
+ testEvenlyRedirectWithProtocol(OPENWIRE_PROTOCOL, queueName, nodes,
withFailure);
+ stopServers(nodes);
+ }
+
+ private void testEvenlyRedirectWithProtocol(final String protocol, final
String queueName, final int[] nodes, final boolean withFailure) throws
Exception {
+ QueueControl[] queueControls = new QueueControl[nodes.length];
+
+ for (int node : nodes) {
queueControls[node] =
(QueueControl)getServer(node).getManagementService()
- .getResource(ResourceNames.QUEUE + queueName);
+ .getResource(ResourceNames.QUEUE + queueName);
- assertEquals(0, queueControls[node].countMessages(), "Unexpected
messagecount for node " + node);
+ assertEquals(0, queueControls[node].countMessages(), "Unexpected
message count for node " + node);
}
+ ConnectionFactory[] connectionFactories = new
ConnectionFactory[nodes.length - 1];
+ Connection[] connections = new Connection[nodes.length - 1];
+ Session[] sessions = new Session[nodes.length - 1];
- ConnectionFactory[] connectionFactories = new ConnectionFactory[targets];
- Connection[] connections = new Connection[targets];
- Session[] sessions = new Session[targets];
-
- for (int i = 0; i < targets; i++) {
+ for (int i = 0; i < nodes.length - 1; i++) {
connectionFactories[i] = createFactory(protocol, false,
TransportConstants.DEFAULT_HOST,
- TransportConstants.DEFAULT_PORT + 0, null, "user" + i, "user" + i);
+ TransportConstants.DEFAULT_PORT + 0, null, "user" + i, "user" +
i);
connections[i] = connectionFactories[i].createConnection();
connections[i].start();
@@ -220,9 +214,9 @@ public class RedirectTest extends RoutingTestBase {
sessions[i] = connections[i].createSession(false,
Session.AUTO_ACKNOWLEDGE);
}
- for (int i = 0; i < targets; i++) {
+ for (int i = 0; i < nodes.length - 1; i++) {
try (MessageProducer producer =
sessions[i].createProducer(sessions[i].createQueue(queueName))) {
- producer.send(sessions[i].createTextMessage("TEST" + i));
+ producer.send(sessions[i].createTextMessage("TEST" + protocol +
i));
}
sessions[i].close();
@@ -230,7 +224,7 @@ public class RedirectTest extends RoutingTestBase {
}
assertEquals(0, queueControls[0].countMessages());
- for (int targetNode : targetNodes) {
+ for (int targetNode = 1; targetNode < nodes.length - 1; targetNode++) {
assertEquals(1, queueControls[targetNode].countMessages(), "Messages
of node " + targetNode);
}
@@ -238,16 +232,19 @@ public class RedirectTest extends RoutingTestBase {
crashAndWaitForFailure(getServer(0));
startServers(0);
+
+ queueControls[0] = (QueueControl)getServer(0).getManagementService()
+ .getResource(ResourceNames.QUEUE + queueName);
}
- for (int i = 0; i < targets; i++) {
+ for (int i = 0; i < nodes.length - 1; i++) {
try (Connection connection =
connectionFactories[i].createConnection()) {
connection.start();
try (Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE)) {
try (MessageConsumer consumer =
session.createConsumer(session.createQueue(queueName))) {
TextMessage message = (TextMessage) consumer.receive(1000);
assertNotNull(message);
- assertEquals("TEST" + i, message.getText());
+ assertEquals("TEST" + protocol + i, message.getText());
}
}
}
@@ -256,8 +253,6 @@ public class RedirectTest extends RoutingTestBase {
for (int node : nodes) {
assertEquals(0, queueControls[node].countMessages(), "Unexpected
message count for node " + node);
}
-
- stopServers(nodes);
}
@TestTemplate
@@ -286,16 +281,24 @@ public class RedirectTest extends RoutingTestBase {
getServer(0).createQueue(QueueConfiguration.of(queueName).setRoutingType(RoutingType.ANYCAST));
getServer(1).createQueue(QueueConfiguration.of(queueName).setRoutingType(RoutingType.ANYCAST));
+ testSymmetricRedirectWithProtocol(AMQP_PROTOCOL, queueName);
+ testSymmetricRedirectWithProtocol(CORE_PROTOCOL, queueName);
+ testSymmetricRedirectWithProtocol(OPENWIRE_PROTOCOL, queueName);
+
+ stopServers(0, 1);
+ }
+
+ private void testSymmetricRedirectWithProtocol(final String protocol, final
String queueName) throws Exception {
QueueControl queueControl0 =
(QueueControl)getServer(0).getManagementService()
- .getResource(ResourceNames.QUEUE + queueName);
+ .getResource(ResourceNames.QUEUE + queueName);
QueueControl queueControl1 =
(QueueControl)getServer(1).getManagementService()
- .getResource(ResourceNames.QUEUE + queueName);
+ .getResource(ResourceNames.QUEUE + queueName);
assertEquals(0, queueControl0.countMessages(), "Unexpected message count
for node 0");
assertEquals(0, queueControl1.countMessages(), "Unexpected message count
for node 1");
ConnectionFactory connectionFactory0 = createFactory(protocol, false,
TransportConstants.DEFAULT_HOST,
- TransportConstants.DEFAULT_PORT + 0, null, "admin", "admin");
+ TransportConstants.DEFAULT_PORT + 0, null, "admin", "admin");
try (Connection connection = connectionFactory0.createConnection()) {
@@ -303,18 +306,18 @@ public class RedirectTest extends RoutingTestBase {
try (Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE)) {
javax.jms.Queue queue = session.createQueue(queueName);
try (MessageProducer producer = session.createProducer(queue)) {
- producer.send(session.createTextMessage("TEST"));
+ producer.send(session.createTextMessage("TEST" + protocol));
}
}
}
assertTrue((queueControl0.countMessages() == 0 &&
queueControl1.countMessages() == 1) ||
- (queueControl0.countMessages() == 1 && queueControl1.countMessages()
== 0));
+ (queueControl0.countMessages() == 1 && queueControl1.countMessages()
== 0));
assertTrue(getServer(0).getNodeID() != getServer(1).getNodeID());
ConnectionFactory connectionFactory1 = createFactory(protocol, false,
TransportConstants.DEFAULT_HOST,
- TransportConstants.DEFAULT_PORT + 1, null, "admin", "admin");
+ TransportConstants.DEFAULT_PORT + 1, null, "admin", "admin");
try (Connection connection = connectionFactory1.createConnection()) {
connection.start();
@@ -322,28 +325,27 @@ public class RedirectTest extends RoutingTestBase {
try (MessageConsumer consumer =
session.createConsumer(session.createQueue(queueName))) {
TextMessage message = (TextMessage) consumer.receive(1000);
assertNotNull(message);
- assertEquals("TEST", message.getText());
+ assertEquals("TEST" + protocol, message.getText());
}
}
}
assertEquals(0, queueControl0.countMessages(), "Unexpected message count
for node 0");
assertEquals(0, queueControl1.countMessages(), "Unexpected message count
for node 1");
-
- stopServers(0, 1);
}
@TestTemplate
public void testRedirectAfterFailure() throws Exception {
final String queueName = "RedirectTestQueue";
+ final int[] nodes = new int[] {0, 1, 2};
- setupPrimaryServerWithDiscovery(0, GROUP_ADDRESS, GROUP_PORT, true,
true, false);
- setupPrimaryServerWithDiscovery(1, GROUP_ADDRESS, GROUP_PORT, true,
true, false);
- setupPrimaryServerWithDiscovery(2, GROUP_ADDRESS, GROUP_PORT, true,
true, false);
+ for (int node : nodes) {
+ setupPrimaryServerWithDiscovery(node, GROUP_ADDRESS, GROUP_PORT,
true, true, false);
+ }
if (CLUSTER_POOL.equals(pool)) {
- setupDiscoveryClusterConnection("cluster0", 0, "dg1", "queues",
MessageLoadBalancingType.OFF, 1, true);
- setupDiscoveryClusterConnection("cluster1", 1, "dg1", "queues",
MessageLoadBalancingType.OFF, 1, true);
- setupDiscoveryClusterConnection("cluster2", 2, "dg1", "queues",
MessageLoadBalancingType.OFF, 1, true);
+ for (int node : nodes) {
+ setupDiscoveryClusterConnection("cluster" + node, node, "dg1",
"queues", MessageLoadBalancingType.OFF, 1, true);
+ }
setupRouterServerWithCluster(0, KeyType.USER_NAME,
FirstElementPolicy.NAME, null, false, "ACTIVEMQ.CLUSTER.ADMIN.USER", 1,
"cluster0");
} else if (DISCOVERY_POOL.equals(pool)) {
setupRouterServerWithDiscovery(0, KeyType.USER_NAME,
FirstElementPolicy.NAME, null, false, null, 1);
@@ -353,24 +355,29 @@ public class RedirectTest extends RoutingTestBase {
startServers(0, 1, 2);
-
getServer(0).createQueue(QueueConfiguration.of(queueName).setRoutingType(RoutingType.ANYCAST));
-
getServer(1).createQueue(QueueConfiguration.of(queueName).setRoutingType(RoutingType.ANYCAST));
-
getServer(2).createQueue(QueueConfiguration.of(queueName).setRoutingType(RoutingType.ANYCAST));
+ for (int node : nodes) {
+
getServer(node).createQueue(QueueConfiguration.of(queueName).setRoutingType(RoutingType.ANYCAST));
+ }
- QueueControl queueControl0 =
(QueueControl)getServer(0).getManagementService()
- .getResource(ResourceNames.QUEUE + queueName);
- QueueControl queueControl1 =
(QueueControl)getServer(1).getManagementService()
- .getResource(ResourceNames.QUEUE + queueName);
- QueueControl queueControl2 =
(QueueControl)getServer(2).getManagementService()
- .getResource(ResourceNames.QUEUE + queueName);
+ testRedirectAfterFailureWithProtocol(AMQP_PROTOCOL, queueName, nodes);
+ testRedirectAfterFailureWithProtocol(CORE_PROTOCOL, queueName, nodes);
+ testRedirectAfterFailureWithProtocol(OPENWIRE_PROTOCOL, queueName,
nodes);
- assertEquals(0, queueControl0.countMessages(), "Unexpected message count
for node 0");
- assertEquals(0, queueControl1.countMessages(), "Unexpected message count
for node 1");
- assertEquals(0, queueControl2.countMessages(), "Unexpected message count
for node 2");
+ stopServers(0, 1, 2);
+ }
+
+ private void testRedirectAfterFailureWithProtocol(final String protocol,
final String queueName, final int[] nodes) throws Exception {
+ QueueControl[] queueControls = new QueueControl[nodes.length];
+ for (int node : nodes) {
+ queueControls[node] =
(QueueControl)getServer(node).getManagementService()
+ .getResource(ResourceNames.QUEUE + queueName);
+
+ assertEquals(0, queueControls[node].countMessages(), "Unexpected
message count for node " + node);
+ }
int failedNode;
ConnectionFactory connectionFactory = createFactory(protocol, false,
TransportConstants.DEFAULT_HOST,
- TransportConstants.DEFAULT_PORT + 0, null, "admin", "admin");
+ TransportConstants.DEFAULT_PORT + 0, null, "admin", "admin");
try (Connection connection = connectionFactory.createConnection()) {
@@ -380,7 +387,7 @@ public class RedirectTest extends RoutingTestBase {
try (MessageProducer producer = session.createProducer(queue)) {
producer.send(session.createTextMessage("TEST_BEFORE_FAILURE"));
- if (queueControl1.countMessages() > 0) {
+ if (queueControls[1].countMessages() > 0) {
failedNode = 1;
} else {
failedNode = 2;
@@ -395,9 +402,8 @@ public class RedirectTest extends RoutingTestBase {
startServers(failedNode);
- Wait.assertEquals(0L, () -> queueControl0.countMessages(), 5000, 100);
- Wait.assertEquals(1L, () -> queueControl1.countMessages(), 5000, 100);
- Wait.assertEquals(1L, () -> queueControl2.countMessages(), 5000, 100);
+ queueControls[failedNode] =
(QueueControl)getServer(failedNode).getManagementService()
+ .getResource(ResourceNames.QUEUE + queueName);
try (Connection connection = connectionFactory.createConnection()) {
connection.start();
@@ -410,15 +416,13 @@ public class RedirectTest extends RoutingTestBase {
}
}
- assertEquals(0, queueControl0.countMessages(), "Unexpected message count
for node 0");
- if (failedNode == 1) {
- assertEquals(1, queueControl1.countMessages(), "Unexpected message
count for node 1");
- assertEquals(0, queueControl2.countMessages(), "Unexpected message
count for node 2");
- } else {
- assertEquals(0, queueControl1.countMessages(), "Unexpected message
count for node 1");
- assertEquals(1, queueControl2.countMessages(), "Unexpected message
count for node 2");
+ for (int node : nodes) {
+ if (node == failedNode) {
+ assertEquals(1, queueControls[node].countMessages(), "Unexpected
message count for node " + node);
+ queueControls[node].removeAllMessages();
+ } else {
+ assertEquals(0, queueControls[node].countMessages(), "Unexpected
message count for node " + node);
+ }
}
-
- stopServers(0, 1, 2);
}
}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/routing/RoutingTestBase.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/routing/RoutingTestBase.java
index 10658e961e..6d8c829a11 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/routing/RoutingTestBase.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/routing/RoutingTestBase.java
@@ -53,8 +53,6 @@ public class RoutingTestBase extends ClusterTestBase {
protected static final int GROUP_PORT =
ActiveMQTestBase.getUDPDiscoveryPort();
- protected static final int MULTIPLE_TARGETS = 3;
-
protected TransportConfiguration getDefaultServerAcceptor(final int node) {
return
getServer(node).getConfiguration().getAcceptorConfigurations().stream().findFirst().get();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact