This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 3e9f59a5f3 ARTEMIS-5722 Improve reliability of TimedBufferMovementTest
3e9f59a5f3 is described below
commit 3e9f59a5f30e873f031b54274b9618e02385df3a
Author: Clebert Suconic <[email protected]>
AuthorDate: Mon Oct 27 12:30:59 2025 -0400
ARTEMIS-5722 Improve reliability of TimedBufferMovementTest
---
.../soak/journal/TimedBufferMovementTest.java | 71 +++++++++++++++++++---
1 file changed, 63 insertions(+), 8 deletions(-)
diff --git
a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/journal/TimedBufferMovementTest.java
b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/journal/TimedBufferMovementTest.java
index fa5aef8c59..66abfae533 100644
---
a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/journal/TimedBufferMovementTest.java
+++
b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/journal/TimedBufferMovementTest.java
@@ -20,6 +20,7 @@ import java.lang.invoke.MethodHandles;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@@ -28,6 +29,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
import org.apache.activemq.artemis.core.journal.EncoderPersister;
+import org.apache.activemq.artemis.core.journal.IOCompletion;
import org.apache.activemq.artemis.core.journal.LoaderCallback;
import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
import org.apache.activemq.artemis.core.journal.RecordInfo;
@@ -37,7 +39,6 @@ import
org.apache.activemq.artemis.core.persistence.OperationContext;
import
org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.RandomUtil;
-import org.apache.activemq.artemis.utils.Wait;
import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
@@ -106,10 +107,20 @@ public class TimedBufferMovementTest extends
ActiveMQTestBase {
AtomicInteger sequence = new AtomicInteger(1);
+ CyclicBarrier startFlag = new CyclicBarrier(REGULAR_THREADS +
TX_THREADS);
+
for (int t = 0; t < REGULAR_THREADS; t++) {
executorService.execute(() -> {
+ OperationContext context = new
OperationContextImpl(orderedExecutorFactory.getExecutor());
try {
- OperationContext context = new
OperationContextImpl(orderedExecutorFactory.getExecutor());
+ try {
+ startFlag.await(10, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ if (e instanceof InterruptedException) {
+ Thread.interrupted();
+ }
+ }
for (int r = 0; r < RECORDS; r++) {
String uuid = "noTX_ " + RandomUtil.randomUUIDString();
pendingCallbacks.put(uuid, uuid);
@@ -137,15 +148,39 @@ public class TimedBufferMovementTest extends
ActiveMQTestBase {
}
}
} finally {
- done.countDown();
+ context.executeOnCompletion(new IOCompletion() {
+ @Override
+ public void storeLineUp() {
+ }
+
+ @Override
+ public void done() {
+ done.countDown();
+ }
+
+ @Override
+ public void onError(int errorCode, String errorMessage) {
+ logger.warn("error {} / message = {}", errorCode,
errorMessage);
+ errors.incrementAndGet();
+ done.countDown();
+ }
+ });
}
});
}
for (int t = 0; t < TX_THREADS; t++) {
executorService.execute(() -> {
+ OperationContext context = new
OperationContextImpl(orderedExecutorFactory.getExecutor());
try {
- OperationContext context = new
OperationContextImpl(orderedExecutorFactory.getExecutor());
+ try {
+ startFlag.await(10, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ if (e instanceof InterruptedException) {
+ Thread.interrupted();
+ }
+ }
for (int r = 0; r < RECORDS; r++) {
String uuid = "tx_" + RandomUtil.randomUUIDString();
try {
@@ -176,7 +211,23 @@ public class TimedBufferMovementTest extends
ActiveMQTestBase {
}
}
} finally {
- done.countDown();
+ context.executeOnCompletion(new IOCompletion() {
+ @Override
+ public void storeLineUp() {
+ }
+
+ @Override
+ public void done() {
+ done.countDown();
+ }
+
+ @Override
+ public void onError(int errorCode, String errorMessage) {
+ logger.warn("error {} / message = {}", errorCode,
errorMessage);
+ errors.incrementAndGet();
+ done.countDown();
+ }
+ });
}
});
}
@@ -184,9 +235,9 @@ public class TimedBufferMovementTest extends
ActiveMQTestBase {
int countRepeat = 0;
int missingData = 0;
- while (!done.await(10, TimeUnit.MILLISECONDS) ||
!pendingCallbacks.isEmpty()) {
+ while (!done.await(10, TimeUnit.MILLISECONDS)) {
logger.debug("forcing recordsWritten={}, pendingCallback={},
recordsCallback={}", recordsWritten.get(), pendingCallbacks.size(),
recordsCallback.get());
- if (countRepeat++ < 10) { // compact a few times
+ if (countRepeat++ < 3) { // compact a few times
journal.scheduleCompactAndBlock(500_000);
}
// we will keep forcing this method (which will move to a next file)
@@ -206,7 +257,11 @@ public class TimedBufferMovementTest extends
ActiveMQTestBase {
}
assertTrue(done.await(1, TimeUnit.MINUTES));
- Wait.assertEquals(0, pendingCallbacks::size);
+
+ // the pendingCallbacks list is removed on the IO completion of each
thread.
+ // No need to use Wait on this place.
+ assertEquals(0, pendingCallbacks.size());
+
journal.stop();
factory.stop();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact