This is an automated email from the ASF dual-hosted git repository.
agoncharuk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new b31e11b IGNITE-12780 Fix deadlock between db-checkpoint-thread and
checkpoint-runner - Fixes #7816.
b31e11b is described below
commit b31e11b1ff33ede81cf5bc810f7e5485b21f004c
Author: Anton Kalashnikov <[email protected]>
AuthorDate: Mon Jun 1 11:50:22 2020 +0300
IGNITE-12780 Fix deadlock between db-checkpoint-thread and
checkpoint-runner - Fixes #7816.
Signed-off-by: Alexey Goncharuk <[email protected]>
---
.../GridCacheDatabaseSharedManager.java | 20 ++--
.../db/IgniteSequentialNodeCrashRecoveryTest.java | 128 ++++++++++++++++++++-
2 files changed, 137 insertions(+), 11 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index 498cbeea5..46b05a3 100755
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -2315,7 +2315,7 @@ public class GridCacheDatabaseSharedManager extends
IgniteCacheDatabaseSharedMan
", walArchive=" + persistenceCfg.getWalArchivePath() + "]");
}
- AtomicReference<IgniteCheckedException> applyError = new
AtomicReference<>();
+ AtomicReference<Throwable> applyError = new AtomicReference<>();
StripedExecutor exec =
cctx.kernalContext().getStripedExecutorService();
@@ -2510,7 +2510,7 @@ public class GridCacheDatabaseSharedManager extends
IgniteCacheDatabaseSharedMan
*/
private void awaitApplyComplete(
StripedExecutor exec,
- AtomicReference<IgniteCheckedException> applyError
+ AtomicReference<Throwable> applyError
) throws IgniteCheckedException {
try {
// Await completion apply tasks in all stripes.
@@ -2521,8 +2521,11 @@ public class GridCacheDatabaseSharedManager extends
IgniteCacheDatabaseSharedMan
}
// Checking error after all task applied.
- if (applyError.get() != null)
- throw applyError.get();
+ Throwable error = applyError.get();
+
+ if (error != null)
+ throw error instanceof IgniteCheckedException
+ ? (IgniteCheckedException)error : new
IgniteCheckedException(error);
}
/**
@@ -2800,7 +2803,7 @@ public class GridCacheDatabaseSharedManager extends
IgniteCacheDatabaseSharedMan
long start = U.currentTimeMillis();
- AtomicReference<IgniteCheckedException> applyError = new
AtomicReference<>();
+ AtomicReference<Throwable> applyError = new AtomicReference<>();
AtomicLong applied = new AtomicLong();
@@ -3102,7 +3105,7 @@ public class GridCacheDatabaseSharedManager extends
IgniteCacheDatabaseSharedMan
AtomicInteger cpPagesCnt = new AtomicInteger();
// Shared refernce for tracking exception during write pages.
- AtomicReference<IgniteCheckedException> writePagesError = new
AtomicReference<>();
+ AtomicReference<Throwable> writePagesError = new AtomicReference<>();
for (int stripeIdx = 0; stripeIdx < exec.stripesCount(); stripeIdx++) {
exec.execute(stripeIdx, () -> {
@@ -3145,10 +3148,13 @@ public class GridCacheDatabaseSharedManager extends
IgniteCacheDatabaseSharedMan
pagesWritten++;
}
}
- catch (IgniteCheckedException e) {
+ catch (Throwable e) {
U.error(log, "Failed to write page to pageStore: " + res);
writePagesError.compareAndSet(null, e);
+
+ if (e instanceof Error)
+ throw (Error)e;
}
cpPagesCnt.addAndGet(pagesWritten);
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgniteSequentialNodeCrashRecoveryTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgniteSequentialNodeCrashRecoveryTest.java
index 216d1c2..b6853a2 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgniteSequentialNodeCrashRecoveryTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgniteSequentialNodeCrashRecoveryTest.java
@@ -23,6 +23,7 @@ import java.nio.ByteBuffer;
import java.nio.file.OpenOption;
import java.util.Collection;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
@@ -37,23 +38,38 @@ import org.apache.ignite.failure.FailureContext;
import org.apache.ignite.failure.FailureHandler;
import org.apache.ignite.failure.StopNodeFailureHandler;
import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.IgnitionEx;
+import org.apache.ignite.internal.managers.discovery.CustomMessageWrapper;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.pagemem.FullPageId;
import org.apache.ignite.internal.pagemem.PageIdAllocator;
import org.apache.ignite.internal.pagemem.PageIdUtils;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
+import
org.apache.ignite.internal.processors.cache.persistence.DbCheckpointListener;
import
org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import
org.apache.ignite.internal.processors.cache.persistence.GridCacheOffheapManager;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
import
org.apache.ignite.internal.processors.cache.persistence.file.FileIODecorator;
import
org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
import
org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
import
org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage;
import
org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryImpl;
+import
org.apache.ignite.internal.processors.cache.persistence.partstate.PartitionAllocationMap;
+import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.spi.discovery.DiscoverySpi;
+import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
+import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCustomEventMessage;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-import org.junit.Ignore;
+import org.jetbrains.annotations.Nullable;
import org.junit.Test;
/**
@@ -69,6 +85,9 @@ public class IgniteSequentialNodeCrashRecoveryTest extends
GridCommonAbstractTes
/** */
private FailureHandler failureHnd;
+ /** */
+ private DiscoverySpi discoverySpi;
+
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
@@ -84,6 +103,9 @@ public class IgniteSequentialNodeCrashRecoveryTest extends
GridCommonAbstractTes
if (fileIoFactory != null)
dsCfg.setFileIOFactory(fileIoFactory);
+ if (discoverySpi != null)
+ cfg.setDiscoverySpi(discoverySpi);
+
cfg
.setDataStorageConfiguration(dsCfg)
.setConsistentId(igniteInstanceName);
@@ -114,7 +136,6 @@ public class IgniteSequentialNodeCrashRecoveryTest extends
GridCommonAbstractTes
* @throws Exception if failed.
*/
@Test
- @Ignore("https://issues.apache.org/jira/browse/IGNITE-12780")
public void testCrashOnCheckpointAfterLogicalRecovery() throws Exception {
IgniteEx g = startGrid(0);
@@ -129,7 +150,7 @@ public class IgniteSequentialNodeCrashRecoveryTest extends
GridCommonAbstractTes
{
IgniteCache<Object, Object> cache = g.cache("cache");
- // Now that checkpoints are disabled, put some data to the
cache.
+ // Now that checkpoints are disabled, put some data to the cache.
GridTestUtils.runMultiThreaded(() -> {
for (int i = 0; i < 400; i++)
cache.put(i % 100, Thread.currentThread().getName());
@@ -140,9 +161,15 @@ public class IgniteSequentialNodeCrashRecoveryTest extends
GridCommonAbstractTes
stopGrid(0);
- CheckpointFailingIoFactory f =
(CheckpointFailingIoFactory)(fileIoFactory = new
CheckpointFailingIoFactory(false));
+ CheckpointFailingIoFactory f = (CheckpointFailingIoFactory)
+ (fileIoFactory = new CheckpointFailingIoFactory(false));
+
StopLatchFailureHandler fh = (StopLatchFailureHandler)(failureHnd =
new StopLatchFailureHandler());
+ //Blocking first exchange to prevent checkpoint on node start(reason =
'node started').
+ BlockingDiscoverySpi ds = (BlockingDiscoverySpi)
+ (discoverySpi = new BlockingDiscoverySpi((m) -> m instanceof
ChangeGlobalStateMessage));
+
// Now start the node. Since the checkpoint was disabled, logical
recovery will be performed.
g = startGrid(0);
@@ -154,6 +181,8 @@ public class IgniteSequentialNodeCrashRecoveryTest extends
GridCommonAbstractTes
f.startFailing();
+ ds.clearBlock();
+
triggerCheckpoint(g);
assertTrue("Failed to wait for checkpoint failure", fh.waitFailed());
@@ -164,6 +193,7 @@ public class IgniteSequentialNodeCrashRecoveryTest extends
GridCommonAbstractTes
assertFalse(dirtyAfterLoad.isEmpty());
fileIoFactory = new CheckingIoFactory(dirtyAfterLoad);
+ discoverySpi = null;
g = startGrid(0);
@@ -206,6 +236,17 @@ public class IgniteSequentialNodeCrashRecoveryTest extends
GridCommonAbstractTes
GridCacheDatabaseSharedManager dbMgr =
(GridCacheDatabaseSharedManager)g.context()
.cache().context().database();
+ dbMgr.checkpointReadLock();
+ try {
+ //Moving free list pages to offheap.
+ for (CacheGroupContext group : g.context().cache().cacheGroups()) {
+
((GridCacheOffheapManager)group.offheap()).onMarkCheckpointBegin(new
DummyCheckpointContext());
+ }
+ }
+ finally {
+ dbMgr.checkpointReadUnlock();
+ }
+
// Capture a set of dirty pages.
PageMemoryImpl pageMem =
(PageMemoryImpl)dbMgr.dataRegion("default").pageMemory();
@@ -251,6 +292,85 @@ public class IgniteSequentialNodeCrashRecoveryTest extends
GridCommonAbstractTes
}
}
+ /** */
+ private static class DummyCheckpointContext implements
DbCheckpointListener.Context {
+ /** {@inheritDoc} */
+ @Override public boolean nextSnapshot() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteInternalFuture<?> finishedStateFut() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public PartitionAllocationMap partitionStatMap() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean needToSnapshot(String cacheOrGrpName) {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public @Nullable Executor executor() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean hasPages() {
+ return false;
+ }
+ }
+
+ /** */
+ protected static class BlockingDiscoverySpi extends TcpDiscoverySpi {
+ /** Discovery custom message filter. */
+ private volatile IgnitePredicate<DiscoveryCustomMessage> blockPred;
+
+ /** **/
+ public BlockingDiscoverySpi(IgnitePredicate<DiscoveryCustomMessage>
blockPred) {
+ this.blockPred = blockPred;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void
startMessageProcess(TcpDiscoveryAbstractMessage msg) {
+ if (msg instanceof TcpDiscoveryCustomEventMessage) {
+ IgnitePredicate<DiscoveryCustomMessage> pred = blockPred;
+
+ if (pred != null &&
pred.apply(extractCustomMessage((TcpDiscoveryCustomEventMessage)msg))) {
+ try {
+ GridTestUtils.waitForCondition(() -> blockPred ==
null, 20_000);
+ }
+ catch (IgniteInterruptedCheckedException e) {
+ log.error("Fail to await release", e);
+ }
+ }
+ }
+ }
+
+ /** */
+ private DiscoveryCustomMessage
extractCustomMessage(TcpDiscoveryCustomEventMessage msg) {
+ DiscoverySpiCustomMessage msgObj = null;
+
+ try {
+ msgObj = msg.message(marshaller(),
U.resolveClassLoader(ignite().configuration()));
+ }
+ catch (Throwable e) {
+ U.error(log, "Failed to unmarshal discovery custom message.",
e);
+ }
+
+ return ((CustomMessageWrapper)msgObj).delegate();
+ }
+
+ /** Unblock discovery custom messages. */
+ public void clearBlock() {
+ blockPred = null;
+ }
+ }
+
/**
*
*/