This is an automated email from the ASF dual-hosted git repository.

robbie pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit 447b72ae5b03016a0986161d85426e498e815c1c
Author: Clebert Suconic <[email protected]>
AuthorDate: Mon Nov 11 19:47:46 2024 -0500

    ARTEMIS-5148 Simplifying and making ClusteredLargeMessageInterruptTest more 
reliable
---
 .../ClusteredLargeMessageInterruptTest.java        | 163 ++++++++-------------
 1 file changed, 61 insertions(+), 102 deletions(-)

diff --git 
a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/interruptlm/ClusteredLargeMessageInterruptTest.java
 
b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/interruptlm/ClusteredLargeMessageInterruptTest.java
index 2c4ed313c4..8afb103a31 100644
--- 
a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/interruptlm/ClusteredLargeMessageInterruptTest.java
+++ 
b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/interruptlm/ClusteredLargeMessageInterruptTest.java
@@ -110,19 +110,13 @@ public class ClusteredLargeMessageInterruptTest extends 
SoakTestBase {
    Process serverProcess2;
 
    public ConnectionFactory createConnectionFactory(int broker, String 
protocol) {
+
+      int portUsed = 61616 + broker * 100;
+
       if (protocol.equals("CORE")) {
-         switch (broker) {
-            // I need the connections stable in the selected server
-            case 0:
-               return new 
org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory("tcp://localhost:61616?ha=false&useTopologyForLoadBalancing=false&callTimeout=1000");
-            case 1:
-               return new 
org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory("tcp://localhost:61716?ha=false&useTopologyForLoadBalancing=false&callTimeout=1000");
-            default:
-               logger.warn("undefined argument {}", broker);
-               throw new IllegalArgumentException("undefined");
-         }
+         return new 
org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory("tcp://localhost:"
 + portUsed + "?ha=false&useTopologyForLoadBalancing=false&callTimeout=1000");
       } else {
-         return CFUtil.createConnectionFactory(protocol, "tcp://localhost:" + 
(61616 + broker * 100) + "?ha=false");
+         return CFUtil.createConnectionFactory(protocol, "tcp://localhost:" + 
portUsed);
       }
    }
 
@@ -145,52 +139,27 @@ public class ClusteredLargeMessageInterruptTest extends 
SoakTestBase {
 
    @Test
    public void testLargeMessageAMQPTX() throws Throwable {
-      testInterrupt("AMQP", true, false);
-   }
-
-   @Test
-   public void testLargeMessageAMQPTXKill() throws Throwable {
-      testInterrupt("AMQP", true, true);
+      testInterrupt("AMQP", true);
    }
 
    @Test
    public void testInterruptAMQPNonTX() throws Throwable {
-      testInterrupt("AMQP", false, false);
-   }
-
-   @Test
-   public void testInterruptAMQPNonTXKill() throws Throwable {
-      testInterrupt("AMQP", false, true);
+      testInterrupt("AMQP", false);
    }
 
    @Test
    public void testInterruptCORETX() throws Throwable {
-      testInterrupt("CORE", true, false);
-   }
-
-   @Test
-   public void testInterruptCORETXKill() throws Throwable {
-      testInterrupt("CORE", true, true);
+      testInterrupt("CORE", true);
    }
 
    @Test
    public void testInterruptOPENWIRETX() throws Throwable {
-      testInterrupt("OPENWIRE", true, false);
-   }
-
-   @Test
-   public void testInterruptOPENWIRETXKill() throws Throwable {
-      testInterrupt("OPENWIRE", true, true);
+      testInterrupt("OPENWIRE", true);
    }
 
    @Test
    public void testInterruptCORENonTX() throws Throwable {
-      testInterrupt("CORE", false, false);
-   }
-
-   @Test
-   public void testInterruptCORENonTXKill() throws Throwable {
-      testInterrupt("CORE", false, true);
+      testInterrupt("CORE", false);
    }
 
    private CountDownLatch startSendingThreads(Executor executor, String 
protocol, int broker, int threads, boolean tx, String queueName) {
@@ -242,8 +211,7 @@ public class ClusteredLargeMessageInterruptTest extends 
SoakTestBase {
       for (int i = 0; i < threads; i++) {
          executor.execute(() -> {
             int numberOfMessages = 0;
-            try {
-               Connection connection = factory.createConnection();
+            try (Connection connection = factory.createConnection()) {
                connection.start();
                Session session = connection.createSession(tx, tx ? 
Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
                MessageConsumer consumer = 
session.createConsumer(session.createQueue(queueName));
@@ -265,6 +233,7 @@ public class ClusteredLargeMessageInterruptTest extends 
SoakTestBase {
                   }
                }
             } catch (Exception e) {
+               logger.warn(e.getMessage(), e);
             } finally {
                logger.info("Done sending");
                done.countDown();
@@ -279,7 +248,7 @@ public class ClusteredLargeMessageInterruptTest extends 
SoakTestBase {
 
    // this test has sleeps as the test will send while still active
    // we keep sending all the time.. so the testInterruptLM acts like a 
controller telling the threads when to stop
-   private void testInterrupt(String protocol, boolean tx, boolean useKill) 
throws Throwable {
+   private void testInterrupt(String protocol, boolean tx) throws Throwable {
       final int SENDING_THREADS = 10;
       final int CONSUMING_THREADS = 10;
       final AtomicInteger errors = new AtomicInteger(0); // I don't expect 
many errors since this test is disconnecting and reconnecting the server
@@ -289,55 +258,50 @@ public class ClusteredLargeMessageInterruptTest extends 
SoakTestBase {
       ExecutorService executorService = 
Executors.newFixedThreadPool(SENDING_THREADS + CONSUMING_THREADS);
       runAfter(executorService::shutdownNow);
 
-      CountDownLatch sendDone = startSendingThreads(executorService, protocol, 
0, SENDING_THREADS, tx, queueName);
-      CountDownLatch receiverDone = startConsumingThreads(executorService, 
protocol, 0, CONSUMING_THREADS, tx, queueName);
-
-      Thread.sleep(2000);
-
-      killProcess(serverProcess, useKill);
-      runningSend = false;
-      runningConsumer = false;
-      assertTrue(serverProcess.waitFor(10, TimeUnit.SECONDS));
-      assertTrue(receiverDone.await(10, TimeUnit.SECONDS));
-      assertTrue(sendDone.await(10, TimeUnit.SECONDS));
-
-      logger.info("All receivers and senders are done!!!");
-
-      serverProcess = startServer0();
-
-      Thread.sleep(2000);
-
-      sendDone = startSendingThreads(executorService, protocol, 1, 
SENDING_THREADS, tx, queueName);
-      receiverDone = startConsumingThreads(executorService, protocol, 1, 
CONSUMING_THREADS, tx, queueName);
-
-      killProcess(serverProcess2, useKill);
-      assertTrue(serverProcess2.waitFor(10, TimeUnit.SECONDS));
-      runningSend = false;
-      runningConsumer = false;
-      assertTrue(sendDone.await(1, TimeUnit.MINUTES));
-      assertTrue(receiverDone.await(10, TimeUnit.SECONDS));
-
-      serverProcess2 = startServer1();
-
-      sendDone = startSendingThreads(executorService, protocol, 1, 
SENDING_THREADS, tx, queueName);
-      receiverDone = startConsumingThreads(executorService, protocol, 1, 
CONSUMING_THREADS, tx, queueName);
-
-      Thread.sleep(2000);
-      runningSend = false;
-      assertTrue(sendDone.await(10, TimeUnit.SECONDS));
-
-      QueueControl queueControl1 = getQueueControl(server1URI, builderServer1, 
queueName, queueName, RoutingType.ANYCAST, 5000);
-      QueueControl queueControl2 = getQueueControl(server2URI, builderServer2, 
queueName, queueName, RoutingType.ANYCAST, 5000);
-
       File lmFolder = new File(getServerLocation(SERVER_NAME_0) + 
"/data/large-messages");
       File lmFolder2 = new File(getServerLocation(SERVER_NAME_1) + 
"/data/large-messages");
 
-      Wait.waitFor(() -> queueControl1.getMessageCount() == 0 && 
queueControl2.getMessageCount() == 0 && lmFolder.listFiles().length == 0 && 
lmFolder2.listFiles().length == 0);
+      {
+         CountDownLatch sendDone = startSendingThreads(executorService, 
protocol, 0, SENDING_THREADS, tx, queueName);
+         CountDownLatch receiverDone = startConsumingThreads(executorService, 
protocol, 1, CONSUMING_THREADS, tx, queueName);
+
+         // let it producing for a while
+         Thread.sleep(2000);
+
+         runningSend = false;
+         assertTrue(sendDone.await(1, TimeUnit.MINUTES));
+
+         killProcess(serverProcess);
+         assertTrue(serverProcess.waitFor(1, TimeUnit.MINUTES));
+         serverProcess = startServer0();
+         runningConsumer = false;
+         assertTrue(receiverDone.await(1, TimeUnit.MINUTES));
+
+         long timeout = System.currentTimeMillis() + 60_000;
+
+         ConnectionFactory factory = createConnectionFactory(1, protocol);
+
+         // This will flush all messages, making sure everything is consumed.
+         try (Connection connection = factory.createConnection()) {
+            Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            MessageConsumer consumer = 
session.createConsumer(session.createQueue(queueName));
+            connection.start();
+            while (System.currentTimeMillis() < timeout) {
+               TextMessage message = (TextMessage)consumer.receive(100);
+               if (message == null) {
+                  if (lmFolder.listFiles().length == 0 && 
lmFolder2.listFiles().length == 0) {
+                     break;
+                  }
+               } else {
+                  assertTrue(message.getText().startsWith(largebody));
+               }
+            }
+         }
 
-      runningConsumer = false;
-      assertTrue(receiverDone.await(10, TimeUnit.SECONDS));
+      }
+
+      logger.info("All receivers and senders are done!!!");
 
-      // no need to use wait here, the previous check should have checked that 
already
       assertEquals(0, lmFolder.listFiles().length);
       assertEquals(0, lmFolder2.listFiles().length);
       assertEquals(0, errors.get());
@@ -353,12 +317,8 @@ public class ClusteredLargeMessageInterruptTest extends 
SoakTestBase {
       testInterruptFailOnBridge("CORE", false);
    }
 
-   private void killProcess(Process process, boolean useKill) throws Exception 
{
-      if (useKill) {
-         Runtime.getRuntime().exec("kill -SIGINT " + process.pid());
-      } else {
-         process.destroyForcibly();
-      }
+   private void killProcess(Process process) throws Exception {
+      process.destroyForcibly();
    }
 
 
@@ -375,26 +335,25 @@ public class ClusteredLargeMessageInterruptTest extends 
SoakTestBase {
       ExecutorService executorService = 
Executors.newFixedThreadPool(SENDING_THREADS + CONSUMING_THREADS);
       runAfter(executorService::shutdownNow);
 
-      // only start the sender for a while
       CountDownLatch sendDone = startSendingThreads(executorService, protocol, 
0, SENDING_THREADS, tx, queueName);
 
       Thread.sleep(2000);
 
       runningSend = runningConsumer = false;
 
-      killProcess(serverProcess, false);
-      assertTrue(serverProcess.waitFor(10, TimeUnit.MINUTES));
-      assertTrue(sendDone.await(10, TimeUnit.SECONDS));
+      killProcess(serverProcess);
+      assertTrue(serverProcess.waitFor(1, TimeUnit.MINUTES));
+      assertTrue(sendDone.await(1, TimeUnit. MINUTES));
 
       sendDone = startSendingThreads(executorService, protocol, 1, 
SENDING_THREADS, tx, queueName);
       CountDownLatch receiverDone = startConsumingThreads(executorService, 
protocol, 1, CONSUMING_THREADS, tx, queueName);
-      killProcess(serverProcess, false);
-      assertTrue(serverProcess.waitFor(10, TimeUnit.SECONDS));
+      killProcess(serverProcess);
+      assertTrue(serverProcess.waitFor(1, TimeUnit.MINUTES));
       serverProcess = startServer0();
 
       Thread.sleep(5000);
       runningSend = false;
-      assertTrue(sendDone.await(10, TimeUnit.SECONDS));
+      assertTrue(sendDone.await(1, TimeUnit.MINUTES));
 
       QueueControl queueControl1 = getQueueControl(server1URI, builderServer1, 
queueName, queueName, RoutingType.ANYCAST, 5000);
       QueueControl queueControl2 = getQueueControl(server2URI, builderServer2, 
queueName, queueName, RoutingType.ANYCAST, 5000);
@@ -405,7 +364,7 @@ public class ClusteredLargeMessageInterruptTest extends 
SoakTestBase {
       Wait.assertTrue(() -> queueControl1.getMessageCount() == 0 && 
queueControl2.getMessageCount() == 0);
 
       runningConsumer = false;
-      assertTrue(receiverDone.await(10, TimeUnit.SECONDS));
+      assertTrue(receiverDone.await(1, TimeUnit.MINUTES));
 
 
       Wait.assertEquals(0, () -> lmFolder.listFiles().length);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to