This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new eb01d78245 ARTEMIS-5384 Proper cycle PagedTimeWriter during a server
shutdown
eb01d78245 is described below
commit eb01d782452b795123ee74a7e2ca5a04f62a736a
Author: Clebert Suconic <[email protected]>
AuthorDate: Fri Apr 4 17:18:21 2025 -0400
ARTEMIS-5384 Proper cycle PagedTimeWriter during a server shutdown
---
.../artemis/core/paging/impl/PageTimedWriter.java | 11 ++++--
.../artemis/core/paging/impl/PagingStoreImpl.java | 11 +++++-
.../core/paging/impl/PageTimedWriterUnitTest.java | 41 ++++++++++++++++++++++
3 files changed, 59 insertions(+), 4 deletions(-)
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageTimedWriter.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageTimedWriter.java
index bbeba5e849..4725c2d7ae 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageTimedWriter.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageTimedWriter.java
@@ -191,9 +191,14 @@ public class PageTimedWriter extends
ActiveMQScheduledComponent {
// In case of failure, The context should propagate an exception to
the client
// We send an exception to the client even on the case of a failure
// to avoid possible locks and the client not getting the exception
back
- for (PageEvent event : pendingEvents) {
- event.context.onError(ActiveMQExceptionType.IO_ERROR.getCode(),
e.getClass() + " during ioSync for paging on " + pagingStore.getStoreName() +
": " + e.getMessage());
- }
+ executor.execute(() -> {
+ // The onError has to be called from a separate executor
+ // because this PagedWriter will be holding the lock on the
storage manager
+ // and this might lead to a deadlock
+ for (PageEvent event : pendingEvents) {
+ event.context.onError(ActiveMQExceptionType.IO_ERROR.getCode(),
e.getClass() + " during ioSync for paging on " + pagingStore.getStoreName() +
": " + e.getMessage());
+ }
+ });
} finally {
for (PageEvent event : pendingEvents) {
event.context.done();
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
index 3954a3e576..0ff67d3244 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
@@ -636,7 +636,12 @@ public class PagingStoreImpl implements PagingStore {
public void stop() throws Exception {
synchronized (this) {
if (running) {
- cursorProvider.stop();
+ if (timedWriter != null) {
+ timedWriter.stop();
+ }
+ if (cursorProvider != null) {
+ cursorProvider.stop();
+ }
running = false;
} else {
return;
@@ -741,6 +746,10 @@ public class PagingStoreImpl implements PagingStore {
if (page != null && !(numberOfPages == 1 && page.getSize() ==
0)) {
startPaging();
}
+
+ if (timedWriter != null) {
+ timedWriter.start();
+ }
}
}
diff --git
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PageTimedWriterUnitTest.java
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PageTimedWriterUnitTest.java
index 60c17a960b..9067ef2d57 100644
---
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PageTimedWriterUnitTest.java
+++
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PageTimedWriterUnitTest.java
@@ -438,6 +438,47 @@ public class PageTimedWriterUnitTest extends
ArtemisTestCase {
assertEquals(numberOfMessages, pageWrites.get());
}
+ @Test
+ public void testVerifyWritesAfterStop() throws Exception {
+ int numberOfMessages = 100;
+ CountDownLatch latch = new CountDownLatch(numberOfMessages);
+
+ allowRunning.countDown();
+ useReplication.set(false);
+ allowSync.setCount(1);
+
+ for (int i = 0; i < numberOfMessages; i++) {
+ TransactionImpl newTX = new
TransactionImpl(realJournalStorageManager);
+ newTX.setContainsPersistent();
+ newTX.addOperation(new TransactionOperationAbstract() {
+ @Override
+ public void afterCommit(Transaction tx) {
+ super.afterCommit(tx);
+ latch.countDown();
+ }
+ });
+ pageStore.page(createMessage(), newTX, routeContextList);
+ newTX.commit();
+ }
+
+ assertFalse(latch.await(10, TimeUnit.MILLISECONDS));
+ allowSync.countDown();
+ // issuing a stop should finish whatever was pending before
+ // instead of sending it to the limbo
+ pageStore.stop();
+ assertTrue(latch.await(10, TimeUnit.SECONDS));
+
+ assertEquals(numberOfMessages, pageWrites.get());
+ }
+
+ @Test
+ public void testVerifyTimedWriterIsStopped() throws Exception {
+ allowRunning.countDown();
+ useReplication.set(false);
+ pageStore.stop();
+ assertFalse(pageStore.getPageTimedWriter().isStarted());
+ }
+
@Test
public void testTXCompletionWhileDisableReplica() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact