This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push:
new a85f029 ARTEMIS-2415 JDBCJournal miss pending tasks during shutdown
new 9d96dda This closes #2741
a85f029 is described below
commit a85f0291061f299c79619f95c131e0dfa4a664e2
Author: brusdev <[email protected]>
AuthorDate: Sat Jul 20 11:50:11 2019 +0200
ARTEMIS-2415 JDBCJournal miss pending tasks during shutdown
Wait deleting large message tasks during stop.
---
.../impl/journal/JDBCJournalStorageManager.java | 2 +
.../impl/journal/JournalStorageManager.java | 10 ++-
.../impl/journal/JournalStorageManagerTest.java | 73 ++++++++++++++++++++++
3 files changed, 84 insertions(+), 1 deletion(-)
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java
index 629405b..9e67f8d 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java
@@ -108,6 +108,8 @@ public class JDBCJournalStorageManager extends
JournalStorageManager {
idGenerator.persistCurrentID();
}
+ deletingLargeMessageTasks.await(30, TimeUnit.SECONDS);
+
final CountDownLatch latch = new CountDownLatch(1);
executor.execute(new Runnable() {
@Override
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
index 2ae475c..df4fec3 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
@@ -65,6 +65,7 @@ import
org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.core.server.files.FileStoreMonitor;
import org.apache.activemq.artemis.journal.ActiveMQJournalBundle;
import org.apache.activemq.artemis.utils.ExecutorFactory;
+import org.apache.activemq.artemis.utils.ReusableLatch;
import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer;
import org.jboss.logging.Logger;
@@ -86,6 +87,8 @@ public class JournalStorageManager extends
AbstractJournalStorageManager {
protected ReplicationManager replicator;
+ protected final ReusableLatch deletingLargeMessageTasks = new
ReusableLatch();
+
public JournalStorageManager(final Configuration config,
final CriticalAnalyzer analyzer,
final ExecutorFactory executorFactory,
@@ -272,6 +275,8 @@ public class JournalStorageManager extends
AbstractJournalStorageManager {
idGenerator.persistCurrentID();
}
+ deletingLargeMessageTasks.await(30, TimeUnit.SECONDS);
+
final CountDownLatch latch = new CountDownLatch(1);
try {
executor.execute(new Runnable() {
@@ -517,6 +522,7 @@ public class JournalStorageManager extends
AbstractJournalStorageManager {
};
+ deletingLargeMessageTasks.countUp();
getContext(true).executeOnCompletion(new IOCallback() {
@Override
public void done() {
@@ -525,11 +531,13 @@ public class JournalStorageManager extends
AbstractJournalStorageManager {
} else {
executor.execute(deleteAction);
}
+
+ deletingLargeMessageTasks.countDown();
}
@Override
public void onError(int errorCode, String errorMessage) {
-
+ deletingLargeMessageTasks.countDown();
}
});
}
diff --git
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManagerTest.java
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManagerTest.java
index a058094..4e2741d 100644
---
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManagerTest.java
+++
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManagerTest.java
@@ -42,6 +42,8 @@ import
org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.core.server.impl.JournalLoader;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.ExecutorFactory;
+import org.apache.activemq.artemis.utils.Wait;
+import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;
import org.junit.AfterClass;
import org.junit.Assert;
@@ -49,6 +51,7 @@ import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
+import org.mockito.Mockito;
import static java.util.stream.Collectors.toList;
import static org.hamcrest.Matchers.is;
@@ -212,4 +215,74 @@ public class JournalStorageManagerTest extends
ActiveMQTestBase {
}
}
+ @Test
+ public void testDeletingLargeMessagePendingTasksOnShutdown() throws
Exception {
+ if (journalType == JournalType.ASYNCIO) {
+ assumeTrue("AIO is not supported on this platform",
AIOSequentialFileFactory.isSupported());
+ }
+ final Configuration configuration =
createDefaultInVMConfig().setJournalType(journalType);
+ final ExecutorFactory executorFactory = spy(new
OrderedExecutorFactory(executor));
+ final ExecutorFactory ioExecutorFactory = new
OrderedExecutorFactory(ioExecutor);
+ final ArtemisExecutor artemisExecutor = executorFactory.getExecutor();
+ final ArtemisExecutor artemisExecutorWrapper = spy(artemisExecutor);
+
Mockito.when(executorFactory.getExecutor()).thenReturn(artemisExecutorWrapper);
+ final JournalStorageManager manager = new
JournalStorageManager(configuration, null, executorFactory, null,
ioExecutorFactory);
+ manager.start();
+ manager.loadBindingJournal(new ArrayList<>(), new ArrayList<>(), new
ArrayList<>());
+ final PostOffice postOffice = mock(PostOffice.class);
+ final JournalLoader journalLoader = mock(JournalLoader.class);
+ manager.loadMessageJournal(postOffice, null, null, null, null, null,
null, journalLoader);
+ final LargeServerMessage largeMessage =
manager.createLargeMessage(manager.generateID() + 1, new
CoreMessage().setDurable(true));
+ final SequentialFile file = largeMessage.getFile();
+
+ boolean fileExists = file.exists();
+ manager.getContext(true).storeLineUp();
+ Assert.assertTrue(fileExists);
+ manager.deleteLargeMessageFile(largeMessage);
+
+ final Thread currentThread = Thread.currentThread();
+ final CountDownLatch beforeLatch = new CountDownLatch(1);
+ final CountDownLatch afterStopLatch = new CountDownLatch(1);
+
+ //Simulate an executor task that begins after store done and ends after
manager stop begins.
+ artemisExecutor.execute(() -> {
+ try {
+ //Wait until thread is ready to start executing manager stop.
+ Assert.assertTrue(beforeLatch.await(30000, TimeUnit.MILLISECONDS));
+
+ //Wait until thread executing manager stop is waiting for another
thread.
+ Assert.assertTrue(Wait.waitFor(() -> currentThread.getState() ==
Thread.State.TIMED_WAITING, 30000));
+ } catch (Exception ignore) {
+ }
+ });
+
+ Mockito.doAnswer(invocationOnMock -> {
+ invocationOnMock.callRealMethod();
+
+ if (Thread.currentThread().equals(currentThread) &&
+
invocationOnMock.getArgument(0).getClass().getName().contains(JournalStorageManager.class.getName()))
{
+
+ //Simulate an executor task that ends after manager stop.
+ artemisExecutor.execute(() -> {
+ try {
+ //Wait until manager stop is executed.
+ afterStopLatch.await(30000, TimeUnit.MILLISECONDS);
+ } catch (Exception ignore) {
+ }
+ });
+ }
+
+ return null;
+ }).when(artemisExecutorWrapper).execute(Mockito.any(Runnable.class));
+
+ manager.getContext(true).done();
+
+ beforeLatch.countDown();
+ manager.stop();
+ fileExists = file.exists();
+ afterStopLatch.countDown();
+
+ Assert.assertFalse(fileExists);
+ }
+
}