This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 900e329c90 ARTEMIS-5441 Flow control on PageTimedWriter deadlocks with
Repliation.start
900e329c90 is described below
commit 900e329c90ec4ba6cae30d3bc4bd16038254ab3f
Author: Clebert Suconic <[email protected]>
AuthorDate: Tue Apr 22 16:03:12 2025 -0400
ARTEMIS-5441 Flow control on PageTimedWriter deadlocks with Repliation.start
---
.../connect/mirror/AMQPMirrorControllerSource.java | 5 +-
.../activemq/artemis/core/paging/PagingStore.java | 5 +-
.../artemis/core/paging/impl/PageTimedWriter.java | 32 ++++-
.../artemis/core/paging/impl/PagingStoreImpl.java | 44 +++---
.../journal/AbstractJournalStorageManager.java | 11 +-
.../core/transaction/impl/TransactionImpl.java | 2 +-
.../storage/PersistMultiThreadTest.java | 7 +-
.../core/paging/impl/PageTimedWriterUnitTest.java | 148 ++++++++++++++++++---
8 files changed, 207 insertions(+), 47 deletions(-)
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
index a4efb5739a..fae26f0293 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
@@ -374,8 +374,11 @@ public class AMQPMirrorControllerSource extends
BasicMirrorController<Sender> im
return;
}
+ int creditsWrite = snfQueue.getPagingStore().page(message, tx,
pagedRouteContext, this::copyMessageForPaging, true);
+
// This will store the message on paging, and the message will be
copied into paging.
- if (snfQueue.getPagingStore().page(message, tx, pagedRouteContext,
this::copyMessageForPaging)) {
+ if (creditsWrite >= 0) {
+ snfQueue.getPagingStore().writeFlowControl(creditsWrite);
if (tx == null) {
snfQueue.deliverAsync();
} else {
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
index 9e3608056b..29f5a5faf7 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
@@ -129,7 +129,7 @@ public interface PagingStore extends ActiveMQComponent,
RefCountMessageListener
*/
boolean page(Message message, Transaction tx, RouteContextList listCtx)
throws Exception;
- boolean page(Message message, Transaction tx, RouteContextList listCtx,
Function<Message, Message> pageDecorator) throws Exception;
+ int page(Message message, Transaction tx, RouteContextList listCtx,
Function<Message, Message> pageDecorator, boolean useFlowControl) throws
Exception;
Page usePage(long page);
@@ -278,4 +278,7 @@ public interface PagingStore extends ActiveMQComponent,
RefCountMessageListener
default StorageManager getStorageManager() {
return null;
}
+
+ default void writeFlowControl(int credits) {
+ }
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageTimedWriter.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageTimedWriter.java
index a6f584f677..91b53e812f 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageTimedWriter.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageTimedWriter.java
@@ -67,13 +67,14 @@ public class PageTimedWriter extends
ActiveMQScheduledComponent {
public static class PageEvent {
- PageEvent(OperationContext context, PagedMessage message, Transaction
tx, RouteContextList listCtx, int credits, boolean replicated) {
+ PageEvent(OperationContext context, PagedMessage message, Transaction
tx, RouteContextList listCtx, int credits, boolean replicated, boolean
useFlowControl) {
this.context = context;
this.message = message;
this.listCtx = listCtx;
this.replicated = replicated;
this.credits = credits;
this.tx = tx;
+ this.useFlowControl = useFlowControl;
}
final boolean replicated;
@@ -82,6 +83,7 @@ public class PageTimedWriter extends
ActiveMQScheduledComponent {
final RouteContextList listCtx;
final Transaction tx;
final int credits;
+ final boolean useFlowControl;
}
public PageTimedWriter(int writeCredits, StorageManager storageManager,
PagingStoreImpl pagingStore, ScheduledExecutorService scheduledExecutor,
Executor executor, boolean syncNonTX, long timeSync) {
@@ -118,18 +120,20 @@ public class PageTimedWriter extends
ActiveMQScheduledComponent {
pendingTasksUpdater.incrementAndGet(this);
}
- public void addTask(OperationContext context,
+ public int addTask(OperationContext context,
PagedMessage message,
Transaction tx,
- RouteContextList listCtx) {
+ RouteContextList listCtx, boolean
useFlowControl) {
logger.trace("Adding paged message {} to paged writer", message);
+ // the module using the page operation should later call flowControl
from this class.
+ // the only exception to this would be from tests where we don't really
care about flow control on the TimedExecutor
+ // also: the credits is based on the page size, and we use the
encodeSize to flow it.
int credits = Math.min(message.getEncodeSize() +
PageReadWriter.SIZE_RECORD, maxCredits);
- writeCredits.acquireUninterruptibly(credits);
+
synchronized (this) {
if (!isStarted()) {
- writeCredits.release(credits);
throw new IllegalStateException("PageWriter Service is stopped");
}
@@ -139,7 +143,7 @@ public class PageTimedWriter extends
ActiveMQScheduledComponent {
}
final boolean replicated = storageManager.isReplicated();
- PageEvent event = new PageEvent(context, message, tx, listCtx,
credits, replicated);
+ PageEvent event = new PageEvent(context, message, tx, listCtx,
credits, replicated, useFlowControl);
context.storeLineUp();
if (replicated) {
context.replicationLineUp();
@@ -147,6 +151,14 @@ public class PageTimedWriter extends
ActiveMQScheduledComponent {
this.pageEvents.add(event);
delay();
}
+
+ return credits;
+ }
+
+ public void flowControl(int credits) {
+ if (credits > 0) {
+ writeCredits.acquireUninterruptibly(credits);
+ }
}
private synchronized PageEvent[] extractPendingEvents() {
@@ -257,7 +269,9 @@ public class PageTimedWriter extends
ActiveMQScheduledComponent {
try {
for (PageEvent event : pendingEvents) {
pendingTasksUpdater.decrementAndGet(this);
- writeCredits.release(event.credits);
+ if (event.useFlowControl) {
+ writeCredits.release(event.credits);
+ }
}
OperationContextImpl.setContext(beforeContext);
} catch (Throwable t) {
@@ -270,4 +284,8 @@ public class PageTimedWriter extends
ActiveMQScheduledComponent {
protected void performSync() throws Exception {
pagingStore.ioSync();
}
+
+ public int getAvailablePermits() {
+ return writeCredits.availablePermits();
+ }
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
index 3507fd3759..e0f5e7cd56 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
@@ -1273,17 +1273,18 @@ public class PagingStoreImpl implements PagingStore {
public boolean page(Message message,
final Transaction tx,
RouteContextList listCtx) throws Exception {
- return page(message, tx, listCtx, null);
+ return page(message, tx, listCtx, null, false) >= 0;
}
@Override
- public boolean page(Message message,
+ public int page(Message message,
final Transaction tx,
RouteContextList listCtx,
- Function<Message, Message> pageDecorator) throws
Exception {
+ Function<Message, Message> pageDecorator,
+ boolean useFlowControl) throws Exception {
if (!running) {
- return false;
+ return -1;
}
boolean full = isFull();
@@ -1303,12 +1304,12 @@ public class PagingStoreImpl implements PagingStore {
printedDropMessagesWarning = true;
ActiveMQServerLogger.LOGGER.pageStoreDropMessages(storeName,
getPageInfo());
}
- return true;
+ return 0;
} else {
- return false;
+ return -1;
}
} else if (addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK) {
- return false;
+ return -1;
}
if (pageFull) {
@@ -1326,24 +1327,28 @@ public class PagingStoreImpl implements PagingStore {
}
// we are in page mode, if we got to this point, we are dropping the
message while still paging
- // this needs to return true as it is paging
- return true;
+ // we return 0 as in the storage is in "page mode" however no credits
are being taken.
+ return 0;
}
- return writePage(message, tx, listCtx, pageDecorator);
+ int creditsUsed = writePage(message, tx, listCtx, pageDecorator,
useFlowControl);
+
+ return creditsUsed;
}
- private boolean writePage(Message message,
+ private int writePage(Message message,
Transaction tx,
RouteContextList listCtx,
- Function<Message, Message> pageDecorator) throws
Exception {
+ Function<Message, Message> pageDecorator,
+ boolean useFlowControl) throws Exception {
// We need to use a readLock as we need to keep paging until we
scheduled a task
// notice that to leave paging you need pending tasks done
readLock();
PagedMessage pagedMessage;
try {
if (!paging) {
- return false;
+ // no paging was used
+ return -1;
}
final long transactionID = (tx != null &&
tx.isAllowPageTransaction()) ? tx.getID() : -1L;
@@ -1383,9 +1388,18 @@ public class PagingStoreImpl implements PagingStore {
// timedWriter.hasPendingIO would return pending based on incrementTask,
and for that reason we can still call the addTask away from the readLock.
//
// This scenario was found when running FloodServerWithAsyncSendTest
smoke test.
- timedWriter.addTask(storageManager.getContext(), pagedMessage, tx,
listCtx);
+ int credits = timedWriter.addTask(storageManager.getContext(),
pagedMessage, tx, listCtx, useFlowControl);
- return true;
+ assert credits >= 0;
+
+ return credits;
+ }
+
+ @Override
+ public void writeFlowControl(int credits) {
+ if (timedWriter != null) {
+ timedWriter.flowControl(credits);
+ }
}
protected void directWritePage(PagedMessage pagedMessage, boolean lineUp,
boolean originalReplicated) throws Exception {
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
index 32aae12d74..bd6dfab33a 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
@@ -2280,9 +2280,18 @@ public abstract class AbstractJournalStorageManager
extends CriticalComponentImp
@Override
public boolean addToPage(PagingStore store, Message msg, Transaction tx,
RouteContextList listCtx) throws Exception {
+ int credits;
try (ArtemisCloseable closeable = closeableReadLock()) {
- return store.page(msg, tx, listCtx);
+ credits = store.page(msg, tx, listCtx, null, true);
}
+
+ // flow control on the TimedWriter needs to be done outside of locking
+ // it is ok to do it after the write
+ if (credits > 0) {
+ store.writeFlowControl(credits);
+ }
+
+ return credits >= 0;
}
private void installLargeMessageConfirmationOnTX(Transaction tx, long
recordID) {
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java
index a245865fc7..c45badfe0c 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java
@@ -506,7 +506,7 @@ public class TransactionImpl implements Transaction {
throw new ActiveMQIllegalStateException("Transaction is in
invalid state " + state);
}
} else {
- if (state != State.ACTIVE && state != State.ROLLBACK_ONLY) {
+ if (delayed == 0 && state != State.ACTIVE && state !=
State.ROLLBACK_ONLY) {
throw new ActiveMQIllegalStateException("Transaction is in
invalid state " + state);
}
}
diff --git
a/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java
b/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java
index e5010424c5..0a784f21a4 100644
---
a/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java
+++
b/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java
@@ -463,11 +463,12 @@ public class PersistMultiThreadTest extends
ActiveMQTestBase {
}
@Override
- public boolean page(Message message,
+ public int page(Message message,
Transaction tx,
RouteContextList listCtx,
- Function<Message, Message> pageDecorator) throws
Exception {
- return false;
+ Function<Message, Message> pageDecorator,
+ boolean useFlowControl) throws Exception {
+ return -1;
}
@Override
diff --git
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PageTimedWriterUnitTest.java
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PageTimedWriterUnitTest.java
index be8ce5531a..3fc0fc3208 100644
---
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PageTimedWriterUnitTest.java
+++
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PageTimedWriterUnitTest.java
@@ -33,6 +33,7 @@ import java.util.function.Consumer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
@@ -73,6 +74,7 @@ import org.apache.activemq.artemis.utils.ReusableLatch;
import org.apache.activemq.artemis.utils.Wait;
import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;
import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -195,7 +197,6 @@ public class PageTimedWriterUnitTest extends
ArtemisTestCase {
executorService = Executors.newFixedThreadPool(10);
runAfter(scheduledExecutorService::shutdownNow);
runAfter(executorService::shutdownNow);
- runAfter(() -> OperationContextImpl.clearContext());
executorFactory = new OrderedExecutorFactory(executorService);
context = OperationContextImpl.getContext(executorFactory);
assertNotNull(context);
@@ -311,6 +312,11 @@ public class PageTimedWriterUnitTest extends
ArtemisTestCase {
runAfter(OperationContextImpl::clearContext);
}
+ @AfterEach
+ public void clearContext() {
+ OperationContextImpl.clearContext();
+ }
+
// a test to validate if the Mocks are correctly setup
@Test
public void testValidateMocks() throws Exception {
@@ -356,7 +362,6 @@ public class PageTimedWriterUnitTest extends
ArtemisTestCase {
long id = realJournalStorageManager.generateID();
long newID = realJournalStorageManager.generateID();
assertEquals(1L, newID - id);
-
}
PagedMessage createPagedMessage() {
@@ -367,7 +372,8 @@ public class PageTimedWriterUnitTest extends
ArtemisTestCase {
public void testIOCompletion() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
- timer.addTask(context, createPagedMessage(), null,
Mockito.mock(RouteContextList.class));
+ OperationContextImpl.setContext(context);
+ assertTrue(realJournalStorageManager.addToPage(pageStore,
createMessage(), null, Mockito.mock(RouteContextList.class)));
context.executeOnCompletion(new IOCallback() {
@Override
@@ -383,7 +389,6 @@ public class PageTimedWriterUnitTest extends
ArtemisTestCase {
assertFalse(latch.await(10, TimeUnit.MILLISECONDS));
allowRunning.countDown();
assertTrue(latch.await(10, TimeUnit.SECONDS));
-
}
@Test
@@ -392,7 +397,8 @@ public class PageTimedWriterUnitTest extends
ArtemisTestCase {
useReplication.set(true);
- timer.addTask(context, createPagedMessage(), null,
Mockito.mock(RouteContextList.class));
+ OperationContextImpl.setContext(context);
+ assertTrue(realJournalStorageManager.addToPage(pageStore,
createMessage(), null, Mockito.mock(RouteContextList.class)));
context.executeOnCompletion(new IOCallback() {
@Override
@@ -426,7 +432,8 @@ public class PageTimedWriterUnitTest extends
ArtemisTestCase {
}
});
- timer.addTask(context, createPagedMessage(), transaction,
Mockito.mock(RouteContextList.class));
+ OperationContextImpl.setContext(context);
+ assertTrue(realJournalStorageManager.addToPage(pageStore,
createMessage(), transaction, Mockito.mock(RouteContextList.class)));
transaction.commit();
@@ -497,7 +504,8 @@ public class PageTimedWriterUnitTest extends
ArtemisTestCase {
}
});
- timer.addTask(context, createPagedMessage(), transaction,
Mockito.mock(RouteContextList.class));
+ OperationContextImpl.setContext(context);
+ assertTrue(realJournalStorageManager.addToPage(pageStore,
createMessage(), transaction, Mockito.mock(RouteContextList.class)));
numberOfCommitsMessageJournal.set(0); // it should been 0 before anyway
but since I have no real reason to require it to be zero before, I'm doing this
just in case it ever changes
transaction.commit();
@@ -525,7 +533,8 @@ public class PageTimedWriterUnitTest extends
ArtemisTestCase {
}
});
- timer.addTask(context, createPagedMessage(), transaction,
Mockito.mock(RouteContextList.class));
+ OperationContextImpl.setContext(context);
+ assertTrue(realJournalStorageManager.addToPage(pageStore,
createMessage(), transaction, Mockito.mock(RouteContextList.class)));
numberOfCommitsMessageJournal.set(0); // it should been 0 before anyway
but since I have no real reason to require it to be zero before, I'm doing this
just in case it ever changes
numberOfPreparesMessageJournal.set(0);
@@ -546,7 +555,8 @@ public class PageTimedWriterUnitTest extends
ArtemisTestCase {
useReplication.set(true);
- timer.addTask(context, createPagedMessage(), null,
Mockito.mock(RouteContextList.class));
+ OperationContextImpl.setContext(context);
+ assertTrue(realJournalStorageManager.addToPage(pageStore,
createMessage(), null, Mockito.mock(RouteContextList.class)));
context.executeOnCompletion(new IOCallback() {
@Override
@@ -573,7 +583,8 @@ public class PageTimedWriterUnitTest extends
ArtemisTestCase {
useReplication.set(false);
- timer.addTask(context, createPagedMessage(), null,
Mockito.mock(RouteContextList.class));
+ OperationContextImpl.setContext(context);
+ assertTrue(realJournalStorageManager.addToPage(pageStore,
createMessage(), null, Mockito.mock(RouteContextList.class)));
context.executeOnCompletion(new IOCallback() {
@Override
@@ -603,7 +614,8 @@ public class PageTimedWriterUnitTest extends
ArtemisTestCase {
Transaction tx = new TransactionImpl(realJournalStorageManager,
Integer.MAX_VALUE);
tx.setContainsPersistent();
- timer.addTask(context, createPagedMessage(), tx,
Mockito.mock(RouteContextList.class));
+ OperationContextImpl.setContext(context);
+ assertTrue(realJournalStorageManager.addToPage(pageStore,
createMessage(), tx, Mockito.mock(RouteContextList.class)));
tx.addOperation(new TransactionOperationAbstract() {
@Override
public void afterCommit(Transaction tx) {
@@ -617,7 +629,6 @@ public class PageTimedWriterUnitTest extends
ArtemisTestCase {
allowRunning.countDown();
assertTrue(latch.await(10, TimeUnit.SECONDS));
-
}
@Test
@@ -693,7 +704,7 @@ public class PageTimedWriterUnitTest extends
ArtemisTestCase {
@Test
public void testMarkRollbackCancelDelay() throws Exception {
- testRollback(true);
+ testRollback(false);
}
private void testRollback(boolean rollback) throws Exception {
@@ -866,9 +877,10 @@ public class PageTimedWriterUnitTest extends
ArtemisTestCase {
int sentNumber = 0;
while (timeout > System.currentTimeMillis()) {
try {
- PagedMessage message = createPagedMessage();
- message.getMessage().putStringProperty("testId",
String.valueOf(sentNumber));
- timer.addTask(context, message, tx, routeContextListMocked);
+ Message message = createMessage();
+ message.putStringProperty("testId", String.valueOf(sentNumber));
+ OperationContextImpl.setContext(context);
+ assertTrue(realJournalStorageManager.addToPage(pageStore, message,
tx, routeContextListMocked));
sentWrite.add(String.valueOf(sentNumber));
sentNumber++;
if (sentNumber % 1000 == 0) {
@@ -884,13 +896,16 @@ public class PageTimedWriterUnitTest extends
ArtemisTestCase {
assertTrue(runLatch.await(10, TimeUnit.SECONDS));
assertTrue(timer.isStarted());
timer.delay(); // calling one more delay as the last one done could
still be missing
+
assertEquals(0, errors.get());
- // not supposed to throw any exception
Wait.assertEquals(0, tx::getPendingDelay, 5000, 100);
+ // not supposed to throw any exceptions
tx.commit();
assertTrue(committed.await(10, TimeUnit.SECONDS));
+ Wait.assertTrue(() -> interceptedWrite.size() == sentWrite.size(), 5000,
100);
+
int interceptorOriginalSize = interceptedWrite.size();
int sentOriginalSize = sentWrite.size();
@@ -907,7 +922,104 @@ public class PageTimedWriterUnitTest extends
ArtemisTestCase {
assertEquals(interceptorOriginalSize, sentOriginalSize);
assertEquals(sentNumber, interceptorOriginalSize);
-
}
+ @Test
+ public void testLockWhileFlowControlled() throws Exception {
+ AtomicBoolean notSupposedToWrite = new AtomicBoolean(false);
+
+ AtomicInteger errors = new AtomicInteger(0);
+
+ LinkedHashSet<String> interceptedWrite = new LinkedHashSet<>();
+ LinkedHashSet<String> sentWrite = new LinkedHashSet<>();
+
+ directWriteInterceptor = m -> {
+ String messageID = m.getMessage().getStringProperty("testId");
+ if (messageID == null) {
+ logger.warn("no messageID defined on message");
+ errors.incrementAndGet();
+ }
+ if (notSupposedToWrite.get()) {
+ logger.warn("Not supposed to write message {}",
m.getMessage().getStringProperty("testId"));
+ errors.incrementAndGet();
+ }
+ interceptedWrite.add(m.getMessage().getStringProperty("testId"));
+ };
+
+ // I don't want to mess with the Executor simulating to be on the
+ ExecutorService testExecutor = Executors.newFixedThreadPool(1);
+
+ AtomicBoolean running = new AtomicBoolean(true);
+ runAfter(() -> running.set(false));
+ runAfter(testExecutor::shutdownNow);
+
+ CountDownLatch runLatch = new CountDownLatch(1);
+ CyclicBarrier flagStart = new CyclicBarrier(2);
+
+ AtomicInteger sentNumber = new AtomicInteger(0);
+
+ // sending messages
+ testExecutor.execute(() -> {
+ try {
+ flagStart.await(10, TimeUnit.SECONDS);
+
+ RouteContextList routeContextListMocked =
Mockito.mock(RouteContextList.class);
+
+ while (running.get()) {
+ Message message = createMessage();
+ message.putStringProperty("testId", String.valueOf(sentNumber));
+ OperationContextImpl.setContext(context);
+ assertTrue(realJournalStorageManager.addToPage(pageStore,
message, null, routeContextListMocked));
+ sentWrite.add(String.valueOf(sentNumber.get()));
+ sentNumber.incrementAndGet();
+ if (sentNumber.get() % 1000 == 0) {
+ logger.info("Sent {}", sentNumber);
+ }
+ }
+ } catch (Throwable e) {
+ logger.warn(e.getMessage(), e);
+ errors.incrementAndGet();
+ } finally {
+ runLatch.countDown();
+ }
+ });
+
+ flagStart.await(10, TimeUnit.SECONDS);
+
+ // getting a base credit for most messages
+ int baseCreditSize = createMessage().putStringProperty("testID",
"00000").getEncodeSize();
+
+ // Waiting messages to stop flowing
+ Wait.assertTrue(() -> timer.getAvailablePermits() < baseCreditSize,
5_000, 10);
+
+ // this is simulating certain operations that will need to get a
writeLock (example replication start)
+ // we still must be able to get a writeLock while paging is flow
controller
+ // otherwise we could starve or deadlock in some scenarios
+ realJournalStorageManager.writeLock();
+ realJournalStorageManager.writeUnlock();
+
+ allowRunning.countDown(); // allowing messages to flow again
+ running.set(false);
+ assertTrue(runLatch.await(10, TimeUnit.SECONDS));
+ assertTrue(timer.isStarted());
+ timer.delay(); // calling one more delay as the last one done could
still be missing
+ assertEquals(0, errors.get());
+ Wait.assertTrue(() -> interceptedWrite.size() == sentWrite.size(), 5000,
100);
+ int interceptorOriginalSize = interceptedWrite.size();
+ int sentOriginalSize = sentWrite.size();
+
+ interceptedWrite.forEach(s -> {
+ sentWrite.remove(s);
+ });
+ sentWrite.forEach(m -> {
+ logger.warn("message {} missed", m);
+ });
+
+ assertEquals(interceptorOriginalSize, sentOriginalSize);
+
+ assertEquals(0, sentWrite.size());
+
+ assertEquals(interceptorOriginalSize, sentOriginalSize);
+ assertEquals(sentNumber.get(), interceptorOriginalSize);
+ }
}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact