This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 900e329c90 ARTEMIS-5441 Flow control on PageTimedWriter deadlocks with 
Repliation.start
900e329c90 is described below

commit 900e329c90ec4ba6cae30d3bc4bd16038254ab3f
Author: Clebert Suconic <[email protected]>
AuthorDate: Tue Apr 22 16:03:12 2025 -0400

    ARTEMIS-5441 Flow control on PageTimedWriter deadlocks with Repliation.start
---
 .../connect/mirror/AMQPMirrorControllerSource.java |   5 +-
 .../activemq/artemis/core/paging/PagingStore.java  |   5 +-
 .../artemis/core/paging/impl/PageTimedWriter.java  |  32 ++++-
 .../artemis/core/paging/impl/PagingStoreImpl.java  |  44 +++---
 .../journal/AbstractJournalStorageManager.java     |  11 +-
 .../core/transaction/impl/TransactionImpl.java     |   2 +-
 .../storage/PersistMultiThreadTest.java            |   7 +-
 .../core/paging/impl/PageTimedWriterUnitTest.java  | 148 ++++++++++++++++++---
 8 files changed, 207 insertions(+), 47 deletions(-)

diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
index a4efb5739a..fae26f0293 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
@@ -374,8 +374,11 @@ public class AMQPMirrorControllerSource extends 
BasicMirrorController<Sender> im
             return;
          }
 
+         int creditsWrite = snfQueue.getPagingStore().page(message, tx, 
pagedRouteContext, this::copyMessageForPaging, true);
+
          // This will store the message on paging, and the message will be 
copied into paging.
-         if (snfQueue.getPagingStore().page(message, tx, pagedRouteContext, 
this::copyMessageForPaging)) {
+         if (creditsWrite >= 0) {
+            snfQueue.getPagingStore().writeFlowControl(creditsWrite);
             if (tx == null) {
                snfQueue.deliverAsync();
             } else {
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
index 9e3608056b..29f5a5faf7 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
@@ -129,7 +129,7 @@ public interface PagingStore extends ActiveMQComponent, 
RefCountMessageListener
     */
    boolean page(Message message, Transaction tx, RouteContextList listCtx) 
throws Exception;
 
-   boolean page(Message message, Transaction tx, RouteContextList listCtx, 
Function<Message, Message> pageDecorator) throws Exception;
+   int page(Message message, Transaction tx, RouteContextList listCtx, 
Function<Message, Message> pageDecorator, boolean useFlowControl) throws 
Exception;
 
    Page usePage(long page);
 
@@ -278,4 +278,7 @@ public interface PagingStore extends ActiveMQComponent, 
RefCountMessageListener
    default StorageManager getStorageManager() {
       return null;
    }
+
+   default void writeFlowControl(int credits) {
+   }
 }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageTimedWriter.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageTimedWriter.java
index a6f584f677..91b53e812f 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageTimedWriter.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageTimedWriter.java
@@ -67,13 +67,14 @@ public class PageTimedWriter extends 
ActiveMQScheduledComponent {
 
    public static class PageEvent {
 
-      PageEvent(OperationContext context, PagedMessage message, Transaction 
tx, RouteContextList listCtx, int credits, boolean replicated) {
+      PageEvent(OperationContext context, PagedMessage message, Transaction 
tx, RouteContextList listCtx, int credits, boolean replicated, boolean 
useFlowControl) {
          this.context = context;
          this.message = message;
          this.listCtx = listCtx;
          this.replicated = replicated;
          this.credits = credits;
          this.tx = tx;
+         this.useFlowControl = useFlowControl;
       }
 
       final boolean replicated;
@@ -82,6 +83,7 @@ public class PageTimedWriter extends 
ActiveMQScheduledComponent {
       final RouteContextList listCtx;
       final Transaction tx;
       final int credits;
+      final boolean useFlowControl;
    }
 
    public PageTimedWriter(int writeCredits, StorageManager storageManager, 
PagingStoreImpl pagingStore, ScheduledExecutorService scheduledExecutor, 
Executor executor, boolean syncNonTX, long timeSync) {
@@ -118,18 +120,20 @@ public class PageTimedWriter extends 
ActiveMQScheduledComponent {
       pendingTasksUpdater.incrementAndGet(this);
    }
 
-   public void addTask(OperationContext context,
+   public int addTask(OperationContext context,
                                     PagedMessage message,
                                     Transaction tx,
-                                    RouteContextList listCtx) {
+                                    RouteContextList listCtx, boolean 
useFlowControl) {
 
       logger.trace("Adding paged message {} to paged writer", message);
 
+      // the module using the page operation should later call flowControl 
from this class.
+      // the only exception to this would be from tests where we don't really 
care about flow control on the TimedExecutor
+      // also: the credits is based on the page size, and we use the 
encodeSize to flow it.
       int credits = Math.min(message.getEncodeSize() + 
PageReadWriter.SIZE_RECORD, maxCredits);
-      writeCredits.acquireUninterruptibly(credits);
+
       synchronized (this) {
          if (!isStarted()) {
-            writeCredits.release(credits);
             throw new IllegalStateException("PageWriter Service is stopped");
          }
 
@@ -139,7 +143,7 @@ public class PageTimedWriter extends 
ActiveMQScheduledComponent {
          }
 
          final boolean replicated = storageManager.isReplicated();
-         PageEvent event = new PageEvent(context, message, tx, listCtx, 
credits, replicated);
+         PageEvent event = new PageEvent(context, message, tx, listCtx, 
credits, replicated, useFlowControl);
          context.storeLineUp();
          if (replicated) {
             context.replicationLineUp();
@@ -147,6 +151,14 @@ public class PageTimedWriter extends 
ActiveMQScheduledComponent {
          this.pageEvents.add(event);
          delay();
       }
+
+      return credits;
+   }
+
+   public void flowControl(int credits) {
+      if (credits > 0) {
+         writeCredits.acquireUninterruptibly(credits);
+      }
    }
 
    private synchronized PageEvent[] extractPendingEvents() {
@@ -257,7 +269,9 @@ public class PageTimedWriter extends 
ActiveMQScheduledComponent {
          try {
             for (PageEvent event : pendingEvents) {
                pendingTasksUpdater.decrementAndGet(this);
-               writeCredits.release(event.credits);
+               if (event.useFlowControl) {
+                  writeCredits.release(event.credits);
+               }
             }
             OperationContextImpl.setContext(beforeContext);
          } catch (Throwable t) {
@@ -270,4 +284,8 @@ public class PageTimedWriter extends 
ActiveMQScheduledComponent {
    protected void performSync() throws Exception {
       pagingStore.ioSync();
    }
+
+   public int getAvailablePermits() {
+      return writeCredits.availablePermits();
+   }
 }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
index 3507fd3759..e0f5e7cd56 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
@@ -1273,17 +1273,18 @@ public class PagingStoreImpl implements PagingStore {
    public boolean page(Message message,
                        final Transaction tx,
                        RouteContextList listCtx) throws Exception {
-      return page(message, tx, listCtx, null);
+      return page(message, tx, listCtx, null, false) >= 0;
    }
 
    @Override
-   public boolean page(Message message,
+   public int page(Message message,
                        final Transaction tx,
                        RouteContextList listCtx,
-                       Function<Message, Message> pageDecorator) throws 
Exception {
+                       Function<Message, Message> pageDecorator,
+                       boolean useFlowControl) throws Exception {
 
       if (!running) {
-         return false;
+         return -1;
       }
 
       boolean full = isFull();
@@ -1303,12 +1304,12 @@ public class PagingStoreImpl implements PagingStore {
                printedDropMessagesWarning = true;
                ActiveMQServerLogger.LOGGER.pageStoreDropMessages(storeName, 
getPageInfo());
             }
-            return true;
+            return 0;
          } else {
-            return false;
+            return -1;
          }
       } else if (addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK) {
-         return false;
+         return -1;
       }
 
       if (pageFull) {
@@ -1326,24 +1327,28 @@ public class PagingStoreImpl implements PagingStore {
          }
 
          // we are in page mode, if we got to this point, we are dropping the 
message while still paging
-         // this needs to return true as it is paging
-         return true;
+         // we return 0 as in the storage is in "page mode" however no credits 
are being taken.
+         return 0;
       }
 
-      return writePage(message, tx, listCtx, pageDecorator);
+      int creditsUsed = writePage(message, tx, listCtx, pageDecorator, 
useFlowControl);
+
+      return creditsUsed;
    }
 
-   private boolean writePage(Message message,
+   private int writePage(Message message,
                              Transaction tx,
                              RouteContextList listCtx,
-                             Function<Message, Message> pageDecorator) throws 
Exception {
+                             Function<Message, Message> pageDecorator,
+                             boolean useFlowControl) throws Exception {
       // We need to use a readLock as we need to keep paging until we 
scheduled a task
       // notice that to leave paging you need pending tasks done
       readLock();
       PagedMessage pagedMessage;
       try {
          if (!paging) {
-            return false;
+            // no paging was used
+            return -1;
          }
 
          final long transactionID = (tx != null && 
tx.isAllowPageTransaction()) ? tx.getID() : -1L;
@@ -1383,9 +1388,18 @@ public class PagingStoreImpl implements PagingStore {
       // timedWriter.hasPendingIO would return pending based on incrementTask, 
and for that reason we can still call the addTask away from the readLock.
       //
       // This scenario was found when running FloodServerWithAsyncSendTest 
smoke test.
-      timedWriter.addTask(storageManager.getContext(), pagedMessage, tx, 
listCtx);
+      int credits = timedWriter.addTask(storageManager.getContext(), 
pagedMessage, tx, listCtx, useFlowControl);
 
-      return true;
+      assert credits >= 0;
+
+      return credits;
+   }
+
+   @Override
+   public void writeFlowControl(int credits) {
+      if (timedWriter != null) {
+         timedWriter.flowControl(credits);
+      }
    }
 
    protected void directWritePage(PagedMessage pagedMessage, boolean lineUp, 
boolean originalReplicated) throws Exception {
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
index 32aae12d74..bd6dfab33a 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
@@ -2280,9 +2280,18 @@ public abstract class AbstractJournalStorageManager 
extends CriticalComponentImp
 
    @Override
    public boolean addToPage(PagingStore store, Message msg, Transaction tx, 
RouteContextList listCtx) throws Exception {
+      int credits;
       try (ArtemisCloseable closeable = closeableReadLock()) {
-         return store.page(msg, tx, listCtx);
+         credits = store.page(msg, tx, listCtx, null, true);
       }
+
+      // flow control on the TimedWriter needs to be done outside of locking
+      // it is ok to do it after the write
+      if (credits > 0) {
+         store.writeFlowControl(credits);
+      }
+
+      return credits >= 0;
    }
 
    private void installLargeMessageConfirmationOnTX(Transaction tx, long 
recordID) {
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java
index a245865fc7..c45badfe0c 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java
@@ -506,7 +506,7 @@ public class TransactionImpl implements Transaction {
                throw new ActiveMQIllegalStateException("Transaction is in 
invalid state " + state);
             }
          } else {
-            if (state != State.ACTIVE && state != State.ROLLBACK_ONLY) {
+            if (delayed == 0 && state != State.ACTIVE && state != 
State.ROLLBACK_ONLY) {
                throw new ActiveMQIllegalStateException("Transaction is in 
invalid state " + state);
             }
          }
diff --git 
a/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java
 
b/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java
index e5010424c5..0a784f21a4 100644
--- 
a/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java
+++ 
b/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java
@@ -463,11 +463,12 @@ public class PersistMultiThreadTest extends 
ActiveMQTestBase {
       }
 
       @Override
-      public boolean page(Message message,
+      public int page(Message message,
                           Transaction tx,
                           RouteContextList listCtx,
-                          Function<Message, Message> pageDecorator) throws 
Exception {
-         return false;
+                          Function<Message, Message> pageDecorator,
+                          boolean useFlowControl) throws Exception {
+         return -1;
       }
 
       @Override
diff --git 
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PageTimedWriterUnitTest.java
 
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PageTimedWriterUnitTest.java
index be8ce5531a..3fc0fc3208 100644
--- 
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PageTimedWriterUnitTest.java
+++ 
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PageTimedWriterUnitTest.java
@@ -33,6 +33,7 @@ import java.util.function.Consumer;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.config.Configuration;
 import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
@@ -73,6 +74,7 @@ import org.apache.activemq.artemis.utils.ReusableLatch;
 import org.apache.activemq.artemis.utils.Wait;
 import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;
 import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer;
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -195,7 +197,6 @@ public class PageTimedWriterUnitTest extends 
ArtemisTestCase {
       executorService = Executors.newFixedThreadPool(10);
       runAfter(scheduledExecutorService::shutdownNow);
       runAfter(executorService::shutdownNow);
-      runAfter(() -> OperationContextImpl.clearContext());
       executorFactory = new OrderedExecutorFactory(executorService);
       context = OperationContextImpl.getContext(executorFactory);
       assertNotNull(context);
@@ -311,6 +312,11 @@ public class PageTimedWriterUnitTest extends 
ArtemisTestCase {
       runAfter(OperationContextImpl::clearContext);
    }
 
+   @AfterEach
+   public void clearContext() {
+      OperationContextImpl.clearContext();
+   }
+
    // a test to validate if the Mocks are correctly setup
    @Test
    public void testValidateMocks() throws Exception {
@@ -356,7 +362,6 @@ public class PageTimedWriterUnitTest extends 
ArtemisTestCase {
       long id = realJournalStorageManager.generateID();
       long newID = realJournalStorageManager.generateID();
       assertEquals(1L, newID - id);
-
    }
 
    PagedMessage createPagedMessage() {
@@ -367,7 +372,8 @@ public class PageTimedWriterUnitTest extends 
ArtemisTestCase {
    public void testIOCompletion() throws Exception {
       CountDownLatch latch = new CountDownLatch(1);
 
-      timer.addTask(context, createPagedMessage(), null, 
Mockito.mock(RouteContextList.class));
+      OperationContextImpl.setContext(context);
+      assertTrue(realJournalStorageManager.addToPage(pageStore, 
createMessage(), null, Mockito.mock(RouteContextList.class)));
 
       context.executeOnCompletion(new IOCallback() {
          @Override
@@ -383,7 +389,6 @@ public class PageTimedWriterUnitTest extends 
ArtemisTestCase {
       assertFalse(latch.await(10, TimeUnit.MILLISECONDS));
       allowRunning.countDown();
       assertTrue(latch.await(10, TimeUnit.SECONDS));
-
    }
 
    @Test
@@ -392,7 +397,8 @@ public class PageTimedWriterUnitTest extends 
ArtemisTestCase {
 
       useReplication.set(true);
 
-      timer.addTask(context, createPagedMessage(), null, 
Mockito.mock(RouteContextList.class));
+      OperationContextImpl.setContext(context);
+      assertTrue(realJournalStorageManager.addToPage(pageStore, 
createMessage(), null, Mockito.mock(RouteContextList.class)));
 
       context.executeOnCompletion(new IOCallback() {
          @Override
@@ -426,7 +432,8 @@ public class PageTimedWriterUnitTest extends 
ArtemisTestCase {
          }
       });
 
-      timer.addTask(context, createPagedMessage(), transaction, 
Mockito.mock(RouteContextList.class));
+      OperationContextImpl.setContext(context);
+      assertTrue(realJournalStorageManager.addToPage(pageStore, 
createMessage(), transaction, Mockito.mock(RouteContextList.class)));
 
       transaction.commit();
 
@@ -497,7 +504,8 @@ public class PageTimedWriterUnitTest extends 
ArtemisTestCase {
          }
       });
 
-      timer.addTask(context, createPagedMessage(), transaction, 
Mockito.mock(RouteContextList.class));
+      OperationContextImpl.setContext(context);
+      assertTrue(realJournalStorageManager.addToPage(pageStore, 
createMessage(), transaction, Mockito.mock(RouteContextList.class)));
 
       numberOfCommitsMessageJournal.set(0); // it should been 0 before anyway 
but since I have no real reason to require it to be zero before, I'm doing this 
just in case it ever changes
       transaction.commit();
@@ -525,7 +533,8 @@ public class PageTimedWriterUnitTest extends 
ArtemisTestCase {
          }
       });
 
-      timer.addTask(context, createPagedMessage(), transaction, 
Mockito.mock(RouteContextList.class));
+      OperationContextImpl.setContext(context);
+      assertTrue(realJournalStorageManager.addToPage(pageStore, 
createMessage(), transaction, Mockito.mock(RouteContextList.class)));
 
       numberOfCommitsMessageJournal.set(0); // it should been 0 before anyway 
but since I have no real reason to require it to be zero before, I'm doing this 
just in case it ever changes
       numberOfPreparesMessageJournal.set(0);
@@ -546,7 +555,8 @@ public class PageTimedWriterUnitTest extends 
ArtemisTestCase {
 
       useReplication.set(true);
 
-      timer.addTask(context, createPagedMessage(), null, 
Mockito.mock(RouteContextList.class));
+      OperationContextImpl.setContext(context);
+      assertTrue(realJournalStorageManager.addToPage(pageStore, 
createMessage(), null, Mockito.mock(RouteContextList.class)));
 
       context.executeOnCompletion(new IOCallback() {
          @Override
@@ -573,7 +583,8 @@ public class PageTimedWriterUnitTest extends 
ArtemisTestCase {
 
       useReplication.set(false);
 
-      timer.addTask(context, createPagedMessage(), null, 
Mockito.mock(RouteContextList.class));
+      OperationContextImpl.setContext(context);
+      assertTrue(realJournalStorageManager.addToPage(pageStore, 
createMessage(), null, Mockito.mock(RouteContextList.class)));
 
       context.executeOnCompletion(new IOCallback() {
          @Override
@@ -603,7 +614,8 @@ public class PageTimedWriterUnitTest extends 
ArtemisTestCase {
       Transaction tx = new TransactionImpl(realJournalStorageManager, 
Integer.MAX_VALUE);
       tx.setContainsPersistent();
 
-      timer.addTask(context, createPagedMessage(), tx, 
Mockito.mock(RouteContextList.class));
+      OperationContextImpl.setContext(context);
+      assertTrue(realJournalStorageManager.addToPage(pageStore, 
createMessage(), tx, Mockito.mock(RouteContextList.class)));
       tx.addOperation(new TransactionOperationAbstract() {
          @Override
          public void afterCommit(Transaction tx) {
@@ -617,7 +629,6 @@ public class PageTimedWriterUnitTest extends 
ArtemisTestCase {
       allowRunning.countDown();
 
       assertTrue(latch.await(10, TimeUnit.SECONDS));
-
    }
 
    @Test
@@ -693,7 +704,7 @@ public class PageTimedWriterUnitTest extends 
ArtemisTestCase {
 
    @Test
    public void testMarkRollbackCancelDelay() throws Exception {
-      testRollback(true);
+      testRollback(false);
    }
 
    private void testRollback(boolean rollback) throws Exception {
@@ -866,9 +877,10 @@ public class PageTimedWriterUnitTest extends 
ArtemisTestCase {
       int sentNumber = 0;
       while (timeout > System.currentTimeMillis()) {
          try {
-            PagedMessage message = createPagedMessage();
-            message.getMessage().putStringProperty("testId", 
String.valueOf(sentNumber));
-            timer.addTask(context, message, tx, routeContextListMocked);
+            Message message = createMessage();
+            message.putStringProperty("testId", String.valueOf(sentNumber));
+            OperationContextImpl.setContext(context);
+            assertTrue(realJournalStorageManager.addToPage(pageStore, message, 
tx, routeContextListMocked));
             sentWrite.add(String.valueOf(sentNumber));
             sentNumber++;
             if (sentNumber % 1000 == 0) {
@@ -884,13 +896,16 @@ public class PageTimedWriterUnitTest extends 
ArtemisTestCase {
       assertTrue(runLatch.await(10, TimeUnit.SECONDS));
       assertTrue(timer.isStarted());
       timer.delay(); // calling one more delay as the last one done could 
still be missing
+
       assertEquals(0, errors.get());
 
-      // not supposed to throw any exception
       Wait.assertEquals(0, tx::getPendingDelay, 5000, 100);
+      // not supposed to throw any exceptions
       tx.commit();
       assertTrue(committed.await(10, TimeUnit.SECONDS));
 
+      Wait.assertTrue(() -> interceptedWrite.size() == sentWrite.size(), 5000, 
100);
+
       int interceptorOriginalSize = interceptedWrite.size();
       int sentOriginalSize = sentWrite.size();
 
@@ -907,7 +922,104 @@ public class PageTimedWriterUnitTest extends 
ArtemisTestCase {
 
       assertEquals(interceptorOriginalSize, sentOriginalSize);
       assertEquals(sentNumber, interceptorOriginalSize);
-
    }
 
+   @Test
+   public void testLockWhileFlowControlled() throws Exception {
+      AtomicBoolean notSupposedToWrite = new AtomicBoolean(false);
+
+      AtomicInteger errors = new AtomicInteger(0);
+
+      LinkedHashSet<String> interceptedWrite = new LinkedHashSet<>();
+      LinkedHashSet<String> sentWrite = new LinkedHashSet<>();
+
+      directWriteInterceptor = m -> {
+         String messageID = m.getMessage().getStringProperty("testId");
+         if (messageID == null) {
+            logger.warn("no messageID defined on message");
+            errors.incrementAndGet();
+         }
+         if (notSupposedToWrite.get()) {
+            logger.warn("Not supposed to write message {}", 
m.getMessage().getStringProperty("testId"));
+            errors.incrementAndGet();
+         }
+         interceptedWrite.add(m.getMessage().getStringProperty("testId"));
+      };
+
+      // I don't want to mess with the Executor simulating to be on the
+      ExecutorService testExecutor = Executors.newFixedThreadPool(1);
+
+      AtomicBoolean running = new AtomicBoolean(true);
+      runAfter(() -> running.set(false));
+      runAfter(testExecutor::shutdownNow);
+
+      CountDownLatch runLatch = new CountDownLatch(1);
+      CyclicBarrier flagStart = new CyclicBarrier(2);
+
+      AtomicInteger sentNumber = new AtomicInteger(0);
+
+      // sending messages
+      testExecutor.execute(() -> {
+         try {
+            flagStart.await(10, TimeUnit.SECONDS);
+
+            RouteContextList routeContextListMocked = 
Mockito.mock(RouteContextList.class);
+
+            while (running.get()) {
+               Message message = createMessage();
+               message.putStringProperty("testId", String.valueOf(sentNumber));
+               OperationContextImpl.setContext(context);
+               assertTrue(realJournalStorageManager.addToPage(pageStore, 
message, null, routeContextListMocked));
+               sentWrite.add(String.valueOf(sentNumber.get()));
+               sentNumber.incrementAndGet();
+               if (sentNumber.get() % 1000 == 0) {
+                  logger.info("Sent {}", sentNumber);
+               }
+            }
+         } catch (Throwable e) {
+            logger.warn(e.getMessage(), e);
+            errors.incrementAndGet();
+         } finally {
+            runLatch.countDown();
+         }
+      });
+
+      flagStart.await(10, TimeUnit.SECONDS);
+
+      // getting a base credit for most messages
+      int baseCreditSize = createMessage().putStringProperty("testID", 
"00000").getEncodeSize();
+
+      // Waiting messages to stop flowing
+      Wait.assertTrue(() -> timer.getAvailablePermits() < baseCreditSize, 
5_000, 10);
+
+      // this is simulating certain operations that will need to get a 
writeLock (example replication start)
+      // we still must be able to get a writeLock while paging is flow 
controller
+      // otherwise we could starve or deadlock in some scenarios
+      realJournalStorageManager.writeLock();
+      realJournalStorageManager.writeUnlock();
+
+      allowRunning.countDown(); // allowing messages to flow again
+      running.set(false);
+      assertTrue(runLatch.await(10, TimeUnit.SECONDS));
+      assertTrue(timer.isStarted());
+      timer.delay(); // calling one more delay as the last one done could 
still be missing
+      assertEquals(0, errors.get());
+      Wait.assertTrue(() -> interceptedWrite.size() == sentWrite.size(), 5000, 
100);
+      int interceptorOriginalSize = interceptedWrite.size();
+      int sentOriginalSize = sentWrite.size();
+
+      interceptedWrite.forEach(s -> {
+         sentWrite.remove(s);
+      });
+      sentWrite.forEach(m -> {
+         logger.warn("message {} missed", m);
+      });
+
+      assertEquals(interceptorOriginalSize, sentOriginalSize);
+
+      assertEquals(0, sentWrite.size());
+
+      assertEquals(interceptorOriginalSize, sentOriginalSize);
+      assertEquals(sentNumber.get(), interceptorOriginalSize);
+   }
 }
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to