This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new a85f029  ARTEMIS-2415 JDBCJournal miss pending tasks during shutdown
     new 9d96dda  This closes #2741
a85f029 is described below

commit a85f0291061f299c79619f95c131e0dfa4a664e2
Author: brusdev <[email protected]>
AuthorDate: Sat Jul 20 11:50:11 2019 +0200

    ARTEMIS-2415 JDBCJournal miss pending tasks during shutdown
    
    Wait deleting large message tasks during stop.
---
 .../impl/journal/JDBCJournalStorageManager.java    |  2 +
 .../impl/journal/JournalStorageManager.java        | 10 ++-
 .../impl/journal/JournalStorageManagerTest.java    | 73 ++++++++++++++++++++++
 3 files changed, 84 insertions(+), 1 deletion(-)

diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java
index 629405b..9e67f8d 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java
@@ -108,6 +108,8 @@ public class JDBCJournalStorageManager extends 
JournalStorageManager {
             idGenerator.persistCurrentID();
       }
 
+      deletingLargeMessageTasks.await(30, TimeUnit.SECONDS);
+
       final CountDownLatch latch = new CountDownLatch(1);
       executor.execute(new Runnable() {
          @Override
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
index 2ae475c..df4fec3 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
@@ -65,6 +65,7 @@ import 
org.apache.activemq.artemis.core.server.LargeServerMessage;
 import org.apache.activemq.artemis.core.server.files.FileStoreMonitor;
 import org.apache.activemq.artemis.journal.ActiveMQJournalBundle;
 import org.apache.activemq.artemis.utils.ExecutorFactory;
+import org.apache.activemq.artemis.utils.ReusableLatch;
 import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer;
 import org.jboss.logging.Logger;
 
@@ -86,6 +87,8 @@ public class JournalStorageManager extends 
AbstractJournalStorageManager {
 
    protected ReplicationManager replicator;
 
+   protected final ReusableLatch deletingLargeMessageTasks = new 
ReusableLatch();
+
    public JournalStorageManager(final Configuration config,
                                 final CriticalAnalyzer analyzer,
                                 final ExecutorFactory executorFactory,
@@ -272,6 +275,8 @@ public class JournalStorageManager extends 
AbstractJournalStorageManager {
             idGenerator.persistCurrentID();
       }
 
+      deletingLargeMessageTasks.await(30, TimeUnit.SECONDS);
+
       final CountDownLatch latch = new CountDownLatch(1);
       try {
          executor.execute(new Runnable() {
@@ -517,6 +522,7 @@ public class JournalStorageManager extends 
AbstractJournalStorageManager {
 
       };
 
+      deletingLargeMessageTasks.countUp();
       getContext(true).executeOnCompletion(new IOCallback() {
          @Override
          public void done() {
@@ -525,11 +531,13 @@ public class JournalStorageManager extends 
AbstractJournalStorageManager {
             } else {
                executor.execute(deleteAction);
             }
+
+            deletingLargeMessageTasks.countDown();
          }
 
          @Override
          public void onError(int errorCode, String errorMessage) {
-
+            deletingLargeMessageTasks.countDown();
          }
       });
    }
diff --git 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManagerTest.java
 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManagerTest.java
index a058094..4e2741d 100644
--- 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManagerTest.java
+++ 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManagerTest.java
@@ -42,6 +42,8 @@ import 
org.apache.activemq.artemis.core.server.LargeServerMessage;
 import org.apache.activemq.artemis.core.server.impl.JournalLoader;
 import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
 import org.apache.activemq.artemis.utils.ExecutorFactory;
+import org.apache.activemq.artemis.utils.Wait;
+import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
 import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -49,6 +51,7 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
+import org.mockito.Mockito;
 
 import static java.util.stream.Collectors.toList;
 import static org.hamcrest.Matchers.is;
@@ -212,4 +215,74 @@ public class JournalStorageManagerTest extends 
ActiveMQTestBase {
       }
    }
 
+   @Test
+   public void testDeletingLargeMessagePendingTasksOnShutdown() throws 
Exception {
+      if (journalType == JournalType.ASYNCIO) {
+         assumeTrue("AIO is not supported on this platform", 
AIOSequentialFileFactory.isSupported());
+      }
+      final Configuration configuration = 
createDefaultInVMConfig().setJournalType(journalType);
+      final ExecutorFactory executorFactory = spy(new 
OrderedExecutorFactory(executor));
+      final ExecutorFactory ioExecutorFactory = new 
OrderedExecutorFactory(ioExecutor);
+      final ArtemisExecutor artemisExecutor = executorFactory.getExecutor();
+      final ArtemisExecutor artemisExecutorWrapper = spy(artemisExecutor);
+      
Mockito.when(executorFactory.getExecutor()).thenReturn(artemisExecutorWrapper);
+      final JournalStorageManager manager = new 
JournalStorageManager(configuration, null, executorFactory, null, 
ioExecutorFactory);
+      manager.start();
+      manager.loadBindingJournal(new ArrayList<>(), new ArrayList<>(), new 
ArrayList<>());
+      final PostOffice postOffice = mock(PostOffice.class);
+      final JournalLoader journalLoader = mock(JournalLoader.class);
+      manager.loadMessageJournal(postOffice, null, null, null, null, null, 
null, journalLoader);
+      final LargeServerMessage largeMessage = 
manager.createLargeMessage(manager.generateID() + 1, new 
CoreMessage().setDurable(true));
+      final SequentialFile file = largeMessage.getFile();
+
+      boolean fileExists = file.exists();
+      manager.getContext(true).storeLineUp();
+      Assert.assertTrue(fileExists);
+      manager.deleteLargeMessageFile(largeMessage);
+
+      final Thread currentThread = Thread.currentThread();
+      final CountDownLatch beforeLatch = new CountDownLatch(1);
+      final CountDownLatch afterStopLatch = new CountDownLatch(1);
+
+      //Simulate an executor task that begins after store done and ends after 
manager stop begins.
+      artemisExecutor.execute(() -> {
+         try {
+            //Wait until thread is ready to start executing manager stop.
+            Assert.assertTrue(beforeLatch.await(30000, TimeUnit.MILLISECONDS));
+
+            //Wait until thread executing manager stop is waiting for another 
thread.
+            Assert.assertTrue(Wait.waitFor(() -> currentThread.getState() == 
Thread.State.TIMED_WAITING, 30000));
+         } catch (Exception ignore) {
+         }
+      });
+
+      Mockito.doAnswer(invocationOnMock -> {
+         invocationOnMock.callRealMethod();
+
+         if (Thread.currentThread().equals(currentThread) &&
+            
invocationOnMock.getArgument(0).getClass().getName().contains(JournalStorageManager.class.getName()))
 {
+
+            //Simulate an executor task that ends after manager stop.
+            artemisExecutor.execute(() -> {
+               try {
+                  //Wait until manager stop is executed.
+                  afterStopLatch.await(30000, TimeUnit.MILLISECONDS);
+               } catch (Exception ignore) {
+               }
+            });
+         }
+
+         return null;
+      }).when(artemisExecutorWrapper).execute(Mockito.any(Runnable.class));
+
+      manager.getContext(true).done();
+
+      beforeLatch.countDown();
+      manager.stop();
+      fileExists = file.exists();
+      afterStopLatch.countDown();
+
+      Assert.assertFalse(fileExists);
+   }
+
 }

Reply via email to