This is an automated email from the ASF dual-hosted git repository.

namelchev pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new f06c1480758 IGNITE-22319 Fixed node crashing if a snapshot restore 
cancelled due to network issues (#11361)
f06c1480758 is described below

commit f06c1480758cfc6ec6a47b0828c881aefffa3b04
Author: Nikita Amelchev <[email protected]>
AuthorDate: Wed Jul 31 10:07:05 2024 +0300

    IGNITE-22319 Fixed node crashing if a snapshot restore cancelled due to 
network issues (#11361)
---
 .../snapshot/IgniteSnapshotManager.java            | 94 +++++++++-------------
 .../snapshot/SnapshotRestoreProcess.java           |  2 +-
 .../IgniteSnapshotRestoreFromRemoteTest.java       | 55 +++++++++++++
 3 files changed, 95 insertions(+), 56 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
index be0d9afd6c9..dc893677f01 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
@@ -164,7 +164,6 @@ import org.apache.ignite.internal.util.GridBusyLock;
 import org.apache.ignite.internal.util.GridCloseableIteratorAdapter;
 import org.apache.ignite.internal.util.distributed.DistributedProcess;
 import org.apache.ignite.internal.util.distributed.InitMessage;
-import org.apache.ignite.internal.util.future.GridCompoundFuture;
 import org.apache.ignite.internal.util.future.GridCompoundIdentityFuture;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -722,6 +721,8 @@ public class IgniteSnapshotManager extends 
GridCacheSharedManagerAdapter
         busyLock.block();
 
         try {
+            snpRmtMgr.stop();
+
             restoreCacheGrpProc.interrupt(new NodeStoppingException("Node is 
stopping."));
 
             // Try stop all snapshot processing if not yet.
@@ -730,8 +731,6 @@ public class IgniteSnapshotManager extends 
GridCacheSharedManagerAdapter
 
             locSnpTasks.clear();
 
-            snpRmtMgr.stop();
-
             synchronized (snpOpMux) {
                 if (clusterSnpFut != null) {
                     clusterSnpFut.onDone(new 
NodeStoppingException(SNP_NODE_STOPPING_ERR_MSG));
@@ -3739,93 +3738,78 @@ public class IgniteSnapshotManager extends 
GridCacheSharedManagerAdapter
         /** {@code true} if the node is stopping. */
         private boolean stopping;
 
-        /**
-         * @param next New task for scheduling.
-         */
-        public synchronized void 
submit(IgniteSnapshotManager.RemoteSnapshotFilesRecevier next) {
+        /**  @param next New task for scheduling. */
+        public void submit(IgniteSnapshotManager.RemoteSnapshotFilesRecevier 
next) {
             assert next != null;
 
-            if (stopping) {
-                next.acceptException(new 
IgniteException(SNP_NODE_STOPPING_ERR_MSG));
+            synchronized (this) {
+                if (stopping) {
+                    next.acceptException(new 
IgniteException(SNP_NODE_STOPPING_ERR_MSG));
 
-                return;
-            }
+                    return;
+                }
 
-            RemoteSnapshotFilesRecevier curr = active;
+                if (active != null && !active.isDone()) {
+                    queue.offer(next);
 
-            if (curr == null || curr.isDone()) {
-                next.listen(this::scheduleNext);
+                    return;
+                }
 
                 active = next;
 
-                next.init();
+                active.listen(this::scheduleNext);
             }
-            else
-                queue.offer(next);
+
+            next.init();
         }
 
         /** Schedule next async receiver. */
-        private synchronized void scheduleNext() {
+        private void scheduleNext() {
             RemoteSnapshotFilesRecevier next = queue.poll();
 
-            if (next == null)
+            while (next != null && next.isDone())
+                next = queue.poll();
+
+            if (next == null) {
+                active = null;
+
                 return;
+            }
 
             submit(next);
         }
 
         /** Stopping handler. */
-        public synchronized void stop() {
-            stopping = true;
+        public void stop() {
+            synchronized (this) {
+                stopping = true;
+            }
 
-            if (active != null)
-                active.acceptException(new 
IgniteException(SNP_NODE_STOPPING_ERR_MSG));
+            IgniteException ex = new 
IgniteException(SNP_NODE_STOPPING_ERR_MSG);
 
             RemoteSnapshotFilesRecevier r;
 
             while ((r = queue.poll()) != null)
-                r.acceptException(new 
IgniteException(SNP_NODE_STOPPING_ERR_MSG));
-
-            Set<RemoteSnapshotFilesRecevier> futs = activeTasks();
-            GridCompoundFuture<Void, Void> stopFut = new 
GridCompoundFuture<>();
+                r.acceptException(ex);
 
-            try {
-                for (IgniteInternalFuture<Void> fut : futs)
-                    stopFut.add(fut);
-
-                stopFut.markInitialized().get();
-            }
-            catch (IgniteCheckedException e) {
-                throw new IgniteException(e);
-            }
+            if (active != null)
+                active.acceptException(ex);
         }
 
-        /**
-         * @param nodeId A node left the cluster.
-         */
+        /** @param nodeId A node left the cluster. */
         public void onNodeLeft(UUID nodeId) {
-            Set<RemoteSnapshotFilesRecevier> futs = activeTasks();
             ClusterTopologyCheckedException ex = new 
ClusterTopologyCheckedException("The node from which a snapshot has been " +
                 "requested left the grid");
 
-            futs.forEach(t -> {
-                if (t.rmtNodeId.equals(nodeId))
-                    t.acceptException(ex);
+            queue.forEach(r -> {
+                if (r.stopChecker.getAsBoolean() || r.rmtNodeId.equals(nodeId))
+                    r.acceptException(ex);
             });
-        }
-
-        /**
-         * @return The set of currently scheduled tasks, some of them may be 
already completed.
-         */
-        private Set<RemoteSnapshotFilesRecevier> activeTasks() {
-            Set<RemoteSnapshotFilesRecevier> futs = new HashSet<>(queue);
 
-            RemoteSnapshotFilesRecevier active0 = active;
-
-            if (active0 != null)
-                futs.add(active0);
+            RemoteSnapshotFilesRecevier task = active;
 
-            return futs;
+            if (task != null && !task.isDone() && 
(task.stopChecker.getAsBoolean() || task.rmtNodeId.equals(nodeId)))
+                task.acceptException(ex);
         }
 
         /** {@inheritDoc} */
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
index 16511cb3c4e..c02deccd103 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
@@ -1201,7 +1201,7 @@ public class SnapshotRestoreProcess {
         catch (Exception ex) {
             opCtx0.errHnd.accept(ex);
 
-            return new GridFinishedFuture<>(ex);
+            retFut.onDone(ex);
         }
 
         return retFut;
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotRestoreFromRemoteTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotRestoreFromRemoteTest.java
index 40bd60290e6..cbb5767e9c4 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotRestoreFromRemoteTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotRestoreFromRemoteTest.java
@@ -30,6 +30,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.BiFunction;
 import java.util.function.Function;
@@ -64,6 +66,7 @@ import static 
org.apache.ignite.events.EventType.EVT_CLUSTER_SNAPSHOT_RESTORE_ST
 import static 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.partId;
 import static 
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.resolveSnapshotWorkDirectory;
 import static org.apache.ignite.testframework.GridTestUtils.assertContains;
+import static 
org.apache.ignite.testframework.GridTestUtils.assertThrowsWithCause;
 
 /** */
 public class IgniteSnapshotRestoreFromRemoteTest extends 
IgniteClusterSnapshotRestoreBaseTest {
@@ -323,6 +326,58 @@ public class IgniteSnapshotRestoreFromRemoteTest extends 
IgniteClusterSnapshotRe
         ensureCacheAbsent(dfltCacheCfg);
     }
 
+    /** @throws Exception If failed. */
+    @Test
+    public void testRestoreConnectionLost() throws Exception {
+        IgniteEx coord = startDedicatedGrids(SECOND_CLUSTER_PREFIX, 2);
+
+        copyAndShuffle(snpParts, G.allGrids());
+
+        // Start a new node without snapshot working directory.
+        IgniteEx emptyNode = startDedicatedGrid(SECOND_CLUSTER_PREFIX, 2);
+
+        emptyNode.cluster().state(ClusterState.ACTIVE);
+
+        emptyNode.cache(DEFAULT_CACHE_NAME).destroy();
+
+        awaitPartitionMapExchange();
+
+        CountDownLatch restoreStarted = new CountDownLatch(1);
+        CountDownLatch nodeStopped = new CountDownLatch(1);
+
+        IgniteSnapshotManager mgr = snp(coord);
+
+        mgr.remoteSnapshotSenderFactory(new BiFunction<String, UUID, 
SnapshotSender>() {
+            @Override public SnapshotSender apply(String s, UUID uuid) {
+                return new DelegateSnapshotSender(log, 
mgr.snapshotExecutorService(), mgr.remoteSnapshotSenderFactory(s, uuid)) {
+                    @Override public void sendPart0(File part, String 
cacheDirName, GroupPartitionId pair, Long length) {
+                        delegate.sendPart0(part, cacheDirName, pair, length);
+
+                        restoreStarted.countDown();
+
+                        try {
+                            nodeStopped.await(TIMEOUT, TimeUnit.MILLISECONDS);
+                        }
+                        catch (InterruptedException e) {
+                            throw new RuntimeException(e);
+                        }
+                    }
+                };
+            }
+        });
+
+        // Restore all cache groups.
+        IgniteFuture<Void> fut = 
emptyNode.snapshot().restoreSnapshot(SNAPSHOT_NAME, null);
+
+        restoreStarted.await(TIMEOUT, TimeUnit.MILLISECONDS);
+
+        coord.close();
+
+        nodeStopped.countDown();
+
+        assertThrowsWithCause(() -> fut.get(TIMEOUT), IgniteException.class);
+    }
+
     /**
      * @param snpParts Snapshot parts.
      * @param toNodes List of toNodes to copy parts to.

Reply via email to