This is an automated email from the ASF dual-hosted git repository.

agoncharuk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new b31e11b  IGNITE-12780 Fix deadlock between db-checkpoint-thread and 
checkpoint-runner - Fixes #7816.
b31e11b is described below

commit b31e11b1ff33ede81cf5bc810f7e5485b21f004c
Author: Anton Kalashnikov <[email protected]>
AuthorDate: Mon Jun 1 11:50:22 2020 +0300

    IGNITE-12780 Fix deadlock between db-checkpoint-thread and 
checkpoint-runner - Fixes #7816.
    
    Signed-off-by: Alexey Goncharuk <[email protected]>
---
 .../GridCacheDatabaseSharedManager.java            |  20 ++--
 .../db/IgniteSequentialNodeCrashRecoveryTest.java  | 128 ++++++++++++++++++++-
 2 files changed, 137 insertions(+), 11 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index 498cbeea5..46b05a3 100755
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -2315,7 +2315,7 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
                 ", walArchive=" + persistenceCfg.getWalArchivePath() + "]");
         }
 
-        AtomicReference<IgniteCheckedException> applyError = new 
AtomicReference<>();
+        AtomicReference<Throwable> applyError = new AtomicReference<>();
 
         StripedExecutor exec = 
cctx.kernalContext().getStripedExecutorService();
 
@@ -2510,7 +2510,7 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
      */
     private void awaitApplyComplete(
         StripedExecutor exec,
-        AtomicReference<IgniteCheckedException> applyError
+        AtomicReference<Throwable> applyError
     ) throws IgniteCheckedException {
         try {
             // Await completion apply tasks in all stripes.
@@ -2521,8 +2521,11 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
         }
 
         // Checking error after all task applied.
-        if (applyError.get() != null)
-            throw applyError.get();
+        Throwable error = applyError.get();
+
+        if (error != null)
+            throw error instanceof IgniteCheckedException
+                ? (IgniteCheckedException)error : new 
IgniteCheckedException(error);
     }
 
     /**
@@ -2800,7 +2803,7 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
 
         long start = U.currentTimeMillis();
 
-        AtomicReference<IgniteCheckedException> applyError = new 
AtomicReference<>();
+        AtomicReference<Throwable> applyError = new AtomicReference<>();
 
         AtomicLong applied = new AtomicLong();
 
@@ -3102,7 +3105,7 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
         AtomicInteger cpPagesCnt = new AtomicInteger();
 
         // Shared refernce for tracking exception during write pages.
-        AtomicReference<IgniteCheckedException> writePagesError = new 
AtomicReference<>();
+        AtomicReference<Throwable> writePagesError = new AtomicReference<>();
 
         for (int stripeIdx = 0; stripeIdx < exec.stripesCount(); stripeIdx++) {
             exec.execute(stripeIdx, () -> {
@@ -3145,10 +3148,13 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
                         pagesWritten++;
                     }
                 }
-                catch (IgniteCheckedException e) {
+                catch (Throwable e) {
                     U.error(log, "Failed to write page to pageStore: " + res);
 
                     writePagesError.compareAndSet(null, e);
+
+                    if (e instanceof Error)
+                        throw (Error)e;
                 }
 
                 cpPagesCnt.addAndGet(pagesWritten);
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgniteSequentialNodeCrashRecoveryTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgniteSequentialNodeCrashRecoveryTest.java
index 216d1c2..b6853a2 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgniteSequentialNodeCrashRecoveryTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgniteSequentialNodeCrashRecoveryTest.java
@@ -23,6 +23,7 @@ import java.nio.ByteBuffer;
 import java.nio.file.OpenOption;
 import java.util.Collection;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
@@ -37,23 +38,38 @@ import org.apache.ignite.failure.FailureContext;
 import org.apache.ignite.failure.FailureHandler;
 import org.apache.ignite.failure.StopNodeFailureHandler;
 import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.IgnitionEx;
+import org.apache.ignite.internal.managers.discovery.CustomMessageWrapper;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
 import org.apache.ignite.internal.pagemem.FullPageId;
 import org.apache.ignite.internal.pagemem.PageIdAllocator;
 import org.apache.ignite.internal.pagemem.PageIdUtils;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
+import 
org.apache.ignite.internal.processors.cache.persistence.DbCheckpointListener;
 import 
org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import 
org.apache.ignite.internal.processors.cache.persistence.GridCacheOffheapManager;
 import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
 import 
org.apache.ignite.internal.processors.cache.persistence.file.FileIODecorator;
 import 
org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
 import 
org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
 import 
org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage;
 import 
org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryImpl;
+import 
org.apache.ignite.internal.processors.cache.persistence.partstate.PartitionAllocationMap;
+import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.spi.discovery.DiscoverySpi;
+import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
+import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCustomEventMessage;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-import org.junit.Ignore;
+import org.jetbrains.annotations.Nullable;
 import org.junit.Test;
 
 /**
@@ -69,6 +85,9 @@ public class IgniteSequentialNodeCrashRecoveryTest extends 
GridCommonAbstractTes
     /** */
     private FailureHandler failureHnd;
 
+    /** */
+    private DiscoverySpi discoverySpi;
+
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
@@ -84,6 +103,9 @@ public class IgniteSequentialNodeCrashRecoveryTest extends 
GridCommonAbstractTes
         if (fileIoFactory != null)
             dsCfg.setFileIOFactory(fileIoFactory);
 
+        if (discoverySpi != null)
+            cfg.setDiscoverySpi(discoverySpi);
+
         cfg
             .setDataStorageConfiguration(dsCfg)
             .setConsistentId(igniteInstanceName);
@@ -114,7 +136,6 @@ public class IgniteSequentialNodeCrashRecoveryTest extends 
GridCommonAbstractTes
      * @throws Exception if failed.
      */
     @Test
-    @Ignore("https://issues.apache.org/jira/browse/IGNITE-12780";)
     public void testCrashOnCheckpointAfterLogicalRecovery() throws Exception {
         IgniteEx g = startGrid(0);
 
@@ -129,7 +150,7 @@ public class IgniteSequentialNodeCrashRecoveryTest extends 
GridCommonAbstractTes
         {
             IgniteCache<Object, Object> cache = g.cache("cache");
 
-                // Now that checkpoints are disabled, put some data to the 
cache.
+            // Now that checkpoints are disabled, put some data to the cache.
             GridTestUtils.runMultiThreaded(() -> {
                 for (int i = 0; i < 400; i++)
                     cache.put(i % 100, Thread.currentThread().getName());
@@ -140,9 +161,15 @@ public class IgniteSequentialNodeCrashRecoveryTest extends 
GridCommonAbstractTes
 
         stopGrid(0);
 
-        CheckpointFailingIoFactory f = 
(CheckpointFailingIoFactory)(fileIoFactory = new 
CheckpointFailingIoFactory(false));
+        CheckpointFailingIoFactory f = (CheckpointFailingIoFactory)
+            (fileIoFactory = new CheckpointFailingIoFactory(false));
+
         StopLatchFailureHandler fh = (StopLatchFailureHandler)(failureHnd = 
new StopLatchFailureHandler());
 
+        //Blocking first exchange to prevent checkpoint on node start(reason = 
'node started').
+        BlockingDiscoverySpi ds = (BlockingDiscoverySpi)
+            (discoverySpi = new BlockingDiscoverySpi((m) -> m instanceof 
ChangeGlobalStateMessage));
+
         // Now start the node. Since the checkpoint was disabled, logical 
recovery will be performed.
         g = startGrid(0);
 
@@ -154,6 +181,8 @@ public class IgniteSequentialNodeCrashRecoveryTest extends 
GridCommonAbstractTes
 
         f.startFailing();
 
+        ds.clearBlock();
+
         triggerCheckpoint(g);
 
         assertTrue("Failed to wait for checkpoint failure", fh.waitFailed());
@@ -164,6 +193,7 @@ public class IgniteSequentialNodeCrashRecoveryTest extends 
GridCommonAbstractTes
         assertFalse(dirtyAfterLoad.isEmpty());
 
         fileIoFactory = new CheckingIoFactory(dirtyAfterLoad);
+        discoverySpi = null;
 
         g = startGrid(0);
 
@@ -206,6 +236,17 @@ public class IgniteSequentialNodeCrashRecoveryTest extends 
GridCommonAbstractTes
         GridCacheDatabaseSharedManager dbMgr = 
(GridCacheDatabaseSharedManager)g.context()
             .cache().context().database();
 
+        dbMgr.checkpointReadLock();
+        try {
+            //Moving free list pages to offheap.
+            for (CacheGroupContext group : g.context().cache().cacheGroups()) {
+                
((GridCacheOffheapManager)group.offheap()).onMarkCheckpointBegin(new 
DummyCheckpointContext());
+            }
+        }
+        finally {
+            dbMgr.checkpointReadUnlock();
+        }
+
         // Capture a set of dirty pages.
         PageMemoryImpl pageMem = 
(PageMemoryImpl)dbMgr.dataRegion("default").pageMemory();
 
@@ -251,6 +292,85 @@ public class IgniteSequentialNodeCrashRecoveryTest extends 
GridCommonAbstractTes
         }
     }
 
+    /** */
+    private static class DummyCheckpointContext implements 
DbCheckpointListener.Context {
+        /** {@inheritDoc} */
+        @Override public boolean nextSnapshot() {
+            return false;
+        }
+
+        /** {@inheritDoc} */
+        @Override public IgniteInternalFuture<?> finishedStateFut() {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public PartitionAllocationMap partitionStatMap() {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean needToSnapshot(String cacheOrGrpName) {
+            return false;
+        }
+
+        /** {@inheritDoc} */
+        @Override public @Nullable Executor executor() {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean hasPages() {
+            return false;
+        }
+    }
+
+    /** */
+    protected static class BlockingDiscoverySpi extends TcpDiscoverySpi {
+        /** Discovery custom message filter. */
+        private volatile IgnitePredicate<DiscoveryCustomMessage> blockPred;
+
+        /** **/
+        public BlockingDiscoverySpi(IgnitePredicate<DiscoveryCustomMessage> 
blockPred) {
+            this.blockPred = blockPred;
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void 
startMessageProcess(TcpDiscoveryAbstractMessage msg) {
+            if (msg instanceof TcpDiscoveryCustomEventMessage) {
+                IgnitePredicate<DiscoveryCustomMessage> pred = blockPred;
+
+                if (pred != null && 
pred.apply(extractCustomMessage((TcpDiscoveryCustomEventMessage)msg))) {
+                    try {
+                        GridTestUtils.waitForCondition(() -> blockPred == 
null, 20_000);
+                    }
+                    catch (IgniteInterruptedCheckedException e) {
+                        log.error("Fail to await release", e);
+                    }
+                }
+            }
+        }
+
+        /** */
+        private DiscoveryCustomMessage 
extractCustomMessage(TcpDiscoveryCustomEventMessage msg) {
+            DiscoverySpiCustomMessage msgObj = null;
+
+            try {
+                msgObj = msg.message(marshaller(), 
U.resolveClassLoader(ignite().configuration()));
+            }
+            catch (Throwable e) {
+                U.error(log, "Failed to unmarshal discovery custom message.", 
e);
+            }
+
+            return ((CustomMessageWrapper)msgObj).delegate();
+        }
+
+        /** Unblock discovery custom messages. */
+        public void clearBlock() {
+            blockPred = null;
+        }
+    }
+
     /**
      *
      */

Reply via email to