This is an automated email from the ASF dual-hosted git repository. clebertsuconic pushed a commit to branch openWireTest in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
commit d177c8539def65ec0576f22bd2ec33b0e82a0be1 Author: Clebert Suconic <[email protected]> AuthorDate: Wed Jul 31 13:59:03 2024 -0400 ARTEMIS-X Expanding tests towards OpenWire --- .../brokerConnection/DivertQueueMirrorTest.java | 4 ++ .../mirror/AccumulatedInPageSoakTest.java | 17 ++++-- .../mirror/ClusteredMirrorSoakTest.java | 65 +++++++++++++++++----- .../brokerConnection/mirror/IdempotentACKTest.java | 8 ++- .../mirror/ReplicatedBothNodesMirrorTest.java | 40 +++++++++---- .../mirror/ReplicatedMirrorTargetTest.java | 13 ++++- .../mirror/SingleMirrorSoakTest.java | 16 +++++- 7 files changed, 129 insertions(+), 34 deletions(-) diff --git a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/brokerConnection/DivertQueueMirrorTest.java b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/brokerConnection/DivertQueueMirrorTest.java index e7c86a6b65..d1e67f1567 100644 --- a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/brokerConnection/DivertQueueMirrorTest.java +++ b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/brokerConnection/DivertQueueMirrorTest.java @@ -130,6 +130,10 @@ public class DivertQueueMirrorTest extends SmokeTestBase { @Test @Timeout(value = 240_000L, unit = TimeUnit.MILLISECONDS) public void testDivertAndMirror() throws Exception { + testDivertAndMirror("AMQP"); + + } + public void testDivertAndMirror(String protocol) throws Exception { String protocol = "AMQP"; // no need to run this test using multiple protocols. this is about validating paging works correctly processDC1_node_A = startServer(DC1_NODE_A, -1, -1, new File(getServerLocation(DC1_NODE_A), "broker.properties")); diff --git a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/AccumulatedInPageSoakTest.java b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/AccumulatedInPageSoakTest.java index 568c25df28..181381f13a 100644 --- a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/AccumulatedInPageSoakTest.java +++ b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/AccumulatedInPageSoakTest.java @@ -124,16 +124,22 @@ public class AccumulatedInPageSoakTest extends SoakTestBase { @Test @Timeout(value = 240_000L, unit = TimeUnit.MILLISECONDS) - public void testAccumulateWhileMirrorDown() throws Exception { - String protocol = "AMQP"; // no need to run this test using multiple protocols. this is about validating paging works correctly + public void testAccumulateWhileMirrorDownOpenWire() throws Exception { + testAccumulateWhileMirrorDown("OPENWIRE", 20_000, 100); + } + + @Test + @Timeout(value = 240_000L, unit = TimeUnit.MILLISECONDS) + public void testAccumulateWhileMirrorDownAMQP() throws Exception { + testAccumulateWhileMirrorDown("AMQP", 20_000, 1000); + } + + private void testAccumulateWhileMirrorDown(String protocol, final int numberOfMessages, final int commitInterval) throws Exception { startDC1(); ExecutorService service = Executors.newFixedThreadPool(1); runAfter(service::shutdownNow); - final int numberOfMessages = 20_000; - final int commitInterval = 1000; - ConnectionFactory connectionFactoryDC1A = CFUtil.createConnectionFactory(protocol, DC1_NODEA_URI); ConnectionFactory connectionFactoryDC2A = CFUtil.createConnectionFactory(protocol, DC2_NODEA_URI); @@ -163,6 +169,7 @@ public class AccumulatedInPageSoakTest extends SoakTestBase { } catch (Throwable e) { logger.warn(e.getMessage(), e); errors.incrementAndGet(); + System.exit(-1); } finally { done.countDown(); } diff --git a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/ClusteredMirrorSoakTest.java b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/ClusteredMirrorSoakTest.java index ad4191fe28..0215d09690 100644 --- a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/ClusteredMirrorSoakTest.java +++ b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/ClusteredMirrorSoakTest.java @@ -147,7 +147,17 @@ public class ClusteredMirrorSoakTest extends SoakTestBase { } @Test - public void testAvoidReflections() throws Exception { + public void testAvoidReflectionsAMQP() throws Exception { + testAvoidReflections("AMQP"); + } + + @Test + public void testAvoidReflectionsOpenWire() throws Exception { + testAvoidReflections("OPENWIRE"); + } + + @Test + public void testAvoidReflections(String protocol) throws Exception { createRealServers(true); String internalQueue = "INTERNAL_QUEUE"; @@ -195,7 +205,7 @@ public class ClusteredMirrorSoakTest extends SoakTestBase { } catch (Exception expected) { } - ConnectionFactory connectionFactoryDC1A = CFUtil.createConnectionFactory("amqp", DC1_NODEA_URI); + ConnectionFactory connectionFactoryDC1A = CFUtil.createConnectionFactory(protocol, DC1_NODEA_URI); int numberOfMessages = 1_000; @@ -285,7 +295,16 @@ public class ClusteredMirrorSoakTest extends SoakTestBase { } @Test - public void testSimpleQueue() throws Exception { + public void testSimpleQueueAMQP() throws Exception { + testSimpleQueue("AMQP"); + } + + @Test + public void testSimpleQueueOPENWIRE() throws Exception { + testSimpleQueue("OPENWIRE"); + } + + private void testSimpleQueue(String protocol) throws Exception { createRealServers(false); startServers(); @@ -293,8 +312,8 @@ public class ClusteredMirrorSoakTest extends SoakTestBase { assertTrue(numberOfMessages % 2 == 0, "numberOfMessages must be even"); - ConnectionFactory connectionFactoryDC1A = CFUtil.createConnectionFactory("amqp", DC1_NODEA_URI); - ConnectionFactory connectionFactoryDC2A = CFUtil.createConnectionFactory("amqp", DC2_NODEA_URI); + ConnectionFactory connectionFactoryDC1A = CFUtil.createConnectionFactory(protocol, DC1_NODEA_URI); + ConnectionFactory connectionFactoryDC2A = CFUtil.createConnectionFactory(protocol, DC2_NODEA_URI); String snfQueue = "$ACTIVEMQ_ARTEMIS_MIRROR_mirror"; SimpleManagement simpleManagementDC1A = new SimpleManagement(DC1_NODEA_URI, null, null); @@ -437,7 +456,16 @@ public class ClusteredMirrorSoakTest extends SoakTestBase { } @Test - public void testAutoCreateQueue() throws Exception { + public void testAutoCreateQueueAMQP() throws Exception { + testAutoCreateQueue("AMQP"); + } + + @Test + public void testAutoCreateQueueOPENWIRE() throws Exception { + testAutoCreateQueue("OPENWIRE"); + } + + public void testAutoCreateQueue(String protocol) throws Exception { ExecutorService executorService = Executors.newFixedThreadPool(2); runAfter(executorService::shutdownNow); @@ -448,9 +476,9 @@ public class ClusteredMirrorSoakTest extends SoakTestBase { final int numberOfMessages = 50; - ConnectionFactory connectionFactoryDC1A = CFUtil.createConnectionFactory("amqp", DC1_NODEA_URI); - ConnectionFactory connectionFactoryDC2A = CFUtil.createConnectionFactory("amqp", DC2_NODEA_URI); - ConnectionFactory connectionFactoryDC2B = CFUtil.createConnectionFactory("amqp", DC2_NODEB_URI); + ConnectionFactory connectionFactoryDC1A = CFUtil.createConnectionFactory(protocol, DC1_NODEA_URI); + ConnectionFactory connectionFactoryDC2A = CFUtil.createConnectionFactory(protocol, DC2_NODEA_URI); + ConnectionFactory connectionFactoryDC2B = CFUtil.createConnectionFactory(protocol, DC2_NODEB_URI); AtomicBoolean runningConsumers = new AtomicBoolean(true); runAfter(() -> runningConsumers.set(false)); @@ -508,7 +536,16 @@ public class ClusteredMirrorSoakTest extends SoakTestBase { } @Test - public void testMirroredTopics() throws Exception { + public void testMirroredTopicsAMQP() throws Exception { + testMirroredTopics("AMQP"); + } + + @Test + public void testMirroredTopicsOPENWIRE() throws Exception { + testMirroredTopics("OPENWIRE"); + } + + public void testMirroredTopics(String protocol) throws Exception { createRealServers(false); startServers(); @@ -521,10 +558,10 @@ public class ClusteredMirrorSoakTest extends SoakTestBase { String subscriptionID = "my-order"; String snfQueue = "$ACTIVEMQ_ARTEMIS_MIRROR_mirror"; - ConnectionFactory connectionFactoryDC1A = CFUtil.createConnectionFactory("amqp", "tcp://localhost:61616"); - ConnectionFactory connectionFactoryDC1B = CFUtil.createConnectionFactory("amqp", "tcp://localhost:61617"); - ConnectionFactory connectionFactoryDC2A = CFUtil.createConnectionFactory("amqp", "tcp://localhost:61618"); - ConnectionFactory connectionFactoryDC2B = CFUtil.createConnectionFactory("amqp", "tcp://localhost:61619"); + ConnectionFactory connectionFactoryDC1A = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616"); + ConnectionFactory connectionFactoryDC1B = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61617"); + ConnectionFactory connectionFactoryDC2A = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61618"); + ConnectionFactory connectionFactoryDC2B = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61619"); SimpleManagement simpleManagementDC1B = new SimpleManagement(DC1_NODEB_URI, null, null); SimpleManagement simpleManagementDC2B = new SimpleManagement(DC2_NODEB_URI, null, null); diff --git a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/IdempotentACKTest.java b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/IdempotentACKTest.java index 6369d63cb2..f620c227f2 100644 --- a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/IdempotentACKTest.java +++ b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/IdempotentACKTest.java @@ -164,11 +164,17 @@ public class IdempotentACKTest extends SoakTestBase { testACKs("CORE"); } + @Test + public void testOPENWIRE() throws Exception { + testACKs("OPENWIRE"); + } + + private void testACKs(final String protocol) throws Exception { startServers(); final int consumers = 10; - final int numberOfMessages = 1000; + final int numberOfMessages = 10000; final int largeMessageFactor = 30; final int messagesPerConsumer = 30; diff --git a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/ReplicatedBothNodesMirrorTest.java b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/ReplicatedBothNodesMirrorTest.java index 33b1ffa64a..e0e9e3e8bb 100644 --- a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/ReplicatedBothNodesMirrorTest.java +++ b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/ReplicatedBothNodesMirrorTest.java @@ -268,16 +268,26 @@ public class ReplicatedBothNodesMirrorTest extends SoakTestBase { } @Test - public void testFailoverLaterStart() throws Exception { - testMirror(true); + public void testFailoverLaterStartAMQP() throws Exception { + testMirror("AMQP", true); } @Test - public void testFailoverWhileMirroring() throws Exception { - testMirror(false); + public void testFailoverWhileMirroringAMQP() throws Exception { + testMirror("AMQP", false); } - private void testMirror(boolean laterStart) throws Exception { + @Test + public void testFailoverLaterStartOPENWIRE() throws Exception { + testMirror("OPENWIRE", true); + } + + @Test + public void testFailoverWhileMirroringOPENWIRE() throws Exception { + testMirror("OPENWIRE", false); + } + + private void testMirror(String protocol, boolean laterStart) throws Exception { createRealServers(); SimpleManagement managementDC1 = new SimpleManagement(uri(DC1_IP), null, null); @@ -292,7 +302,7 @@ public class ReplicatedBothNodesMirrorTest extends SoakTestBase { runAfter(() -> managementDC1.close()); runAfter(() -> managementDC2.close()); - sendMessages(QUEUE_NAME); + sendMessages(protocol, QUEUE_NAME); processDC1.destroyForcibly(); processDC1.waitFor(10, TimeUnit.SECONDS); @@ -304,7 +314,7 @@ public class ReplicatedBothNodesMirrorTest extends SoakTestBase { // Mirror failover could challenge the order HashSet<Integer> receivedIDs = new HashSet<>(); - ConnectionFactory connectionFactoryDC2 = CFUtil.createConnectionFactory("amqp", uri(DC2_IP)); + ConnectionFactory connectionFactoryDC2 = CFUtil.createConnectionFactory(protocol, uri(DC2_IP)); try (Connection connection = connectionFactoryDC2.createConnection()) { Session session = connection.createSession(true, Session.SESSION_TRANSACTED); connection.start(); @@ -328,9 +338,17 @@ public class ReplicatedBothNodesMirrorTest extends SoakTestBase { } } + @Test + public void testMultipleSendersAMQP() throws Exception { + testMultipleSenders("AMQP"); + } @Test - public void testMultipleSenders() throws Exception { + public void testMultipleSendersOPENWIRE() throws Exception { + testMultipleSenders("OPENWIRE"); + } + + private void testMultipleSenders(String protocol) throws Exception { try { lsof(); } catch (IOException e) { @@ -359,7 +377,7 @@ public class ReplicatedBothNodesMirrorTest extends SoakTestBase { String destination = "queue" + i; executorService.execute(() -> { try { - sendMessages(destination); + sendMessages(protocol, destination); } catch (Throwable e) { logger.warn(e.getMessage(), e); errors.incrementAndGet(); @@ -402,8 +420,8 @@ public class ReplicatedBothNodesMirrorTest extends SoakTestBase { return filesCounter.get(); } - private static void sendMessages(String queueName) throws JMSException { - ConnectionFactory connectionFactoryDC1A = CFUtil.createConnectionFactory("amqp", uri(DC1_IP)); + private static void sendMessages(String protocol, String queueName) throws JMSException { + ConnectionFactory connectionFactoryDC1A = CFUtil.createConnectionFactory(protocol, uri(DC1_IP)); try (Connection connection = connectionFactoryDC1A.createConnection()) { Session session = connection.createSession(true, Session.SESSION_TRANSACTED); MessageProducer producer = session.createProducer(session.createQueue(queueName)); diff --git a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/ReplicatedMirrorTargetTest.java b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/ReplicatedMirrorTargetTest.java index 94f0bf680f..a6bbffb39b 100644 --- a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/ReplicatedMirrorTargetTest.java +++ b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/ReplicatedMirrorTargetTest.java @@ -240,9 +240,18 @@ public class ReplicatedMirrorTargetTest extends SoakTestBase { Thread.sleep(5000); } + @Test + public void testMirrorOnReplicaAMQP() throws Exception { + testMirrorOnReplica("AMQP"); + } @Test - public void testMirrorOnReplica() throws Exception { + public void testMirrorOnReplicaOPENWIRE() throws Exception { + testMirrorOnReplica("OPENWIRE"); + } + + + private void testMirrorOnReplica(String protocol) throws Exception { createRealServers(true); startServers(); @@ -254,7 +263,7 @@ public class ReplicatedMirrorTargetTest extends SoakTestBase { String subscriptionID = "my-order"; String snfQueue = "$ACTIVEMQ_ARTEMIS_MIRROR_mirror"; - ConnectionFactory connectionFactoryDC1A = CFUtil.createConnectionFactory("amqp", DC1_URI); + ConnectionFactory connectionFactoryDC1A = CFUtil.createConnectionFactory(protocol, DC1_URI); consume(connectionFactoryDC1A, clientIDA, subscriptionID, 0, 0, false, false, RECEIVE_COMMIT); consume(connectionFactoryDC1A, clientIDB, subscriptionID, 0, 0, false, false, RECEIVE_COMMIT); diff --git a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/SingleMirrorSoakTest.java b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/SingleMirrorSoakTest.java index ff7edbc238..b653ef0f7d 100644 --- a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/SingleMirrorSoakTest.java +++ b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/SingleMirrorSoakTest.java @@ -199,7 +199,21 @@ public class SingleMirrorSoakTest extends SoakTestBase { } @Test - public void testInterruptedMirrorTransfer() throws Exception { + public void testInterruptedMirrorTransferAMQP() throws Exception { + testInterruptedMirrorTransfer("AMQP"); + } + @Test + public void testInterruptedMirrorTransferOPENWIRE() throws Exception { + testInterruptedMirrorTransfer("OPENWIRE"); + } + + @Test + public void testInterruptedMirrorTransferCORE() throws Exception { + testInterruptedMirrorTransfer("CORE"); + } + + + public void testInterruptedMirrorTransfer(String protocol) throws Exception { createRealServers(true); startServers(); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected] For further information, visit: https://activemq.apache.org/contact
