This is an automated email from the ASF dual-hosted git repository.

timoninmaxim pushed a commit to branch IGNITE-22662__snapshot_refactoring
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to 
refs/heads/IGNITE-22662__snapshot_refactoring by this push:
     new 0abc9bae944 IGNITE-22662 : Incremental snapshot check as DP (#11495)
0abc9bae944 is described below

commit 0abc9bae944f6e510b048ef1d9c7eb2c8e79cd3a
Author: Vladimir Steshin <[email protected]>
AuthorDate: Wed Oct 9 16:08:16 2024 +0300

    IGNITE-22662 : Incremental snapshot check as DP (#11495)
---
 .../snapshot/IgniteSnapshotManager.java            |  40 +-
 ...lt.java => IncrementalSnapshotCheckResult.java} |  30 +-
 .../IncrementalSnapshotVerificationTask.java       | 426 ------------------
 .../persistence/snapshot/SnapshotCheckProcess.java | 147 ++++---
 .../snapshot/SnapshotCheckProcessRequest.java      |  12 +
 .../persistence/snapshot/SnapshotChecker.java      | 483 ++++++++++++++++++---
 .../snapshot/SnapshotMetadataVerificationTask.java | 127 +-----
 .../processors/cache/verify/IdleVerifyUtility.java |  10 +-
 .../snapshot/IgniteClusterSnapshotCheckTest.java   | 130 +++++-
 9 files changed, 697 insertions(+), 708 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
index bb78b31bbde..2d16637af6e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
@@ -17,10 +17,12 @@
 
 package org.apache.ignite.internal.processors.cache.persistence.snapshot;
 
+import java.io.BufferedInputStream;
 import java.io.Closeable;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.Serializable;
 import java.nio.ByteBuffer;
@@ -469,7 +471,7 @@ public class IgniteSnapshotManager extends 
GridCacheSharedManagerAdapter
     private final boolean sequentialWrite =
         IgniteSystemProperties.getBoolean(IGNITE_SNAPSHOT_SEQUENTIAL_WRITE, 
DFLT_IGNITE_SNAPSHOT_SEQUENTIAL_WRITE);
 
-    /** Snapshot validator. */
+    /** Snapshot checker. */
     private final SnapshotChecker snpChecker;
 
     /**
@@ -495,7 +497,7 @@ public class IgniteSnapshotManager extends 
GridCacheSharedManagerAdapter
         // Manage remote snapshots.
         snpRmtMgr = new SequentialRemoteSnapshotManager();
 
-        snpChecker = new SnapshotChecker(ctx, marsh, 
ctx.pools().getSnapshotExecutorService(), U.resolveClassLoader(ctx.config()));
+        snpChecker = new SnapshotChecker(ctx, marsh, 
ctx.pools().getSnapshotExecutorService());
     }
 
     /**
@@ -1902,8 +1904,8 @@ public class IgniteSnapshotManager extends 
GridCacheSharedManagerAdapter
                 ", incIdx=" + incIdx + ", grps=" + grps + ", validateParts=" + 
check + ']');
         }
 
-        if (check && incIdx < 1)
-            return checkSnpProc.start(name, snpPath, grps, 
includeCustomHandlers);
+        if (check && (incIdx < 1 || !includeCustomHandlers))
+            return checkSnpProc.start(name, snpPath, grps, incIdx, 
includeCustomHandlers);
 
         GridFutureAdapter<SnapshotPartitionsVerifyTaskResult> res = new 
GridFutureAdapter<>();
 
@@ -1926,12 +1928,9 @@ public class IgniteSnapshotManager extends 
GridCacheSharedManagerAdapter
             if (f0.error() == null && F.isEmpty(metasRes.exceptions())) {
                 Map<ClusterNode, List<SnapshotMetadata>> metas = 
metasRes.meta();
 
-                Class<? extends AbstractSnapshotVerificationTask> cls;
-
-                if (includeCustomHandlers)
-                    cls = SnapshotHandlerRestoreTask.class;
-                else
-                    cls = incIdx > 0 ? 
IncrementalSnapshotVerificationTask.class : SnapshotPartitionsVerifyTask.class;
+                Class<? extends AbstractSnapshotVerificationTask> cls = 
includeCustomHandlers
+                    ? SnapshotHandlerRestoreTask.class
+                    : SnapshotPartitionsVerifyTask.class;
 
                 kctx0.task().execute(
                         cls,
@@ -1994,8 +1993,18 @@ public class IgniteSnapshotManager extends 
GridCacheSharedManagerAdapter
      * @param smf File denoting to snapshot metafile.
      * @return Snapshot metadata instance.
      */
-    private SnapshotMetadata readSnapshotMetadata(File smf) throws 
IgniteCheckedException, IOException {
-        return snpChecker.readSnapshotMetadata(smf);
+    SnapshotMetadata readSnapshotMetadata(File smf) throws 
IgniteCheckedException, IOException {
+        SnapshotMetadata meta = readFromFile(smf);
+
+        String smfName = smf.getName().substring(0, smf.getName().length() - 
SNAPSHOT_METAFILE_EXT.length());
+
+        if (!U.maskForFileName(meta.consistentId()).equals(smfName)) {
+            throw new IgniteException(
+                "Error reading snapshot metadata [smfName=" + smfName + ", 
consId=" + U.maskForFileName(meta.consistentId())
+            );
+        }
+
+        return meta;
     }
 
     /**
@@ -2004,7 +2013,12 @@ public class IgniteSnapshotManager extends 
GridCacheSharedManagerAdapter
      * @param <T> Type of metadata.
      */
     public <T> T readFromFile(File smf) throws IgniteCheckedException, 
IOException {
-        return snpChecker.readFromFile(smf);
+        if (!smf.exists())
+            throw new IgniteCheckedException("Snapshot metafile cannot be read 
due to it doesn't exist: " + smf);
+
+        try (InputStream in = new 
BufferedInputStream(Files.newInputStream(smf.toPath()))) {
+            return marsh.unmarshal(in, 
U.resolveClassLoader(cctx.gridConfig()));
+        }
     }
 
     /**
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotVerificationTaskResult.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotCheckResult.java
similarity index 71%
rename from 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotVerificationTaskResult.java
rename to 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotCheckResult.java
index 6dcd1515160..e50a9a79b9b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotVerificationTaskResult.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotCheckResult.java
@@ -17,21 +17,17 @@
 
 package org.apache.ignite.internal.processors.cache.persistence.snapshot;
 
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
+import java.io.Serializable;
 import java.util.Collection;
 import java.util.Map;
-import org.apache.ignite.internal.dto.IgniteDataTransferObject;
 import org.apache.ignite.internal.management.cache.PartitionKeyV2;
 import org.apache.ignite.internal.pagemem.wal.record.DataEntry;
 import 
org.apache.ignite.internal.processors.cache.verify.PartitionHashRecordV2;
 import 
org.apache.ignite.internal.processors.cache.verify.TransactionsHashRecord;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.util.typedef.internal.U;
 
-/** Represents single job result for {@link 
IncrementalSnapshotVerificationTask}. */
-class IncrementalSnapshotVerificationTaskResult extends 
IgniteDataTransferObject {
+/** */
+class IncrementalSnapshotCheckResult implements Serializable {
     /** */
     private static final long serialVersionUID = 0L;
 
@@ -51,12 +47,12 @@ class IncrementalSnapshotVerificationTaskResult extends 
IgniteDataTransferObject
     private Collection<Exception> exceptions;
 
     /** */
-    public IncrementalSnapshotVerificationTaskResult() {
+    public IncrementalSnapshotCheckResult() {
         // No-op.
     }
 
     /** */
-    IncrementalSnapshotVerificationTaskResult(
+    IncrementalSnapshotCheckResult(
         Map<Object, TransactionsHashRecord> txHashRes,
         Map<PartitionKeyV2, PartitionHashRecordV2> partHashRes,
         Collection<GridCacheVersion> partiallyCommittedTxs,
@@ -87,20 +83,4 @@ class IncrementalSnapshotVerificationTaskResult extends 
IgniteDataTransferObject
     public Collection<Exception> exceptions() {
         return exceptions;
     }
-
-    /** {@inheritDoc} */
-    @Override protected void writeExternalData(ObjectOutput out) throws 
IOException {
-        U.writeMap(out, txHashRes);
-        U.writeMap(out, partHashRes);
-        U.writeCollection(out, partiallyCommittedTxs);
-        U.writeCollection(out, exceptions);
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void readExternalData(byte protoVer, ObjectInput in) 
throws IOException, ClassNotFoundException {
-        txHashRes = U.readMap(in);
-        partHashRes = U.readMap(in);
-        partiallyCommittedTxs = U.readCollection(in);
-        exceptions = U.readCollection(in);
-    }
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotVerificationTask.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotVerificationTask.java
deleted file mode 100644
index a6779e8e7e7..00000000000
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotVerificationTask.java
+++ /dev/null
@@ -1,426 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.persistence.snapshot;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.LongAdder;
-import java.util.function.BiConsumer;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.cache.CacheAtomicityMode;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.compute.ComputeJobResult;
-import org.apache.ignite.internal.management.cache.IdleVerifyResultV2;
-import org.apache.ignite.internal.management.cache.PartitionKeyV2;
-import org.apache.ignite.internal.processors.cache.GridCacheOperation;
-import org.apache.ignite.internal.processors.cache.GridLocalConfigManager;
-import org.apache.ignite.internal.processors.cache.StoredCacheData;
-import 
org.apache.ignite.internal.processors.cache.verify.IdleVerifyUtility.VerifyPartitionContext;
-import 
org.apache.ignite.internal.processors.cache.verify.PartitionHashRecordV2;
-import 
org.apache.ignite.internal.processors.cache.verify.TransactionsHashRecord;
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.processors.cluster.BaselineTopology;
-import org.apache.ignite.internal.processors.task.GridInternal;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.CU;
-import org.apache.ignite.marshaller.MarshallerUtils;
-import org.apache.ignite.transactions.TransactionState;
-import org.jetbrains.annotations.Nullable;
-
-import static 
org.apache.ignite.internal.managers.discovery.ConsistentIdMapper.ALL_NODES;
-import static 
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.databaseRelativePath;
-
-/** */
-@GridInternal
-public class IncrementalSnapshotVerificationTask extends 
AbstractSnapshotVerificationTask {
-    /** Serial version uid. */
-    private static final long serialVersionUID = 0L;
-
-    /** {@inheritDoc} */
-    @Override public SnapshotPartitionsVerifyTaskResult 
reduce(List<ComputeJobResult> results) throws IgniteException {
-        Map<Object, Map<Object, TransactionsHashRecord>> nodeTxHashMap = new 
HashMap<>();
-
-        List<List<TransactionsHashRecord>> txHashConflicts = new ArrayList<>();
-        Map<PartitionKeyV2, List<PartitionHashRecordV2>> partHashes = new 
HashMap<>();
-        Map<ClusterNode, Collection<GridCacheVersion>> partiallyCommittedTxs = 
new HashMap<>();
-
-        Map<ClusterNode, Exception> errors = new HashMap<>();
-
-        for (ComputeJobResult nodeRes: results) {
-            if (nodeRes.getException() != null) {
-                errors.put(nodeRes.getNode(), nodeRes.getException());
-
-                continue;
-            }
-
-            IncrementalSnapshotVerificationTaskResult res = nodeRes.getData();
-
-            if (!F.isEmpty(res.exceptions())) {
-                errors.put(nodeRes.getNode(), F.first(res.exceptions()));
-
-                continue;
-            }
-
-            if (!F.isEmpty(res.partiallyCommittedTxs()))
-                partiallyCommittedTxs.put(nodeRes.getNode(), 
res.partiallyCommittedTxs());
-
-            for (Map.Entry<PartitionKeyV2, PartitionHashRecordV2> entry: 
res.partHashRes().entrySet())
-                partHashes.computeIfAbsent(entry.getKey(), v -> new 
ArrayList<>()).add(entry.getValue());
-
-            if (log.isDebugEnabled())
-                log.debug("Handle VerifyIncrementalSnapshotJob result [node=" 
+ nodeRes.getNode() + ", taskRes=" + res + ']');
-
-            nodeTxHashMap.put(nodeRes.getNode().consistentId(), 
res.txHashRes());
-
-            Iterator<Map.Entry<Object, TransactionsHashRecord>> resIt = 
res.txHashRes().entrySet().iterator();
-
-            while (resIt.hasNext()) {
-                Map.Entry<Object, TransactionsHashRecord> nodeTxHash = 
resIt.next();
-
-                Map<Object, TransactionsHashRecord> prevNodeTxHash = 
nodeTxHashMap.get(nodeTxHash.getKey());
-
-                if (prevNodeTxHash != null) {
-                    TransactionsHashRecord hash = nodeTxHash.getValue();
-                    TransactionsHashRecord prevHash = 
prevNodeTxHash.remove(hash.localConsistentId());
-
-                    if (prevHash == null || prevHash.transactionHash() != 
hash.transactionHash())
-                        txHashConflicts.add(F.asList(hash, prevHash));
-
-                    resIt.remove();
-                }
-            }
-        }
-
-        // Add all missed pairs to conflicts.
-        nodeTxHashMap.values().stream()
-            .flatMap(e -> e.values().stream())
-            .forEach(e -> txHashConflicts.add(F.asList(e, null)));
-
-        return new SnapshotPartitionsVerifyTaskResult(
-            metas,
-            errors.isEmpty() ?
-                new IdleVerifyResultV2(partHashes, txHashConflicts, 
partiallyCommittedTxs)
-                : new IdleVerifyResultV2(errors));
-    }
-
-    /** {@inheritDoc} */
-    @Override protected VerifyIncrementalSnapshotJob createJob(String name, 
String consId, SnapshotPartitionsVerifyTaskArg args) {
-        return new VerifyIncrementalSnapshotJob(name, args.snapshotPath(), 
args.incrementIndex(), consId);
-    }
-
-    /** */
-    private static class VerifyIncrementalSnapshotJob extends 
AbstractSnapshotVerificationJob {
-        /** Serial version uid. */
-        private static final long serialVersionUID = 0L;
-
-        /** Incremental snapshot index. */
-        private final int incIdx;
-
-        /** */
-        private LongAdder procEntriesCnt;
-
-        /**
-         * @param snpName Snapshot name.
-         * @param snpPath Snapshot directory path.
-         * @param incIdx Incremental snapshot index.
-         * @param consId Consistent id of the related node.
-         */
-        public VerifyIncrementalSnapshotJob(
-            String snpName,
-            @Nullable String snpPath,
-            int incIdx,
-            String consId
-        ) {
-            super(snpName, snpPath, consId, null, true);
-
-            this.incIdx = incIdx;
-        }
-
-        /**
-         * @return Map containing calculated transactions hash for every 
remote node in the cluster.
-         */
-        @Override public IncrementalSnapshotVerificationTaskResult execute() 
throws IgniteException {
-            try {
-                if (log.isInfoEnabled()) {
-                    log.info("Verify incremental snapshot procedure has been 
initiated " +
-                        "[snpName=" + snpName + ", incrementIndex=" + incIdx + 
", consId=" + consId + ']');
-                }
-
-                if (incIdx <= 0)
-                    return new IncrementalSnapshotVerificationTaskResult();
-
-                BaselineTopology blt = 
ignite.context().state().clusterState().baselineTopology();
-
-                checkBaseline(blt);
-
-                Map<Integer, StoredCacheData> txCaches = readTxCachesData();
-
-                AtomicLong procSegCnt = new AtomicLong();
-
-                IncrementalSnapshotProcessor proc = new 
IncrementalSnapshotProcessor(
-                    ignite.context().cache().context(), snpName, snpPath, 
incIdx, txCaches.keySet()
-                ) {
-                    @Override void totalWalSegments(int segCnt) {
-                        // No-op.
-                    }
-
-                    @Override void processedWalSegments(int segCnt) {
-                        procSegCnt.set(segCnt);
-                    }
-
-                    @Override void initWalEntries(LongAdder entriesCnt) {
-                        procEntriesCnt = entriesCnt;
-                    }
-                };
-
-                short locNodeId = blt.consistentIdMapping().get(consId);
-
-                Set<GridCacheVersion> activeDhtTxs = new HashSet<>();
-                Map<GridCacheVersion, Set<Short>> txPrimParticipatingNodes = 
new HashMap<>();
-                Map<Short, HashHolder> nodesTxHash = new HashMap<>();
-
-                Set<GridCacheVersion> partiallyCommittedTxs = new HashSet<>();
-                // Hashes in this map calculated based on WAL records only, 
not part-X.bin data.
-                Map<PartitionKeyV2, HashHolder> partMap = new HashMap<>();
-                List<Exception> exceptions = new ArrayList<>();
-
-                Function<Short, HashHolder> hashHolderBuilder = (k) -> new 
HashHolder();
-
-                BiConsumer<GridCacheVersion, Set<Short>> calcTxHash = (xid, 
participatingNodes) -> {
-                    for (short nodeId: participatingNodes) {
-                        if (nodeId != locNodeId) {
-                            HashHolder hash = 
nodesTxHash.computeIfAbsent(nodeId, hashHolderBuilder);
-
-                            hash.increment(xid.hashCode(), 0);
-                        }
-                    }
-                };
-
-                // CacheId -> CacheGrpId.
-                Map<Integer, Integer> cacheGrpId = txCaches.values().stream()
-                    .collect(Collectors.toMap(
-                        StoredCacheData::cacheId,
-                        cacheData -> 
CU.cacheGroupId(cacheData.config().getName(), cacheData.config().getGroupName())
-                    ));
-
-                LongAdder procTxCnt = new LongAdder();
-
-                proc.process(dataEntry -> {
-                    if (dataEntry.op() == GridCacheOperation.READ || 
!exceptions.isEmpty())
-                        return;
-
-                    if (log.isTraceEnabled())
-                        log.trace("Checking data entry [entry=" + dataEntry + 
']');
-
-                    if (!activeDhtTxs.contains(dataEntry.writeVersion()))
-                        partiallyCommittedTxs.add(dataEntry.nearXidVersion());
-
-                    StoredCacheData cacheData = 
txCaches.get(dataEntry.cacheId());
-
-                    PartitionKeyV2 partKey = new PartitionKeyV2(
-                        cacheGrpId.get(dataEntry.cacheId()),
-                        dataEntry.partitionId(),
-                        CU.cacheOrGroupName(cacheData.config()));
-
-                    HashHolder hash = partMap.computeIfAbsent(partKey, (k) -> 
new HashHolder());
-
-                    try {
-                        int valHash = dataEntry.key().hashCode();
-
-                        if (dataEntry.value() != null)
-                            valHash += 
Arrays.hashCode(dataEntry.value().valueBytes(null));
-
-                        int verHash = dataEntry.writeVersion().hashCode();
-
-                        hash.increment(valHash, verHash);
-                    }
-                    catch (IgniteCheckedException ex) {
-                        exceptions.add(ex);
-                    }
-                }, txRec -> {
-                    if (!exceptions.isEmpty())
-                        return;
-
-                    if (log.isDebugEnabled())
-                        log.debug("Checking tx record [txRec=" + txRec + ']');
-
-                    if (txRec.state() == TransactionState.PREPARED) {
-                        // Collect only primary nodes. For some cases backup 
nodes is included into TxRecord#participationNodes()
-                        // but actually doesn't even start transaction, for 
example, if the node participates only as a backup
-                        // of reading only keys.
-                        Set<Short> primParticipatingNodes = 
txRec.participatingNodes().keySet();
-
-                        if (primParticipatingNodes.contains(locNodeId)) {
-                            
txPrimParticipatingNodes.put(txRec.nearXidVersion(), primParticipatingNodes);
-                            activeDhtTxs.add(txRec.writeVersion());
-                        }
-                        else {
-                            for (Collection<Short> backups: 
txRec.participatingNodes().values()) {
-                                if (backups.contains(ALL_NODES) || 
backups.contains(locNodeId))
-                                    activeDhtTxs.add(txRec.writeVersion());
-                            }
-                        }
-                    }
-                    else if (txRec.state() == TransactionState.COMMITTED) {
-                        activeDhtTxs.remove(txRec.writeVersion());
-
-                        Set<Short> participatingNodes = 
txPrimParticipatingNodes.remove(txRec.nearXidVersion());
-
-                        // Legal cases:
-                        // 1. This node is a transaction near node, but not 
primary or backup node.
-                        // 2. This node participated in the transaction 
multiple times (e.g., primary for one key and backup for other key).
-                        // 3. A transaction is included into previous 
incremental snapshot.
-                        if (participatingNodes == null)
-                            return;
-
-                        procTxCnt.increment();
-
-                        calcTxHash.accept(txRec.nearXidVersion(), 
participatingNodes);
-                    }
-                    else if (txRec.state() == TransactionState.ROLLED_BACK) {
-                        activeDhtTxs.remove(txRec.writeVersion());
-                        
txPrimParticipatingNodes.remove(txRec.nearXidVersion());
-                    }
-                });
-
-                // All active transactions that didn't log COMMITTED or 
ROLL_BACK records are considered committed.
-                // It is possible as incremental snapshot started after 
transaction left IgniteTxManager#activeTransactions() collection,
-                // but completed before the final TxRecord was written.
-                for (Map.Entry<GridCacheVersion, Set<Short>> tx: 
txPrimParticipatingNodes.entrySet())
-                    calcTxHash.accept(tx.getKey(), tx.getValue());
-
-                Map<Object, TransactionsHashRecord> txHashRes = 
nodesTxHash.entrySet().stream()
-                    .map(e -> new TransactionsHashRecord(
-                        consId,
-                        blt.compactIdMapping().get(e.getKey()),
-                        e.getValue().hash
-                    ))
-                    .collect(Collectors.toMap(
-                        TransactionsHashRecord::remoteConsistentId,
-                        Function.identity()
-                    ));
-
-                Map<PartitionKeyV2, PartitionHashRecordV2> partHashRes = 
partMap.entrySet().stream()
-                    .collect(Collectors.toMap(
-                        Map.Entry::getKey,
-                        e -> new PartitionHashRecordV2(
-                            e.getKey(),
-                            false,
-                            consId,
-                            null,
-                            0,
-                            null,
-                            new VerifyPartitionContext(e.getValue())
-                        )
-                    ));
-
-                if (log.isInfoEnabled()) {
-                    log.info("Verify incremental snapshot procedure finished " 
+
-                        "[snpName=" + snpName + ", incrementIndex=" + incIdx + 
", consId=" + consId +
-                        ", txCnt=" + procTxCnt.sum() + ", dataEntries=" + 
procEntriesCnt.sum() +
-                        ", walSegments=" + procSegCnt.get() + ']');
-                }
-
-                return new IncrementalSnapshotVerificationTaskResult(
-                    txHashRes,
-                    partHashRes,
-                    partiallyCommittedTxs,
-                    exceptions);
-            }
-            catch (IgniteCheckedException | IOException e) {
-                throw new IgniteException(e);
-            }
-        }
-
-        /** Checks that current baseline topology matches baseline topology of 
the snapshot. */
-        private void checkBaseline(BaselineTopology blt) throws 
IgniteCheckedException, IOException {
-            IgniteSnapshotManager snpMgr = 
ignite.context().cache().context().snapshotMgr();
-
-            File snpDir = snpMgr.snapshotLocalDir(snpName, snpPath);
-            SnapshotMetadata meta = snpMgr.readSnapshotMetadata(snpDir, 
ignite.localNode().consistentId().toString());
-
-            if (!F.eqNotOrdered(blt.consistentIds(), meta.baselineNodes())) {
-                throw new IgniteCheckedException("Topologies of snapshot and 
current cluster are different [snp=" +
-                    meta.baselineNodes() + ", current=" + blt.consistentIds() 
+ ']');
-            }
-        }
-
-        /** @return Collection of snapshotted transactional caches, key is a 
cache ID. */
-        private Map<Integer, StoredCacheData> readTxCachesData() throws 
IgniteCheckedException, IOException {
-            File snpDir = 
ignite.context().cache().context().snapshotMgr().snapshotLocalDir(snpName, 
snpPath);
-
-            String folderName = 
ignite.context().pdsFolderResolver().resolveFolders().folderName();
-
-            return GridLocalConfigManager.readCachesData(
-                    new File(snpDir, databaseRelativePath(folderName)),
-                    MarshallerUtils.jdkMarshaller(ignite.name()),
-                    ignite.configuration())
-                .values().stream()
-                .filter(data -> data.config().getAtomicityMode() == 
CacheAtomicityMode.TRANSACTIONAL)
-                .collect(Collectors.toMap(StoredCacheData::cacheId, 
Function.identity()));
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean equals(Object o) {
-            if (this == o)
-                return true;
-
-            if (o == null || getClass() != o.getClass())
-                return false;
-
-            VerifyIncrementalSnapshotJob job = (VerifyIncrementalSnapshotJob)o;
-
-            return snpName.equals(job.snpName) && Objects.equals(incIdx, 
job.incIdx) && Objects.equals(snpPath, job.snpPath)
-                && consId.equals(job.consId);
-        }
-
-        /** {@inheritDoc} */
-        @Override public int hashCode() {
-            return Objects.hash(snpName, incIdx, snpPath, consId);
-        }
-    }
-
-    /** Holder for calculated hashes. */
-    public static class HashHolder {
-        /** */
-        public int hash;
-
-        /** */
-        public int verHash;
-
-        /** */
-        public void increment(int hash, int verHash) {
-            this.hash += hash;
-            this.verHash += verHash;
-        }
-    }
-}
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotCheckProcess.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotCheckProcess.java
index 9d4a47040c8..e9f64247e82 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotCheckProcess.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotCheckProcess.java
@@ -27,6 +27,7 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
@@ -147,14 +148,25 @@ public class SnapshotCheckProcess {
 
         assert results.values().stream().noneMatch(res -> res != null && 
res.metas != null);
 
-        if (ctx.req.allRestoreHandlers()) {
+        SnapshotChecker checker = 
kctx.cache().context().snapshotMgr().checker();
+
+        if (ctx.req.incrementalIndex() > 0) {
+            IdleVerifyResultV2 chkRes = checker.reduceIncrementalResults(
+                mapResults(results, ctx.req.nodes(), 
SnapshotCheckResponse::incrementalResult),
+                mapErrors(errors)
+            );
+
+            clusterOpFut.onDone(new 
SnapshotPartitionsVerifyTaskResult(ctx.clusterMetas, chkRes));
+        }
+        else if (ctx.req.allRestoreHandlers()) {
             try {
                 if (!errors.isEmpty())
                     throw F.firstValue(errors);
 
-                Map<ClusterNode, Map<String, SnapshotHandlerResult<?>>> cstRes 
= mapCustomHandlersResults(results, ctx.req.nodes());
+                Map<ClusterNode, Map<String, SnapshotHandlerResult<?>>> cstRes 
= mapResults(results, ctx.req.nodes(),
+                    SnapshotCheckResponse::customHandlersResults);
 
-                
kctx.cache().context().snapshotMgr().checker().checkCustomHandlersResults(ctx.req.snapshotName(),
 cstRes);
+                checker.checkCustomHandlersResults(ctx.req.snapshotName(), 
cstRes);
 
                 clusterOpFut.onDone(new 
SnapshotPartitionsVerifyTaskResult(ctx.clusterMetas, null));
             }
@@ -166,7 +178,8 @@ public class SnapshotCheckProcess {
             Map<ClusterNode, Exception> errors0 = mapErrors(errors);
 
             if (!results.isEmpty()) {
-                Map<ClusterNode, Map<PartitionKeyV2, PartitionHashRecordV2>> 
results0 = mapPartsHashes(results, ctx.req.nodes());
+                Map<ClusterNode, Map<PartitionKeyV2, PartitionHashRecordV2>> 
results0 = mapResults(results, ctx.req.nodes(),
+                    SnapshotCheckResponse::partsHashes);
 
                 IdleVerifyResultV2 chkRes = 
SnapshotChecker.reduceHashesResults(results0, errors0);
 
@@ -197,16 +210,29 @@ public class SnapshotCheckProcess {
 
         // Might be already finished by asynchronous leave of a required node.
         if (!phaseFut.isDone()) {
-            CompletableFuture<? extends Map<?, ?>> workingFut = 
req.allRestoreHandlers()
-                ? snpMgr.checker().invokeCustomHandlers(ctx.locMeta, 
req.snapshotPath(), req.groups(), true)
-                : snpMgr.checker().checkPartitions(ctx.locMeta, 
snpMgr.snapshotLocalDir(req.snapshotName(), req.snapshotPath()),
-                req.groups(), false, true, false);
+            CompletableFuture<?> workingFut;
+
+            if (req.incrementalIndex() > 0) {
+                assert !req.allRestoreHandlers() : "Snapshot handlers aren't 
supported for incremental snapshot.";
+
+                workingFut = 
snpMgr.checker().checkIncrementalSnapshot(req.snapshotName(), 
req.snapshotPath(), req.incrementalIndex());
+            }
+            else {
+                workingFut = req.allRestoreHandlers()
+                    ? snpMgr.checker().invokeCustomHandlers(ctx.locMeta, 
req.snapshotPath(), req.groups(), true)
+                    : snpMgr.checker().checkPartitions(ctx.locMeta, 
snpMgr.snapshotLocalDir(req.snapshotName(), req.snapshotPath()),
+                    req.groups(), false, true, false);
+            }
 
             workingFut.whenComplete((res, err) -> {
                 if (err != null)
                     phaseFut.onDone(err);
-                else
-                    phaseFut.onDone(new SnapshotCheckResponse(res));
+                else {
+                    if (req.incrementalIndex() > 0)
+                        phaseFut.onDone(new 
SnapshotCheckResponse((IncrementalSnapshotCheckResult)res));
+                    else
+                        phaseFut.onDone(new SnapshotCheckResponse((Map<?, 
?>)res));
+                }
             });
         }
 
@@ -221,38 +247,25 @@ public class SnapshotCheckProcess {
     }
 
     /** */
-    private Map<ClusterNode, Map<PartitionKeyV2, PartitionHashRecordV2>> 
mapPartsHashes(
+    private <T> Map<ClusterNode, T> mapResults(
         Map<UUID, SnapshotCheckResponse> results,
-        Collection<UUID> requiredNodes
+        Set<UUID> requiredNodes,
+        Function<SnapshotCheckResponse, T> resExtractor
     ) {
         return results.entrySet().stream()
             .filter(e -> requiredNodes.contains(e.getKey()) && e.getValue() != 
null)
-            .collect(Collectors.toMap(e -> 
kctx.cluster().get().node(e.getKey()), e -> e.getValue().partsHashes()));
-    }
-
-    /** */
-    private Map<ClusterNode, Map<String, SnapshotHandlerResult<?>>> 
mapCustomHandlersResults(
-        Map<UUID, SnapshotCheckResponse> results,
-        Set<UUID> requiredNodes
-    ) {
-        return results.entrySet().stream()
-            .filter(e -> requiredNodes.contains(e.getKey()) && e.getValue() != 
null)
-            .collect(Collectors.toMap(e -> 
kctx.cluster().get().node(e.getKey()), e -> 
e.getValue().customHandlersResults()));
+            .collect(Collectors.toMap(e -> 
kctx.cluster().get().node(e.getKey()), e -> resExtractor.apply(e.getValue())));
     }
 
     /**
-     * @param snpName Snapshot name of the validation process. If {@code 
null}, ignored.
-     * @param reqId  If {@code snpName} is {@code null}, is used to find the 
operation request.
-     * @return Current snapshot checking context by {@code snpName} or {@code 
reqId}.
+     * @param snpName Snapshot name. If {@code null}, ignored.
+     * @param reqId If {@code ctxId} is {@code null}, is used to find the 
operation context.
+     * @return Current snapshot checking context by {@code ctxId} or {@code 
reqId}.
      */
     private @Nullable SnapshotCheckContext context(@Nullable String snpName, 
UUID reqId) {
-        SnapshotCheckContext ctx = snpName == null
+        return snpName == null
             ? contexts.values().stream().filter(ctx0 -> 
ctx0.req.requestId().equals(reqId)).findFirst().orElse(null)
             : contexts.get(snpName);
-
-        assert ctx == null || ctx.req.requestId().equals(reqId);
-
-        return ctx;
     }
 
     /** Phase 1 beginning: prepare, collect and check local metas. */
@@ -289,6 +302,7 @@ public class SnapshotCheckProcess {
         if (!phaseFut.isDone()) {
             snpMgr.checker().checkLocalMetas(
                 snpMgr.snapshotLocalDir(req.snapshotName(), 
req.snapshotPath()),
+                req.incrementalIndex(),
                 grpIds,
                 kctx.cluster().get().localNode().consistentId()
             ).whenComplete((locMetas, err) -> {
@@ -308,9 +322,7 @@ public class SnapshotCheckProcess {
         Map<UUID, SnapshotCheckResponse> results,
         Map<UUID, Throwable> errors
     ) {
-        String snpName = snpName(results);
-
-        SnapshotCheckContext ctx = context(snpName, reqId);
+        SnapshotCheckContext ctx = context(null, reqId);
 
         // The context is not stored in the case of concurrent check of the 
same snapshot but the operation future is registered.
         GridFutureAdapter<SnapshotPartitionsVerifyTaskResult> clusterOpFut = 
clusterOpFuts.get(reqId);
@@ -369,27 +381,13 @@ public class SnapshotCheckProcess {
         }
     }
 
-    /** Finds current snapshot name from the metas. */
-    private @Nullable String snpName(Map<UUID, SnapshotCheckResponse> results) 
{
-        for (SnapshotCheckResponse nodeRes : results.values()) {
-            if (nodeRes == null || F.isEmpty(nodeRes.metas))
-                continue;
-
-            assert nodeRes.metas.get(0) != null : "Empty snapshot metadata in 
the results";
-            assert !F.isEmpty(nodeRes.metas.get(0).snapshotName()) : "Empty 
snapshot name in a snapshot metadata.";
-
-            return nodeRes.metas.get(0).snapshotName();
-        }
-
-        return null;
-    }
-
     /**
      * Starts the snapshot validation process.
      *
      * @param snpName Snapshot name.
      * @param snpPath Snapshot directory path.
      * @param grpNames List of cache group names.
+     * @param incIdx Incremental snapshot index. If not positive, snapshot is 
not considered as incremental.
      * @param allRestoreHandlers If {@code true}, all the registered {@link 
IgniteSnapshotManager#handlers()} of type
      *                    {@link SnapshotHandlerType#RESTORE} are invoked. 
Otherwise, only snapshot metadatas and partition
      *                    hashes are validated.
@@ -398,6 +396,7 @@ public class SnapshotCheckProcess {
         String snpName,
         @Nullable String snpPath,
         @Nullable Collection<String> grpNames,
+        int incIdx,
         boolean allRestoreHandlers
     ) {
         assert !F.isEmpty(snpName);
@@ -415,6 +414,7 @@ public class SnapshotCheckProcess {
             snpName,
             snpPath,
             grpNames,
+            incIdx,
             allRestoreHandlers
         );
 
@@ -475,32 +475,65 @@ public class SnapshotCheckProcess {
         /** Serial version uid. */
         private static final long serialVersionUID = 0L;
 
-        /** Metas for the phase 1. Is always {@code null} for the phase 2. */
+        /** @see #metas() */
         @Nullable private final List<SnapshotMetadata> metas;
 
-        /** Node's partition hashes for the phase 2. Is always {@code null} 
for the phase 1. */
+        /**
+         * @see #partsHashes()
+         * @see #customHandlersResults()
+         */
         @Nullable private final Map<?, ?> partsResults;
 
+        /** @see #incrementalResult() */
+        @Nullable private final IncrementalSnapshotCheckResult incRes;
+
         /** Ctor for the phase 1. */
         private SnapshotCheckResponse(@Nullable List<SnapshotMetadata> metas) {
             this.metas = metas;
             this.partsResults = null;
+            this.incRes = null;
         }
 
-        /** Ctor for the phase 2. */
-        private SnapshotCheckResponse(@Nullable Map<?, ?> partsResults) {
+        /** Ctor for the phase 2 for normal snapshot. */
+        private SnapshotCheckResponse(Map<?, ?> partsResults) {
             this.metas = null;
             this.partsResults = partsResults;
+            this.incRes = null;
         }
 
-        /** */
-        private Map<PartitionKeyV2, PartitionHashRecordV2> partsHashes() {
+        /** Ctor for the phase 2 for incremental snapshot. */
+        private SnapshotCheckResponse(IncrementalSnapshotCheckResult incRes) {
+            this.metas = null;
+            this.partsResults = null;
+            this.incRes = incRes;
+        }
+
+        /** Metas for the phase 1. Is always {@code null} for the phase 2. */
+        @Nullable private List<SnapshotMetadata> metas() {
+            return metas;
+        }
+
+        /**
+         * Node's partition hashes for the phase 2. Is always {@code null} for 
the phase 1 or in case of incremental
+         * snapshot.
+         */
+        private @Nullable Map<PartitionKeyV2, PartitionHashRecordV2> 
partsHashes() {
             return (Map<PartitionKeyV2, PartitionHashRecordV2>)partsResults;
         }
 
-        /** */
-        private Map<String, SnapshotHandlerResult<?>> customHandlersResults() {
+        /**
+         * Results of the custom handlers for the phase 2. Is always {@code 
null} for the phase 1 or in case of incremental
+         * snapshot.
+         *
+         * @see IgniteSnapshotManager#handlers()
+         */
+        private @Nullable Map<String, SnapshotHandlerResult<?>> 
customHandlersResults() {
             return (Map<String, SnapshotHandlerResult<?>>)partsResults;
         }
+
+        /** Incremental snapshot result for the phase 2. Is always {@code 
null} for the phase 1 or in case of normal snapshot. */
+        private @Nullable IncrementalSnapshotCheckResult incrementalResult() {
+            return incRes;
+        }
     }
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotCheckProcessRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotCheckProcessRequest.java
index b907f520a61..d1b38d4d512 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotCheckProcessRequest.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotCheckProcessRequest.java
@@ -40,6 +40,10 @@ public class SnapshotCheckProcessRequest extends 
AbstractSnapshotOperationReques
     @GridToStringInclude
     private final boolean allRestoreHandlers;
 
+    /** Incremental snapshot index. If not positive, snapshot is not 
considered as incremental. */
+    @GridToStringInclude
+    private final int incIdx;
+
     /**
      * Creates snapshot check process request.
      *
@@ -48,6 +52,7 @@ public class SnapshotCheckProcessRequest extends 
AbstractSnapshotOperationReques
      * @param nodes Baseline node IDs that must be alive to complete the 
operation..
      * @param snpPath Snapshot directory path.
      * @param grps List of cache group names.
+     * @param incIdx Incremental snapshot index. If not positive, snapshot is 
not considered as incremental.
      * @param allRestoreHandlers If {@code true}, all the registered {@link 
IgniteSnapshotManager#handlers()} of type
      *                           {@link SnapshotHandlerType#RESTORE} are 
invoked. Otherwise, only snapshot metadatas and
      *                           partition hashes are validated.
@@ -58,6 +63,7 @@ public class SnapshotCheckProcessRequest extends 
AbstractSnapshotOperationReques
         String snpName,
         String snpPath,
         @Nullable Collection<String> grps,
+        int incIdx,
         boolean allRestoreHandlers
     ) {
         super(reqId, snpName, snpPath, grps, 0, nodes);
@@ -65,6 +71,7 @@ public class SnapshotCheckProcessRequest extends 
AbstractSnapshotOperationReques
         assert !F.isEmpty(nodes);
 
         this.allRestoreHandlers = allRestoreHandlers;
+        this.incIdx = incIdx;
     }
 
     /**
@@ -75,6 +82,11 @@ public class SnapshotCheckProcessRequest extends 
AbstractSnapshotOperationReques
         return allRestoreHandlers;
     }
 
+    /** @return Incremental snapshot index. If not positive, snapshot is not 
considered as incremental. */
+    public int incrementalIndex() {
+        return incIdx;
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(SnapshotCheckProcessRequest.class, this, 
super.toString());
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotChecker.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotChecker.java
index feca6343058..8a12db38ffd 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotChecker.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotChecker.java
@@ -16,10 +16,8 @@
  */
 package org.apache.ignite.internal.processors.cache.persistence.snapshot;
 
-import java.io.BufferedInputStream;
 import java.io.File;
 import java.io.IOException;
-import java.io.InputStream;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 import java.nio.file.DirectoryStream;
@@ -31,6 +29,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -38,10 +37,15 @@ import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.LongAdder;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.dump.DumpEntry;
 import org.apache.ignite.internal.GridKernalContext;
@@ -52,6 +56,8 @@ import 
org.apache.ignite.internal.managers.encryption.GroupKey;
 import org.apache.ignite.internal.managers.encryption.GroupKeyEncrypted;
 import org.apache.ignite.internal.pagemem.store.PageStore;
 import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.GridCacheOperation;
+import org.apache.ignite.internal.processors.cache.GridLocalConfigManager;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.StoredCacheData;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
@@ -64,8 +70,14 @@ import 
org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
 import 
org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusMetaIO;
 import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
 import 
org.apache.ignite.internal.processors.cache.persistence.tree.io.PagePartitionMetaIO;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.FileDescriptor;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory;
 import org.apache.ignite.internal.processors.cache.verify.IdleVerifyUtility;
 import 
org.apache.ignite.internal.processors.cache.verify.PartitionHashRecordV2;
+import 
org.apache.ignite.internal.processors.cache.verify.TransactionsHashRecord;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.processors.cluster.BaselineTopology;
 import org.apache.ignite.internal.processors.compress.CompressionProcessor;
 import org.apache.ignite.internal.util.GridUnsafe;
 import org.apache.ignite.internal.util.lang.GridIterator;
@@ -74,10 +86,13 @@ import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.marshaller.MarshallerUtils;
 import org.apache.ignite.spi.encryption.EncryptionSpi;
 import org.apache.ignite.spi.encryption.noop.NoopEncryptionSpi;
+import org.apache.ignite.transactions.TransactionState;
 import org.jetbrains.annotations.Nullable;
 
+import static 
org.apache.ignite.internal.managers.discovery.ConsistentIdMapper.ALL_NODES;
 import static org.apache.ignite.internal.pagemem.PageIdAllocator.FLAG_DATA;
 import static org.apache.ignite.internal.pagemem.PageIdAllocator.FLAG_IDX;
 import static 
org.apache.ignite.internal.pagemem.PageIdAllocator.INDEX_PARTITION;
@@ -93,6 +108,8 @@ import static 
org.apache.ignite.internal.processors.cache.persistence.file.FileP
 import static 
org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId.getTypeByPartId;
 import static 
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.SNAPSHOT_METAFILE_EXT;
 import static 
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.databaseRelativePath;
+import static 
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.incrementalSnapshotWalsDir;
+import static 
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.snapshotMetaFileName;
 import static 
org.apache.ignite.internal.processors.cache.persistence.snapshot.dump.CreateDumpFutureTask.DUMP_FILE_EXT;
 import static 
org.apache.ignite.internal.processors.cache.verify.IdleVerifyUtility.calculatePartitionHash;
 import static 
org.apache.ignite.internal.processors.cache.verify.IdleVerifyUtility.checkPartitionsPageCrcSum;
@@ -105,12 +122,6 @@ public class SnapshotChecker {
     /** */
     @Nullable protected final GridKernalContext kctx;
 
-    /** */
-    protected final Marshaller marshaller;
-
-    /** */
-    @Nullable protected final ClassLoader marshallerClsLdr;
-
     /** */
     protected final EncryptionSpi encryptionSpi;
 
@@ -121,14 +132,10 @@ public class SnapshotChecker {
     public SnapshotChecker(
         GridKernalContext kctx,
         Marshaller marshaller,
-        ExecutorService executorSrvc,
-        @Nullable ClassLoader marshallerClsLdr
+        ExecutorService executorSrvc
     ) {
         this.kctx = kctx;
 
-        this.marshaller = marshaller;
-        this.marshallerClsLdr = marshallerClsLdr;
-
         this.encryptionSpi = kctx.config().getEncryptionSpi() == null ? new 
NoopEncryptionSpi() : kctx.config().getEncryptionSpi();
 
         this.executor = executorSrvc;
@@ -137,7 +144,7 @@ public class SnapshotChecker {
     }
 
     /** */
-    protected List<SnapshotMetadata> readSnapshotMetadatas(File snpFullPath, 
@Nullable Object nodeConstId) {
+    protected List<SnapshotMetadata> readSnapshotMetadatas(File snpFullPath, 
Object nodeConstId) {
         if (!(snpFullPath.exists() && snpFullPath.isDirectory()))
             return Collections.emptyList();
 
@@ -161,7 +168,7 @@ public class SnapshotChecker {
 
         try {
             for (File smf : smfs) {
-                SnapshotMetadata curr = readSnapshotMetadata(smf);
+                SnapshotMetadata curr = 
kctx.cache().context().snapshotMgr().readSnapshotMetadata(smf);
 
                 if (prev != null && !prev.sameSnapshot(curr)) {
                     throw new IgniteException("Snapshot metadata files are 
from different snapshots " +
@@ -192,37 +199,15 @@ public class SnapshotChecker {
         }
     }
 
-    /** */
-    public SnapshotMetadata readSnapshotMetadata(File smf)
-        throws IgniteCheckedException, IOException {
-        SnapshotMetadata meta = readFromFile(smf);
-
-        String smfName = smf.getName().substring(0, smf.getName().length() - 
SNAPSHOT_METAFILE_EXT.length());
-
-        if (!U.maskForFileName(meta.consistentId()).equals(smfName)) {
-            throw new IgniteException("Error reading snapshot metadata 
[smfName=" + smfName + ", consId="
-                + U.maskForFileName(meta.consistentId()));
-        }
-
-        return meta;
-    }
-
-    /** */
-    public <T> T readFromFile(File smf)
-        throws IOException, IgniteCheckedException {
-        if (!smf.exists())
-            throw new IgniteCheckedException("Snapshot metafile cannot be read 
due to it doesn't exist: " + smf);
-
-        try (InputStream in = new 
BufferedInputStream(Files.newInputStream(smf.toPath()))) {
-            return marshaller.unmarshal(in, marshallerClsLdr);
-        }
-    }
-
     /** Launches local metas checking. */
-    public CompletableFuture<List<SnapshotMetadata>> checkLocalMetas(File 
snpPath, @Nullable Collection<Integer> grpIds,
-        @Nullable Object locNodeCstId) {
+    public CompletableFuture<List<SnapshotMetadata>> checkLocalMetas(
+        File snpDir,
+        int incIdx,
+        @Nullable Collection<Integer> grpIds,
+        Object consId
+    ) {
         return CompletableFuture.supplyAsync(() -> {
-            List<SnapshotMetadata> snpMetas = readSnapshotMetadatas(snpPath, 
locNodeCstId);
+            List<SnapshotMetadata> snpMetas = readSnapshotMetadatas(snpDir, 
consId);
 
             for (SnapshotMetadata meta : snpMetas) {
                 byte[] snpMasterKeyDigest = meta.masterKeyDigest();
@@ -265,10 +250,387 @@ public class SnapshotChecker {
                 }
             }
 
+            if (incIdx > 0) {
+                List<SnapshotMetadata> metas = snpMetas.stream().filter(m -> 
m.consistentId().equals(String.valueOf(consId)))
+                    .collect(Collectors.toList());
+
+                if (metas.size() != 1) {
+                    throw new IgniteException("Failed to find single snapshot 
metafile on local node [locNodeId="
+                        + consId + ", metas=" + snpMetas + ", snpName=" + 
snpDir.getName()
+                        + ", snpPath=" + snpDir.getParent() + "]. Incremental 
snapshots requires exactly one meta file " +
+                        "per node because they don't support restoring on a 
different topology.");
+                }
+
+                checkIncrementalSnapshotsExist(metas.get(0), snpDir, incIdx);
+            }
+
             return snpMetas;
         }, executor);
     }
 
+    /** Checks that all incremental snapshots are present, contain correct 
metafile and WAL segments. */
+    private void checkIncrementalSnapshotsExist(SnapshotMetadata fullMeta, 
File snpDir, int incIdx) {
+        try {
+            // Incremental snapshot must contain ClusterSnapshotRecord.
+            long startSeg = fullMeta.snapshotRecordPointer().index();
+
+            String snpName = fullMeta.snapshotName();
+
+            for (int inc = 1; inc <= incIdx; inc++) {
+                File incSnpDir = 
kctx.cache().context().snapshotMgr().incrementalSnapshotLocalDir(snpName, 
snpDir.getParent(), inc);
+
+                if (!incSnpDir.exists()) {
+                    throw new IllegalArgumentException("No incremental 
snapshot found " +
+                        "[snpName=" + snpName + ", snpPath=" + 
snpDir.getParent() + ", incrementIndex=" + inc + ']');
+                }
+
+                String metaFileName = 
snapshotMetaFileName(kctx.pdsFolderResolver().resolveFolders().folderName());
+
+                File metafile = 
incSnpDir.toPath().resolve(metaFileName).toFile();
+
+                IncrementalSnapshotMetadata incMeta = 
kctx.cache().context().snapshotMgr().readFromFile(metafile);
+
+                if (!incMeta.matchBaseSnapshot(fullMeta)) {
+                    throw new IllegalArgumentException("Incremental snapshot 
doesn't match full snapshot " +
+                        "[incMeta=" + incMeta + ", fullMeta=" + fullMeta + 
']');
+                }
+
+                if (incMeta.incrementIndex() != inc) {
+                    throw new IgniteException(
+                        "Incremental snapshot meta has wrong index 
[expectedIdx=" + inc + ", meta=" + incMeta + ']');
+                }
+
+                checkWalSegments(incMeta, startSeg, 
incrementalSnapshotWalsDir(incSnpDir, incMeta.folderName()));
+
+                // Incremental snapshots must not cross each other.
+                startSeg = incMeta.incrementalSnapshotPointer().index() + 1;
+            }
+        }
+        catch (IgniteCheckedException | IOException e) {
+            throw new IgniteException(e);
+        }
+    }
+
+    /** Check that incremental snapshot contains all required WAL segments. 
Throws {@link IgniteException} in case of any errors. */
+    private void checkWalSegments(IncrementalSnapshotMetadata meta, long 
startWalSeg, File incSnpWalDir) {
+        IgniteWalIteratorFactory factory = new IgniteWalIteratorFactory(log);
+
+        List<FileDescriptor> walSeg = factory.resolveWalFiles(
+            new IgniteWalIteratorFactory.IteratorParametersBuilder()
+                .filesOrDirs(incSnpWalDir.listFiles(file ->
+                    
FileWriteAheadLogManager.WAL_SEGMENT_FILE_COMPACTED_PATTERN.matcher(file.getName()).matches())));
+
+        if (walSeg.isEmpty())
+            throw new IgniteException("No WAL segments found for incremental 
snapshot [dir=" + incSnpWalDir + ']');
+
+        long actFirstSeg = walSeg.get(0).idx();
+
+        if (actFirstSeg != startWalSeg) {
+            throw new IgniteException("Missed WAL segment 
[expectFirstSegment=" + startWalSeg
+                + ", actualFirstSegment=" + actFirstSeg + ", meta=" + meta + 
']');
+        }
+
+        long expLastSeg = meta.incrementalSnapshotPointer().index();
+        long actLastSeg = walSeg.get(walSeg.size() - 1).idx();
+
+        if (actLastSeg != expLastSeg) {
+            throw new IgniteException("Missed WAL segment [expectLastSegment=" 
+ startWalSeg
+                + ", actualLastSegment=" + actFirstSeg + ", meta=" + meta + 
']');
+        }
+
+        List<?> walSegGaps = factory.hasGaps(walSeg);
+
+        if (!walSegGaps.isEmpty())
+            throw new IgniteException("Missed WAL segments [misses=" + 
walSegGaps + ", meta=" + meta + ']');
+    }
+
+    /** */
+    public CompletableFuture<IncrementalSnapshotCheckResult> 
checkIncrementalSnapshot(
+        String snpName,
+        @Nullable String snpPath,
+        int incIdx
+    ) {
+        assert incIdx > 0;
+
+        return CompletableFuture.supplyAsync(
+            () -> {
+                String consId = 
kctx.cluster().get().localNode().consistentId().toString();
+
+                File snpDir = 
kctx.cache().context().snapshotMgr().snapshotLocalDir(snpName, snpPath);
+
+                try {
+                    if (log.isInfoEnabled()) {
+                        log.info("Verify incremental snapshot procedure has 
been initiated " +
+                            "[snpName=" + snpName + ", incrementIndex=" + 
incIdx + ", consId=" + consId + ']');
+                    }
+
+                    BaselineTopology blt = 
kctx.state().clusterState().baselineTopology();
+
+                    SnapshotMetadata meta = 
kctx.cache().context().snapshotMgr().readSnapshotMetadata(snpDir, consId);
+
+                    if (!F.eqNotOrdered(blt.consistentIds(), 
meta.baselineNodes())) {
+                        throw new IgniteCheckedException("Topologies of 
snapshot and current cluster are different [snp=" +
+                            meta.baselineNodes() + ", current=" + 
blt.consistentIds() + ']');
+                    }
+
+                    Map<Integer, StoredCacheData> txCaches = 
readTxCachesData(snpDir);
+
+                    AtomicLong procSegCnt = new AtomicLong();
+
+                    LongAdder procEntriesCnt = new LongAdder();
+
+                    IncrementalSnapshotProcessor proc = new 
IncrementalSnapshotProcessor(
+                        kctx.cache().context(), snpName, snpPath, incIdx, 
txCaches.keySet()
+                    ) {
+                        @Override void totalWalSegments(int segCnt) {
+                            // No-op.
+                        }
+
+                        @Override void processedWalSegments(int segCnt) {
+                            procSegCnt.set(segCnt);
+                        }
+
+                        @Override void initWalEntries(LongAdder entriesCnt) {
+                            procEntriesCnt.add(entriesCnt.sum());
+                        }
+                    };
+
+                    short locNodeId = blt.consistentIdMapping().get(consId);
+
+                    Set<GridCacheVersion> activeDhtTxs = new HashSet<>();
+                    Map<GridCacheVersion, Set<Short>> txPrimParticipatingNodes 
= new HashMap<>();
+                    Map<Short, HashHolder> nodesTxHash = new HashMap<>();
+
+                    Set<GridCacheVersion> partiallyCommittedTxs = new 
HashSet<>();
+                    // Hashes in this map calculated based on WAL records 
only, not part-X.bin data.
+                    Map<PartitionKeyV2, HashHolder> partMap = new HashMap<>();
+                    List<Exception> exceptions = new ArrayList<>();
+
+                    Function<Short, HashHolder> hashHolderBuilder = (k) -> new 
HashHolder();
+
+                    BiConsumer<GridCacheVersion, Set<Short>> calcTxHash = 
(xid, participatingNodes) -> {
+                        for (short nodeId : participatingNodes) {
+                            if (nodeId != locNodeId) {
+                                HashHolder hash = 
nodesTxHash.computeIfAbsent(nodeId, hashHolderBuilder);
+
+                                hash.increment(xid.hashCode(), 0);
+                            }
+                        }
+                    };
+
+                    // CacheId -> CacheGrpId.
+                    Map<Integer, Integer> cacheGrpId = 
txCaches.values().stream()
+                        .collect(Collectors.toMap(
+                            StoredCacheData::cacheId,
+                            cacheData -> 
CU.cacheGroupId(cacheData.config().getName(), cacheData.config().getGroupName())
+                        ));
+
+                    LongAdder procTxCnt = new LongAdder();
+
+                    proc.process(dataEntry -> {
+                        if (dataEntry.op() == GridCacheOperation.READ || 
!exceptions.isEmpty())
+                            return;
+
+                        if (log.isTraceEnabled())
+                            log.trace("Checking data entry [entry=" + 
dataEntry + ']');
+
+                        if (!activeDhtTxs.contains(dataEntry.writeVersion()))
+                            
partiallyCommittedTxs.add(dataEntry.nearXidVersion());
+
+                        StoredCacheData cacheData = 
txCaches.get(dataEntry.cacheId());
+
+                        PartitionKeyV2 partKey = new PartitionKeyV2(
+                            cacheGrpId.get(dataEntry.cacheId()),
+                            dataEntry.partitionId(),
+                            CU.cacheOrGroupName(cacheData.config()));
+
+                        HashHolder hash = partMap.computeIfAbsent(partKey, (k) 
-> new HashHolder());
+
+                        try {
+                            int valHash = dataEntry.key().hashCode();
+
+                            if (dataEntry.value() != null)
+                                valHash += 
Arrays.hashCode(dataEntry.value().valueBytes(null));
+
+                            int verHash = dataEntry.writeVersion().hashCode();
+
+                            hash.increment(valHash, verHash);
+                        }
+                        catch (IgniteCheckedException ex) {
+                            exceptions.add(ex);
+                        }
+                    }, txRec -> {
+                        if (!exceptions.isEmpty())
+                            return;
+
+                        if (log.isDebugEnabled())
+                            log.debug("Checking tx record [txRec=" + txRec + 
']');
+
+                        if (txRec.state() == TransactionState.PREPARED) {
+                            // Collect only primary nodes. For some cases 
backup nodes is included into TxRecord#participationNodes()
+                            // but actually doesn't even start transaction, 
for example, if the node participates only as a backup
+                            // of reading only keys.
+                            Set<Short> primParticipatingNodes = 
txRec.participatingNodes().keySet();
+
+                            if (primParticipatingNodes.contains(locNodeId)) {
+                                
txPrimParticipatingNodes.put(txRec.nearXidVersion(), primParticipatingNodes);
+                                activeDhtTxs.add(txRec.writeVersion());
+                            }
+                            else {
+                                for (Collection<Short> backups : 
txRec.participatingNodes().values()) {
+                                    if (backups.contains(ALL_NODES) || 
backups.contains(locNodeId))
+                                        activeDhtTxs.add(txRec.writeVersion());
+                                }
+                            }
+                        }
+                        else if (txRec.state() == TransactionState.COMMITTED) {
+                            activeDhtTxs.remove(txRec.writeVersion());
+
+                            Set<Short> participatingNodes = 
txPrimParticipatingNodes.remove(txRec.nearXidVersion());
+
+                            // Legal cases:
+                            // 1. This node is a transaction near node, but 
not primary or backup node.
+                            // 2. This node participated in the transaction 
multiple times (e.g., primary for one key and
+                            //    backup for other key).
+                            // 3. A transaction is included into previous 
incremental snapshot.
+                            if (participatingNodes == null)
+                                return;
+
+                            procTxCnt.increment();
+
+                            calcTxHash.accept(txRec.nearXidVersion(), 
participatingNodes);
+                        }
+                        else if (txRec.state() == 
TransactionState.ROLLED_BACK) {
+                            activeDhtTxs.remove(txRec.writeVersion());
+                            
txPrimParticipatingNodes.remove(txRec.nearXidVersion());
+                        }
+                    });
+
+                    // All active transactions that didn't log COMMITTED or 
ROLL_BACK records are considered committed.
+                    // It is possible as incremental snapshot started after 
transaction left IgniteTxManager#activeTransactions()
+                    // collection, but completed before the final TxRecord was 
written.
+                    for (Map.Entry<GridCacheVersion, Set<Short>> tx : 
txPrimParticipatingNodes.entrySet())
+                        calcTxHash.accept(tx.getKey(), tx.getValue());
+
+                    Map<Object, TransactionsHashRecord> txHashRes = 
nodesTxHash.entrySet().stream()
+                        .map(e -> new TransactionsHashRecord(
+                            consId,
+                            blt.compactIdMapping().get(e.getKey()),
+                            e.getValue().hash
+                        ))
+                        .collect(Collectors.toMap(
+                            TransactionsHashRecord::remoteConsistentId,
+                            Function.identity()
+                        ));
+
+                    Map<PartitionKeyV2, PartitionHashRecordV2> partHashRes = 
partMap.entrySet().stream()
+                        .collect(Collectors.toMap(
+                            Map.Entry::getKey,
+                            e -> new PartitionHashRecordV2(
+                                e.getKey(),
+                                false,
+                                consId,
+                                null,
+                                0,
+                                null,
+                                new 
IdleVerifyUtility.VerifyPartitionContext(e.getValue().hash, 
e.getValue().verHash)
+                            )
+                        ));
+
+                    if (log.isInfoEnabled()) {
+                        log.info("Verify incremental snapshot procedure 
finished " +
+                            "[snpName=" + snpName + ", incrementIndex=" + 
incIdx + ", consId=" + consId +
+                            ", txCnt=" + procTxCnt.sum() + ", dataEntries=" + 
procEntriesCnt.sum() +
+                            ", walSegments=" + procSegCnt.get() + ']');
+                    }
+
+                    return new IncrementalSnapshotCheckResult(
+                        txHashRes,
+                        partHashRes,
+                        partiallyCommittedTxs,
+                        exceptions
+                    );
+                }
+                catch (IgniteCheckedException | IOException e) {
+                    throw new IgniteException(e);
+                }
+            },
+            executor
+        );
+    }
+
+    /** @return Collection of snapshotted transactional caches, key is a cache 
ID. */
+    private Map<Integer, StoredCacheData> readTxCachesData(File snpDir) throws 
IgniteCheckedException, IOException {
+        String folderName = 
kctx.pdsFolderResolver().resolveFolders().folderName();
+
+        return GridLocalConfigManager.readCachesData(
+                new File(snpDir, databaseRelativePath(folderName)),
+                MarshallerUtils.jdkMarshaller(kctx.igniteInstanceName()),
+                kctx.config())
+            .values().stream()
+            .filter(data -> data.config().getAtomicityMode() == 
CacheAtomicityMode.TRANSACTIONAL)
+            .collect(Collectors.toMap(StoredCacheData::cacheId, 
Function.identity()));
+    }
+
+    /** */
+    public IdleVerifyResultV2 reduceIncrementalResults(
+        Map<ClusterNode, IncrementalSnapshotCheckResult> results,
+        Map<ClusterNode, Exception> operationErrors
+    ) {
+        if (!operationErrors.isEmpty())
+            return new IdleVerifyResultV2(operationErrors);
+
+        Map<Object, Map<Object, TransactionsHashRecord>> nodeTxHashMap = new 
HashMap<>();
+        List<List<TransactionsHashRecord>> txHashConflicts = new ArrayList<>();
+        Map<PartitionKeyV2, List<PartitionHashRecordV2>> partHashes = new 
HashMap<>();
+        Map<ClusterNode, Collection<GridCacheVersion>> partiallyCommittedTxs = 
new HashMap<>();
+        Map<ClusterNode, Exception> errors = new HashMap<>();
+
+        results.forEach((node, res) -> {
+            if (res.exceptions().isEmpty() && errors.isEmpty()) {
+                if (!F.isEmpty(res.partiallyCommittedTxs()))
+                    partiallyCommittedTxs.put(node, 
res.partiallyCommittedTxs());
+
+                for (Map.Entry<PartitionKeyV2, PartitionHashRecordV2> entry : 
res.partHashRes().entrySet())
+                    partHashes.computeIfAbsent(entry.getKey(), v -> new 
ArrayList<>()).add(entry.getValue());
+
+                if (log.isDebugEnabled())
+                    log.debug("Handle VerifyIncrementalSnapshotJob result 
[node=" + node + ", taskRes=" + res + ']');
+
+                nodeTxHashMap.put(node.consistentId(), res.txHashRes());
+
+                Iterator<Map.Entry<Object, TransactionsHashRecord>> resIt = 
res.txHashRes().entrySet().iterator();
+
+                while (resIt.hasNext()) {
+                    Map.Entry<Object, TransactionsHashRecord> nodeTxHash = 
resIt.next();
+
+                    Map<Object, TransactionsHashRecord> prevNodeTxHash = 
nodeTxHashMap.get(nodeTxHash.getKey());
+
+                    if (prevNodeTxHash != null) {
+                        TransactionsHashRecord hash = nodeTxHash.getValue();
+                        TransactionsHashRecord prevHash = 
prevNodeTxHash.remove(hash.localConsistentId());
+
+                        if (prevHash == null || prevHash.transactionHash() != 
hash.transactionHash())
+                            txHashConflicts.add(F.asList(hash, prevHash));
+
+                        resIt.remove();
+                    }
+                }
+            }
+            else if (!res.exceptions().isEmpty())
+                errors.put(node, F.first(res.exceptions()));
+        });
+
+        // Add all missed pairs to conflicts.
+        nodeTxHashMap.values().stream()
+            .flatMap(e -> e.values().stream())
+            .forEach(e -> txHashConflicts.add(F.asList(e, null)));
+
+        return errors.isEmpty()
+            ? new IdleVerifyResultV2(partHashes, txHashConflicts, 
partiallyCommittedTxs)
+            : new IdleVerifyResultV2(errors);
+    }
+
     /** */
     public static IdleVerifyResultV2 reduceHashesResults(
         Map<ClusterNode, Map<PartitionKeyV2, PartitionHashRecordV2>> results,
@@ -298,7 +660,7 @@ public class SnapshotChecker {
         @Nullable String snpPath,
         Map<ClusterNode, List<SnapshotMetadata>> allMetas,
         @Nullable Map<ClusterNode, Exception> exceptions,
-        Object curNodeCstId
+        Object consId
     ) {
         Map<ClusterNode, Exception> mappedExceptions = F.isEmpty(exceptions) ? 
Collections.emptyMap() : new HashMap<>(exceptions);
 
@@ -333,7 +695,7 @@ public class SnapshotChecker {
 
         if (firstMeta == null && mappedExceptions.isEmpty()) {
             throw new IllegalArgumentException("Snapshot does not exists 
[snapshot=" + snpName
-                + (snpPath != null ? ", baseDir=" + snpPath : "") + ", 
consistentId=" + curNodeCstId + ']');
+                + (snpPath != null ? ", baseDir=" + snpPath : "") + ", 
consistentId=" + consId + ']');
         }
 
         if (!F.isEmpty(baselineNodes) && F.isEmpty(exceptions)) {
@@ -390,7 +752,7 @@ public class SnapshotChecker {
     }
 
     /**
-     * Calls all the registered Absence validaton handlers. Reads snapshot 
metadata.
+     * Calls all the registered custom validaton handlers. Reads snapshot 
metadata.
      *
      * @see IgniteSnapshotManager#handlers()
      */
@@ -411,7 +773,7 @@ public class SnapshotChecker {
     }
 
     /**
-     *  Reads snapshot metadata. Requires snapshot meta to work.
+     * Calls all the registered custom validaton handlers.
      *
      * @see IgniteSnapshotManager#handlers()
      */
@@ -718,7 +1080,7 @@ public class SnapshotChecker {
         File snpDir,
         SnapshotMetadata meta,
         Collection<Integer> grpIds,
-        Object nodeCstId,
+        Object consId,
         boolean procPartitionsData,
         boolean skipHash
     ) {
@@ -732,14 +1094,14 @@ public class SnapshotChecker {
 
         EncryptionSpi encSpi = meta.encryptionKey() != null ? encryptionSpi : 
null;
 
-        try (Dump dump = new Dump(snpDir, nodeCstId.toString(), true, true, 
encSpi, log)) {
+        try (Dump dump = new Dump(snpDir, consId.toString(), true, true, 
encSpi, log)) {
             String nodeFolderName = 
kctx.pdsFolderResolver().resolveFolders().folderName();
 
             Collection<PartitionHashRecordV2> partitionHashRecordV2s = 
U.doInParallel(
                 executor,
                 grpAndPartFiles.get2(),
                 part -> calculateDumpedPartitionHash(dump, 
cacheGroupName(part.getParentFile()), partId(part.getName()),
-                    skipHash, nodeCstId, nodeFolderName)
+                    skipHash, consId, nodeFolderName)
             );
 
             return 
partitionHashRecordV2s.stream().collect(Collectors.toMap(PartitionHashRecordV2::partitionKey,
 r -> r));
@@ -781,7 +1143,7 @@ public class SnapshotChecker {
         Dump.DumpedPartitionIterator iter,
         String grpName,
         int part,
-        Object consistentId
+        Object consId
     ) throws IgniteCheckedException {
         long size = 0;
 
@@ -798,7 +1160,7 @@ public class SnapshotChecker {
         return new PartitionHashRecordV2(
             new PartitionKeyV2(CU.cacheId(grpName), part, grpName),
             false,
-            consistentId,
+            consId,
             null,
             size,
             PartitionHashRecordV2.PartitionState.OWNING,
@@ -872,4 +1234,19 @@ public class SnapshotChecker {
             return key != null && key.id() == keyId ? key : null;
         }
     }
+
+    /** Holder for calculated hashes. */
+    private static class HashHolder {
+        /** */
+        private int hash;
+
+        /** */
+        private int verHash;
+
+        /** */
+        private void increment(int hash, int verHash) {
+            this.hash += hash;
+            this.verHash += verHash;
+        }
+    }
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadataVerificationTask.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadataVerificationTask.java
index f3250b63b76..bad025849b3 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadataVerificationTask.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadataVerificationTask.java
@@ -17,18 +17,12 @@
 
 package org.apache.ignite.internal.processors.cache.persistence.snapshot;
 
-import java.io.File;
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.stream.Collectors;
-import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
-import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.compute.ComputeJob;
 import org.apache.ignite.compute.ComputeJobAdapter;
@@ -36,22 +30,13 @@ import org.apache.ignite.compute.ComputeJobResult;
 import org.apache.ignite.compute.ComputeJobResultPolicy;
 import org.apache.ignite.compute.ComputeTaskAdapter;
 import org.apache.ignite.internal.IgniteEx;
-import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
-import 
org.apache.ignite.internal.processors.cache.persistence.wal.FileDescriptor;
-import 
org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
-import 
org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory;
 import org.apache.ignite.internal.processors.task.GridInternal;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.resources.IgniteInstanceResource;
-import org.apache.ignite.resources.LoggerResource;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
-import static java.lang.String.valueOf;
-import static 
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.incrementalSnapshotWalsDir;
-import static 
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.snapshotMetaFileName;
-
 /** Snapshot task to verify snapshot metadata on the baseline nodes for given 
snapshot name. */
 @GridInternal
 public class SnapshotMetadataVerificationTask
@@ -90,10 +75,6 @@ public class SnapshotMetadataVerificationTask
         @IgniteInstanceResource
         private transient IgniteEx ignite;
 
-        /** */
-        @LoggerResource
-        private transient IgniteLogger log;
-
         /** */
         private final SnapshotMetadataVerificationTaskArg arg;
 
@@ -106,112 +87,8 @@ public class SnapshotMetadataVerificationTask
         @Override public List<SnapshotMetadata> execute() {
             IgniteSnapshotManager snpMgr = 
ignite.context().cache().context().snapshotMgr();
 
-            List<SnapshotMetadata> snpMeta;
-
-            try {
-                snpMeta = 
snpMgr.checker().checkLocalMetas(snpMgr.snapshotLocalDir(arg.snapshotName(), 
arg.snapshotPath()),
-                    arg.grpIds(), ignite.localNode().consistentId()).get();
-            }
-            catch (InterruptedException | ExecutionException e) {
-                throw new IgniteException("Failed to launch snapshot metadatas 
check of snapshot '" + arg.snapshotName() + "'.", e);
-            }
-
-            if (arg.incrementIndex() > 0) {
-                List<SnapshotMetadata> metas = snpMeta.stream()
-                    .filter(m -> 
m.consistentId().equals(valueOf(ignite.localNode().consistentId())))
-                    .collect(Collectors.toList());
-
-                if (metas.size() != 1) {
-                    throw new IgniteException("Failed to find single snapshot 
metafile on local node [locNodeId="
-                        + ignite.localNode().consistentId() + ", metas=" + 
snpMeta + ", snpName=" + arg.snapshotName()
-                        + ", snpPath=" + arg.snapshotPath() + "]. Incremental 
snapshots requires exactly one meta file " +
-                        "per node because they don't support restoring on a 
different topology.");
-                }
-
-                checkIncrementalSnapshots(metas.get(0), arg);
-            }
-
-            return snpMeta;
-        }
-
-        /** Checks that all incremental snapshots are present, contain correct 
metafile and WAL segments. */
-        public void checkIncrementalSnapshots(SnapshotMetadata fullMeta, 
SnapshotMetadataVerificationTaskArg arg) {
-            try {
-                GridCacheSharedContext<Object, Object> ctx = 
ignite.context().cache().context();
-
-                IgniteSnapshotManager snpMgr = ctx.snapshotMgr();
-
-                // Incremental snapshot must contain ClusterSnapshotRecord.
-                long startSeg = fullMeta.snapshotRecordPointer().index();
-
-                for (int inc = 1; inc <= arg.incrementIndex(); inc++) {
-                    File incSnpDir = 
snpMgr.incrementalSnapshotLocalDir(arg.snapshotName(), arg.snapshotPath(), inc);
-
-                    if (!incSnpDir.exists()) {
-                        throw new IllegalArgumentException("No incremental 
snapshot found " +
-                            "[snpName=" + arg.snapshotName() + ", snpPath=" + 
arg.snapshotPath() + ", incrementIndex=" + inc + ']');
-                    }
-
-                    String folderName = 
ctx.kernalContext().pdsFolderResolver().resolveFolders().folderName();
-
-                    String metaFileName = snapshotMetaFileName(folderName);
-
-                    File metafile = 
incSnpDir.toPath().resolve(metaFileName).toFile();
-
-                    IncrementalSnapshotMetadata incMeta = 
snpMgr.readFromFile(metafile);
-
-                    if (!incMeta.matchBaseSnapshot(fullMeta)) {
-                        throw new IllegalArgumentException("Incremental 
snapshot doesn't match full snapshot " +
-                            "[incMeta=" + incMeta + ", fullMeta=" + fullMeta + 
']');
-                    }
-
-                    if (incMeta.incrementIndex() != inc) {
-                        throw new IgniteException(
-                            "Incremental snapshot meta has wrong index 
[expectedIdx=" + inc + ", meta=" + incMeta + ']');
-                    }
-
-                    checkWalSegments(incMeta, startSeg, 
incrementalSnapshotWalsDir(incSnpDir, incMeta.folderName()));
-
-                    // Incremental snapshots must not cross each other.
-                    startSeg = incMeta.incrementalSnapshotPointer().index() + 
1;
-                }
-            }
-            catch (IgniteCheckedException | IOException e) {
-                throw new IgniteException(e);
-            }
-        }
-
-        /** Check that incremental snapshot contains all required WAL 
segments. Throws {@link IgniteException} in case of any errors. */
-        private void checkWalSegments(IncrementalSnapshotMetadata meta, long 
startWalSeg, File incSnpWalDir) {
-            IgniteWalIteratorFactory factory = new 
IgniteWalIteratorFactory(log);
-
-            List<FileDescriptor> walSeg = factory.resolveWalFiles(
-                new IgniteWalIteratorFactory.IteratorParametersBuilder()
-                    .filesOrDirs(incSnpWalDir.listFiles(file ->
-                        
FileWriteAheadLogManager.WAL_SEGMENT_FILE_COMPACTED_PATTERN.matcher(file.getName()).matches())));
-
-            if (walSeg.isEmpty())
-                throw new IgniteException("No WAL segments found for 
incremental snapshot [dir=" + incSnpWalDir + ']');
-
-            long actFirstSeg = walSeg.get(0).idx();
-
-            if (actFirstSeg != startWalSeg) {
-                throw new IgniteException("Missed WAL segment 
[expectFirstSegment=" + startWalSeg
-                    + ", actualFirstSegment=" + actFirstSeg + ", meta=" + meta 
+ ']');
-            }
-
-            long expLastSeg = meta.incrementalSnapshotPointer().index();
-            long actLastSeg = walSeg.get(walSeg.size() - 1).idx();
-
-            if (actLastSeg != expLastSeg) {
-                throw new IgniteException("Missed WAL segment 
[expectLastSegment=" + startWalSeg
-                    + ", actualLastSegment=" + actFirstSeg + ", meta=" + meta 
+ ']');
-            }
-
-            List<?> walSegGaps = factory.hasGaps(walSeg);
-
-            if (!walSegGaps.isEmpty())
-                throw new IgniteException("Missed WAL segments [misses=" + 
walSegGaps + ", meta=" + meta + ']');
+            return 
snpMgr.checker().checkLocalMetas(snpMgr.snapshotLocalDir(arg.snapshotName(), 
arg.snapshotPath()),
+                arg.incrementIndex(), arg.grpIds(), 
ignite.localNode().consistentId()).join();
         }
     }
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/IdleVerifyUtility.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/IdleVerifyUtility.java
index bc54704746a..4c1d1e917de 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/IdleVerifyUtility.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/IdleVerifyUtility.java
@@ -46,7 +46,6 @@ import 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.Grid
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStore;
-import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.IncrementalSnapshotVerificationTask.HashHolder;
 import org.apache.ignite.internal.util.lang.GridIterator;
 import org.apache.ignite.internal.util.lang.IgniteThrowableSupplier;
 import org.apache.ignite.internal.util.typedef.F;
@@ -381,11 +380,12 @@ public class IdleVerifyUtility {
         }
 
         /**
-         * @param hash Incremental snapshot hash holder.
+         * @param partHash Partition hash.
+         * @param partVerHash Version hash.
          */
-        public VerifyPartitionContext(HashHolder hash) {
-            this.partHash = hash.hash;
-            this.partVerHash = hash.verHash;
+        public VerifyPartitionContext(int partHash, int partVerHash) {
+            this.partHash = partHash;
+            this.partVerHash = partVerHash;
         }
 
         /** */
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckTest.java
index 3e3ab9ee6e5..ac041c439e5 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckTest.java
@@ -95,7 +95,6 @@ import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteFuture;
 import org.jetbrains.annotations.Nullable;
-import org.junit.Assume;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -116,6 +115,7 @@ import static 
org.apache.ignite.testframework.GridTestUtils.assertContains;
 import static org.apache.ignite.testframework.GridTestUtils.assertNotContains;
 import static 
org.apache.ignite.testframework.GridTestUtils.assertThrowsAnyCause;
 import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+import static org.junit.Assume.assumeFalse;
 
 /**
  * Cluster-wide snapshot check procedure tests.
@@ -249,7 +249,7 @@ public class IgniteClusterSnapshotCheckTest extends 
AbstractSnapshotSelfTest {
     /** @throws Exception If fails. */
     @Test
     public void testClusterSnapshotCheckPartitionCounters() throws Exception {
-        Assume.assumeFalse("One copy of partiton created in only primary 
mode", onlyPrimary);
+        assumeFalse("One copy of partiton created in only primary mode", 
onlyPrimary);
 
         IgniteEx ignite = startGridsWithCache(3, dfltCacheCfg.
             setAffinity(new RendezvousAffinityFunction(false, 1)),
@@ -404,7 +404,7 @@ public class IgniteClusterSnapshotCheckTest extends 
AbstractSnapshotSelfTest {
     /** @throws Exception If fails. */
     @Test
     public void testClusterSnapshotCheckFailsOnPartitionDataDiffers() throws 
Exception {
-        Assume.assumeFalse("One copy of partiton created in only primary 
mode", onlyPrimary);
+        assumeFalse("One copy of partiton created in only primary mode", 
onlyPrimary);
 
         CacheConfiguration<Integer, Value> ccfg = txCacheConfig(new 
CacheConfiguration<Integer, Value>(DEFAULT_CACHE_NAME))
             .setAffinity(new RendezvousAffinityFunction(false, 1));
@@ -641,6 +641,128 @@ public class IgniteClusterSnapshotCheckTest extends 
AbstractSnapshotSelfTest {
         }
     }
 
+    /** Tests that concurrent full checks of normal and incremental the same 
snapshot are declined . */
+    @Test
+    public void testConcurrentTheSameSnpFullAndIncrementalChecksDeclined() 
throws Exception {
+        assumeFalse(encryption);
+
+        // 0 - coordinator; 0,1 - baselines; 2 - non-baseline; 3,4 - clients.
+        prepareGridsAndSnapshot(3, 2, 2, false);
+
+        snp(grid(3)).createIncrementalSnapshot(SNAPSHOT_NAME).get();
+
+        for (int i = 0; i < G.allGrids().size(); ++i) {
+            for (int j = 1; j < G.allGrids().size() - 1; ++j) {
+                int i0 = i;
+                int j0 = j;
+
+                doTestConcurrentSnpCheckOperations(
+                    () -> new 
IgniteFutureImpl<>(snp(grid(i0)).checkSnapshot(SNAPSHOT_NAME, null)),
+                    () -> new 
IgniteFutureImpl<>(snp(grid(j0)).checkSnapshot(SNAPSHOT_NAME, null, 1)),
+                    CHECK_SNAPSHOT_METAS,
+                    CHECK_SNAPSHOT_PARTS,
+                    true,
+                    false,
+                    null,
+                    null
+                );
+            }
+        }
+    }
+
+    /** Tests that concurrent full checks of the same incremental snapshot are 
declined. */
+    @Test
+    public void testConcurrentTheSameIncrementalFullChecksDeclined() throws 
Exception {
+        assumeFalse(encryption);
+
+        // 0 - coordinator; 0,1 - baselines; 2 - non-baseline; 3,4 - clients.
+        prepareGridsAndSnapshot(3, 2, 2, false);
+
+        snp(grid(3)).createIncrementalSnapshot(SNAPSHOT_NAME).get();
+
+        for (int i = 0; i < G.allGrids().size(); ++i) {
+            for (int j = 1; j < G.allGrids().size() - 1; ++j) {
+                int i0 = i;
+                int j0 = j;
+
+                doTestConcurrentSnpCheckOperations(
+                    () -> new 
IgniteFutureImpl<>(snp(grid(i0)).checkSnapshot(SNAPSHOT_NAME, null, 1)),
+                    () -> new 
IgniteFutureImpl<>(snp(grid(j0)).checkSnapshot(SNAPSHOT_NAME, null, 1)),
+                    CHECK_SNAPSHOT_METAS,
+                    CHECK_SNAPSHOT_PARTS,
+                    true,
+                    false,
+                    null,
+                    null
+                );
+            }
+        }
+    }
+
+    /** Tests that concurrent checks of different incremental snapshots are 
declined. */
+    @Test
+    public void testConcurrentDifferentIncrementalFullChecksDeclined() throws 
Exception {
+        assumeFalse(encryption);
+
+        // 0 - coordinator; 0,1 - baselines; 2 - non-baseline; 3,4 - clients.
+        prepareGridsAndSnapshot(3, 2, 2, false);
+
+        snp(grid(3)).createIncrementalSnapshot(SNAPSHOT_NAME).get();
+        snp(grid(3)).createIncrementalSnapshot(SNAPSHOT_NAME).get();
+
+        for (int i = 0; i < G.allGrids().size(); ++i) {
+            for (int j = 1; j < G.allGrids().size() - 1; ++j) {
+                int i0 = i;
+                int j0 = j;
+
+                doTestConcurrentSnpCheckOperations(
+                    () -> new 
IgniteFutureImpl<>(snp(grid(i0)).checkSnapshot(SNAPSHOT_NAME, null, 1)),
+                    () -> new 
IgniteFutureImpl<>(snp(grid(j0)).checkSnapshot(SNAPSHOT_NAME, null, 2)),
+                    CHECK_SNAPSHOT_METAS,
+                    CHECK_SNAPSHOT_PARTS,
+                    true,
+                    false,
+                    null,
+                    null
+                );
+            }
+        }
+    }
+
+    /** Tests that concurrent full restoration of a normal snapshot and check 
of an incremental are declined. */
+    @Test
+    public void 
testConcurrentTheSameSnpIncrementalCheckAndFullRestoreDeclined() throws 
Exception {
+        assumeFalse(encryption);
+
+        // 0 - coordinator; 0,1 - baselines; 2 - non-baseline; 3,4 - clients.
+        prepareGridsAndSnapshot(3, 2, 2, false);
+
+        snp(grid(3)).createIncrementalSnapshot(SNAPSHOT_NAME).get();
+
+        grid(0).destroyCache(DEFAULT_CACHE_NAME);
+
+        awaitPartitionMapExchange();
+
+        // Snapshot restoration is disallowed from client nodes.
+        for (int i = 0; i < 3; ++i) {
+            for (int j = 1; j < G.allGrids().size() - 1; ++j) {
+                int i0 = i;
+                int j0 = j;
+
+                doTestConcurrentSnpCheckOperations(
+                    () -> snp(grid(i0)).restoreSnapshot(SNAPSHOT_NAME, null, 
null, 0, true),
+                    () -> new 
IgniteFutureImpl<>(snp(grid(j0)).checkSnapshot(SNAPSHOT_NAME, null, 1)),
+                    CHECK_SNAPSHOT_METAS,
+                    CHECK_SNAPSHOT_PARTS,
+                    true,
+                    false,
+                    null,
+                    () -> grid(0).destroyCache(DEFAULT_CACHE_NAME)
+                );
+            }
+        }
+    }
+
     /** Tests that concurrent snapshot full checks are allowed for different 
snapshots. */
     @Test
     public void testConcurrentDifferentSnpFullChecksAllowed() throws Exception 
{
@@ -807,7 +929,7 @@ public class IgniteClusterSnapshotCheckTest extends 
AbstractSnapshotSelfTest {
 
     /** Tests that snapshot full check doesn't affect a snapshot creation. */
     @Test
-    public void testConcurrentSnpCheckAndCreateAllowed() throws Exception {
+    public void testConcurrentDifferentSnpCheckAndCreateAllowed() throws 
Exception {
         prepareGridsAndSnapshot(3, 2, 2, false);
 
         for (int i = 0; i < G.allGrids().size(); ++i) {


Reply via email to