This is an automated email from the ASF dual-hosted git repository.
namelchev pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 6f89e18d04a IGNITE-19248 Fixed snapshot restore hanging if the prepare
stage fails. (#10629)
6f89e18d04a is described below
commit 6f89e18d04ae500318e16c49d446c12b2156d5a2
Author: Nikita Amelchev <[email protected]>
AuthorDate: Fri Apr 7 18:37:17 2023 +0300
IGNITE-19248 Fixed snapshot restore hanging if the prepare stage fails.
(#10629)
---
.../snapshot/SnapshotRestoreProcess.java | 2 +
.../snapshot/IncrementalSnapshotTest.java | 60 +++++++++++++++++++++-
2 files changed, 60 insertions(+), 2 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
index 72a483f3c2b..be6136dd10e 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
@@ -865,6 +865,8 @@ public class SnapshotRestoreProcess {
if (failure != null) {
opCtx0.errHnd.accept(failure);
+ finishProcess(reqId, failure);
+
return;
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotTest.java
index cd3d7df1239..6195aac41de 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotTest.java
@@ -18,9 +18,9 @@
package org.apache.ignite.internal.processors.cache.persistence.snapshot;
import java.io.File;
-import java.util.Collections;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.UnaryOperator;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteException;
@@ -29,15 +29,24 @@ import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
import org.apache.ignite.internal.processors.cache.GridLocalConfigManager;
import org.apache.ignite.internal.processors.cache.StoredCacheData;
import
org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
+import org.apache.ignite.internal.util.distributed.DistributedProcess;
+import org.apache.ignite.internal.util.distributed.SingleNodeMessage;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.GridTestUtils;
import org.junit.Test;
+import static java.util.Collections.singleton;
+import static java.util.Collections.singletonList;
import static
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.snapshotMetaFileName;
+import static
org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.RESTORE_CACHE_GROUP_SNAPSHOT_PRELOAD;
+import static
org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.RESTORE_CACHE_GROUP_SNAPSHOT_PREPARE;
+import static
org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.RESTORE_INCREMENTAL_SNAPSHOT_START;
import static org.apache.ignite.testframework.GridTestUtils.assertThrows;
import static
org.apache.ignite.testframework.GridTestUtils.assertThrowsWithCause;
import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
@@ -304,7 +313,7 @@ public class IncrementalSnapshotTest extends
AbstractSnapshotSelfTest {
assertNotNull(cacheData);
- cacheData.queryEntities(Collections.singletonList(new
QueryEntity(String.class, Account.class)));
+ cacheData.queryEntities(singletonList(new QueryEntity(String.class,
Account.class)));
locCfgMgr.writeCacheData(cacheData, ccfgFile);
@@ -374,6 +383,53 @@ public class IncrementalSnapshotTest extends
AbstractSnapshotSelfTest {
}
+ /** @throws Exception if failed. */
+ @Test
+ public void testStagesFail() throws Exception {
+ assumeFalse("https://issues.apache.org/jira/browse/IGNITE-17819",
encryption);
+
+ DistributedProcess.DistributedProcessType[] stages = new
DistributedProcess.DistributedProcessType[] {
+ RESTORE_CACHE_GROUP_SNAPSHOT_PREPARE,
+ RESTORE_CACHE_GROUP_SNAPSHOT_PRELOAD,
+ RESTORE_INCREMENTAL_SNAPSHOT_START
+ };
+
+ IgniteEx srv = startGridsWithCache(
+ 2,
+ CACHE_KEYS_RANGE,
+ key -> new Account(key, key),
+ new CacheConfiguration<>(DEFAULT_CACHE_NAME)
+ );
+
+ createAndCheckSnapshot(srv, SNAPSHOT_NAME, null, TIMEOUT);
+ srv.snapshot().createIncrementalSnapshot(SNAPSHOT_NAME).get(TIMEOUT);
+
+ srv.destroyCache(DEFAULT_CACHE_NAME);
+
+ awaitPartitionMapExchange();
+
+ AtomicReference<DistributedProcess.DistributedProcessType> failStage =
new AtomicReference<>();
+
+ TestRecordingCommunicationSpi.spi(grid(1)).blockMessages((node, msg)
-> {
+ if (msg instanceof SingleNodeMessage) {
+ SingleNodeMessage<?> singleMsg = (SingleNodeMessage<?>)msg;
+
+ if (failStage.get().ordinal() == singleMsg.type())
+ GridTestUtils.setFieldValue(singleMsg, "err", new
IgniteException("Test exception."));
+ }
+
+ return false;
+ });
+
+ for (DistributedProcess.DistributedProcessType stage : stages) {
+ failStage.set(stage);
+
+ assertThrows(log,
+ () -> srv.snapshot().restoreSnapshot(SNAPSHOT_NAME,
singleton(DEFAULT_CACHE_NAME), 1).get(),
+ IgniteException.class, "Test exception.");
+ }
+ }
+
/** */
private void checkFailWhenCacheDestroyed(String cache2rvm, String errMsg)
throws Exception {
IgniteEx srv = startGridsWithCache(