This is an automated email from the ASF dual-hosted git repository.

namelchev pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 2c9e07b095c IGNITE-19972 Added metrics of snapshot check (#12330)
2c9e07b095c is described below

commit 2c9e07b095c05dcf865ab7b837d54d00ceece593
Author: Vladimir Steshin <[email protected]>
AuthorDate: Wed Mar 18 15:13:59 2026 +0300

    IGNITE-19972 Added metrics of snapshot check (#12330)
---
 .../snapshot/IgniteSnapshotManager.java            |   2 +-
 .../snapshot/IncrementalSnapshotVerify.java        |  27 +-
 .../persistence/snapshot/SnapshotCheckProcess.java | 152 ++++++++--
 .../persistence/snapshot/SnapshotChecker.java      |  47 ++-
 .../snapshot/SnapshotHandlerContext.java           |  25 +-
 .../snapshot/SnapshotHandlerRestoreTask.java       |  73 -----
 .../snapshot/SnapshotPartitionsVerifyHandler.java  |  23 +-
 .../main/resources/META-INF/classnames.properties  |   2 -
 .../snapshot/AbstractSnapshotSelfTest.java         |  18 +-
 .../snapshot/IgniteClusterSnapshotCheckTest.java   | 324 ++++++++++++++++++++-
 10 files changed, 561 insertions(+), 132 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
index cdd0eb3af30..ed01716a600 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
@@ -1070,7 +1070,7 @@ public class IgniteSnapshotManager extends 
GridCacheSharedManagerAdapter
                 );
 
                 SnapshotHandlerContext ctx = new SnapshotHandlerContext(meta, 
req.groups(), cctx.localNode(), snpOp.snapshotFileTree(),
-                    snpOp.streamerWarning(), true);
+                    snpOp.streamerWarning(), true, null, null);
 
                 snpOp.meta(meta);
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotVerify.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotVerify.java
index a206415a498..094dceb1820 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotVerify.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotVerify.java
@@ -29,6 +29,7 @@ import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.LongAdder;
 import java.util.function.BiConsumer;
+import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
@@ -50,6 +51,7 @@ import 
org.apache.ignite.internal.processors.cluster.BaselineTopology;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.transactions.TransactionState;
+import org.jetbrains.annotations.Nullable;
 
 import static 
org.apache.ignite.internal.managers.discovery.ConsistentIdMapper.ALL_NODES;
 
@@ -71,11 +73,26 @@ public class IncrementalSnapshotVerify implements 
Supplier<IncrementalSnapshotVe
     private LongAdder procEntriesCnt;
 
     /** */
-    public IncrementalSnapshotVerify(IgniteEx ignite, IgniteLogger log, 
SnapshotFileTree sft, int incrementalIdx) {
+    @Nullable private final Consumer<Integer> totalCnsmr;
+
+    /** */
+    @Nullable private final Consumer<Integer> checkedCnsmr;
+
+    /** */
+    public IncrementalSnapshotVerify(
+        IgniteEx ignite,
+        IgniteLogger log,
+        SnapshotFileTree sft,
+        int incrementalIdx,
+        @Nullable Consumer<Integer> totalCnsmr,
+        @Nullable Consumer<Integer> checkedCnsmr
+    ) {
         this.ignite = ignite;
         this.log = log;
         this.sft = sft;
-        this.incIdx = incrementalIdx;
+        incIdx = incrementalIdx;
+        this.totalCnsmr = totalCnsmr;
+        this.checkedCnsmr = checkedCnsmr;
     }
 
     /**
@@ -106,11 +123,15 @@ public class IncrementalSnapshotVerify implements 
Supplier<IncrementalSnapshotVe
                 ignite.context().cache().context(), sft, incIdx, 
txCaches.keySet()
             ) {
                 @Override void totalWalSegments(int segCnt) {
-                    // No-op.
+                    if (totalCnsmr != null)
+                        totalCnsmr.accept(segCnt);
                 }
 
                 @Override void processedWalSegments(int segCnt) {
                     procSegCnt.set(segCnt);
+
+                    if (checkedCnsmr != null)
+                        checkedCnsmr.accept(segCnt);
                 }
 
                 @Override void initWalEntries(LongAdder entriesCnt) {
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotCheckProcess.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotCheckProcess.java
index 29bd175cb9c..1baacc7d978 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotCheckProcess.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotCheckProcess.java
@@ -44,9 +44,12 @@ import 
org.apache.ignite.internal.management.cache.IdleVerifyResult;
 import org.apache.ignite.internal.management.cache.PartitionKey;
 import 
org.apache.ignite.internal.processors.cache.persistence.filename.SnapshotFileTree;
 import org.apache.ignite.internal.processors.cache.verify.PartitionHashRecord;
+import org.apache.ignite.internal.processors.metric.MetricRegistryImpl;
+import org.apache.ignite.internal.processors.metric.impl.MetricUtils;
 import org.apache.ignite.internal.util.distributed.DistributedProcess;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.CU;
@@ -60,6 +63,9 @@ import static 
org.apache.ignite.internal.util.lang.ClusterNodeFunc.node2id;
 
 /** Distributed process of snapshot checking. */
 public class SnapshotCheckProcess {
+    /** */
+    public static final String SNAPSHOT_CHECK_METRIC = "snapshot-check";
+
     /** */
     private final IgniteLogger log;
 
@@ -157,6 +163,8 @@ public class SnapshotCheckProcess {
             return new GridFinishedFuture<>();
         }
         finally {
+            unregisterMetrics(ctx.req.snapshotName());
+
             if (log.isInfoEnabled())
                 log.info("Finished snapshot validation [req=" + ctx.req + ']');
         }
@@ -314,6 +322,8 @@ public class SnapshotCheckProcess {
         if (F.isEmpty(ctx.metas))
             return new GridFinishedFuture<>();
 
+        ctx.totalCounter.set(0);
+
         GridFutureAdapter<SnapshotCheckResponse> phaseFut = new 
GridFutureAdapter<>();
 
         CompletableFuture<SnapshotCheckResponse> workingFut;
@@ -346,7 +356,11 @@ public class SnapshotCheckProcess {
         CompletableFuture<SnapshotCheckResponse> resFut = new 
CompletableFuture<>();
 
         CompletableFuture<IncrementalSnapshotVerifyResult> workingFut = 
snpChecker.checkIncrementalSnapshot(
-            ctx.locFileTree.get(meta.consistentId()), 
ctx.req.incrementalIndex());
+            ctx.locFileTree.get(meta.consistentId()),
+            ctx.req.incrementalIndex(),
+            ctx.totalCounter::addAndGet,
+            ctx.checkedCounter::addAndGet
+        );
 
         workingFut.whenComplete((res, err) -> {
             if (err != null)
@@ -368,22 +382,29 @@ public class SnapshotCheckProcess {
         AtomicInteger metasProcessed = new AtomicInteger(ctx.metas.size());
 
         for (SnapshotMetadata meta : ctx.metas) {
-            CompletableFuture<Map<PartitionKey, PartitionHashRecord>> metaFut 
= snpChecker.checkPartitions(
-                meta,
-                ctx.locFileTree.get(meta.consistentId()),
-                ctx.req.groups(),
-                false,
-                ctx.req.fullCheck()
-            );
-
-            metaFut.whenComplete((res, err) -> {
-                if (err != null)
-                    exceptions.put(meta.consistentId(), err);
-                else if (!F.isEmpty(res))
-                    perMetaResults.put(meta.consistentId(), res);
-
-                if (metasProcessed.decrementAndGet() == 0)
-                    composedFut.complete(new 
SnapshotCheckResponse(perMetaResults, exceptions));
+            // Run asynchronously to calculate the metric 'total partitions' 
faster.
+            kctx.pools().getSnapshotExecutorService().submit(() -> {
+                CompletableFuture<Map<PartitionKey, PartitionHashRecord>> 
metaFut = snpChecker.checkPartitions(
+                    meta,
+                    ctx.locFileTree.get(meta.consistentId()),
+                    ctx.req.groups(),
+                    false,
+                    ctx.req.fullCheck(),
+                    ctx.totalCounter::addAndGet,
+                    checkedPartId -> ctx.checkedCounter.incrementAndGet()
+                );
+
+                metaFut.whenComplete((res, err) -> {
+                    ctx.checkedSnapshotParts.incrementAndGet();
+
+                    if (err != null)
+                        exceptions.put(meta.consistentId(), err);
+                    else if (!F.isEmpty(res))
+                        perMetaResults.put(meta.consistentId(), res);
+
+                    if (metasProcessed.decrementAndGet() == 0)
+                        composedFut.complete(new 
SnapshotCheckResponse(perMetaResults, exceptions));
+                });
             });
         }
 
@@ -403,17 +424,28 @@ public class SnapshotCheckProcess {
         AtomicInteger metasProcessed = new AtomicInteger(ctx.metas.size());
 
         for (SnapshotMetadata meta : ctx.metas) {
-            CompletableFuture<Map<String, SnapshotHandlerResult<Object>>> 
metaFut = snpChecker.invokeCustomHandlers(meta,
-                ctx.locFileTree.get(meta.consistentId()), ctx.req.groups(), 
ctx.req.fullCheck());
-
-            metaFut.whenComplete((res, err) -> {
-                if (err != null)
-                    exceptions.put(meta.consistentId(), err);
-                else if (!F.isEmpty(res))
-                    perMetaResults.put(meta.consistentId(), res);
-
-                if (metasProcessed.decrementAndGet() == 0)
-                    composedFut.complete(new 
SnapshotCheckResponse(perMetaResults, exceptions));
+            // Run asynchronously to calculate the metric 'total partitions' 
faster.
+            kctx.pools().getSnapshotExecutorService().submit(() -> {
+                CompletableFuture<Map<String, SnapshotHandlerResult<Object>>> 
metaFut = snpChecker.invokeCustomHandlers(
+                    meta,
+                    ctx.locFileTree.get(meta.consistentId()),
+                    ctx.req.groups(),
+                    ctx.req.fullCheck(),
+                    ctx.totalCounter::addAndGet,
+                    processedPart -> ctx.checkedCounter.incrementAndGet()
+                );
+
+                metaFut.whenComplete((res, err) -> {
+                    ctx.checkedSnapshotParts.incrementAndGet();
+
+                    if (err != null)
+                        exceptions.put(meta.consistentId(), err);
+                    else if (!F.isEmpty(res))
+                        perMetaResults.put(meta.consistentId(), res);
+
+                    if (metasProcessed.decrementAndGet() == 0)
+                        composedFut.complete(new 
SnapshotCheckResponse(perMetaResults, exceptions));
+                });
             });
         }
 
@@ -465,6 +497,8 @@ public class SnapshotCheckProcess {
         if (!baseline(kctx.localNodeId()))
             return new GridFinishedFuture<>();
 
+        registerMetrics(ctx);
+
         Collection<Integer> grpIds = F.isEmpty(req.groups()) ? null : 
F.viewReadOnly(req.groups(), CU::cacheId);
 
         GridFutureAdapter<SnapshotCheckResponse> phaseFut = new 
GridFutureAdapter<>();
@@ -548,8 +582,11 @@ public class SnapshotCheckProcess {
                 phase2PartsHashes.start(reqId, ctx.req);
         }
         catch (Throwable th) {
-            if (ctx != null)
+            if (ctx != null) {
+                unregisterMetrics(ctx.req.snapshotName());
+
                 contexts.remove(ctx.req.snapshotName());
+            }
 
             if (clusterOpFut != null)
                 clusterOpFut.onDone(th);
@@ -562,12 +599,13 @@ public class SnapshotCheckProcess {
      *
      * @return Metadatas to process on current node.
      */
-    private @Nullable List<SnapshotMetadata> assingMetas(Map<ClusterNode, 
List<SnapshotMetadata>> clusterMetas) {
+    private List<SnapshotMetadata> assingMetas(Map<ClusterNode, 
List<SnapshotMetadata>> clusterMetas) {
         ClusterNode locNode = kctx.cluster().get().localNode();
         List<SnapshotMetadata> locMetas = clusterMetas.get(locNode);
 
+        // Might be empty due to a cache's node filter.
         if (F.isEmpty(locMetas))
-            return null;
+            return Collections.emptyList();
 
         Set<String> onlineNodesConstIdsStr = new 
HashSet<>(clusterMetas.size());
         // The nodes are sorted with lesser order.
@@ -681,6 +719,42 @@ public class SnapshotCheckProcess {
         return null;
     }
 
+    /** */
+    private void registerMetrics(SnapshotCheckContext ctx) {
+        MetricRegistryImpl mreg = 
kctx.metric().registry(MetricUtils.metricName(SNAPSHOT_CHECK_METRIC, 
ctx.req.snapshotName()));
+
+        assert !mreg.iterator().hasNext();
+        assert ctx.req.requestId() != null;
+
+        mreg.register("startTime", U::currentTimeMillis,
+            "The system time of the start of the cluster snapshot check 
operation on current node.");
+
+        if (ctx.req.incrementalIndex() > 0) {
+            mreg.register("incrementIndex", ctx.req::incrementalIndex,
+                "The index of incremental snapshot of the snapshot check 
operation.");
+            mreg.register("totalWalSegments", ctx.totalCounter::get,
+                "The total number of WAL segments in the incremental snapshot 
to check on current node.");
+            mreg.register("processedWalSegments", ctx.checkedCounter::get,
+                "The number of checked WAL segments in the incremental 
snapshot on current node.");
+        }
+        else {
+            mreg.register("checkPartitions", ctx.req::fullCheck, "Shows 
whether full validation of snapshot partitions is enabled.");
+            mreg.register("totalPartitions", ctx.totalCounter::get, "The total 
number of partitions to check on current node.");
+            mreg.register("processedPartitions", ctx.checkedCounter::get, "The 
number of checked partitions on current node.");
+
+            // Node can hold and process another nodes' snapshot data.
+            mreg.register("snapshotPartsToProcess", () -> ctx.metas == null ? 
-1 : ctx.metas.size(),
+                "Number of parts (nodes data) of snapshot to check on current 
node.");
+            mreg.register("processedSnapshotParts", 
ctx.checkedSnapshotParts::get,
+                "Number of checked snapshot parts (nodes data) on current 
node.");
+        }
+    }
+
+    /** */
+    private void unregisterMetrics(String snpName) {
+        kctx.metric().remove(MetricUtils.metricName(SNAPSHOT_CHECK_METRIC, 
snpName));
+    }
+
     /** Operation context. */
     private static final class SnapshotCheckContext {
         /** Request. */
@@ -691,7 +765,7 @@ public class SnapshotCheckProcess {
          * Metadatas to process on this node. Also indicates the snapshot 
parts to check on this node.
          * @see #partitionsHashesFuture(SnapshotCheckContext)
          */
-        @Nullable private List<SnapshotMetadata> metas;
+        @Nullable private volatile List<SnapshotMetadata> metas;
 
         /** Map of snapshot pathes per consistent id for {@link #metas}. */
         @GridToStringInclude
@@ -700,9 +774,23 @@ public class SnapshotCheckProcess {
         /** All the snapshot metadatas. */
         @Nullable private Map<ClusterNode, List<SnapshotMetadata>> 
clusterMetas;
 
+        /** Common counter of total work units to process on current node. */
+        @GridToStringExclude
+        private final AtomicInteger totalCounter = new AtomicInteger(-1);
+
+        /** Common counter of checked work units on current node. */
+        @GridToStringExclude
+        private final AtomicInteger checkedCounter = new AtomicInteger(0);
+
+        /** Number of checked snapshot parts (nodes data/consistent ids) on 
current node. {@code Null} for incremental snapshots. */
+        @GridToStringExclude
+        @Nullable private final AtomicInteger checkedSnapshotParts;
+
         /** Creates operation context. */
         private SnapshotCheckContext(SnapshotCheckProcessRequest req) {
             this.req = req;
+
+            checkedSnapshotParts = req.incrementalIndex() > 0 ? null : new 
AtomicInteger(0);
         }
 
         /** {@inheritDoc} */
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotChecker.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotChecker.java
index 9c16c0be0a7..4f81025d0a2 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotChecker.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotChecker.java
@@ -25,6 +25,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
+import java.util.function.Consumer;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
@@ -72,12 +73,14 @@ public class SnapshotChecker {
     /** */
     public CompletableFuture<IncrementalSnapshotVerifyResult> 
checkIncrementalSnapshot(
         SnapshotFileTree sft,
-        int incIdx
+        int incIdx,
+        @Nullable Consumer<Integer> totalCnsmr,
+        @Nullable Consumer<Integer> checkedCnsmr
     ) {
         assert incIdx > 0;
 
         return CompletableFuture.supplyAsync(
-            new IncrementalSnapshotVerify(kctx.grid(), log, sft, incIdx),
+            new IncrementalSnapshotVerify(kctx.grid(), log, sft, incIdx, 
totalCnsmr, checkedCnsmr),
             executor
         );
     }
@@ -160,12 +163,31 @@ public class SnapshotChecker {
         SnapshotMetadata meta,
         SnapshotFileTree sft,
         @Nullable Collection<String> grps,
-        boolean check
+        boolean check,
+        @Nullable Consumer<Integer> totalCnsmr,
+        @Nullable Consumer<Integer> processedCnsmr
     ) {
         // The handlers use or may use the same snapshot pool. If it is 
configured with 1 thread, launching waiting task in
         // the same pool might block it.
-        return CompletableFuture.supplyAsync(
-            new SnapshotHandlerRestoreTask(kctx.grid(), log, sft, grps, check)
+        return CompletableFuture.supplyAsync(() -> {
+                try {
+                    SnapshotHandlerContext hndCnt = new SnapshotHandlerContext(
+                        meta,
+                        grps,
+                        kctx.cluster().get().localNode(),
+                        sft,
+                        false,
+                        check,
+                        totalCnsmr == null ? null : (hndCls, totalCnt) -> 
totalCnsmr.accept(totalCnt),
+                        processedCnsmr == null ? null : (hndCls, partId) -> 
processedCnsmr.accept(partId)
+                    );
+
+                    return 
kctx.cache().context().snapshotMgr().handlers().invokeAll(SnapshotHandlerType.RESTORE,
 hndCnt);
+                }
+                catch (IgniteCheckedException e) {
+                    throw new IgniteException(e);
+                }
+            }
         );
     }
 
@@ -175,12 +197,21 @@ public class SnapshotChecker {
         SnapshotFileTree sft,
         @Nullable Collection<String> grps,
         boolean forCreation,
-        boolean checkParts
+        boolean checkParts,
+        @Nullable Consumer<Integer> totalCnsmr,
+        @Nullable Consumer<Integer> checkedPartCnsmr
     ) {
-        // Await in the default executor to avoid blocking the snapshot 
executor if it has just one thread.
         return CompletableFuture.supplyAsync(() -> {
             SnapshotHandlerContext hctx = new SnapshotHandlerContext(
-                meta, grps, kctx.cluster().get().localNode(), sft, false, 
checkParts);
+                meta,
+                grps,
+                kctx.cluster().get().localNode(),
+                sft,
+                false,
+                checkParts,
+                totalCnsmr == null ? null : (hndCls, unitsToWork) -> 
totalCnsmr.accept(unitsToWork),
+                checkedPartCnsmr == null ? null : (hndCls, partId) -> 
checkedPartCnsmr.accept(partId)
+            );
 
             try {
                 return new 
SnapshotPartitionsVerifyHandler(kctx.cache().context()).invoke(hctx);
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotHandlerContext.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotHandlerContext.java
index 6f84c968184..90516ac1374 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotHandlerContext.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotHandlerContext.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.cache.persistence.snapshot;
 
 import java.util.Collection;
+import java.util.function.BiConsumer;
 import org.apache.ignite.cluster.ClusterNode;
 import 
org.apache.ignite.internal.processors.cache.persistence.filename.SnapshotFileTree;
 import org.jetbrains.annotations.Nullable;
@@ -44,6 +45,12 @@ public class SnapshotHandlerContext {
     /** If {@code true}, calculates and compares partition hashes. Otherwise, 
only basic snapshot validation is launched.*/
     private final boolean check;
 
+    /** Consumer of total work units per handler type. */
+    private final BiConsumer<Class<? extends SnapshotHandler<?>>, Integer> 
totalCnsmr;
+
+    /** Consumer of processed work unit per handler type. */
+    private final BiConsumer<Class<? extends SnapshotHandler<?>>, Integer> 
progressCnsmr;
+
     /**
      * @param metadata Snapshot metadata.
      * @param grps The names of the cache groups on which the operation is 
performed.
@@ -52,6 +59,8 @@ public class SnapshotHandlerContext {
      * @param sft Snapshot file tree.
      * @param streamerWrn {@code True} if concurrent streaming updates 
occurred during snapshot operation.
      * @param check If {@code true}, calculates and compares partition hashes. 
Otherwise, only basic snapshot validation is launched.
+     * @param totalCnsmr Consumer of total work units per handler type.
+     * @param progressCnsmr Consumer of processed work unit per handler type.
      */
     public SnapshotHandlerContext(
         SnapshotMetadata metadata,
@@ -59,7 +68,9 @@ public class SnapshotHandlerContext {
         ClusterNode locNode,
         SnapshotFileTree sft,
         boolean streamerWrn,
-        boolean check
+        boolean check,
+        @Nullable BiConsumer<Class<? extends SnapshotHandler<?>>, Integer> 
totalCnsmr,
+        @Nullable BiConsumer<Class<? extends SnapshotHandler<?>>, Integer> 
progressCnsmr
     ) {
         this.metadata = metadata;
         this.grps = grps;
@@ -67,6 +78,8 @@ public class SnapshotHandlerContext {
         this.sft = sft;
         this.streamerWrn = streamerWrn;
         this.check = check;
+        this.totalCnsmr = totalCnsmr;
+        this.progressCnsmr = progressCnsmr;
     }
 
     /**
@@ -109,4 +122,14 @@ public class SnapshotHandlerContext {
     public boolean check() {
         return check;
     }
+
+    /** @return Consumer of total work units per handler type. */
+    public @Nullable BiConsumer<Class<? extends SnapshotHandler<?>>, Integer> 
totalConsumer() {
+        return totalCnsmr;
+    }
+
+    /** @return Consumer of processed work unit per handler type. */
+    public @Nullable BiConsumer<Class<? extends SnapshotHandler<?>>, Integer> 
progressConsumer() {
+        return progressCnsmr;
+    }
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotHandlerRestoreTask.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotHandlerRestoreTask.java
deleted file mode 100644
index dccb0bc7c84..00000000000
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotHandlerRestoreTask.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.persistence.snapshot;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Map;
-import java.util.function.Supplier;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.internal.IgniteEx;
-import 
org.apache.ignite.internal.processors.cache.persistence.filename.SnapshotFileTree;
-
-/**
- * Snapshot restore operation handling task.
- */
-public class SnapshotHandlerRestoreTask implements Supplier<Map<String, 
SnapshotHandlerResult<Object>>> {
-    /** */
-    private final IgniteEx ignite;
-
-    /** */
-    private final SnapshotFileTree sft;
-
-    /** */
-    private final Collection<String> rqGrps;
-
-    /** */
-    private final boolean check;
-
-    /** */
-    SnapshotHandlerRestoreTask(
-        IgniteEx ignite,
-        IgniteLogger log,
-        SnapshotFileTree sft,
-        Collection<String> grps,
-        boolean check
-    ) {
-        this.ignite = ignite;
-        this.sft = sft;
-        this.rqGrps = grps;
-        this.check = check;
-    }
-
-    /** */
-    @Override public Map<String, SnapshotHandlerResult<Object>> get() {
-        try {
-            IgniteSnapshotManager snpMgr = 
ignite.context().cache().context().snapshotMgr();
-            SnapshotMetadata meta = snpMgr.readSnapshotMetadata(sft.meta());
-
-            return snpMgr.handlers().invokeAll(SnapshotHandlerType.RESTORE,
-                new SnapshotHandlerContext(meta, rqGrps, ignite.localNode(), 
sft, false, check));
-        }
-        catch (IgniteCheckedException | IOException e) {
-            throw new IgniteException(e);
-        }
-    }
-}
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyHandler.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyHandler.java
index 8619b53fc02..597e95cc58c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyHandler.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyHandler.java
@@ -31,6 +31,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import org.apache.ignite.IgniteCheckedException;
@@ -158,6 +159,12 @@ public class SnapshotPartitionsVerifyHandler implements 
SnapshotHandler<Map<Part
                     continue;
 
                 partFiles.add(part);
+
+                if (opCtx.totalConsumer() != null)
+                    opCtx.totalConsumer().accept(getClass(), 1);
+
+                if (!opCtx.check() && opCtx.progressConsumer() != null)
+                    opCtx.progressConsumer().accept(getClass(), partId);
             }
 
             if (parts.isEmpty())
@@ -175,14 +182,16 @@ public class SnapshotPartitionsVerifyHandler implements 
SnapshotHandler<Map<Part
         }
 
         if (!opCtx.check()) {
-            log.info("Snapshot data integrity check skipped [snpName=" + 
meta.snapshotName() + ']');
+            if (log.isInfoEnabled())
+                log.info("Snapshot data integrity check skipped [snpName=" + 
meta.snapshotName() + ']');
 
             return Collections.emptyMap();
         }
 
         return meta.dump()
             ? checkDumpFiles(opCtx, partFiles)
-            : checkSnapshotFiles(opCtx.snapshotFileTree(), grpDirs, meta, 
partFiles, isPunchHoleEnabled(opCtx, grpDirs.keySet()));
+            : checkSnapshotFiles(opCtx.snapshotFileTree(), grpDirs, meta, 
partFiles, isPunchHoleEnabled(opCtx, grpDirs.keySet()),
+                opCtx.progressConsumer() == null ? null : partId -> 
opCtx.progressConsumer().accept(getClass(), partId));
     }
 
     /** */
@@ -191,9 +200,11 @@ public class SnapshotPartitionsVerifyHandler implements 
SnapshotHandler<Map<Part
         Map<Integer, List<File>> grpDirs,
         SnapshotMetadata meta,
         Set<File> partFiles,
-        boolean punchHoleEnabled
+        boolean punchHoleEnabled,
+        @Nullable Consumer<Integer> checkedCnsmr
     ) throws IgniteCheckedException {
-        Map<PartitionKey, PartitionHashRecord> res = new ConcurrentHashMap<>();
+        Map<PartitionKey, PartitionHashRecord> res = new 
ConcurrentHashMap<>(partFiles.size(), 1.0f);
+
         ThreadLocal<ByteBuffer> buff = ThreadLocal.withInitial(() -> 
ByteBuffer.allocateDirect(meta.pageSize())
             .order(ByteOrder.nativeOrder()));
 
@@ -313,6 +324,10 @@ public class SnapshotPartitionsVerifyHandler implements 
SnapshotHandler<Map<Part
                     catch (IOException e) {
                         throw new IgniteCheckedException(e);
                     }
+                    finally {
+                        if (checkedCnsmr != null)
+                            checkedCnsmr.accept(partId);
+                    }
 
                     return null;
                 }
diff --git a/modules/core/src/main/resources/META-INF/classnames.properties 
b/modules/core/src/main/resources/META-INF/classnames.properties
index 5f22afc2a72..cb6e27862f6 100644
--- a/modules/core/src/main/resources/META-INF/classnames.properties
+++ b/modules/core/src/main/resources/META-INF/classnames.properties
@@ -1305,8 +1305,6 @@ 
org.apache.ignite.internal.processors.cache.persistence.snapshot.IncrementalSnap
 
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotDiscoveryMessage
 
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotFilesFailureMessage
 
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotFilesRequestMessage
-org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotHandlerRestoreTask
-org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotHandlerRestoreTask$SnapshotHandlerRestoreJob
 
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotHandlerResult
 
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotHandlerType
 
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotMetadata
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java
index 715b771d611..b74dfddcae4 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java
@@ -88,6 +88,7 @@ import 
org.apache.ignite.internal.processors.marshaller.MappedName;
 import 
org.apache.ignite.internal.processors.resource.GridSpringResourceContext;
 import org.apache.ignite.internal.util.future.IgniteFutureImpl;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.G;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
@@ -180,19 +181,22 @@ public abstract class AbstractSnapshotSelfTest extends 
GridCommonAbstractTest {
     /** Parameters. */
     @Parameterized.Parameters(name = "encryption={0}, onlyPrimay={1}")
     public static Collection<Object[]> params() {
-        boolean[] encVals = DISK_PAGE_COMPRESSION != 
DiskPageCompression.DISABLED
-            ? new boolean[] {false}
-            : new boolean[] {false, true};
-
         List<Object[]> res = new ArrayList<>();
 
-        for (boolean enc: encVals)
-            for (boolean onlyPrimary: new boolean[] {true, false})
-                res.add(new Object[] { enc, onlyPrimary});
+        for (boolean enc : encryptionParameters())
+            for (boolean onlyPrimary : new boolean[] {true, false})
+                res.add(new Object[] {enc, onlyPrimary});
 
         return res;
     }
 
+    /** */
+    protected static Collection<Boolean> encryptionParameters() {
+        return DISK_PAGE_COMPRESSION != DiskPageCompression.DISABLED
+            ? F.asList(false)
+            : F.asList(false, true);
+    }
+
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckTest.java
index 93f175a15a0..306d44224a4 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckTest.java
@@ -38,6 +38,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 import java.util.stream.Stream;
 import javax.cache.expiry.CreatedExpiryPolicy;
 import javax.cache.expiry.Duration;
@@ -47,6 +48,7 @@ import org.apache.ignite.IgniteDataStreamer;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.binary.BinaryObject;
 import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cluster.BaselineNode;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.compute.ComputeJobResult;
 import org.apache.ignite.configuration.CacheConfiguration;
@@ -76,6 +78,7 @@ import 
org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabase
 import 
org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointListener;
 import 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStore;
 import 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
+import 
org.apache.ignite.internal.processors.cache.persistence.filename.SnapshotFileTree;
 import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
 import 
org.apache.ignite.internal.processors.cache.persistence.tree.io.PagePartitionMetaIO;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.crc.IgniteDataIntegrityViolationException;
@@ -94,21 +97,31 @@ import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.metric.MetricRegistry;
+import org.apache.ignite.spi.metric.BooleanMetric;
+import org.apache.ignite.spi.metric.IntMetric;
+import org.apache.ignite.spi.metric.LongMetric;
+import org.apache.ignite.testframework.ListeningTestLogger;
 import org.jetbrains.annotations.Nullable;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.runners.Parameterized;
 
 import static org.apache.ignite.cluster.ClusterState.ACTIVE;
+import static 
org.apache.ignite.configuration.IgniteConfiguration.DFLT_SNAPSHOT_THREAD_POOL_SIZE;
 import static 
org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.NONE;
 import static 
org.apache.ignite.internal.processors.cache.GridCacheUtils.TTL_ETERNAL;
 import static 
org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId.getTypeByPartId;
+import static 
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotCheckProcess.SNAPSHOT_CHECK_METRIC;
 import static org.apache.ignite.internal.processors.dr.GridDrType.DR_NONE;
+import static 
org.apache.ignite.internal.processors.metric.impl.MetricUtils.metricName;
 import static 
org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.CHECK_SNAPSHOT_METAS;
 import static 
org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.CHECK_SNAPSHOT_PARTS;
 import static 
org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.RESTORE_CACHE_GROUP_SNAPSHOT_START;
 import static org.apache.ignite.testframework.GridTestUtils.assertContains;
 import static org.apache.ignite.testframework.GridTestUtils.assertNotContains;
 import static 
org.apache.ignite.testframework.GridTestUtils.assertThrowsAnyCause;
+import static org.apache.ignite.testframework.GridTestUtils.cartesianProduct;
 import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
 import static org.junit.Assume.assumeFalse;
 
@@ -116,6 +129,9 @@ import static org.junit.Assume.assumeFalse;
  * Cluster-wide snapshot check procedure tests.
  */
 public class IgniteClusterSnapshotCheckTest extends AbstractSnapshotSelfTest {
+    /** The default cache partitions number. */
+    protected static final int CACHE_PARTS_CNT = 32;
+
     /** Map of intermediate compute task results collected prior performing 
reduce operation on them. */
     private final Map<Class<?>, Map<PartitionKey, List<PartitionHashRecord>>> 
jobResults = new ConcurrentHashMap<>();
 
@@ -125,12 +141,44 @@ public class IgniteClusterSnapshotCheckTest extends 
AbstractSnapshotSelfTest {
     /** Optional cache name to be created on demand. */
     private static final String OPTIONAL_CACHE_NAME = "CacheName";
 
+    /** */
+    private ListeningTestLogger listeningLog;
+
+    /** */
+    @Parameterized.Parameter(2)
+    public int snpThrdPoolSz;
+
+    /** Parameters. */
+    @Parameterized.Parameters(name = "encryption={0}, onlyPrimary={1}, 
snpThrdPoolSz={2}")
+    public static Collection<Object[]> params() {
+        return cartesianProduct(
+            encryptionParameters(),
+            F.asList(false, true),
+            F.asList(DFLT_SNAPSHOT_THREAD_POOL_SIZE, 1)
+        );
+    }
+
     /** Cleanup data of task execution results if need. */
     @Before
     public void beforeCheck() {
         jobResults.clear();
     }
 
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = 
super.getConfiguration(igniteInstanceName).setSnapshotThreadPoolSize(snpThrdPoolSz);
+
+        if (listeningLog != null)
+            cfg.setGridLogger(listeningLog);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected <K, V> CacheConfiguration<K, V> 
txCacheConfig(CacheConfiguration<K, V> ccfg) {
+        return super.txCacheConfig(ccfg).setAffinity(new 
RendezvousAffinityFunction(false, CACHE_PARTS_CNT));
+    }
+
     /** @throws Exception If fails. */
     @Test
     public void testClusterSnapshotCheck() throws Exception {
@@ -553,6 +601,279 @@ public class IgniteClusterSnapshotCheckTest extends 
AbstractSnapshotSelfTest {
         assertNotContains(log, b.toString(), "Failed to read page (CRC 
validation failed)");
     }
 
+    /** */
+    @Test
+    public void testSnapshotCheckMetricsEntireTopology() throws Exception {
+        doTestSnapshotMetricsAllRuns(true, false);
+    }
+
+    /** */
+    @Test
+    public void testSnapshotCheckMetricsLesserTopology() throws Exception {
+        doTestSnapshotMetricsAllRuns(false, false);
+    }
+
+    /** */
+    @Test
+    public void testIncrementalSnapshotCheckMetrics() throws Exception {
+        assumeFalse(encryption);
+
+        doTestSnapshotMetricsAllRuns(true, true);
+    }
+
+    /** */
+    private void doTestSnapshotMetricsAllRuns(boolean entireTop, boolean 
incremental) throws Exception {
+        assert entireTop || !incremental : "Incremental snapshot supports only 
entire topology";
+
+        listeningLog = new ListeningTestLogger(log);
+
+        prepareGridsAndSnapshot(3, 2, 1, false);
+
+        if (incremental) {
+            try (IgniteDataStreamer<Integer, Integer> ds = 
grid(0).dataStreamer(DEFAULT_CACHE_NAME)) {
+                for (int i = 0; i < 100; ++i)
+                    ds.addData(i, i);
+            }
+
+            grid(1).snapshot().createIncrementalSnapshot(SNAPSHOT_NAME).get();
+        }
+
+        grid(1).destroyCache(DEFAULT_CACHE_NAME);
+
+        awaitPartitionMapExchange();
+
+        if (!entireTop)
+            stopGrid(0);
+
+        // From a baseline, non-baseline and client (0,1 - servers, 2 - 
baseline, 3 - client).
+        for (int initiator : F.asList(1, 2, 3)) {
+            // Check partitions hashes.
+            for (boolean fullCheck : F.asList(true, false)) {
+                // Invoke all the snapshot handlers.
+                for (boolean allHandlers : F.asList(false, true)) {
+                    // Restore snapshot instead of just checking.
+                    for (boolean restore : F.asList(false, true)) {
+                        // The all-handlers-task currently fails on client 
because it has no snapshot handlers. Thus,
+                        // a 'handlers mismatch configuration' error occures.
+                        // Also, snapshot restoration isn't allowed from 
clients.
+                        if (initiator > 2 && (allHandlers || restore))
+                            continue;
+                        // On the restoration, all the snapshot checking 
handlers are always invoked for a non-incremental snapshot.
+                        // Incremental snapshot doesn't support snapshot 
handlers and do not check partitions.
+                        if (restore && !allHandlers || incremental && 
(allHandlers || fullCheck))
+                            continue;
+
+                        if (log.isInfoEnabled()) {
+                            log.info("Testing snapshot metrics with the 
parameters: entireTop=" + entireTop
+                                + "initiator=" + initiator + ", fullCheck=" + 
fullCheck + ", allHandlers=" + allHandlers
+                                + ", restore=" + restore + ", incremental=" + 
incremental);
+                        }
+
+                        if (entireTop)
+                            doTestSnapshotCheckMetricsCertainRun(initiator, 
incremental, fullCheck, allHandlers, restore);
+                        else
+                            doTestSnapshotCheckMetricsCertainRun(initiator, 
incremental, fullCheck, allHandlers, restore, 0);
+
+                        if (restore) {
+                            grid(1).destroyCache(DEFAULT_CACHE_NAME);
+
+                            awaitPartitionMapExchange();
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    /** */
+    private void doTestSnapshotCheckMetricsCertainRun(
+        int initiator,
+        boolean incremental,
+        boolean fullCheck,
+        boolean allHandlers,
+        boolean restore,
+        int... stoppedNodes
+    ) throws Exception {
+        assert !restore || allHandlers;
+        assert !incremental || (!allHandlers && stoppedNodes.length == 0 && 
!fullCheck);
+
+        Set<Integer> stoppedNodes0 = Collections.emptySet();
+        int coordId = 0;
+
+        if (stoppedNodes.length > 0) {
+            stoppedNodes0 = 
IntStream.of(stoppedNodes).boxed().collect(Collectors.toSet());
+
+            for (int i = 0; i < G.allGrids().size(); ++i) {
+                if (stoppedNodes0.contains(i))
+                    continue;
+
+                coordId = i;
+
+                break;
+            }
+        }
+
+        assert U.isLocalNodeCoordinator(grid(coordId).context().discovery());
+
+        Set<Object> baseline = 
grid(coordId).cluster().currentBaselineTopology().stream().map(BaselineNode::consistentId)
+            .collect(Collectors.toSet());
+
+        IgniteInternalFuture<SnapshotPartitionsVerifyResult> fut1 = null;
+        IgniteFuture<?> fut2 = null;
+
+        try {
+            discoSpi(grid(coordId)).block(msg -> msg instanceof FullMessage
+                && ((FullMessage<?>)msg).type() == 
CHECK_SNAPSHOT_METAS.ordinal());
+
+            long mills = System.currentTimeMillis();
+
+            if (restore) {
+                fut2 = snp(grid(initiator)).restoreSnapshot(SNAPSHOT_NAME, 
null, null, incremental ? 1 : 0,
+                    fullCheck);
+
+                assertTrue(!fut2.isDone());
+            }
+            else {
+                fut1 = snp(grid(initiator)).checkSnapshot(SNAPSHOT_NAME, null, 
null, allHandlers, incremental ? 1 : 0,
+                    fullCheck);
+
+                assertTrue(!fut1.isDone());
+            }
+
+            discoSpi(grid(coordId)).waitBlocked(getTestTimeout());
+
+            for (int i = 0; i < G.allGrids().size(); ++i) {
+                if (stoppedNodes0.contains(i))
+                    continue;
+
+                MetricRegistry mreg = 
grid(i).context().metric().registry(metricName(SNAPSHOT_CHECK_METRIC, 
SNAPSHOT_NAME));
+
+                // Clients and non-baseline nodes doesn't have data and 
doesn't run snapshots.
+                if (!baseline.contains(grid(i).localNode().consistentId())) {
+                    assertFalse(mreg.iterator().hasNext());
+
+                    continue;
+                }
+
+                assertTrue(mreg.iterator().hasNext());
+
+                assertTrue(mreg.<LongMetric>findMetric("startTime").value() > 
mills);
+
+                if (incremental) {
+                    assertEquals(1, 
mreg.<IntMetric>findMetric("incrementIndex").value());
+
+                    // Full-snapshot metrics aren't expected.
+                    
assertNull(mreg.<BooleanMetric>findMetric("checkPartitions"));
+                    
assertNull(mreg.<BooleanMetric>findMetric("processedPartitions"));
+                    
assertNull(mreg.<BooleanMetric>findMetric("snapshotPartsToProcess"));
+                    
assertNull(mreg.<BooleanMetric>findMetric("processedSnapshotParts"));
+                    
assertNull(mreg.<BooleanMetric>findMetric("totalPartitions"));
+
+                    // Initial metrics, aren't set yet.
+                    assertEquals(-1, 
mreg.<IntMetric>findMetric("totalWalSegments").value());
+                    assertEquals(0, 
mreg.<IntMetric>findMetric("processedWalSegments").value());
+                }
+                else {
+                    assertNull(mreg.findMetric("incrementIndex"));
+                    assertEquals(fullCheck, 
mreg.<BooleanMetric>findMetric("checkPartitions").value());
+
+                    // Incremental snapshot metrics aren't expected.
+                    assertNull(mreg.<IntMetric>findMetric("totalWalSegments"));
+                    
assertNull(mreg.<IntMetric>findMetric("processedWalSegments"));
+
+                    // Initial metrics, aren't set yet.
+                    assertEquals(0, 
mreg.<IntMetric>findMetric("processedPartitions").value());
+                    assertEquals(-1, 
mreg.<IntMetric>findMetric("snapshotPartsToProcess").value());
+                    assertEquals(0, 
mreg.<IntMetric>findMetric("processedSnapshotParts").value());
+                    assertEquals(-1, 
mreg.<IntMetric>findMetric("totalPartitions").value());
+                }
+            }
+
+            discoSpi(grid(coordId)).blockNextAndRelease(msg -> msg instanceof 
FullMessage
+                && ((FullMessage<?>)msg).type() == 
CHECK_SNAPSHOT_PARTS.ordinal());
+
+            discoSpi(grid(coordId)).waitBlocked(getTestTimeout());
+
+            int totalSnpPartsDetected = 0;
+            boolean moreThatOneSnpPartProcessed = false;
+
+            for (int i = 0; i < G.allGrids().size(); ++i) {
+                if (stoppedNodes0.contains(i))
+                    continue;
+
+                MetricRegistry mreg = 
grid(i).context().metric().registry(metricName(SNAPSHOT_CHECK_METRIC, 
SNAPSHOT_NAME));
+
+                // Clients and non-baseline nodes doesn't have data and 
doesn't run snapshots.
+                if (!baseline.contains(grid(i).localNode().consistentId())) {
+                    assertFalse(mreg.iterator().hasNext());
+
+                    continue;
+                }
+
+                assertTrue(mreg.iterator().hasNext());
+
+                if (incremental) {
+                    int walSegmentsDetected = 
mreg.<IntMetric>findMetric("totalWalSegments").value();
+
+                    assertTrue(walSegmentsDetected > 0);
+                    assertEquals(walSegmentsDetected, 
mreg.<IntMetric>findMetric("processedWalSegments").value());
+                }
+                else {
+                    int snpPartsDetected = 
mreg.<IntMetric>findMetric("processedSnapshotParts").value();
+
+                    assertTrue(snpPartsDetected > 0);
+
+                    totalSnpPartsDetected += snpPartsDetected;
+
+                    assertEquals(snpPartsDetected, 
mreg.<IntMetric>findMetric("snapshotPartsToProcess").value());
+
+                    if (snpPartsDetected > 1)
+                        moreThatOneSnpPartProcessed = true;
+
+                    SnapshotFileTree sft = snapshotFileTree(grid(i), 
SNAPSHOT_NAME);
+
+                    long partsFilesCnt = Stream.of(new File(sft.nodeStorage(), 
"cache-" + DEFAULT_CACHE_NAME).list())
+                        .filter(fn -> fn.endsWith(".bin")).count();
+
+                    // Partitions + metastorage.
+                    
assertEquals(mreg.<IntMetric>findMetric("totalPartitions").value(), 
snpPartsDetected * (partsFilesCnt + 1));
+
+                    
assertEquals(mreg.<IntMetric>findMetric("totalPartitions").value(),
+                        
mreg.<IntMetric>findMetric("processedPartitions").value());
+                }
+            }
+
+            if (!incremental) {
+                assertEquals(baseline.size(), totalSnpPartsDetected);
+                assertTrue(stoppedNodes0.isEmpty() || 
moreThatOneSnpPartProcessed);
+            }
+        }
+        finally {
+            discoSpi(grid(coordId)).unblock();
+
+            if (fut1 != null) {
+                assert fut2 == null;
+
+                fut1.get(getTestTimeout());
+            }
+            else
+                fut2.get(getTestTimeout());
+        }
+
+        for (int i = 0; i < G.allGrids().size(); ++i) {
+            if (stoppedNodes0.contains(i))
+                continue;
+
+            int i0 = i;
+
+            waitForCondition(() -> {
+                MetricRegistry mreg = 
grid(i0).context().metric().registry(metricName(SNAPSHOT_CHECK_METRIC, 
SNAPSHOT_NAME));
+
+                return !mreg.iterator().hasNext();
+            }, getTestTimeout());
+        }
+    }
+
     /** */
     @Test
     public void testCheckFromLesserTopology() throws Exception {
@@ -1074,7 +1395,8 @@ public class IgniteClusterSnapshotCheckTest extends 
AbstractSnapshotSelfTest {
         }
 
         try (IgniteDataStreamer<Integer, Integer> ds = 
grid(0).dataStreamer(DEFAULT_CACHE_NAME)) {
-            for (int i = 0; i < 100; ++i)
+            // Ensure all the partitions are created: several records per 
partition.
+            for (int i = 0; i < CACHE_PARTS_CNT * 4; ++i)
                 ds.addData(i, i);
         }
 

Reply via email to