This is an automated email from the ASF dual-hosted git repository.
namelchev pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new f06c1480758 IGNITE-22319 Fixed node crashing if a snapshot restore
cancelled due to network issues (#11361)
f06c1480758 is described below
commit f06c1480758cfc6ec6a47b0828c881aefffa3b04
Author: Nikita Amelchev <[email protected]>
AuthorDate: Wed Jul 31 10:07:05 2024 +0300
IGNITE-22319 Fixed node crashing if a snapshot restore cancelled due to
network issues (#11361)
---
.../snapshot/IgniteSnapshotManager.java | 94 +++++++++-------------
.../snapshot/SnapshotRestoreProcess.java | 2 +-
.../IgniteSnapshotRestoreFromRemoteTest.java | 55 +++++++++++++
3 files changed, 95 insertions(+), 56 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
index be0d9afd6c9..dc893677f01 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
@@ -164,7 +164,6 @@ import org.apache.ignite.internal.util.GridBusyLock;
import org.apache.ignite.internal.util.GridCloseableIteratorAdapter;
import org.apache.ignite.internal.util.distributed.DistributedProcess;
import org.apache.ignite.internal.util.distributed.InitMessage;
-import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.future.GridCompoundIdentityFuture;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -722,6 +721,8 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
busyLock.block();
try {
+ snpRmtMgr.stop();
+
restoreCacheGrpProc.interrupt(new NodeStoppingException("Node is
stopping."));
// Try stop all snapshot processing if not yet.
@@ -730,8 +731,6 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
locSnpTasks.clear();
- snpRmtMgr.stop();
-
synchronized (snpOpMux) {
if (clusterSnpFut != null) {
clusterSnpFut.onDone(new
NodeStoppingException(SNP_NODE_STOPPING_ERR_MSG));
@@ -3739,93 +3738,78 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
/** {@code true} if the node is stopping. */
private boolean stopping;
- /**
- * @param next New task for scheduling.
- */
- public synchronized void
submit(IgniteSnapshotManager.RemoteSnapshotFilesRecevier next) {
+ /** @param next New task for scheduling. */
+ public void submit(IgniteSnapshotManager.RemoteSnapshotFilesRecevier
next) {
assert next != null;
- if (stopping) {
- next.acceptException(new
IgniteException(SNP_NODE_STOPPING_ERR_MSG));
+ synchronized (this) {
+ if (stopping) {
+ next.acceptException(new
IgniteException(SNP_NODE_STOPPING_ERR_MSG));
- return;
- }
+ return;
+ }
- RemoteSnapshotFilesRecevier curr = active;
+ if (active != null && !active.isDone()) {
+ queue.offer(next);
- if (curr == null || curr.isDone()) {
- next.listen(this::scheduleNext);
+ return;
+ }
active = next;
- next.init();
+ active.listen(this::scheduleNext);
}
- else
- queue.offer(next);
+
+ next.init();
}
/** Schedule next async receiver. */
- private synchronized void scheduleNext() {
+ private void scheduleNext() {
RemoteSnapshotFilesRecevier next = queue.poll();
- if (next == null)
+ while (next != null && next.isDone())
+ next = queue.poll();
+
+ if (next == null) {
+ active = null;
+
return;
+ }
submit(next);
}
/** Stopping handler. */
- public synchronized void stop() {
- stopping = true;
+ public void stop() {
+ synchronized (this) {
+ stopping = true;
+ }
- if (active != null)
- active.acceptException(new
IgniteException(SNP_NODE_STOPPING_ERR_MSG));
+ IgniteException ex = new
IgniteException(SNP_NODE_STOPPING_ERR_MSG);
RemoteSnapshotFilesRecevier r;
while ((r = queue.poll()) != null)
- r.acceptException(new
IgniteException(SNP_NODE_STOPPING_ERR_MSG));
-
- Set<RemoteSnapshotFilesRecevier> futs = activeTasks();
- GridCompoundFuture<Void, Void> stopFut = new
GridCompoundFuture<>();
+ r.acceptException(ex);
- try {
- for (IgniteInternalFuture<Void> fut : futs)
- stopFut.add(fut);
-
- stopFut.markInitialized().get();
- }
- catch (IgniteCheckedException e) {
- throw new IgniteException(e);
- }
+ if (active != null)
+ active.acceptException(ex);
}
- /**
- * @param nodeId A node left the cluster.
- */
+ /** @param nodeId A node left the cluster. */
public void onNodeLeft(UUID nodeId) {
- Set<RemoteSnapshotFilesRecevier> futs = activeTasks();
ClusterTopologyCheckedException ex = new
ClusterTopologyCheckedException("The node from which a snapshot has been " +
"requested left the grid");
- futs.forEach(t -> {
- if (t.rmtNodeId.equals(nodeId))
- t.acceptException(ex);
+ queue.forEach(r -> {
+ if (r.stopChecker.getAsBoolean() || r.rmtNodeId.equals(nodeId))
+ r.acceptException(ex);
});
- }
-
- /**
- * @return The set of currently scheduled tasks, some of them may be
already completed.
- */
- private Set<RemoteSnapshotFilesRecevier> activeTasks() {
- Set<RemoteSnapshotFilesRecevier> futs = new HashSet<>(queue);
- RemoteSnapshotFilesRecevier active0 = active;
-
- if (active0 != null)
- futs.add(active0);
+ RemoteSnapshotFilesRecevier task = active;
- return futs;
+ if (task != null && !task.isDone() &&
(task.stopChecker.getAsBoolean() || task.rmtNodeId.equals(nodeId)))
+ task.acceptException(ex);
}
/** {@inheritDoc} */
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
index 16511cb3c4e..c02deccd103 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
@@ -1201,7 +1201,7 @@ public class SnapshotRestoreProcess {
catch (Exception ex) {
opCtx0.errHnd.accept(ex);
- return new GridFinishedFuture<>(ex);
+ retFut.onDone(ex);
}
return retFut;
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotRestoreFromRemoteTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotRestoreFromRemoteTest.java
index 40bd60290e6..cbb5767e9c4 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotRestoreFromRemoteTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotRestoreFromRemoteTest.java
@@ -30,6 +30,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.function.Function;
@@ -64,6 +66,7 @@ import static
org.apache.ignite.events.EventType.EVT_CLUSTER_SNAPSHOT_RESTORE_ST
import static
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.partId;
import static
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.resolveSnapshotWorkDirectory;
import static org.apache.ignite.testframework.GridTestUtils.assertContains;
+import static
org.apache.ignite.testframework.GridTestUtils.assertThrowsWithCause;
/** */
public class IgniteSnapshotRestoreFromRemoteTest extends
IgniteClusterSnapshotRestoreBaseTest {
@@ -323,6 +326,58 @@ public class IgniteSnapshotRestoreFromRemoteTest extends
IgniteClusterSnapshotRe
ensureCacheAbsent(dfltCacheCfg);
}
+ /** @throws Exception If failed. */
+ @Test
+ public void testRestoreConnectionLost() throws Exception {
+ IgniteEx coord = startDedicatedGrids(SECOND_CLUSTER_PREFIX, 2);
+
+ copyAndShuffle(snpParts, G.allGrids());
+
+ // Start a new node without snapshot working directory.
+ IgniteEx emptyNode = startDedicatedGrid(SECOND_CLUSTER_PREFIX, 2);
+
+ emptyNode.cluster().state(ClusterState.ACTIVE);
+
+ emptyNode.cache(DEFAULT_CACHE_NAME).destroy();
+
+ awaitPartitionMapExchange();
+
+ CountDownLatch restoreStarted = new CountDownLatch(1);
+ CountDownLatch nodeStopped = new CountDownLatch(1);
+
+ IgniteSnapshotManager mgr = snp(coord);
+
+ mgr.remoteSnapshotSenderFactory(new BiFunction<String, UUID,
SnapshotSender>() {
+ @Override public SnapshotSender apply(String s, UUID uuid) {
+ return new DelegateSnapshotSender(log,
mgr.snapshotExecutorService(), mgr.remoteSnapshotSenderFactory(s, uuid)) {
+ @Override public void sendPart0(File part, String
cacheDirName, GroupPartitionId pair, Long length) {
+ delegate.sendPart0(part, cacheDirName, pair, length);
+
+ restoreStarted.countDown();
+
+ try {
+ nodeStopped.await(TIMEOUT, TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ }
+ });
+
+ // Restore all cache groups.
+ IgniteFuture<Void> fut =
emptyNode.snapshot().restoreSnapshot(SNAPSHOT_NAME, null);
+
+ restoreStarted.await(TIMEOUT, TimeUnit.MILLISECONDS);
+
+ coord.close();
+
+ nodeStopped.countDown();
+
+ assertThrowsWithCause(() -> fut.get(TIMEOUT), IgniteException.class);
+ }
+
/**
* @param snpParts Snapshot parts.
* @param toNodes List of toNodes to copy parts to.