This is an automated email from the ASF dual-hosted git repository.
timoninmaxim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 37967efadd8 IGNITE-26604 Refactor SnapshotMetadataVerificationTask
(#12418)
37967efadd8 is described below
commit 37967efadd841a6cb052534771cb465dac778c58
Author: Daniil <[email protected]>
AuthorDate: Fri Oct 24 11:48:39 2025 +0300
IGNITE-26604 Refactor SnapshotMetadataVerificationTask (#12418)
---
.../persistence/snapshot/SnapshotChecker.java | 49 +++-
.../snapshot/SnapshotMetadataVerificationTask.java | 300 ++++++++-------------
2 files changed, 161 insertions(+), 188 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotChecker.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotChecker.java
index 8ccffd74ca8..721d6886def 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotChecker.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotChecker.java
@@ -17,8 +17,12 @@
package org.apache.ignite.internal.processors.cache.persistence.snapshot;
import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import org.apache.ignite.IgniteCheckedException;
@@ -59,8 +63,10 @@ public class SnapshotChecker {
int incIdx,
@Nullable Collection<Integer> grpIds
) {
- return CompletableFuture.supplyAsync(() ->
- new SnapshotMetadataVerificationTask(kctx.grid(), log, sft,
incIdx, grpIds).execute(), executor);
+ return CompletableFuture.supplyAsync(
+ new SnapshotMetadataVerificationTask(kctx.grid(), log, sft,
incIdx, grpIds),
+ executor
+ );
}
/** */
@@ -106,8 +112,43 @@ public class SnapshotChecker {
}
/** */
- public Map<ClusterNode, Exception> reduceMetasResults(SnapshotFileTree
sft, Map<ClusterNode, List<SnapshotMetadata>> metas) {
- return new SnapshotMetadataVerificationTask(kctx.grid(), log, sft, 0,
null).reduce(metas);
+ public Map<ClusterNode, Exception> reduceMetasResults(SnapshotFileTree
sft, Map<ClusterNode, List<SnapshotMetadata>> results) {
+ Map<ClusterNode, Exception> exs = new HashMap<>();
+
+ SnapshotMetadata first = null;
+ Set<String> baselineMetasLeft = Collections.emptySet();
+
+ for (Map.Entry<ClusterNode, List<SnapshotMetadata>> res :
results.entrySet()) {
+ List<SnapshotMetadata> metas = res.getValue();
+
+ for (SnapshotMetadata meta : metas) {
+ if (first == null) {
+ first = meta;
+
+ baselineMetasLeft = new HashSet<>(meta.baselineNodes());
+ }
+
+ baselineMetasLeft.remove(meta.consistentId());
+
+ if (!first.sameSnapshot(meta)) {
+ exs.put(res.getKey(),
+ new IgniteException("An error occurred during
comparing snapshot metadata from cluster nodes " +
+ "[first=" + first + ", meta=" + meta + ", nodeId="
+ res.getKey().id() + ']'));
+ }
+ }
+ }
+
+ if (first == null && exs.isEmpty()) {
+ throw new IllegalArgumentException("Snapshot does not exists
[snapshot=" + sft.name()
+ + ", baseDir=" + sft.root() + ", consistentId=" +
sft.consistentId() + ']');
+ }
+
+ if (!F.isEmpty(baselineMetasLeft) && F.isEmpty(exs)) {
+ throw new IgniteException("No snapshot metadatas found for the
baseline nodes " +
+ "with consistent ids: " + String.join(", ",
baselineMetasLeft));
+ }
+
+ return exs;
}
/**
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadataVerificationTask.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadataVerificationTask.java
index a00f07e8a96..643838ac3c4 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadataVerificationTask.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadataVerificationTask.java
@@ -20,17 +20,13 @@ package
org.apache.ignite.internal.processors.cache.persistence.snapshot;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import
org.apache.ignite.internal.processors.cache.persistence.filename.SnapshotFileTree;
@@ -40,9 +36,21 @@ import
org.apache.ignite.internal.processors.cache.persistence.wal.reader.Ignite
import org.apache.ignite.internal.util.typedef.F;
/** Snapshot task to verify snapshot metadata on the baseline nodes for given
snapshot name. */
-public class SnapshotMetadataVerificationTask {
+public class SnapshotMetadataVerificationTask implements
Supplier<List<SnapshotMetadata>> {
/** */
- private final MetadataVerificationJob job;
+ private final IgniteEx ignite;
+
+ /** */
+ private final IgniteLogger log;
+
+ /** */
+ private final SnapshotFileTree sft;
+
+ /** */
+ private final int incrementIdx;
+
+ /** */
+ private final Collection<Integer> grpIds;
/** */
public SnapshotMetadataVerificationTask(
@@ -52,229 +60,153 @@ public class SnapshotMetadataVerificationTask {
int incrementIdx,
Collection<Integer> grpIds
) {
- job = new MetadataVerificationJob(ignite, log, sft, incrementIdx,
grpIds);
+ this.ignite = ignite;
+ this.sft = sft;
+ this.incrementIdx = incrementIdx;
+ this.grpIds = grpIds;
+ this.log = log;
}
/** */
- public List<SnapshotMetadata> execute() {
- return job.execute();
- }
-
- /** Job that verifies snapshot on an Ignite node. */
- private static class MetadataVerificationJob {
- /** */
- private final IgniteEx ignite;
-
- /** */
- private final IgniteLogger log;
-
- /** */
- private final SnapshotFileTree sft;
-
- /** */
- private final int incrementIdx;
-
- /** */
- private final Collection<Integer> grpIds;
-
- /** */
- private MetadataVerificationJob(
- IgniteEx ignite,
- IgniteLogger log,
- SnapshotFileTree sft,
- int incrementIdx,
- Collection<Integer> grpIds
- ) {
- this.ignite = ignite;
- this.sft = sft;
- this.incrementIdx = incrementIdx;
- this.grpIds = grpIds;
- this.log = log;
- }
-
- /** */
- public List<SnapshotMetadata> execute() {
- IgniteSnapshotManager snpMgr =
ignite.context().cache().context().snapshotMgr();
+ @Override public List<SnapshotMetadata> get() {
+ IgniteSnapshotManager snpMgr =
ignite.context().cache().context().snapshotMgr();
- List<SnapshotMetadata> snpMeta = snpMgr.readSnapshotMetadatas(sft);
+ List<SnapshotMetadata> snpMeta = snpMgr.readSnapshotMetadatas(sft);
- for (SnapshotMetadata meta : snpMeta)
- checkMeta(meta);
+ for (SnapshotMetadata meta : snpMeta)
+ checkMeta(meta);
- if (incrementIdx > 0) {
- List<SnapshotMetadata> metas = snpMeta.stream()
- .filter(m -> m.consistentId().equals(sft.consistentId()))
- .collect(Collectors.toList());
+ if (incrementIdx > 0) {
+ List<SnapshotMetadata> metas = snpMeta.stream()
+ .filter(m -> m.consistentId().equals(sft.consistentId()))
+ .collect(Collectors.toList());
- if (metas.size() != 1) {
- throw new IgniteException("Failed to find single snapshot
metafile on local node [locNodeId="
- + ignite.localNode().consistentId() + ", metas=" +
snpMeta + ", snpName=" + sft.name()
- + ", snpPath=" + sft.root() + "]. Incremental
snapshots requires exactly one meta file " +
- "per node because they don't support restoring on a
different topology.");
- }
-
- checkIncrementalSnapshots(metas.get(0), sft, incrementIdx);
+ if (metas.size() != 1) {
+ throw new IgniteException("Failed to find single snapshot
metafile on local node [locNodeId="
+ + ignite.localNode().consistentId() + ", metas=" + snpMeta
+ ", snpName=" + sft.name()
+ + ", snpPath=" + sft.root() + "]. Incremental snapshots
requires exactly one meta file " +
+ "per node because they don't support restoring on a
different topology.");
}
- return snpMeta;
+ checkIncrementalSnapshots(metas.get(0), sft, incrementIdx);
}
- /** */
- private void checkMeta(SnapshotMetadata meta) {
- byte[] snpMasterKeyDigest = meta.masterKeyDigest();
- byte[] masterKeyDigest =
ignite.context().config().getEncryptionSpi().masterKeyDigest();
-
- if (masterKeyDigest == null && snpMasterKeyDigest != null) {
- throw new IllegalStateException("Snapshot '" +
meta.snapshotName() + "' has encrypted caches " +
- "while encryption is disabled. To restore this snapshot,
start Ignite with configured " +
- "encryption and the same master key.");
- }
-
- if (snpMasterKeyDigest != null &&
!Arrays.equals(snpMasterKeyDigest, masterKeyDigest)) {
- throw new IllegalStateException("Snapshot '" +
meta.snapshotName() + "' has different master " +
- "key digest. To restore this snapshot, start Ignite with
the same master key.");
- }
+ return snpMeta;
+ }
- Collection<Integer> grpIds = new HashSet<>(F.isEmpty(this.grpIds)
? meta.cacheGroupIds() : this.grpIds);
+ /** */
+ private void checkMeta(SnapshotMetadata meta) {
+ byte[] snpMasterKeyDigest = meta.masterKeyDigest();
+ byte[] masterKeyDigest =
ignite.context().config().getEncryptionSpi().masterKeyDigest();
+
+ if (masterKeyDigest == null && snpMasterKeyDigest != null) {
+ throw new IllegalStateException("Snapshot '" + meta.snapshotName()
+ "' has encrypted caches " +
+ "while encryption is disabled. To restore this snapshot, start
Ignite with configured " +
+ "encryption and the same master key.");
+ }
- if (meta.hasCompressedGroups() &&
grpIds.stream().anyMatch(meta::isGroupWithCompression)) {
- try {
-
ignite.context().compress().checkPageCompressionSupported();
- }
- catch (NullPointerException | IgniteCheckedException e) {
- String grpWithCompr =
grpIds.stream().filter(meta::isGroupWithCompression)
- .map(String::valueOf).collect(Collectors.joining(",
"));
+ if (snpMasterKeyDigest != null && !Arrays.equals(snpMasterKeyDigest,
masterKeyDigest)) {
+ throw new IllegalStateException("Snapshot '" + meta.snapshotName()
+ "' has different master " +
+ "key digest. To restore this snapshot, start Ignite with the
same master key.");
+ }
- String msg = "Requested cache groups [" + grpWithCompr +
"] for check " +
- "from snapshot '" + meta.snapshotName() + "' are
compressed while " +
- "disk page compression is disabled. To check these
groups please " +
- "start Ignite with ignite-compress module in
classpath";
+ Collection<Integer> grpIds = new HashSet<>(F.isEmpty(this.grpIds) ?
meta.cacheGroupIds() : this.grpIds);
- throw new IllegalStateException(msg);
- }
+ if (meta.hasCompressedGroups() &&
grpIds.stream().anyMatch(meta::isGroupWithCompression)) {
+ try {
+ ignite.context().compress().checkPageCompressionSupported();
}
+ catch (NullPointerException | IgniteCheckedException e) {
+ String grpWithCompr =
grpIds.stream().filter(meta::isGroupWithCompression)
+ .map(String::valueOf).collect(Collectors.joining(", "));
- grpIds.removeAll(meta.partitions().keySet());
+ String msg = "Requested cache groups [" + grpWithCompr + "]
for check " +
+ "from snapshot '" + meta.snapshotName() + "' are
compressed while " +
+ "disk page compression is disabled. To check these groups
please " +
+ "start Ignite with ignite-compress module in classpath";
- if (!grpIds.isEmpty() && !new
HashSet<>(meta.cacheGroupIds()).containsAll(grpIds)) {
- throw new IllegalArgumentException("Cache group(s) was not
found in the snapshot [groups=" + grpIds +
- ", snapshot=" + sft.name() + ']');
+ throw new IllegalStateException(msg);
}
}
- /** Checks that all incremental snapshots are present, contain correct
metafile and WAL segments. */
- public void checkIncrementalSnapshots(SnapshotMetadata fullMeta,
SnapshotFileTree sft, int incIdx) {
- try {
- GridCacheSharedContext<Object, Object> ctx =
ignite.context().cache().context();
-
- IgniteSnapshotManager snpMgr = ctx.snapshotMgr();
-
- // Incremental snapshot must contain ClusterSnapshotRecord.
- long startSeg = fullMeta.snapshotRecordPointer().index();
+ grpIds.removeAll(meta.partitions().keySet());
- for (int inc = 1; inc <= incIdx; inc++) {
- IncrementalSnapshotFileTree ift =
sft.incrementalSnapshotFileTree(inc);
-
- if (!ift.root().exists()) {
- throw new IllegalArgumentException("No incremental
snapshot found " +
- "[snpName=" + sft.name() + ", snpPath=" +
sft.root() + ", incrementIndex=" + inc + ']');
- }
+ if (!grpIds.isEmpty() && !new
HashSet<>(meta.cacheGroupIds()).containsAll(grpIds)) {
+ throw new IllegalArgumentException("Cache group(s) was not found
in the snapshot [groups=" + grpIds +
+ ", snapshot=" + sft.name() + ']');
+ }
+ }
- IncrementalSnapshotMetadata incMeta =
snpMgr.readIncrementalSnapshotMetadata(ift.meta());
+ /** Checks that all incremental snapshots are present, contain correct
metafile and WAL segments. */
+ private void checkIncrementalSnapshots(SnapshotMetadata fullMeta,
SnapshotFileTree sft, int incIdx) {
+ try {
+ GridCacheSharedContext<Object, Object> ctx =
ignite.context().cache().context();
- if (!incMeta.matchBaseSnapshot(fullMeta)) {
- throw new IllegalArgumentException("Incremental
snapshot doesn't match full snapshot " +
- "[incMeta=" + incMeta + ", fullMeta=" + fullMeta +
']');
- }
+ IgniteSnapshotManager snpMgr = ctx.snapshotMgr();
- if (incMeta.incrementIndex() != inc) {
- throw new IgniteException(
- "Incremental snapshot meta has wrong index
[expectedIdx=" + inc + ", meta=" + incMeta + ']');
- }
+ // Incremental snapshot must contain ClusterSnapshotRecord.
+ long startSeg = fullMeta.snapshotRecordPointer().index();
- checkWalSegments(incMeta, startSeg, ift);
+ for (int inc = 1; inc <= incIdx; inc++) {
+ IncrementalSnapshotFileTree ift =
sft.incrementalSnapshotFileTree(inc);
- // Incremental snapshots must not cross each other.
- startSeg = incMeta.incrementalSnapshotPointer().index() +
1;
+ if (!ift.root().exists()) {
+ throw new IllegalArgumentException("No incremental
snapshot found " +
+ "[snpName=" + sft.name() + ", snpPath=" + sft.root() +
", incrementIndex=" + inc + ']');
}
- }
- catch (IgniteCheckedException | IOException e) {
- throw new IgniteException(e);
- }
- }
- /** Check that incremental snapshot contains all required WAL
segments. Throws {@link IgniteException} in case of any errors. */
- private void checkWalSegments(IncrementalSnapshotMetadata meta, long
startWalSeg, IncrementalSnapshotFileTree ift) {
- IgniteWalIteratorFactory factory = new
IgniteWalIteratorFactory(log);
+ IncrementalSnapshotMetadata incMeta =
snpMgr.readIncrementalSnapshotMetadata(ift.meta());
- List<FileDescriptor> walSeg = factory.resolveWalFiles(
- new IgniteWalIteratorFactory.IteratorParametersBuilder()
- .filesOrDirs(ift.walCompactedSegments()));
-
- if (walSeg.isEmpty())
- throw new IgniteException("No WAL segments found for
incremental snapshot [dir=" + ift.wal() + ']');
-
- long actFirstSeg = walSeg.get(0).idx();
+ if (!incMeta.matchBaseSnapshot(fullMeta)) {
+ throw new IllegalArgumentException("Incremental snapshot
doesn't match full snapshot " +
+ "[incMeta=" + incMeta + ", fullMeta=" + fullMeta +
']');
+ }
- if (actFirstSeg != startWalSeg) {
- throw new IgniteException("Missed WAL segment
[expectFirstSegment=" + startWalSeg
- + ", actualFirstSegment=" + actFirstSeg + ", meta=" + meta
+ ']');
- }
+ if (incMeta.incrementIndex() != inc) {
+ throw new IgniteException(
+ "Incremental snapshot meta has wrong index
[expectedIdx=" + inc + ", meta=" + incMeta + ']');
+ }
- long expLastSeg = meta.incrementalSnapshotPointer().index();
- long actLastSeg = walSeg.get(walSeg.size() - 1).idx();
+ checkWalSegments(incMeta, startSeg, ift);
- if (actLastSeg != expLastSeg) {
- throw new IgniteException("Missed WAL segment
[expectLastSegment=" + startWalSeg
- + ", actualLastSegment=" + actFirstSeg + ", meta=" + meta
+ ']');
+ // Incremental snapshots must not cross each other.
+ startSeg = incMeta.incrementalSnapshotPointer().index() + 1;
}
-
- List<?> walSegGaps = factory.hasGaps(walSeg);
-
- if (!walSegGaps.isEmpty())
- throw new IgniteException("Missed WAL segments [misses=" +
walSegGaps + ", meta=" + meta + ']');
+ }
+ catch (IgniteCheckedException | IOException e) {
+ throw new IgniteException(e);
}
}
- /** */
- public Map<ClusterNode, Exception> reduce(Map<ClusterNode,
List<SnapshotMetadata>> results) throws IgniteException {
- Map<ClusterNode, Exception> exs = new HashMap<>();
-
- SnapshotMetadata first = null;
- Set<String> baselineMetasLeft = Collections.emptySet();
-
- for (Map.Entry<ClusterNode, List<SnapshotMetadata>> res :
results.entrySet()) {
- List<SnapshotMetadata> metas = res.getValue();
+ /** Check that incremental snapshot contains all required WAL segments.
Throws {@link IgniteException} in case of any errors. */
+ private void checkWalSegments(IncrementalSnapshotMetadata meta, long
startWalSeg, IncrementalSnapshotFileTree ift) {
+ IgniteWalIteratorFactory factory = new IgniteWalIteratorFactory(log);
- for (SnapshotMetadata meta : metas) {
- if (first == null) {
- first = meta;
+ List<FileDescriptor> walSeg = factory.resolveWalFiles(
+ new IgniteWalIteratorFactory.IteratorParametersBuilder()
+ .filesOrDirs(ift.walCompactedSegments()));
- baselineMetasLeft = new HashSet<>(meta.baselineNodes());
- }
+ if (walSeg.isEmpty())
+ throw new IgniteException("No WAL segments found for incremental
snapshot [dir=" + ift.wal() + ']');
- baselineMetasLeft.remove(meta.consistentId());
+ long actFirstSeg = walSeg.get(0).idx();
- if (!first.sameSnapshot(meta)) {
- exs.put(res.getKey(),
- new IgniteException("An error occurred during
comparing snapshot metadata from cluster nodes " +
- "[first=" + first + ", meta=" + meta + ", nodeId="
+ res.getKey().id() + ']'));
- }
- }
+ if (actFirstSeg != startWalSeg) {
+ throw new IgniteException("Missed WAL segment
[expectFirstSegment=" + startWalSeg
+ + ", actualFirstSegment=" + actFirstSeg + ", meta=" + meta +
']');
}
- if (first == null && exs.isEmpty()) {
- SnapshotFileTree sft = job.sft;
+ long expLastSeg = meta.incrementalSnapshotPointer().index();
+ long actLastSeg = walSeg.get(walSeg.size() - 1).idx();
- throw new IllegalArgumentException("Snapshot does not exists
[snapshot=" + sft.name()
- + ", baseDir=" + sft.root() + ", consistentId=" +
sft.consistentId() + ']');
+ if (actLastSeg != expLastSeg) {
+ throw new IgniteException("Missed WAL segment [expectLastSegment="
+ startWalSeg
+ + ", actualLastSegment=" + actFirstSeg + ", meta=" + meta +
']');
}
- if (!F.isEmpty(baselineMetasLeft) && F.isEmpty(exs)) {
- throw new IgniteException("No snapshot metadatas found for the
baseline nodes " +
- "with consistent ids: " + String.join(", ",
baselineMetasLeft));
- }
+ List<?> walSegGaps = factory.hasGaps(walSeg);
- return exs;
+ if (!walSegGaps.isEmpty())
+ throw new IgniteException("Missed WAL segments [misses=" +
walSegGaps + ", meta=" + meta + ']');
}
}