This is an automated email from the ASF dual-hosted git repository.

timoninmaxim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new fb9372c33bf IGNITE-26493 Refactor IncrementalSnapshotVerificationTask 
(#12386)
fb9372c33bf is described below

commit fb9372c33bf200dddda5c88e4fe8436babff448b
Author: Daniil <[email protected]>
AuthorDate: Mon Oct 13 11:45:33 2025 +0300

    IGNITE-26493 Refactor IncrementalSnapshotVerificationTask (#12386)
---
 .../IncrementalSnapshotVerificationTask.java       | 430 +++++++++------------
 .../persistence/snapshot/SnapshotChecker.java      |  21 +-
 2 files changed, 208 insertions(+), 243 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotVerificationTask.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotVerificationTask.java
index 27bf90db3f2..d5f744b450c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotVerificationTask.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotVerificationTask.java
@@ -30,14 +30,13 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.LongAdder;
 import java.util.function.BiConsumer;
 import java.util.function.Function;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cache.CacheAtomicityMode;
-import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.IgniteEx;
-import org.apache.ignite.internal.management.cache.IdleVerifyResult;
 import org.apache.ignite.internal.management.cache.PartitionKey;
 import org.apache.ignite.internal.processors.cache.GridCacheOperation;
 import org.apache.ignite.internal.processors.cache.GridLocalConfigManager;
@@ -55,301 +54,250 @@ import org.apache.ignite.transactions.TransactionState;
 import static 
org.apache.ignite.internal.managers.discovery.ConsistentIdMapper.ALL_NODES;
 
 /** */
-public class IncrementalSnapshotVerificationTask {
+public class IncrementalSnapshotVerificationTask implements 
Supplier<IncrementalSnapshotVerificationTaskResult> {
     /** */
-    private final VerifyIncrementalSnapshotJob job;
+    private final IgniteEx ignite;
 
     /** */
     private final IgniteLogger log;
 
     /** */
-    public IncrementalSnapshotVerificationTask(IgniteEx ignite, IgniteLogger 
log, SnapshotFileTree sft, int incrementalIdx) {
-        job = new VerifyIncrementalSnapshotJob(ignite, log, sft, 
incrementalIdx);
-        this.log = log;
-    }
+    private final SnapshotFileTree sft;
 
     /** */
-    public IdleVerifyResult reduce(Map<ClusterNode, 
IncrementalSnapshotVerificationTaskResult> results) throws IgniteException {
-        IdleVerifyResult.Builder bldr = IdleVerifyResult.builder();
-
-        for (Map.Entry<ClusterNode, IncrementalSnapshotVerificationTaskResult> 
nodeRes: results.entrySet()) {
-            IncrementalSnapshotVerificationTaskResult res = nodeRes.getValue();
-
-            if (!F.isEmpty(res.partiallyCommittedTxs()))
-                bldr.addPartiallyCommited(nodeRes.getKey(), 
res.partiallyCommittedTxs());
-
-            bldr.addPartitionHashes(res.partHashRes());
-
-            if (log.isDebugEnabled())
-                log.debug("Handle VerifyIncrementalSnapshotJob result [node=" 
+ nodeRes.getKey() + ", taskRes=" + res + ']');
-
-            bldr.addIncrementalHashRecords(nodeRes.getKey(), res.txHashRes());
-        }
-
-        return bldr.build();
-    }
+    private final int incIdx;
 
     /** */
-    public IncrementalSnapshotVerificationTaskResult execute() {
-        return job.execute0();
-    }
+    private LongAdder procEntriesCnt;
 
     /** */
-    private static class VerifyIncrementalSnapshotJob {
-        /** */
-        private final IgniteEx ignite;
-
-        /** */
-        private final IgniteLogger log;
-
-        /** */
-        private final SnapshotFileTree sft;
-
-        /** */
-        private final int incIdx;
-
-        /** */
-        private LongAdder procEntriesCnt;
-
-        /**
-         * @param ignite Ignite instance.
-         * @param log Ignite logger.
-         * @param sft Snapshot file tree
-         * @param incIdx Incremental snapshot index.
-         */
-        private VerifyIncrementalSnapshotJob(
-            IgniteEx ignite,
-            IgniteLogger log,
-            SnapshotFileTree sft,
-            int incIdx
-        ) {
-            this.ignite = ignite;
-            this.log = log;
-            this.sft = sft;
-            this.incIdx = incIdx;
-        }
+    public IncrementalSnapshotVerificationTask(IgniteEx ignite, IgniteLogger 
log, SnapshotFileTree sft, int incrementalIdx) {
+        this.ignite = ignite;
+        this.log = log;
+        this.sft = sft;
+        this.incIdx = incrementalIdx;
+    }
 
-        /**
-         * @return Map containing calculated transactions hash for every 
remote node in the cluster.
-         */
-        public IncrementalSnapshotVerificationTaskResult execute0() throws 
IgniteException {
-            try {
-                if (log.isInfoEnabled()) {
-                    log.info("Verify incremental snapshot procedure has been 
initiated " +
-                        "[snpName=" + sft.name() + ", incrementIndex=" + 
incIdx + ", consId=" + sft.consistentId() + ']');
-                }
+    /**
+     * @return Map containing calculated transactions hash for every remote 
node in the cluster.
+     */
+    @Override public IncrementalSnapshotVerificationTaskResult get() throws 
IgniteException {
+        try {
+            if (log.isInfoEnabled()) {
+                log.info("Verify incremental snapshot procedure has been 
initiated " +
+                    "[snpName=" + sft.name() + ", incrementIndex=" + incIdx + 
", consId=" + sft.consistentId() + ']');
+            }
 
-                if (incIdx <= 0)
-                    return new IncrementalSnapshotVerificationTaskResult();
+            if (incIdx <= 0)
+                return new IncrementalSnapshotVerificationTaskResult();
 
-                BaselineTopology blt = 
ignite.context().state().clusterState().baselineTopology();
+            BaselineTopology blt = 
ignite.context().state().clusterState().baselineTopology();
 
-                Map<String, Short> cstIdsMap = 
blt.consistentIdMapping().entrySet().stream()
-                    .collect(Collectors.toMap(e -> e.getKey().toString(), 
Map.Entry::getValue));
+            Map<String, Short> cstIdsMap = 
blt.consistentIdMapping().entrySet().stream()
+                .collect(Collectors.toMap(e -> e.getKey().toString(), 
Map.Entry::getValue));
 
-                checkBaseline(cstIdsMap.keySet());
+            checkBaseline(cstIdsMap.keySet());
 
-                Map<Integer, StoredCacheData> txCaches = readTxCachesData();
+            Map<Integer, StoredCacheData> txCaches = readTxCachesData();
 
-                AtomicLong procSegCnt = new AtomicLong();
+            AtomicLong procSegCnt = new AtomicLong();
 
-                IncrementalSnapshotProcessor proc = new 
IncrementalSnapshotProcessor(
-                    ignite.context().cache().context(), sft, incIdx, 
txCaches.keySet()
-                ) {
-                    @Override void totalWalSegments(int segCnt) {
-                        // No-op.
-                    }
+            IncrementalSnapshotProcessor proc = new 
IncrementalSnapshotProcessor(
+                ignite.context().cache().context(), sft, incIdx, 
txCaches.keySet()
+            ) {
+                @Override void totalWalSegments(int segCnt) {
+                    // No-op.
+                }
 
-                    @Override void processedWalSegments(int segCnt) {
-                        procSegCnt.set(segCnt);
-                    }
+                @Override void processedWalSegments(int segCnt) {
+                    procSegCnt.set(segCnt);
+                }
 
-                    @Override void initWalEntries(LongAdder entriesCnt) {
-                        procEntriesCnt = entriesCnt;
-                    }
-                };
+                @Override void initWalEntries(LongAdder entriesCnt) {
+                    procEntriesCnt = entriesCnt;
+                }
+            };
 
-                short locNodeId = cstIdsMap.get(sft.consistentId());
+            short locNodeId = cstIdsMap.get(sft.consistentId());
 
-                Set<GridCacheVersion> activeDhtTxs = new HashSet<>();
-                Map<GridCacheVersion, Set<Short>> txPrimParticipatingNodes = 
new HashMap<>();
-                Map<Short, HashHolder> nodesTxHash = new HashMap<>();
+            Set<GridCacheVersion> activeDhtTxs = new HashSet<>();
+            Map<GridCacheVersion, Set<Short>> txPrimParticipatingNodes = new 
HashMap<>();
+            Map<Short, HashHolder> nodesTxHash = new HashMap<>();
 
-                Set<GridCacheVersion> partiallyCommittedTxs = new HashSet<>();
-                // Hashes in this map calculated based on WAL records only, 
not part-X.bin data.
-                Map<PartitionKey, HashHolder> partMap = new HashMap<>();
-                List<Exception> exceptions = new ArrayList<>();
+            Set<GridCacheVersion> partiallyCommittedTxs = new HashSet<>();
+            // Hashes in this map calculated based on WAL records only, not 
part-X.bin data.
+            Map<PartitionKey, HashHolder> partMap = new HashMap<>();
+            List<Exception> exceptions = new ArrayList<>();
 
-                Function<Short, HashHolder> hashHolderBuilder = (k) -> new 
HashHolder();
+            Function<Short, HashHolder> hashHolderBuilder = (k) -> new 
HashHolder();
 
-                BiConsumer<GridCacheVersion, Set<Short>> calcTxHash = (xid, 
participatingNodes) -> {
-                    for (short nodeId: participatingNodes) {
-                        if (nodeId != locNodeId) {
-                            HashHolder hash = 
nodesTxHash.computeIfAbsent(nodeId, hashHolderBuilder);
+            BiConsumer<GridCacheVersion, Set<Short>> calcTxHash = (xid, 
participatingNodes) -> {
+                for (short nodeId: participatingNodes) {
+                    if (nodeId != locNodeId) {
+                        HashHolder hash = nodesTxHash.computeIfAbsent(nodeId, 
hashHolderBuilder);
 
-                            hash.increment(xid.hashCode(), 0);
-                        }
+                        hash.increment(xid.hashCode(), 0);
                     }
-                };
+                }
+            };
 
-                // CacheId -> CacheGrpId.
-                Map<Integer, Integer> cacheGrpId = txCaches.values().stream()
-                    .collect(Collectors.toMap(
-                        StoredCacheData::cacheId,
-                        cacheData -> 
CU.cacheGroupId(cacheData.config().getName(), cacheData.config().getGroupName())
-                    ));
+            // CacheId -> CacheGrpId.
+            Map<Integer, Integer> cacheGrpId = txCaches.values().stream()
+                .collect(Collectors.toMap(
+                    StoredCacheData::cacheId,
+                    cacheData -> CU.cacheGroupId(cacheData.config().getName(), 
cacheData.config().getGroupName())
+                ));
 
-                LongAdder procTxCnt = new LongAdder();
+            LongAdder procTxCnt = new LongAdder();
 
-                proc.process(dataEntry -> {
-                    if (dataEntry.op() == GridCacheOperation.READ || 
!exceptions.isEmpty())
-                        return;
+            proc.process(dataEntry -> {
+                if (dataEntry.op() == GridCacheOperation.READ || 
!exceptions.isEmpty())
+                    return;
 
-                    if (log.isTraceEnabled())
-                        log.trace("Checking data entry [entry=" + dataEntry + 
']');
+                if (log.isTraceEnabled())
+                    log.trace("Checking data entry [entry=" + dataEntry + ']');
 
-                    if (!activeDhtTxs.contains(dataEntry.writeVersion()))
-                        partiallyCommittedTxs.add(dataEntry.nearXidVersion());
+                if (!activeDhtTxs.contains(dataEntry.writeVersion()))
+                    partiallyCommittedTxs.add(dataEntry.nearXidVersion());
 
-                    StoredCacheData cacheData = 
txCaches.get(dataEntry.cacheId());
+                StoredCacheData cacheData = txCaches.get(dataEntry.cacheId());
 
-                    PartitionKey partKey = new PartitionKey(
-                        cacheGrpId.get(dataEntry.cacheId()),
-                        dataEntry.partitionId(),
-                        CU.cacheOrGroupName(cacheData.config()));
+                PartitionKey partKey = new PartitionKey(
+                    cacheGrpId.get(dataEntry.cacheId()),
+                    dataEntry.partitionId(),
+                    CU.cacheOrGroupName(cacheData.config()));
 
-                    HashHolder hash = partMap.computeIfAbsent(partKey, (k) -> 
new HashHolder());
+                HashHolder hash = partMap.computeIfAbsent(partKey, (k) -> new 
HashHolder());
 
-                    try {
-                        int valHash = dataEntry.key().hashCode();
+                try {
+                    int valHash = dataEntry.key().hashCode();
 
-                        if (dataEntry.value() != null)
-                            valHash += 
Arrays.hashCode(dataEntry.value().valueBytes(null));
+                    if (dataEntry.value() != null)
+                        valHash += 
Arrays.hashCode(dataEntry.value().valueBytes(null));
 
-                        int verHash = dataEntry.writeVersion().hashCode();
+                    int verHash = dataEntry.writeVersion().hashCode();
 
-                        hash.increment(valHash, verHash);
-                    }
-                    catch (IgniteCheckedException ex) {
-                        exceptions.add(ex);
+                    hash.increment(valHash, verHash);
+                }
+                catch (IgniteCheckedException ex) {
+                    exceptions.add(ex);
+                }
+            }, txRec -> {
+                if (!exceptions.isEmpty())
+                    return;
+
+                if (log.isDebugEnabled())
+                    log.debug("Checking tx record [txRec=" + txRec + ']');
+
+                if (txRec.state() == TransactionState.PREPARED) {
+                    // Collect only primary nodes. For some cases backup nodes 
is included into TxRecord#participationNodes()
+                    // but actually doesn't even start transaction, for 
example, if the node participates only as a backup
+                    // of reading only keys.
+                    Set<Short> primParticipatingNodes = 
txRec.participatingNodes().keySet();
+
+                    if (primParticipatingNodes.contains(locNodeId)) {
+                        txPrimParticipatingNodes.put(txRec.nearXidVersion(), 
primParticipatingNodes);
+                        activeDhtTxs.add(txRec.writeVersion());
                     }
-                }, txRec -> {
-                    if (!exceptions.isEmpty())
-                        return;
-
-                    if (log.isDebugEnabled())
-                        log.debug("Checking tx record [txRec=" + txRec + ']');
-
-                    if (txRec.state() == TransactionState.PREPARED) {
-                        // Collect only primary nodes. For some cases backup 
nodes is included into TxRecord#participationNodes()
-                        // but actually doesn't even start transaction, for 
example, if the node participates only as a backup
-                        // of reading only keys.
-                        Set<Short> primParticipatingNodes = 
txRec.participatingNodes().keySet();
-
-                        if (primParticipatingNodes.contains(locNodeId)) {
-                            
txPrimParticipatingNodes.put(txRec.nearXidVersion(), primParticipatingNodes);
-                            activeDhtTxs.add(txRec.writeVersion());
-                        }
-                        else {
-                            for (Collection<Short> backups: 
txRec.participatingNodes().values()) {
-                                if (backups.contains(ALL_NODES) || 
backups.contains(locNodeId))
-                                    activeDhtTxs.add(txRec.writeVersion());
-                            }
+                    else {
+                        for (Collection<Short> backups: 
txRec.participatingNodes().values()) {
+                            if (backups.contains(ALL_NODES) || 
backups.contains(locNodeId))
+                                activeDhtTxs.add(txRec.writeVersion());
                         }
                     }
-                    else if (txRec.state() == TransactionState.COMMITTED) {
-                        activeDhtTxs.remove(txRec.writeVersion());
-
-                        Set<Short> participatingNodes = 
txPrimParticipatingNodes.remove(txRec.nearXidVersion());
-
-                        // Legal cases:
-                        // 1. This node is a transaction near node, but not 
primary or backup node.
-                        // 2. This node participated in the transaction 
multiple times (e.g., primary for one key and backup for other key).
-                        // 3. A transaction is included into previous 
incremental snapshot.
-                        if (participatingNodes == null)
-                            return;
+                }
+                else if (txRec.state() == TransactionState.COMMITTED) {
+                    activeDhtTxs.remove(txRec.writeVersion());
 
-                        procTxCnt.increment();
+                    Set<Short> participatingNodes = 
txPrimParticipatingNodes.remove(txRec.nearXidVersion());
 
-                        calcTxHash.accept(txRec.nearXidVersion(), 
participatingNodes);
-                    }
-                    else if (txRec.state() == TransactionState.ROLLED_BACK) {
-                        activeDhtTxs.remove(txRec.writeVersion());
-                        
txPrimParticipatingNodes.remove(txRec.nearXidVersion());
-                    }
-                });
+                    // Legal cases:
+                    // 1. This node is a transaction near node, but not 
primary or backup node.
+                    // 2. This node participated in the transaction multiple 
times (e.g., primary for one key and backup for other key).
+                    // 3. A transaction is included into previous incremental 
snapshot.
+                    if (participatingNodes == null)
+                        return;
 
-                // All active transactions that didn't log COMMITTED or 
ROLL_BACK records are considered committed.
-                // It is possible as incremental snapshot started after 
transaction left IgniteTxManager#activeTransactions() collection,
-                // but completed before the final TxRecord was written.
-                for (Map.Entry<GridCacheVersion, Set<Short>> tx: 
txPrimParticipatingNodes.entrySet())
-                    calcTxHash.accept(tx.getKey(), tx.getValue());
+                    procTxCnt.increment();
 
-                Map<Object, TransactionsHashRecord> txHashRes = 
nodesTxHash.entrySet().stream()
-                    .map(e -> new TransactionsHashRecord(
-                        sft.consistentId(),
-                        blt.compactIdMapping().get(e.getKey()),
-                        e.getValue().hash
-                    ))
-                    .collect(Collectors.toMap(
-                        TransactionsHashRecord::remoteConsistentId,
-                        Function.identity()
-                    ));
-
-                Map<PartitionKey, PartitionHashRecord> partHashRes = 
partMap.entrySet().stream()
-                    .collect(Collectors.toMap(
-                        Map.Entry::getKey,
-                        e -> new PartitionHashRecord(
-                            e.getKey(),
-                            false,
-                            sft.consistentId(),
-                            null,
-                            0,
-                            null,
-                            new VerifyPartitionContext(e.getValue())
-                        )
-                    ));
-
-                if (log.isInfoEnabled()) {
-                    log.info("Verify incremental snapshot procedure finished " 
+
-                        "[snpName=" + sft.name() + ", incrementIndex=" + 
incIdx + ", consId=" + sft.consistentId() +
-                        ", txCnt=" + procTxCnt.sum() + ", dataEntries=" + 
procEntriesCnt.sum() +
-                        ", walSegments=" + procSegCnt.get() + ']');
+                    calcTxHash.accept(txRec.nearXidVersion(), 
participatingNodes);
                 }
-
-                return new IncrementalSnapshotVerificationTaskResult(
-                    txHashRes,
-                    partHashRes,
-                    partiallyCommittedTxs,
-                    exceptions);
-            }
-            catch (IgniteCheckedException | IOException e) {
-                throw new IgniteException(e);
+                else if (txRec.state() == TransactionState.ROLLED_BACK) {
+                    activeDhtTxs.remove(txRec.writeVersion());
+                    txPrimParticipatingNodes.remove(txRec.nearXidVersion());
+                }
+            });
+
+            // All active transactions that didn't log COMMITTED or ROLL_BACK 
records are considered committed.
+            // It is possible as incremental snapshot started after 
transaction left IgniteTxManager#activeTransactions() collection,
+            // but completed before the final TxRecord was written.
+            for (Map.Entry<GridCacheVersion, Set<Short>> tx: 
txPrimParticipatingNodes.entrySet())
+                calcTxHash.accept(tx.getKey(), tx.getValue());
+
+            Map<Object, TransactionsHashRecord> txHashRes = 
nodesTxHash.entrySet().stream()
+                .map(e -> new TransactionsHashRecord(
+                    sft.consistentId(),
+                    blt.compactIdMapping().get(e.getKey()),
+                    e.getValue().hash
+                ))
+                .collect(Collectors.toMap(
+                    TransactionsHashRecord::remoteConsistentId,
+                    Function.identity()
+                ));
+
+            Map<PartitionKey, PartitionHashRecord> partHashRes = 
partMap.entrySet().stream()
+                .collect(Collectors.toMap(
+                    Map.Entry::getKey,
+                    e -> new PartitionHashRecord(
+                        e.getKey(),
+                        false,
+                        sft.consistentId(),
+                        null,
+                        0,
+                        null,
+                        new VerifyPartitionContext(e.getValue())
+                    )
+                ));
+
+            if (log.isInfoEnabled()) {
+                log.info("Verify incremental snapshot procedure finished " +
+                    "[snpName=" + sft.name() + ", incrementIndex=" + incIdx + 
", consId=" + sft.consistentId() +
+                    ", txCnt=" + procTxCnt.sum() + ", dataEntries=" + 
procEntriesCnt.sum() +
+                    ", walSegments=" + procSegCnt.get() + ']');
             }
+
+            return new IncrementalSnapshotVerificationTaskResult(
+                txHashRes,
+                partHashRes,
+                partiallyCommittedTxs,
+                exceptions);
         }
+        catch (IgniteCheckedException | IOException e) {
+            throw new IgniteException(e);
+        }
+    }
 
-        /** Checks that current baseline topology matches baseline topology of 
the snapshot. */
-        private void checkBaseline(Collection<String> baselineCstIds) throws 
IgniteCheckedException, IOException {
-            IgniteSnapshotManager snpMgr = 
ignite.context().cache().context().snapshotMgr();
+    /** Checks that current baseline topology matches baseline topology of the 
snapshot. */
+    private void checkBaseline(Collection<String> baselineCstIds) throws 
IgniteCheckedException, IOException {
+        IgniteSnapshotManager snpMgr = 
ignite.context().cache().context().snapshotMgr();
 
-            SnapshotMetadata meta = snpMgr.readSnapshotMetadata(sft.meta());
+        SnapshotMetadata meta = snpMgr.readSnapshotMetadata(sft.meta());
 
-            if (!F.eqNotOrdered(baselineCstIds, meta.baselineNodes())) {
-                throw new IgniteCheckedException("Topologies of snapshot and 
current cluster are different [snp=" +
-                    meta.baselineNodes() + ", current=" + baselineCstIds + 
']');
-            }
+        if (!F.eqNotOrdered(baselineCstIds, meta.baselineNodes())) {
+            throw new IgniteCheckedException("Topologies of snapshot and 
current cluster are different [snp=" +
+                meta.baselineNodes() + ", current=" + baselineCstIds + ']');
         }
+    }
 
-        /** @return Collection of snapshotted transactional caches, key is a 
cache ID. */
-        private Map<Integer, StoredCacheData> readTxCachesData() {
-            return GridLocalConfigManager.readCachesData(
-                    sft,
-                    ignite.context().marshallerContext().jdkMarshaller(),
-                    ignite.configuration())
-                .values().stream()
-                .filter(data -> data.config().getAtomicityMode() == 
CacheAtomicityMode.TRANSACTIONAL)
-                .collect(Collectors.toMap(StoredCacheData::cacheId, 
Function.identity()));
-        }
+    /** @return Collection of snapshotted transactional caches, key is a cache 
ID. */
+    private Map<Integer, StoredCacheData> readTxCachesData() {
+        return GridLocalConfigManager.readCachesData(
+                sft,
+                ignite.context().marshallerContext().jdkMarshaller(),
+                ignite.configuration())
+            .values().stream()
+            .filter(data -> data.config().getAtomicityMode() == 
CacheAtomicityMode.TRANSACTIONAL)
+            .collect(Collectors.toMap(StoredCacheData::cacheId, 
Function.identity()));
     }
 
     /** Holder for calculated hashes. */
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotChecker.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotChecker.java
index 2ad7244ee52..8ccffd74ca8 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotChecker.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotChecker.java
@@ -30,6 +30,7 @@ import 
org.apache.ignite.internal.management.cache.IdleVerifyResult;
 import org.apache.ignite.internal.management.cache.PartitionKey;
 import 
org.apache.ignite.internal.processors.cache.persistence.filename.SnapshotFileTree;
 import org.apache.ignite.internal.processors.cache.verify.PartitionHashRecord;
+import org.apache.ignite.internal.util.typedef.F;
 import org.jetbrains.annotations.Nullable;
 
 /** */
@@ -70,7 +71,7 @@ public class SnapshotChecker {
         assert incIdx > 0;
 
         return CompletableFuture.supplyAsync(
-            () -> new IncrementalSnapshotVerificationTask(kctx.grid(), log, 
sft, incIdx).execute(),
+            new IncrementalSnapshotVerificationTask(kctx.grid(), log, sft, 
incIdx),
             executor
         );
     }
@@ -85,7 +86,23 @@ public class SnapshotChecker {
         if (!operationErrors.isEmpty())
             return 
IdleVerifyResult.builder().exceptions(operationErrors).build();
 
-        return new IncrementalSnapshotVerificationTask(kctx.grid(), log, sft, 
incIdx).reduce(results);
+        IdleVerifyResult.Builder bldr = IdleVerifyResult.builder();
+
+        for (Map.Entry<ClusterNode, IncrementalSnapshotVerificationTaskResult> 
nodeRes: results.entrySet()) {
+            IncrementalSnapshotVerificationTaskResult res = nodeRes.getValue();
+
+            if (!F.isEmpty(res.partiallyCommittedTxs()))
+                bldr.addPartiallyCommited(nodeRes.getKey(), 
res.partiallyCommittedTxs());
+
+            bldr.addPartitionHashes(res.partHashRes());
+
+            if (log.isDebugEnabled())
+                log.debug("Handle VerifyIncrementalSnapshotJob result [node=" 
+ nodeRes.getKey() + ", taskRes=" + res + ']');
+
+            bldr.addIncrementalHashRecords(nodeRes.getKey(), res.txHashRes());
+        }
+
+        return bldr.build();
     }
 
     /** */

Reply via email to