This is an automated email from the ASF dual-hosted git repository.

timoninmaxim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 37967efadd8 IGNITE-26604 Refactor SnapshotMetadataVerificationTask 
(#12418)
37967efadd8 is described below

commit 37967efadd841a6cb052534771cb465dac778c58
Author: Daniil <[email protected]>
AuthorDate: Fri Oct 24 11:48:39 2025 +0300

    IGNITE-26604 Refactor SnapshotMetadataVerificationTask (#12418)
---
 .../persistence/snapshot/SnapshotChecker.java      |  49 +++-
 .../snapshot/SnapshotMetadataVerificationTask.java | 300 ++++++++-------------
 2 files changed, 161 insertions(+), 188 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotChecker.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotChecker.java
index 8ccffd74ca8..721d6886def 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotChecker.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotChecker.java
@@ -17,8 +17,12 @@
 package org.apache.ignite.internal.processors.cache.persistence.snapshot;
 
 import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import org.apache.ignite.IgniteCheckedException;
@@ -59,8 +63,10 @@ public class SnapshotChecker {
         int incIdx,
         @Nullable Collection<Integer> grpIds
     ) {
-        return CompletableFuture.supplyAsync(() ->
-            new SnapshotMetadataVerificationTask(kctx.grid(), log, sft, 
incIdx, grpIds).execute(), executor);
+        return CompletableFuture.supplyAsync(
+            new SnapshotMetadataVerificationTask(kctx.grid(), log, sft, 
incIdx, grpIds),
+            executor
+        );
     }
 
     /** */
@@ -106,8 +112,43 @@ public class SnapshotChecker {
     }
 
     /** */
-    public Map<ClusterNode, Exception> reduceMetasResults(SnapshotFileTree 
sft, Map<ClusterNode, List<SnapshotMetadata>> metas) {
-        return new SnapshotMetadataVerificationTask(kctx.grid(), log, sft, 0, 
null).reduce(metas);
+    public Map<ClusterNode, Exception> reduceMetasResults(SnapshotFileTree 
sft, Map<ClusterNode, List<SnapshotMetadata>> results) {
+        Map<ClusterNode, Exception> exs = new HashMap<>();
+
+        SnapshotMetadata first = null;
+        Set<String> baselineMetasLeft = Collections.emptySet();
+
+        for (Map.Entry<ClusterNode, List<SnapshotMetadata>> res : 
results.entrySet()) {
+            List<SnapshotMetadata> metas = res.getValue();
+
+            for (SnapshotMetadata meta : metas) {
+                if (first == null) {
+                    first = meta;
+
+                    baselineMetasLeft = new HashSet<>(meta.baselineNodes());
+                }
+
+                baselineMetasLeft.remove(meta.consistentId());
+
+                if (!first.sameSnapshot(meta)) {
+                    exs.put(res.getKey(),
+                        new IgniteException("An error occurred during 
comparing snapshot metadata from cluster nodes " +
+                            "[first=" + first + ", meta=" + meta + ", nodeId=" 
+ res.getKey().id() + ']'));
+                }
+            }
+        }
+
+        if (first == null && exs.isEmpty()) {
+            throw new IllegalArgumentException("Snapshot does not exists 
[snapshot=" + sft.name()
+                + ", baseDir=" + sft.root() + ", consistentId=" + 
sft.consistentId() + ']');
+        }
+
+        if (!F.isEmpty(baselineMetasLeft) && F.isEmpty(exs)) {
+            throw new IgniteException("No snapshot metadatas found for the 
baseline nodes " +
+                "with consistent ids: " + String.join(", ", 
baselineMetasLeft));
+        }
+
+        return exs;
     }
 
     /**
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadataVerificationTask.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadataVerificationTask.java
index a00f07e8a96..643838ac3c4 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadataVerificationTask.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadataVerificationTask.java
@@ -20,17 +20,13 @@ package 
org.apache.ignite.internal.processors.cache.persistence.snapshot;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import 
org.apache.ignite.internal.processors.cache.persistence.filename.SnapshotFileTree;
@@ -40,9 +36,21 @@ import 
org.apache.ignite.internal.processors.cache.persistence.wal.reader.Ignite
 import org.apache.ignite.internal.util.typedef.F;
 
 /** Snapshot task to verify snapshot metadata on the baseline nodes for given 
snapshot name. */
-public class SnapshotMetadataVerificationTask {
+public class SnapshotMetadataVerificationTask implements 
Supplier<List<SnapshotMetadata>> {
     /** */
-    private final MetadataVerificationJob job;
+    private final IgniteEx ignite;
+
+    /** */
+    private final IgniteLogger log;
+
+    /** */
+    private final SnapshotFileTree sft;
+
+    /** */
+    private final int incrementIdx;
+
+    /** */
+    private final Collection<Integer> grpIds;
 
     /** */
     public SnapshotMetadataVerificationTask(
@@ -52,229 +60,153 @@ public class SnapshotMetadataVerificationTask {
         int incrementIdx,
         Collection<Integer> grpIds
     ) {
-        job = new MetadataVerificationJob(ignite, log, sft, incrementIdx, 
grpIds);
+        this.ignite = ignite;
+        this.sft = sft;
+        this.incrementIdx = incrementIdx;
+        this.grpIds = grpIds;
+        this.log = log;
     }
 
     /** */
-    public List<SnapshotMetadata> execute() {
-        return job.execute();
-    }
-
-    /** Job that verifies snapshot on an Ignite node. */
-    private static class MetadataVerificationJob {
-        /** */
-        private final IgniteEx ignite;
-
-        /** */
-        private final IgniteLogger log;
-
-        /** */
-        private final SnapshotFileTree sft;
-
-        /** */
-        private final int incrementIdx;
-
-        /** */
-        private final Collection<Integer> grpIds;
-
-        /** */
-        private MetadataVerificationJob(
-            IgniteEx ignite,
-            IgniteLogger log,
-            SnapshotFileTree sft,
-            int incrementIdx,
-            Collection<Integer> grpIds
-        ) {
-            this.ignite = ignite;
-            this.sft = sft;
-            this.incrementIdx = incrementIdx;
-            this.grpIds = grpIds;
-            this.log = log;
-        }
-
-        /** */
-        public List<SnapshotMetadata> execute() {
-            IgniteSnapshotManager snpMgr = 
ignite.context().cache().context().snapshotMgr();
+    @Override public List<SnapshotMetadata> get() {
+        IgniteSnapshotManager snpMgr = 
ignite.context().cache().context().snapshotMgr();
 
-            List<SnapshotMetadata> snpMeta = snpMgr.readSnapshotMetadatas(sft);
+        List<SnapshotMetadata> snpMeta = snpMgr.readSnapshotMetadatas(sft);
 
-            for (SnapshotMetadata meta : snpMeta)
-                checkMeta(meta);
+        for (SnapshotMetadata meta : snpMeta)
+            checkMeta(meta);
 
-            if (incrementIdx > 0) {
-                List<SnapshotMetadata> metas = snpMeta.stream()
-                    .filter(m -> m.consistentId().equals(sft.consistentId()))
-                    .collect(Collectors.toList());
+        if (incrementIdx > 0) {
+            List<SnapshotMetadata> metas = snpMeta.stream()
+                .filter(m -> m.consistentId().equals(sft.consistentId()))
+                .collect(Collectors.toList());
 
-                if (metas.size() != 1) {
-                    throw new IgniteException("Failed to find single snapshot 
metafile on local node [locNodeId="
-                        + ignite.localNode().consistentId() + ", metas=" + 
snpMeta + ", snpName=" + sft.name()
-                        + ", snpPath=" + sft.root() + "]. Incremental 
snapshots requires exactly one meta file " +
-                        "per node because they don't support restoring on a 
different topology.");
-                }
-
-                checkIncrementalSnapshots(metas.get(0), sft, incrementIdx);
+            if (metas.size() != 1) {
+                throw new IgniteException("Failed to find single snapshot 
metafile on local node [locNodeId="
+                    + ignite.localNode().consistentId() + ", metas=" + snpMeta 
+ ", snpName=" + sft.name()
+                    + ", snpPath=" + sft.root() + "]. Incremental snapshots 
requires exactly one meta file " +
+                    "per node because they don't support restoring on a 
different topology.");
             }
 
-            return snpMeta;
+            checkIncrementalSnapshots(metas.get(0), sft, incrementIdx);
         }
 
-        /** */
-        private void checkMeta(SnapshotMetadata meta) {
-            byte[] snpMasterKeyDigest = meta.masterKeyDigest();
-            byte[] masterKeyDigest = 
ignite.context().config().getEncryptionSpi().masterKeyDigest();
-
-            if (masterKeyDigest == null && snpMasterKeyDigest != null) {
-                throw new IllegalStateException("Snapshot '" + 
meta.snapshotName() + "' has encrypted caches " +
-                    "while encryption is disabled. To restore this snapshot, 
start Ignite with configured " +
-                    "encryption and the same master key.");
-            }
-
-            if (snpMasterKeyDigest != null && 
!Arrays.equals(snpMasterKeyDigest, masterKeyDigest)) {
-                throw new IllegalStateException("Snapshot '" + 
meta.snapshotName() + "' has different master " +
-                    "key digest. To restore this snapshot, start Ignite with 
the same master key.");
-            }
+        return snpMeta;
+    }
 
-            Collection<Integer> grpIds = new HashSet<>(F.isEmpty(this.grpIds) 
? meta.cacheGroupIds() : this.grpIds);
+    /** */
+    private void checkMeta(SnapshotMetadata meta) {
+        byte[] snpMasterKeyDigest = meta.masterKeyDigest();
+        byte[] masterKeyDigest = 
ignite.context().config().getEncryptionSpi().masterKeyDigest();
+
+        if (masterKeyDigest == null && snpMasterKeyDigest != null) {
+            throw new IllegalStateException("Snapshot '" + meta.snapshotName() 
+ "' has encrypted caches " +
+                "while encryption is disabled. To restore this snapshot, start 
Ignite with configured " +
+                "encryption and the same master key.");
+        }
 
-            if (meta.hasCompressedGroups() && 
grpIds.stream().anyMatch(meta::isGroupWithCompression)) {
-                try {
-                    
ignite.context().compress().checkPageCompressionSupported();
-                }
-                catch (NullPointerException | IgniteCheckedException e) {
-                    String grpWithCompr = 
grpIds.stream().filter(meta::isGroupWithCompression)
-                        .map(String::valueOf).collect(Collectors.joining(", 
"));
+        if (snpMasterKeyDigest != null && !Arrays.equals(snpMasterKeyDigest, 
masterKeyDigest)) {
+            throw new IllegalStateException("Snapshot '" + meta.snapshotName() 
+ "' has different master " +
+                "key digest. To restore this snapshot, start Ignite with the 
same master key.");
+        }
 
-                    String msg = "Requested cache groups [" + grpWithCompr + 
"] for check " +
-                        "from snapshot '" + meta.snapshotName() + "' are 
compressed while " +
-                        "disk page compression is disabled. To check these 
groups please " +
-                        "start Ignite with ignite-compress module in 
classpath";
+        Collection<Integer> grpIds = new HashSet<>(F.isEmpty(this.grpIds) ? 
meta.cacheGroupIds() : this.grpIds);
 
-                    throw new IllegalStateException(msg);
-                }
+        if (meta.hasCompressedGroups() && 
grpIds.stream().anyMatch(meta::isGroupWithCompression)) {
+            try {
+                ignite.context().compress().checkPageCompressionSupported();
             }
+            catch (NullPointerException | IgniteCheckedException e) {
+                String grpWithCompr = 
grpIds.stream().filter(meta::isGroupWithCompression)
+                    .map(String::valueOf).collect(Collectors.joining(", "));
 
-            grpIds.removeAll(meta.partitions().keySet());
+                String msg = "Requested cache groups [" + grpWithCompr + "] 
for check " +
+                    "from snapshot '" + meta.snapshotName() + "' are 
compressed while " +
+                    "disk page compression is disabled. To check these groups 
please " +
+                    "start Ignite with ignite-compress module in classpath";
 
-            if (!grpIds.isEmpty() && !new 
HashSet<>(meta.cacheGroupIds()).containsAll(grpIds)) {
-                throw new IllegalArgumentException("Cache group(s) was not 
found in the snapshot [groups=" + grpIds +
-                    ", snapshot=" + sft.name() + ']');
+                throw new IllegalStateException(msg);
             }
         }
 
-        /** Checks that all incremental snapshots are present, contain correct 
metafile and WAL segments. */
-        public void checkIncrementalSnapshots(SnapshotMetadata fullMeta, 
SnapshotFileTree sft, int incIdx) {
-            try {
-                GridCacheSharedContext<Object, Object> ctx = 
ignite.context().cache().context();
-
-                IgniteSnapshotManager snpMgr = ctx.snapshotMgr();
-
-                // Incremental snapshot must contain ClusterSnapshotRecord.
-                long startSeg = fullMeta.snapshotRecordPointer().index();
+        grpIds.removeAll(meta.partitions().keySet());
 
-                for (int inc = 1; inc <= incIdx; inc++) {
-                    IncrementalSnapshotFileTree ift = 
sft.incrementalSnapshotFileTree(inc);
-
-                    if (!ift.root().exists()) {
-                        throw new IllegalArgumentException("No incremental 
snapshot found " +
-                            "[snpName=" + sft.name() + ", snpPath=" + 
sft.root() + ", incrementIndex=" + inc + ']');
-                    }
+        if (!grpIds.isEmpty() && !new 
HashSet<>(meta.cacheGroupIds()).containsAll(grpIds)) {
+            throw new IllegalArgumentException("Cache group(s) was not found 
in the snapshot [groups=" + grpIds +
+                ", snapshot=" + sft.name() + ']');
+        }
+    }
 
-                    IncrementalSnapshotMetadata incMeta = 
snpMgr.readIncrementalSnapshotMetadata(ift.meta());
+    /** Checks that all incremental snapshots are present, contain correct 
metafile and WAL segments. */
+    private void checkIncrementalSnapshots(SnapshotMetadata fullMeta, 
SnapshotFileTree sft, int incIdx) {
+        try {
+            GridCacheSharedContext<Object, Object> ctx = 
ignite.context().cache().context();
 
-                    if (!incMeta.matchBaseSnapshot(fullMeta)) {
-                        throw new IllegalArgumentException("Incremental 
snapshot doesn't match full snapshot " +
-                            "[incMeta=" + incMeta + ", fullMeta=" + fullMeta + 
']');
-                    }
+            IgniteSnapshotManager snpMgr = ctx.snapshotMgr();
 
-                    if (incMeta.incrementIndex() != inc) {
-                        throw new IgniteException(
-                            "Incremental snapshot meta has wrong index 
[expectedIdx=" + inc + ", meta=" + incMeta + ']');
-                    }
+            // Incremental snapshot must contain ClusterSnapshotRecord.
+            long startSeg = fullMeta.snapshotRecordPointer().index();
 
-                    checkWalSegments(incMeta, startSeg, ift);
+            for (int inc = 1; inc <= incIdx; inc++) {
+                IncrementalSnapshotFileTree ift = 
sft.incrementalSnapshotFileTree(inc);
 
-                    // Incremental snapshots must not cross each other.
-                    startSeg = incMeta.incrementalSnapshotPointer().index() + 
1;
+                if (!ift.root().exists()) {
+                    throw new IllegalArgumentException("No incremental 
snapshot found " +
+                        "[snpName=" + sft.name() + ", snpPath=" + sft.root() + 
", incrementIndex=" + inc + ']');
                 }
-            }
-            catch (IgniteCheckedException | IOException e) {
-                throw new IgniteException(e);
-            }
-        }
 
-        /** Check that incremental snapshot contains all required WAL 
segments. Throws {@link IgniteException} in case of any errors. */
-        private void checkWalSegments(IncrementalSnapshotMetadata meta, long 
startWalSeg, IncrementalSnapshotFileTree ift) {
-            IgniteWalIteratorFactory factory = new 
IgniteWalIteratorFactory(log);
+                IncrementalSnapshotMetadata incMeta = 
snpMgr.readIncrementalSnapshotMetadata(ift.meta());
 
-            List<FileDescriptor> walSeg = factory.resolveWalFiles(
-                new IgniteWalIteratorFactory.IteratorParametersBuilder()
-                    .filesOrDirs(ift.walCompactedSegments()));
-
-            if (walSeg.isEmpty())
-                throw new IgniteException("No WAL segments found for 
incremental snapshot [dir=" + ift.wal() + ']');
-
-            long actFirstSeg = walSeg.get(0).idx();
+                if (!incMeta.matchBaseSnapshot(fullMeta)) {
+                    throw new IllegalArgumentException("Incremental snapshot 
doesn't match full snapshot " +
+                        "[incMeta=" + incMeta + ", fullMeta=" + fullMeta + 
']');
+                }
 
-            if (actFirstSeg != startWalSeg) {
-                throw new IgniteException("Missed WAL segment 
[expectFirstSegment=" + startWalSeg
-                    + ", actualFirstSegment=" + actFirstSeg + ", meta=" + meta 
+ ']');
-            }
+                if (incMeta.incrementIndex() != inc) {
+                    throw new IgniteException(
+                        "Incremental snapshot meta has wrong index 
[expectedIdx=" + inc + ", meta=" + incMeta + ']');
+                }
 
-            long expLastSeg = meta.incrementalSnapshotPointer().index();
-            long actLastSeg = walSeg.get(walSeg.size() - 1).idx();
+                checkWalSegments(incMeta, startSeg, ift);
 
-            if (actLastSeg != expLastSeg) {
-                throw new IgniteException("Missed WAL segment 
[expectLastSegment=" + startWalSeg
-                    + ", actualLastSegment=" + actFirstSeg + ", meta=" + meta 
+ ']');
+                // Incremental snapshots must not cross each other.
+                startSeg = incMeta.incrementalSnapshotPointer().index() + 1;
             }
-
-            List<?> walSegGaps = factory.hasGaps(walSeg);
-
-            if (!walSegGaps.isEmpty())
-                throw new IgniteException("Missed WAL segments [misses=" + 
walSegGaps + ", meta=" + meta + ']');
+        }
+        catch (IgniteCheckedException | IOException e) {
+            throw new IgniteException(e);
         }
     }
 
-    /** */
-    public Map<ClusterNode, Exception> reduce(Map<ClusterNode, 
List<SnapshotMetadata>> results) throws IgniteException {
-        Map<ClusterNode, Exception> exs = new HashMap<>();
-
-        SnapshotMetadata first = null;
-        Set<String> baselineMetasLeft = Collections.emptySet();
-
-        for (Map.Entry<ClusterNode, List<SnapshotMetadata>> res : 
results.entrySet()) {
-            List<SnapshotMetadata> metas = res.getValue();
+    /** Check that incremental snapshot contains all required WAL segments. 
Throws {@link IgniteException} in case of any errors. */
+    private void checkWalSegments(IncrementalSnapshotMetadata meta, long 
startWalSeg, IncrementalSnapshotFileTree ift) {
+        IgniteWalIteratorFactory factory = new IgniteWalIteratorFactory(log);
 
-            for (SnapshotMetadata meta : metas) {
-                if (first == null) {
-                    first = meta;
+        List<FileDescriptor> walSeg = factory.resolveWalFiles(
+            new IgniteWalIteratorFactory.IteratorParametersBuilder()
+                .filesOrDirs(ift.walCompactedSegments()));
 
-                    baselineMetasLeft = new HashSet<>(meta.baselineNodes());
-                }
+        if (walSeg.isEmpty())
+            throw new IgniteException("No WAL segments found for incremental 
snapshot [dir=" + ift.wal() + ']');
 
-                baselineMetasLeft.remove(meta.consistentId());
+        long actFirstSeg = walSeg.get(0).idx();
 
-                if (!first.sameSnapshot(meta)) {
-                    exs.put(res.getKey(),
-                        new IgniteException("An error occurred during 
comparing snapshot metadata from cluster nodes " +
-                            "[first=" + first + ", meta=" + meta + ", nodeId=" 
+ res.getKey().id() + ']'));
-                }
-            }
+        if (actFirstSeg != startWalSeg) {
+            throw new IgniteException("Missed WAL segment 
[expectFirstSegment=" + startWalSeg
+                + ", actualFirstSegment=" + actFirstSeg + ", meta=" + meta + 
']');
         }
 
-        if (first == null && exs.isEmpty()) {
-            SnapshotFileTree sft = job.sft;
+        long expLastSeg = meta.incrementalSnapshotPointer().index();
+        long actLastSeg = walSeg.get(walSeg.size() - 1).idx();
 
-            throw new IllegalArgumentException("Snapshot does not exists 
[snapshot=" + sft.name()
-                + ", baseDir=" + sft.root() + ", consistentId=" + 
sft.consistentId() + ']');
+        if (actLastSeg != expLastSeg) {
+            throw new IgniteException("Missed WAL segment [expectLastSegment=" 
+ startWalSeg
+                + ", actualLastSegment=" + actFirstSeg + ", meta=" + meta + 
']');
         }
 
-        if (!F.isEmpty(baselineMetasLeft) && F.isEmpty(exs)) {
-            throw new IgniteException("No snapshot metadatas found for the 
baseline nodes " +
-                "with consistent ids: " + String.join(", ", 
baselineMetasLeft));
-        }
+        List<?> walSegGaps = factory.hasGaps(walSeg);
 
-        return exs;
+        if (!walSegGaps.isEmpty())
+            throw new IgniteException("Missed WAL segments [misses=" + 
walSegGaps + ", meta=" + meta + ']');
     }
 }

Reply via email to