This is an automated email from the ASF dual-hosted git repository.
timoninmaxim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new fb9372c33bf IGNITE-26493 Refactor IncrementalSnapshotVerificationTask
(#12386)
fb9372c33bf is described below
commit fb9372c33bf200dddda5c88e4fe8436babff448b
Author: Daniil <[email protected]>
AuthorDate: Mon Oct 13 11:45:33 2025 +0300
IGNITE-26493 Refactor IncrementalSnapshotVerificationTask (#12386)
---
.../IncrementalSnapshotVerificationTask.java | 430 +++++++++------------
.../persistence/snapshot/SnapshotChecker.java | 21 +-
2 files changed, 208 insertions(+), 243 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotVerificationTask.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotVerificationTask.java
index 27bf90db3f2..d5f744b450c 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotVerificationTask.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotVerificationTask.java
@@ -30,14 +30,13 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.BiConsumer;
import java.util.function.Function;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.CacheAtomicityMode;
-import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteEx;
-import org.apache.ignite.internal.management.cache.IdleVerifyResult;
import org.apache.ignite.internal.management.cache.PartitionKey;
import org.apache.ignite.internal.processors.cache.GridCacheOperation;
import org.apache.ignite.internal.processors.cache.GridLocalConfigManager;
@@ -55,301 +54,250 @@ import org.apache.ignite.transactions.TransactionState;
import static
org.apache.ignite.internal.managers.discovery.ConsistentIdMapper.ALL_NODES;
/** */
-public class IncrementalSnapshotVerificationTask {
+public class IncrementalSnapshotVerificationTask implements
Supplier<IncrementalSnapshotVerificationTaskResult> {
/** */
- private final VerifyIncrementalSnapshotJob job;
+ private final IgniteEx ignite;
/** */
private final IgniteLogger log;
/** */
- public IncrementalSnapshotVerificationTask(IgniteEx ignite, IgniteLogger
log, SnapshotFileTree sft, int incrementalIdx) {
- job = new VerifyIncrementalSnapshotJob(ignite, log, sft,
incrementalIdx);
- this.log = log;
- }
+ private final SnapshotFileTree sft;
/** */
- public IdleVerifyResult reduce(Map<ClusterNode,
IncrementalSnapshotVerificationTaskResult> results) throws IgniteException {
- IdleVerifyResult.Builder bldr = IdleVerifyResult.builder();
-
- for (Map.Entry<ClusterNode, IncrementalSnapshotVerificationTaskResult>
nodeRes: results.entrySet()) {
- IncrementalSnapshotVerificationTaskResult res = nodeRes.getValue();
-
- if (!F.isEmpty(res.partiallyCommittedTxs()))
- bldr.addPartiallyCommited(nodeRes.getKey(),
res.partiallyCommittedTxs());
-
- bldr.addPartitionHashes(res.partHashRes());
-
- if (log.isDebugEnabled())
- log.debug("Handle VerifyIncrementalSnapshotJob result [node="
+ nodeRes.getKey() + ", taskRes=" + res + ']');
-
- bldr.addIncrementalHashRecords(nodeRes.getKey(), res.txHashRes());
- }
-
- return bldr.build();
- }
+ private final int incIdx;
/** */
- public IncrementalSnapshotVerificationTaskResult execute() {
- return job.execute0();
- }
+ private LongAdder procEntriesCnt;
/** */
- private static class VerifyIncrementalSnapshotJob {
- /** */
- private final IgniteEx ignite;
-
- /** */
- private final IgniteLogger log;
-
- /** */
- private final SnapshotFileTree sft;
-
- /** */
- private final int incIdx;
-
- /** */
- private LongAdder procEntriesCnt;
-
- /**
- * @param ignite Ignite instance.
- * @param log Ignite logger.
- * @param sft Snapshot file tree
- * @param incIdx Incremental snapshot index.
- */
- private VerifyIncrementalSnapshotJob(
- IgniteEx ignite,
- IgniteLogger log,
- SnapshotFileTree sft,
- int incIdx
- ) {
- this.ignite = ignite;
- this.log = log;
- this.sft = sft;
- this.incIdx = incIdx;
- }
+ public IncrementalSnapshotVerificationTask(IgniteEx ignite, IgniteLogger
log, SnapshotFileTree sft, int incrementalIdx) {
+ this.ignite = ignite;
+ this.log = log;
+ this.sft = sft;
+ this.incIdx = incrementalIdx;
+ }
- /**
- * @return Map containing calculated transactions hash for every
remote node in the cluster.
- */
- public IncrementalSnapshotVerificationTaskResult execute0() throws
IgniteException {
- try {
- if (log.isInfoEnabled()) {
- log.info("Verify incremental snapshot procedure has been
initiated " +
- "[snpName=" + sft.name() + ", incrementIndex=" +
incIdx + ", consId=" + sft.consistentId() + ']');
- }
+ /**
+ * @return Map containing calculated transactions hash for every remote
node in the cluster.
+ */
+ @Override public IncrementalSnapshotVerificationTaskResult get() throws
IgniteException {
+ try {
+ if (log.isInfoEnabled()) {
+ log.info("Verify incremental snapshot procedure has been
initiated " +
+ "[snpName=" + sft.name() + ", incrementIndex=" + incIdx +
", consId=" + sft.consistentId() + ']');
+ }
- if (incIdx <= 0)
- return new IncrementalSnapshotVerificationTaskResult();
+ if (incIdx <= 0)
+ return new IncrementalSnapshotVerificationTaskResult();
- BaselineTopology blt =
ignite.context().state().clusterState().baselineTopology();
+ BaselineTopology blt =
ignite.context().state().clusterState().baselineTopology();
- Map<String, Short> cstIdsMap =
blt.consistentIdMapping().entrySet().stream()
- .collect(Collectors.toMap(e -> e.getKey().toString(),
Map.Entry::getValue));
+ Map<String, Short> cstIdsMap =
blt.consistentIdMapping().entrySet().stream()
+ .collect(Collectors.toMap(e -> e.getKey().toString(),
Map.Entry::getValue));
- checkBaseline(cstIdsMap.keySet());
+ checkBaseline(cstIdsMap.keySet());
- Map<Integer, StoredCacheData> txCaches = readTxCachesData();
+ Map<Integer, StoredCacheData> txCaches = readTxCachesData();
- AtomicLong procSegCnt = new AtomicLong();
+ AtomicLong procSegCnt = new AtomicLong();
- IncrementalSnapshotProcessor proc = new
IncrementalSnapshotProcessor(
- ignite.context().cache().context(), sft, incIdx,
txCaches.keySet()
- ) {
- @Override void totalWalSegments(int segCnt) {
- // No-op.
- }
+ IncrementalSnapshotProcessor proc = new
IncrementalSnapshotProcessor(
+ ignite.context().cache().context(), sft, incIdx,
txCaches.keySet()
+ ) {
+ @Override void totalWalSegments(int segCnt) {
+ // No-op.
+ }
- @Override void processedWalSegments(int segCnt) {
- procSegCnt.set(segCnt);
- }
+ @Override void processedWalSegments(int segCnt) {
+ procSegCnt.set(segCnt);
+ }
- @Override void initWalEntries(LongAdder entriesCnt) {
- procEntriesCnt = entriesCnt;
- }
- };
+ @Override void initWalEntries(LongAdder entriesCnt) {
+ procEntriesCnt = entriesCnt;
+ }
+ };
- short locNodeId = cstIdsMap.get(sft.consistentId());
+ short locNodeId = cstIdsMap.get(sft.consistentId());
- Set<GridCacheVersion> activeDhtTxs = new HashSet<>();
- Map<GridCacheVersion, Set<Short>> txPrimParticipatingNodes =
new HashMap<>();
- Map<Short, HashHolder> nodesTxHash = new HashMap<>();
+ Set<GridCacheVersion> activeDhtTxs = new HashSet<>();
+ Map<GridCacheVersion, Set<Short>> txPrimParticipatingNodes = new
HashMap<>();
+ Map<Short, HashHolder> nodesTxHash = new HashMap<>();
- Set<GridCacheVersion> partiallyCommittedTxs = new HashSet<>();
- // Hashes in this map calculated based on WAL records only,
not part-X.bin data.
- Map<PartitionKey, HashHolder> partMap = new HashMap<>();
- List<Exception> exceptions = new ArrayList<>();
+ Set<GridCacheVersion> partiallyCommittedTxs = new HashSet<>();
+ // Hashes in this map calculated based on WAL records only, not
part-X.bin data.
+ Map<PartitionKey, HashHolder> partMap = new HashMap<>();
+ List<Exception> exceptions = new ArrayList<>();
- Function<Short, HashHolder> hashHolderBuilder = (k) -> new
HashHolder();
+ Function<Short, HashHolder> hashHolderBuilder = (k) -> new
HashHolder();
- BiConsumer<GridCacheVersion, Set<Short>> calcTxHash = (xid,
participatingNodes) -> {
- for (short nodeId: participatingNodes) {
- if (nodeId != locNodeId) {
- HashHolder hash =
nodesTxHash.computeIfAbsent(nodeId, hashHolderBuilder);
+ BiConsumer<GridCacheVersion, Set<Short>> calcTxHash = (xid,
participatingNodes) -> {
+ for (short nodeId: participatingNodes) {
+ if (nodeId != locNodeId) {
+ HashHolder hash = nodesTxHash.computeIfAbsent(nodeId,
hashHolderBuilder);
- hash.increment(xid.hashCode(), 0);
- }
+ hash.increment(xid.hashCode(), 0);
}
- };
+ }
+ };
- // CacheId -> CacheGrpId.
- Map<Integer, Integer> cacheGrpId = txCaches.values().stream()
- .collect(Collectors.toMap(
- StoredCacheData::cacheId,
- cacheData ->
CU.cacheGroupId(cacheData.config().getName(), cacheData.config().getGroupName())
- ));
+ // CacheId -> CacheGrpId.
+ Map<Integer, Integer> cacheGrpId = txCaches.values().stream()
+ .collect(Collectors.toMap(
+ StoredCacheData::cacheId,
+ cacheData -> CU.cacheGroupId(cacheData.config().getName(),
cacheData.config().getGroupName())
+ ));
- LongAdder procTxCnt = new LongAdder();
+ LongAdder procTxCnt = new LongAdder();
- proc.process(dataEntry -> {
- if (dataEntry.op() == GridCacheOperation.READ ||
!exceptions.isEmpty())
- return;
+ proc.process(dataEntry -> {
+ if (dataEntry.op() == GridCacheOperation.READ ||
!exceptions.isEmpty())
+ return;
- if (log.isTraceEnabled())
- log.trace("Checking data entry [entry=" + dataEntry +
']');
+ if (log.isTraceEnabled())
+ log.trace("Checking data entry [entry=" + dataEntry + ']');
- if (!activeDhtTxs.contains(dataEntry.writeVersion()))
- partiallyCommittedTxs.add(dataEntry.nearXidVersion());
+ if (!activeDhtTxs.contains(dataEntry.writeVersion()))
+ partiallyCommittedTxs.add(dataEntry.nearXidVersion());
- StoredCacheData cacheData =
txCaches.get(dataEntry.cacheId());
+ StoredCacheData cacheData = txCaches.get(dataEntry.cacheId());
- PartitionKey partKey = new PartitionKey(
- cacheGrpId.get(dataEntry.cacheId()),
- dataEntry.partitionId(),
- CU.cacheOrGroupName(cacheData.config()));
+ PartitionKey partKey = new PartitionKey(
+ cacheGrpId.get(dataEntry.cacheId()),
+ dataEntry.partitionId(),
+ CU.cacheOrGroupName(cacheData.config()));
- HashHolder hash = partMap.computeIfAbsent(partKey, (k) ->
new HashHolder());
+ HashHolder hash = partMap.computeIfAbsent(partKey, (k) -> new
HashHolder());
- try {
- int valHash = dataEntry.key().hashCode();
+ try {
+ int valHash = dataEntry.key().hashCode();
- if (dataEntry.value() != null)
- valHash +=
Arrays.hashCode(dataEntry.value().valueBytes(null));
+ if (dataEntry.value() != null)
+ valHash +=
Arrays.hashCode(dataEntry.value().valueBytes(null));
- int verHash = dataEntry.writeVersion().hashCode();
+ int verHash = dataEntry.writeVersion().hashCode();
- hash.increment(valHash, verHash);
- }
- catch (IgniteCheckedException ex) {
- exceptions.add(ex);
+ hash.increment(valHash, verHash);
+ }
+ catch (IgniteCheckedException ex) {
+ exceptions.add(ex);
+ }
+ }, txRec -> {
+ if (!exceptions.isEmpty())
+ return;
+
+ if (log.isDebugEnabled())
+ log.debug("Checking tx record [txRec=" + txRec + ']');
+
+ if (txRec.state() == TransactionState.PREPARED) {
+ // Collect only primary nodes. For some cases backup nodes
is included into TxRecord#participationNodes()
+ // but actually doesn't even start transaction, for
example, if the node participates only as a backup
+ // of reading only keys.
+ Set<Short> primParticipatingNodes =
txRec.participatingNodes().keySet();
+
+ if (primParticipatingNodes.contains(locNodeId)) {
+ txPrimParticipatingNodes.put(txRec.nearXidVersion(),
primParticipatingNodes);
+ activeDhtTxs.add(txRec.writeVersion());
}
- }, txRec -> {
- if (!exceptions.isEmpty())
- return;
-
- if (log.isDebugEnabled())
- log.debug("Checking tx record [txRec=" + txRec + ']');
-
- if (txRec.state() == TransactionState.PREPARED) {
- // Collect only primary nodes. For some cases backup
nodes is included into TxRecord#participationNodes()
- // but actually doesn't even start transaction, for
example, if the node participates only as a backup
- // of reading only keys.
- Set<Short> primParticipatingNodes =
txRec.participatingNodes().keySet();
-
- if (primParticipatingNodes.contains(locNodeId)) {
-
txPrimParticipatingNodes.put(txRec.nearXidVersion(), primParticipatingNodes);
- activeDhtTxs.add(txRec.writeVersion());
- }
- else {
- for (Collection<Short> backups:
txRec.participatingNodes().values()) {
- if (backups.contains(ALL_NODES) ||
backups.contains(locNodeId))
- activeDhtTxs.add(txRec.writeVersion());
- }
+ else {
+ for (Collection<Short> backups:
txRec.participatingNodes().values()) {
+ if (backups.contains(ALL_NODES) ||
backups.contains(locNodeId))
+ activeDhtTxs.add(txRec.writeVersion());
}
}
- else if (txRec.state() == TransactionState.COMMITTED) {
- activeDhtTxs.remove(txRec.writeVersion());
-
- Set<Short> participatingNodes =
txPrimParticipatingNodes.remove(txRec.nearXidVersion());
-
- // Legal cases:
- // 1. This node is a transaction near node, but not
primary or backup node.
- // 2. This node participated in the transaction
multiple times (e.g., primary for one key and backup for other key).
- // 3. A transaction is included into previous
incremental snapshot.
- if (participatingNodes == null)
- return;
+ }
+ else if (txRec.state() == TransactionState.COMMITTED) {
+ activeDhtTxs.remove(txRec.writeVersion());
- procTxCnt.increment();
+ Set<Short> participatingNodes =
txPrimParticipatingNodes.remove(txRec.nearXidVersion());
- calcTxHash.accept(txRec.nearXidVersion(),
participatingNodes);
- }
- else if (txRec.state() == TransactionState.ROLLED_BACK) {
- activeDhtTxs.remove(txRec.writeVersion());
-
txPrimParticipatingNodes.remove(txRec.nearXidVersion());
- }
- });
+ // Legal cases:
+ // 1. This node is a transaction near node, but not
primary or backup node.
+ // 2. This node participated in the transaction multiple
times (e.g., primary for one key and backup for other key).
+ // 3. A transaction is included into previous incremental
snapshot.
+ if (participatingNodes == null)
+ return;
- // All active transactions that didn't log COMMITTED or
ROLL_BACK records are considered committed.
- // It is possible as incremental snapshot started after
transaction left IgniteTxManager#activeTransactions() collection,
- // but completed before the final TxRecord was written.
- for (Map.Entry<GridCacheVersion, Set<Short>> tx:
txPrimParticipatingNodes.entrySet())
- calcTxHash.accept(tx.getKey(), tx.getValue());
+ procTxCnt.increment();
- Map<Object, TransactionsHashRecord> txHashRes =
nodesTxHash.entrySet().stream()
- .map(e -> new TransactionsHashRecord(
- sft.consistentId(),
- blt.compactIdMapping().get(e.getKey()),
- e.getValue().hash
- ))
- .collect(Collectors.toMap(
- TransactionsHashRecord::remoteConsistentId,
- Function.identity()
- ));
-
- Map<PartitionKey, PartitionHashRecord> partHashRes =
partMap.entrySet().stream()
- .collect(Collectors.toMap(
- Map.Entry::getKey,
- e -> new PartitionHashRecord(
- e.getKey(),
- false,
- sft.consistentId(),
- null,
- 0,
- null,
- new VerifyPartitionContext(e.getValue())
- )
- ));
-
- if (log.isInfoEnabled()) {
- log.info("Verify incremental snapshot procedure finished "
+
- "[snpName=" + sft.name() + ", incrementIndex=" +
incIdx + ", consId=" + sft.consistentId() +
- ", txCnt=" + procTxCnt.sum() + ", dataEntries=" +
procEntriesCnt.sum() +
- ", walSegments=" + procSegCnt.get() + ']');
+ calcTxHash.accept(txRec.nearXidVersion(),
participatingNodes);
}
-
- return new IncrementalSnapshotVerificationTaskResult(
- txHashRes,
- partHashRes,
- partiallyCommittedTxs,
- exceptions);
- }
- catch (IgniteCheckedException | IOException e) {
- throw new IgniteException(e);
+ else if (txRec.state() == TransactionState.ROLLED_BACK) {
+ activeDhtTxs.remove(txRec.writeVersion());
+ txPrimParticipatingNodes.remove(txRec.nearXidVersion());
+ }
+ });
+
+ // All active transactions that didn't log COMMITTED or ROLL_BACK
records are considered committed.
+ // It is possible as incremental snapshot started after
transaction left IgniteTxManager#activeTransactions() collection,
+ // but completed before the final TxRecord was written.
+ for (Map.Entry<GridCacheVersion, Set<Short>> tx:
txPrimParticipatingNodes.entrySet())
+ calcTxHash.accept(tx.getKey(), tx.getValue());
+
+ Map<Object, TransactionsHashRecord> txHashRes =
nodesTxHash.entrySet().stream()
+ .map(e -> new TransactionsHashRecord(
+ sft.consistentId(),
+ blt.compactIdMapping().get(e.getKey()),
+ e.getValue().hash
+ ))
+ .collect(Collectors.toMap(
+ TransactionsHashRecord::remoteConsistentId,
+ Function.identity()
+ ));
+
+ Map<PartitionKey, PartitionHashRecord> partHashRes =
partMap.entrySet().stream()
+ .collect(Collectors.toMap(
+ Map.Entry::getKey,
+ e -> new PartitionHashRecord(
+ e.getKey(),
+ false,
+ sft.consistentId(),
+ null,
+ 0,
+ null,
+ new VerifyPartitionContext(e.getValue())
+ )
+ ));
+
+ if (log.isInfoEnabled()) {
+ log.info("Verify incremental snapshot procedure finished " +
+ "[snpName=" + sft.name() + ", incrementIndex=" + incIdx +
", consId=" + sft.consistentId() +
+ ", txCnt=" + procTxCnt.sum() + ", dataEntries=" +
procEntriesCnt.sum() +
+ ", walSegments=" + procSegCnt.get() + ']');
}
+
+ return new IncrementalSnapshotVerificationTaskResult(
+ txHashRes,
+ partHashRes,
+ partiallyCommittedTxs,
+ exceptions);
}
+ catch (IgniteCheckedException | IOException e) {
+ throw new IgniteException(e);
+ }
+ }
- /** Checks that current baseline topology matches baseline topology of
the snapshot. */
- private void checkBaseline(Collection<String> baselineCstIds) throws
IgniteCheckedException, IOException {
- IgniteSnapshotManager snpMgr =
ignite.context().cache().context().snapshotMgr();
+ /** Checks that current baseline topology matches baseline topology of the
snapshot. */
+ private void checkBaseline(Collection<String> baselineCstIds) throws
IgniteCheckedException, IOException {
+ IgniteSnapshotManager snpMgr =
ignite.context().cache().context().snapshotMgr();
- SnapshotMetadata meta = snpMgr.readSnapshotMetadata(sft.meta());
+ SnapshotMetadata meta = snpMgr.readSnapshotMetadata(sft.meta());
- if (!F.eqNotOrdered(baselineCstIds, meta.baselineNodes())) {
- throw new IgniteCheckedException("Topologies of snapshot and
current cluster are different [snp=" +
- meta.baselineNodes() + ", current=" + baselineCstIds +
']');
- }
+ if (!F.eqNotOrdered(baselineCstIds, meta.baselineNodes())) {
+ throw new IgniteCheckedException("Topologies of snapshot and
current cluster are different [snp=" +
+ meta.baselineNodes() + ", current=" + baselineCstIds + ']');
}
+ }
- /** @return Collection of snapshotted transactional caches, key is a
cache ID. */
- private Map<Integer, StoredCacheData> readTxCachesData() {
- return GridLocalConfigManager.readCachesData(
- sft,
- ignite.context().marshallerContext().jdkMarshaller(),
- ignite.configuration())
- .values().stream()
- .filter(data -> data.config().getAtomicityMode() ==
CacheAtomicityMode.TRANSACTIONAL)
- .collect(Collectors.toMap(StoredCacheData::cacheId,
Function.identity()));
- }
+ /** @return Collection of snapshotted transactional caches, key is a cache
ID. */
+ private Map<Integer, StoredCacheData> readTxCachesData() {
+ return GridLocalConfigManager.readCachesData(
+ sft,
+ ignite.context().marshallerContext().jdkMarshaller(),
+ ignite.configuration())
+ .values().stream()
+ .filter(data -> data.config().getAtomicityMode() ==
CacheAtomicityMode.TRANSACTIONAL)
+ .collect(Collectors.toMap(StoredCacheData::cacheId,
Function.identity()));
}
/** Holder for calculated hashes. */
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotChecker.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotChecker.java
index 2ad7244ee52..8ccffd74ca8 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotChecker.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotChecker.java
@@ -30,6 +30,7 @@ import
org.apache.ignite.internal.management.cache.IdleVerifyResult;
import org.apache.ignite.internal.management.cache.PartitionKey;
import
org.apache.ignite.internal.processors.cache.persistence.filename.SnapshotFileTree;
import org.apache.ignite.internal.processors.cache.verify.PartitionHashRecord;
+import org.apache.ignite.internal.util.typedef.F;
import org.jetbrains.annotations.Nullable;
/** */
@@ -70,7 +71,7 @@ public class SnapshotChecker {
assert incIdx > 0;
return CompletableFuture.supplyAsync(
- () -> new IncrementalSnapshotVerificationTask(kctx.grid(), log,
sft, incIdx).execute(),
+ new IncrementalSnapshotVerificationTask(kctx.grid(), log, sft,
incIdx),
executor
);
}
@@ -85,7 +86,23 @@ public class SnapshotChecker {
if (!operationErrors.isEmpty())
return
IdleVerifyResult.builder().exceptions(operationErrors).build();
- return new IncrementalSnapshotVerificationTask(kctx.grid(), log, sft,
incIdx).reduce(results);
+ IdleVerifyResult.Builder bldr = IdleVerifyResult.builder();
+
+ for (Map.Entry<ClusterNode, IncrementalSnapshotVerificationTaskResult>
nodeRes: results.entrySet()) {
+ IncrementalSnapshotVerificationTaskResult res = nodeRes.getValue();
+
+ if (!F.isEmpty(res.partiallyCommittedTxs()))
+ bldr.addPartiallyCommited(nodeRes.getKey(),
res.partiallyCommittedTxs());
+
+ bldr.addPartitionHashes(res.partHashRes());
+
+ if (log.isDebugEnabled())
+ log.debug("Handle VerifyIncrementalSnapshotJob result [node="
+ nodeRes.getKey() + ", taskRes=" + res + ']');
+
+ bldr.addIncrementalHashRecords(nodeRes.getKey(), res.txHashRes());
+ }
+
+ return bldr.build();
}
/** */