This is an automated email from the ASF dual-hosted git repository.
timoninmaxim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new f2a6e55302b IGNITE-26605 Refactor SnapshotHandlerRestoreTask (#12419)
f2a6e55302b is described below
commit f2a6e55302b6bbb710ce63e29aa3cf287e3ec038
Author: Daniil <[email protected]>
AuthorDate: Thu Nov 6 18:17:32 2025 +0300
IGNITE-26605 Refactor SnapshotHandlerRestoreTask (#12419)
---
.../persistence/snapshot/SnapshotCheckProcess.java | 28 +++++-
.../persistence/snapshot/SnapshotChecker.java | 17 +---
.../snapshot/SnapshotHandlerRestoreTask.java | 108 ++++-----------------
3 files changed, 46 insertions(+), 107 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotCheckProcess.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotCheckProcess.java
index 130b584dd32..f62fc22ff82 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotCheckProcess.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotCheckProcess.java
@@ -232,11 +232,37 @@ public class SnapshotCheckProcess {
});
}
- snpChecker.checkCustomHandlersResults(ctx.req.snapshotName(),
reduced);
+ Map<String, List<SnapshotHandlerResult<?>>> clusterResults = new
HashMap<>();
+ Collection<UUID> execNodes = new ArrayList<>(reduced.size());
+
+ // Checking node -> Map by consistend id.
+ for (Map.Entry<ClusterNode, Map<Object, Map<String,
SnapshotHandlerResult<?>>>> nodeRes : reduced.entrySet()) {
+ // Consistent id -> Map by handler name.
+ for (Map.Entry<Object, Map<String, SnapshotHandlerResult<?>>>
res : nodeRes.getValue().entrySet()) {
+ // Depending on the job mapping, we can get several
different results from one node.
+ execNodes.add(nodeRes.getKey().id());
+
+ Map<String, SnapshotHandlerResult<?>> nodeDataMap =
res.getValue();
+
+ assert nodeDataMap != null : "At least the default
snapshot restore handler should have been executed ";
+
+ for (Map.Entry<String, SnapshotHandlerResult<?>> entry :
nodeDataMap.entrySet()) {
+ String hndName = entry.getKey();
+
+ clusterResults.computeIfAbsent(hndName, v -> new
ArrayList<>()).add(entry.getValue());
+ }
+ }
+ }
+
+ kctx.cache().context().snapshotMgr().handlers().completeAll(
+ SnapshotHandlerType.RESTORE, ctx.req.snapshotName(),
clusterResults, execNodes, wrns -> {});
fut.onDone(new SnapshotPartitionsVerifyResult(ctx.clusterMetas,
null));
}
catch (Throwable err) {
+ log.warning("The snapshot operation will be aborted due to a
handler error " +
+ "[snapshot=" + ctx.req.snapshotName() + "].", err);
+
fut.onDone(err);
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotChecker.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotChecker.java
index 65c312f1cef..9c16c0be0a7 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotChecker.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotChecker.java
@@ -164,8 +164,8 @@ public class SnapshotChecker {
) {
// The handlers use or may use the same snapshot pool. If it is
configured with 1 thread, launching waiting task in
// the same pool might block it.
- return CompletableFuture.supplyAsync(() ->
- new SnapshotHandlerRestoreTask(kctx.grid(), log, sft, grps,
check).execute()
+ return CompletableFuture.supplyAsync(
+ new SnapshotHandlerRestoreTask(kctx.grid(), log, sft, grps, check)
);
}
@@ -190,17 +190,4 @@ public class SnapshotChecker {
}
}, executor);
}
-
- /**
- * Checks results of all the snapshot validation handlres.
- * @param snpName Snapshot name.
- * @param results Results: checking node -> snapshot part's consistend id
-> custom handler name -> handler result.
- * @see #invokeCustomHandlers(SnapshotMetadata, SnapshotFileTree,
Collection, boolean)
- */
- public void checkCustomHandlersResults(
- String snpName,
- Map<ClusterNode, Map<Object, Map<String, SnapshotHandlerResult<?>>>>
results
- ) {
- new SnapshotHandlerRestoreTask(kctx.grid(), log, null, null,
true).reduce(snpName, results);
- }
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotHandlerRestoreTask.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotHandlerRestoreTask.java
index 9a26cd94120..dccb0bc7c84 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotHandlerRestoreTask.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotHandlerRestoreTask.java
@@ -18,31 +18,30 @@
package org.apache.ignite.internal.processors.cache.persistence.snapshot;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
import java.util.Map;
-import java.util.UUID;
+import java.util.function.Supplier;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteEx;
import
org.apache.ignite.internal.processors.cache.persistence.filename.SnapshotFileTree;
/**
* Snapshot restore operation handling task.
*/
-public class SnapshotHandlerRestoreTask {
+public class SnapshotHandlerRestoreTask implements Supplier<Map<String,
SnapshotHandlerResult<Object>>> {
/** */
private final IgniteEx ignite;
/** */
- private final IgniteLogger log;
+ private final SnapshotFileTree sft;
/** */
- private final SnapshotHandlerRestoreJob job;
+ private final Collection<String> rqGrps;
+
+ /** */
+ private final boolean check;
/** */
SnapshotHandlerRestoreTask(
@@ -52,96 +51,23 @@ public class SnapshotHandlerRestoreTask {
Collection<String> grps,
boolean check
) {
- job = new SnapshotHandlerRestoreJob(ignite, sft, grps, check);
this.ignite = ignite;
- this.log = log;
+ this.sft = sft;
+ this.rqGrps = grps;
+ this.check = check;
}
/** */
- public Map<String, SnapshotHandlerResult<Object>> execute() {
- return job.execute0();
- }
-
- /** */
- public void reduce(
- String snapshotName,
- Map<ClusterNode, Map<Object, Map<String, SnapshotHandlerResult<?>>>>
results
- ) {
- Map<String, List<SnapshotHandlerResult<?>>> clusterResults = new
HashMap<>();
- Collection<UUID> execNodes = new ArrayList<>(results.size());
-
- // Checking node -> Map by consistend id.
- for (Map.Entry<ClusterNode, Map<Object, Map<String,
SnapshotHandlerResult<?>>>> nodeRes : results.entrySet()) {
- // Consistent id -> Map by handler name.
- for (Map.Entry<Object, Map<String, SnapshotHandlerResult<?>>> res
: nodeRes.getValue().entrySet()) {
- // Depending on the job mapping, we can get several different
results from one node.
- execNodes.add(nodeRes.getKey().id());
-
- Map<String, SnapshotHandlerResult<?>> nodeDataMap =
res.getValue();
-
- assert nodeDataMap != null : "At least the default snapshot
restore handler should have been executed ";
-
- for (Map.Entry<String, SnapshotHandlerResult<?>> entry :
nodeDataMap.entrySet()) {
- String hndName = entry.getKey();
-
- clusterResults.computeIfAbsent(hndName, v -> new
ArrayList<>()).add(entry.getValue());
- }
- }
- }
-
+ @Override public Map<String, SnapshotHandlerResult<Object>> get() {
try {
-
ignite.context().cache().context().snapshotMgr().handlers().completeAll(
- SnapshotHandlerType.RESTORE, snapshotName, clusterResults,
execNodes, wrns -> {});
- }
- catch (Exception e) {
- log.warning("The snapshot operation will be aborted due to a
handler error [snapshot=" + snapshotName + "].", e);
-
- throw new IgniteException(e);
- }
- }
-
- /** Invokes all {@link SnapshotHandlerType#RESTORE} handlers locally. */
- private static class SnapshotHandlerRestoreJob {
- /** */
- private final IgniteEx ignite;
+ IgniteSnapshotManager snpMgr =
ignite.context().cache().context().snapshotMgr();
+ SnapshotMetadata meta = snpMgr.readSnapshotMetadata(sft.meta());
- /** */
- private final SnapshotFileTree sft;
-
- /** */
- private final Collection<String> rqGrps;
-
- /** */
- private final boolean check;
-
- /**
- * @param grps Cache group names.
- * @param check If {@code true} check snapshot before restore.
- */
- SnapshotHandlerRestoreJob(
- IgniteEx ignite,
- SnapshotFileTree sft,
- Collection<String> grps,
- boolean check
- ) {
- this.ignite = ignite;
- this.sft = sft;
- this.rqGrps = grps;
- this.check = check;
+ return snpMgr.handlers().invokeAll(SnapshotHandlerType.RESTORE,
+ new SnapshotHandlerContext(meta, rqGrps, ignite.localNode(),
sft, false, check));
}
-
- /** */
- public Map<String, SnapshotHandlerResult<Object>> execute0() {
- try {
- IgniteSnapshotManager snpMgr =
ignite.context().cache().context().snapshotMgr();
- SnapshotMetadata meta =
snpMgr.readSnapshotMetadata(sft.meta());
-
- return snpMgr.handlers().invokeAll(SnapshotHandlerType.RESTORE,
- new SnapshotHandlerContext(meta, rqGrps,
ignite.localNode(), sft, false, check));
- }
- catch (IgniteCheckedException | IOException e) {
- throw new IgniteException(e);
- }
+ catch (IgniteCheckedException | IOException e) {
+ throw new IgniteException(e);
}
}
}