This is an automated email from the ASF dual-hosted git repository.

namelchev pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 6f89e18d04a IGNITE-19248 Fixed snapshot restore hanging if the prepare 
stage fails. (#10629)
6f89e18d04a is described below

commit 6f89e18d04ae500318e16c49d446c12b2156d5a2
Author: Nikita Amelchev <[email protected]>
AuthorDate: Fri Apr 7 18:37:17 2023 +0300

    IGNITE-19248 Fixed snapshot restore hanging if the prepare stage fails. 
(#10629)
---
 .../snapshot/SnapshotRestoreProcess.java           |  2 +
 .../snapshot/IncrementalSnapshotTest.java          | 60 +++++++++++++++++++++-
 2 files changed, 60 insertions(+), 2 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
index 72a483f3c2b..be6136dd10e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
@@ -865,6 +865,8 @@ public class SnapshotRestoreProcess {
         if (failure != null) {
             opCtx0.errHnd.accept(failure);
 
+            finishProcess(reqId, failure);
+
             return;
         }
 
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotTest.java
index cd3d7df1239..6195aac41de 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotTest.java
@@ -18,9 +18,9 @@
 package org.apache.ignite.internal.processors.cache.persistence.snapshot;
 
 import java.io.File;
-import java.util.Collections;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.UnaryOperator;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteException;
@@ -29,15 +29,24 @@ import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.DataStorageConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
 import org.apache.ignite.internal.processors.cache.GridLocalConfigManager;
 import org.apache.ignite.internal.processors.cache.StoredCacheData;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
+import org.apache.ignite.internal.util.distributed.DistributedProcess;
+import org.apache.ignite.internal.util.distributed.SingleNodeMessage;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.GridTestUtils;
 import org.junit.Test;
 
+import static java.util.Collections.singleton;
+import static java.util.Collections.singletonList;
 import static 
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.snapshotMetaFileName;
+import static 
org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.RESTORE_CACHE_GROUP_SNAPSHOT_PRELOAD;
+import static 
org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.RESTORE_CACHE_GROUP_SNAPSHOT_PREPARE;
+import static 
org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.RESTORE_INCREMENTAL_SNAPSHOT_START;
 import static org.apache.ignite.testframework.GridTestUtils.assertThrows;
 import static 
org.apache.ignite.testframework.GridTestUtils.assertThrowsWithCause;
 import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
@@ -304,7 +313,7 @@ public class IncrementalSnapshotTest extends 
AbstractSnapshotSelfTest {
 
         assertNotNull(cacheData);
 
-        cacheData.queryEntities(Collections.singletonList(new 
QueryEntity(String.class, Account.class)));
+        cacheData.queryEntities(singletonList(new QueryEntity(String.class, 
Account.class)));
 
         locCfgMgr.writeCacheData(cacheData, ccfgFile);
 
@@ -374,6 +383,53 @@ public class IncrementalSnapshotTest extends 
AbstractSnapshotSelfTest {
 
     }
 
+    /** @throws Exception if failed. */
+    @Test
+    public void testStagesFail() throws Exception {
+        assumeFalse("https://issues.apache.org/jira/browse/IGNITE-17819";, 
encryption);
+
+        DistributedProcess.DistributedProcessType[] stages = new 
DistributedProcess.DistributedProcessType[] {
+            RESTORE_CACHE_GROUP_SNAPSHOT_PREPARE,
+            RESTORE_CACHE_GROUP_SNAPSHOT_PRELOAD,
+            RESTORE_INCREMENTAL_SNAPSHOT_START
+        };
+
+        IgniteEx srv = startGridsWithCache(
+            2,
+            CACHE_KEYS_RANGE,
+            key -> new Account(key, key),
+            new CacheConfiguration<>(DEFAULT_CACHE_NAME)
+        );
+
+        createAndCheckSnapshot(srv, SNAPSHOT_NAME, null, TIMEOUT);
+        srv.snapshot().createIncrementalSnapshot(SNAPSHOT_NAME).get(TIMEOUT);
+
+        srv.destroyCache(DEFAULT_CACHE_NAME);
+
+        awaitPartitionMapExchange();
+
+        AtomicReference<DistributedProcess.DistributedProcessType> failStage = 
new AtomicReference<>();
+
+        TestRecordingCommunicationSpi.spi(grid(1)).blockMessages((node, msg) 
-> {
+            if (msg instanceof SingleNodeMessage) {
+                SingleNodeMessage<?> singleMsg = (SingleNodeMessage<?>)msg;
+
+                if (failStage.get().ordinal() == singleMsg.type())
+                    GridTestUtils.setFieldValue(singleMsg, "err", new 
IgniteException("Test exception."));
+            }
+
+            return false;
+        });
+
+        for (DistributedProcess.DistributedProcessType stage : stages) {
+            failStage.set(stage);
+
+            assertThrows(log,
+                () -> srv.snapshot().restoreSnapshot(SNAPSHOT_NAME, 
singleton(DEFAULT_CACHE_NAME), 1).get(),
+                IgniteException.class, "Test exception.");
+        }
+    }
+
     /** */
     private void checkFailWhenCacheDestroyed(String cache2rvm, String errMsg) 
throws Exception {
         IgniteEx srv = startGridsWithCache(

Reply via email to