This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 3e9f59a5f3 ARTEMIS-5722 Improve reliability of TimedBufferMovementTest
3e9f59a5f3 is described below

commit 3e9f59a5f30e873f031b54274b9618e02385df3a
Author: Clebert Suconic <[email protected]>
AuthorDate: Mon Oct 27 12:30:59 2025 -0400

    ARTEMIS-5722 Improve reliability of TimedBufferMovementTest
---
 .../soak/journal/TimedBufferMovementTest.java      | 71 +++++++++++++++++++---
 1 file changed, 63 insertions(+), 8 deletions(-)

diff --git 
a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/journal/TimedBufferMovementTest.java
 
b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/journal/TimedBufferMovementTest.java
index fa5aef8c59..66abfae533 100644
--- 
a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/journal/TimedBufferMovementTest.java
+++ 
b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/journal/TimedBufferMovementTest.java
@@ -20,6 +20,7 @@ import java.lang.invoke.MethodHandles;
 import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
@@ -28,6 +29,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.activemq.artemis.core.io.IOCallback;
 import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
 import org.apache.activemq.artemis.core.journal.EncoderPersister;
+import org.apache.activemq.artemis.core.journal.IOCompletion;
 import org.apache.activemq.artemis.core.journal.LoaderCallback;
 import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
 import org.apache.activemq.artemis.core.journal.RecordInfo;
@@ -37,7 +39,6 @@ import 
org.apache.activemq.artemis.core.persistence.OperationContext;
 import 
org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
 import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
 import org.apache.activemq.artemis.utils.RandomUtil;
-import org.apache.activemq.artemis.utils.Wait;
 import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
@@ -106,10 +107,20 @@ public class TimedBufferMovementTest extends 
ActiveMQTestBase {
 
       AtomicInteger sequence = new AtomicInteger(1);
 
+      CyclicBarrier startFlag = new CyclicBarrier(REGULAR_THREADS + 
TX_THREADS);
+
       for (int t = 0; t < REGULAR_THREADS; t++) {
          executorService.execute(() -> {
+            OperationContext context = new 
OperationContextImpl(orderedExecutorFactory.getExecutor());
             try {
-               OperationContext context = new 
OperationContextImpl(orderedExecutorFactory.getExecutor());
+               try {
+                  startFlag.await(10, TimeUnit.SECONDS);
+               } catch (Exception e) {
+                  logger.warn(e.getMessage(), e);
+                  if (e instanceof InterruptedException) {
+                     Thread.interrupted();
+                  }
+               }
                for (int r = 0; r < RECORDS; r++) {
                   String uuid = "noTX_ " + RandomUtil.randomUUIDString();
                   pendingCallbacks.put(uuid, uuid);
@@ -137,15 +148,39 @@ public class TimedBufferMovementTest extends 
ActiveMQTestBase {
                   }
                }
             } finally {
-               done.countDown();
+               context.executeOnCompletion(new IOCompletion() {
+                  @Override
+                  public void storeLineUp() {
+                  }
+
+                  @Override
+                  public void done() {
+                     done.countDown();
+                  }
+
+                  @Override
+                  public void onError(int errorCode, String errorMessage) {
+                     logger.warn("error {} / message = {}", errorCode, 
errorMessage);
+                     errors.incrementAndGet();
+                     done.countDown();
+                  }
+               });
             }
          });
       }
 
       for (int t = 0; t < TX_THREADS; t++) {
          executorService.execute(() -> {
+            OperationContext context = new 
OperationContextImpl(orderedExecutorFactory.getExecutor());
             try {
-               OperationContext context = new 
OperationContextImpl(orderedExecutorFactory.getExecutor());
+               try {
+                  startFlag.await(10, TimeUnit.SECONDS);
+               } catch (Exception e) {
+                  logger.warn(e.getMessage(), e);
+                  if (e instanceof InterruptedException) {
+                     Thread.interrupted();
+                  }
+               }
                for (int r = 0; r < RECORDS; r++) {
                   String uuid = "tx_" + RandomUtil.randomUUIDString();
                   try {
@@ -176,7 +211,23 @@ public class TimedBufferMovementTest extends 
ActiveMQTestBase {
                   }
                }
             } finally {
-               done.countDown();
+               context.executeOnCompletion(new IOCompletion() {
+                  @Override
+                  public void storeLineUp() {
+                  }
+
+                  @Override
+                  public void done() {
+                     done.countDown();
+                  }
+
+                  @Override
+                  public void onError(int errorCode, String errorMessage) {
+                     logger.warn("error {} / message = {}", errorCode, 
errorMessage);
+                     errors.incrementAndGet();
+                     done.countDown();
+                  }
+               });
             }
          });
       }
@@ -184,9 +235,9 @@ public class TimedBufferMovementTest extends 
ActiveMQTestBase {
       int countRepeat = 0;
       int missingData = 0;
 
-      while (!done.await(10, TimeUnit.MILLISECONDS) || 
!pendingCallbacks.isEmpty()) {
+      while (!done.await(10, TimeUnit.MILLISECONDS)) {
          logger.debug("forcing recordsWritten={}, pendingCallback={}, 
recordsCallback={}", recordsWritten.get(), pendingCallbacks.size(), 
recordsCallback.get());
-         if (countRepeat++ < 10) { // compact a few times
+         if (countRepeat++ < 3) { // compact a few times
             journal.scheduleCompactAndBlock(500_000);
          }
          // we will keep forcing this method (which will move to a next file)
@@ -206,7 +257,11 @@ public class TimedBufferMovementTest extends 
ActiveMQTestBase {
       }
 
       assertTrue(done.await(1, TimeUnit.MINUTES));
-      Wait.assertEquals(0, pendingCallbacks::size);
+
+      // the pendingCallbacks list is removed on the IO completion of each 
thread.
+      // No need to use Wait on this place.
+      assertEquals(0, pendingCallbacks.size());
+
       journal.stop();
       factory.stop();
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to