This is an automated email from the ASF dual-hosted git repository.
timoninmaxim pushed a commit to branch IGNITE-22662__snapshot_refactoring
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to
refs/heads/IGNITE-22662__snapshot_refactoring by this push:
new 0abc9bae944 IGNITE-22662 : Incremental snapshot check as DP (#11495)
0abc9bae944 is described below
commit 0abc9bae944f6e510b048ef1d9c7eb2c8e79cd3a
Author: Vladimir Steshin <[email protected]>
AuthorDate: Wed Oct 9 16:08:16 2024 +0300
IGNITE-22662 : Incremental snapshot check as DP (#11495)
---
.../snapshot/IgniteSnapshotManager.java | 40 +-
...lt.java => IncrementalSnapshotCheckResult.java} | 30 +-
.../IncrementalSnapshotVerificationTask.java | 426 ------------------
.../persistence/snapshot/SnapshotCheckProcess.java | 147 ++++---
.../snapshot/SnapshotCheckProcessRequest.java | 12 +
.../persistence/snapshot/SnapshotChecker.java | 483 ++++++++++++++++++---
.../snapshot/SnapshotMetadataVerificationTask.java | 127 +-----
.../processors/cache/verify/IdleVerifyUtility.java | 10 +-
.../snapshot/IgniteClusterSnapshotCheckTest.java | 130 +++++-
9 files changed, 697 insertions(+), 708 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
index bb78b31bbde..2d16637af6e 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
@@ -17,10 +17,12 @@
package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+import java.io.BufferedInputStream;
import java.io.Closeable;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
+import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.nio.ByteBuffer;
@@ -469,7 +471,7 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
private final boolean sequentialWrite =
IgniteSystemProperties.getBoolean(IGNITE_SNAPSHOT_SEQUENTIAL_WRITE,
DFLT_IGNITE_SNAPSHOT_SEQUENTIAL_WRITE);
- /** Snapshot validator. */
+ /** Snapshot checker. */
private final SnapshotChecker snpChecker;
/**
@@ -495,7 +497,7 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
// Manage remote snapshots.
snpRmtMgr = new SequentialRemoteSnapshotManager();
- snpChecker = new SnapshotChecker(ctx, marsh,
ctx.pools().getSnapshotExecutorService(), U.resolveClassLoader(ctx.config()));
+ snpChecker = new SnapshotChecker(ctx, marsh,
ctx.pools().getSnapshotExecutorService());
}
/**
@@ -1902,8 +1904,8 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
", incIdx=" + incIdx + ", grps=" + grps + ", validateParts=" +
check + ']');
}
- if (check && incIdx < 1)
- return checkSnpProc.start(name, snpPath, grps,
includeCustomHandlers);
+ if (check && (incIdx < 1 || !includeCustomHandlers))
+ return checkSnpProc.start(name, snpPath, grps, incIdx,
includeCustomHandlers);
GridFutureAdapter<SnapshotPartitionsVerifyTaskResult> res = new
GridFutureAdapter<>();
@@ -1926,12 +1928,9 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
if (f0.error() == null && F.isEmpty(metasRes.exceptions())) {
Map<ClusterNode, List<SnapshotMetadata>> metas =
metasRes.meta();
- Class<? extends AbstractSnapshotVerificationTask> cls;
-
- if (includeCustomHandlers)
- cls = SnapshotHandlerRestoreTask.class;
- else
- cls = incIdx > 0 ?
IncrementalSnapshotVerificationTask.class : SnapshotPartitionsVerifyTask.class;
+ Class<? extends AbstractSnapshotVerificationTask> cls =
includeCustomHandlers
+ ? SnapshotHandlerRestoreTask.class
+ : SnapshotPartitionsVerifyTask.class;
kctx0.task().execute(
cls,
@@ -1994,8 +1993,18 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
* @param smf File denoting to snapshot metafile.
* @return Snapshot metadata instance.
*/
- private SnapshotMetadata readSnapshotMetadata(File smf) throws
IgniteCheckedException, IOException {
- return snpChecker.readSnapshotMetadata(smf);
+ SnapshotMetadata readSnapshotMetadata(File smf) throws
IgniteCheckedException, IOException {
+ SnapshotMetadata meta = readFromFile(smf);
+
+ String smfName = smf.getName().substring(0, smf.getName().length() -
SNAPSHOT_METAFILE_EXT.length());
+
+ if (!U.maskForFileName(meta.consistentId()).equals(smfName)) {
+ throw new IgniteException(
+ "Error reading snapshot metadata [smfName=" + smfName + ",
consId=" + U.maskForFileName(meta.consistentId())
+ );
+ }
+
+ return meta;
}
/**
@@ -2004,7 +2013,12 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
* @param <T> Type of metadata.
*/
public <T> T readFromFile(File smf) throws IgniteCheckedException,
IOException {
- return snpChecker.readFromFile(smf);
+ if (!smf.exists())
+ throw new IgniteCheckedException("Snapshot metafile cannot be read
due to it doesn't exist: " + smf);
+
+ try (InputStream in = new
BufferedInputStream(Files.newInputStream(smf.toPath()))) {
+ return marsh.unmarshal(in,
U.resolveClassLoader(cctx.gridConfig()));
+ }
}
/**
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotVerificationTaskResult.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotCheckResult.java
similarity index 71%
rename from
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotVerificationTaskResult.java
rename to
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotCheckResult.java
index 6dcd1515160..e50a9a79b9b 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotVerificationTaskResult.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotCheckResult.java
@@ -17,21 +17,17 @@
package org.apache.ignite.internal.processors.cache.persistence.snapshot;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
+import java.io.Serializable;
import java.util.Collection;
import java.util.Map;
-import org.apache.ignite.internal.dto.IgniteDataTransferObject;
import org.apache.ignite.internal.management.cache.PartitionKeyV2;
import org.apache.ignite.internal.pagemem.wal.record.DataEntry;
import
org.apache.ignite.internal.processors.cache.verify.PartitionHashRecordV2;
import
org.apache.ignite.internal.processors.cache.verify.TransactionsHashRecord;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.util.typedef.internal.U;
-/** Represents single job result for {@link
IncrementalSnapshotVerificationTask}. */
-class IncrementalSnapshotVerificationTaskResult extends
IgniteDataTransferObject {
+/** */
+class IncrementalSnapshotCheckResult implements Serializable {
/** */
private static final long serialVersionUID = 0L;
@@ -51,12 +47,12 @@ class IncrementalSnapshotVerificationTaskResult extends
IgniteDataTransferObject
private Collection<Exception> exceptions;
/** */
- public IncrementalSnapshotVerificationTaskResult() {
+ public IncrementalSnapshotCheckResult() {
// No-op.
}
/** */
- IncrementalSnapshotVerificationTaskResult(
+ IncrementalSnapshotCheckResult(
Map<Object, TransactionsHashRecord> txHashRes,
Map<PartitionKeyV2, PartitionHashRecordV2> partHashRes,
Collection<GridCacheVersion> partiallyCommittedTxs,
@@ -87,20 +83,4 @@ class IncrementalSnapshotVerificationTaskResult extends
IgniteDataTransferObject
public Collection<Exception> exceptions() {
return exceptions;
}
-
- /** {@inheritDoc} */
- @Override protected void writeExternalData(ObjectOutput out) throws
IOException {
- U.writeMap(out, txHashRes);
- U.writeMap(out, partHashRes);
- U.writeCollection(out, partiallyCommittedTxs);
- U.writeCollection(out, exceptions);
- }
-
- /** {@inheritDoc} */
- @Override protected void readExternalData(byte protoVer, ObjectInput in)
throws IOException, ClassNotFoundException {
- txHashRes = U.readMap(in);
- partHashRes = U.readMap(in);
- partiallyCommittedTxs = U.readCollection(in);
- exceptions = U.readCollection(in);
- }
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotVerificationTask.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotVerificationTask.java
deleted file mode 100644
index a6779e8e7e7..00000000000
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotVerificationTask.java
+++ /dev/null
@@ -1,426 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.persistence.snapshot;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.LongAdder;
-import java.util.function.BiConsumer;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.cache.CacheAtomicityMode;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.compute.ComputeJobResult;
-import org.apache.ignite.internal.management.cache.IdleVerifyResultV2;
-import org.apache.ignite.internal.management.cache.PartitionKeyV2;
-import org.apache.ignite.internal.processors.cache.GridCacheOperation;
-import org.apache.ignite.internal.processors.cache.GridLocalConfigManager;
-import org.apache.ignite.internal.processors.cache.StoredCacheData;
-import
org.apache.ignite.internal.processors.cache.verify.IdleVerifyUtility.VerifyPartitionContext;
-import
org.apache.ignite.internal.processors.cache.verify.PartitionHashRecordV2;
-import
org.apache.ignite.internal.processors.cache.verify.TransactionsHashRecord;
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.processors.cluster.BaselineTopology;
-import org.apache.ignite.internal.processors.task.GridInternal;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.CU;
-import org.apache.ignite.marshaller.MarshallerUtils;
-import org.apache.ignite.transactions.TransactionState;
-import org.jetbrains.annotations.Nullable;
-
-import static
org.apache.ignite.internal.managers.discovery.ConsistentIdMapper.ALL_NODES;
-import static
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.databaseRelativePath;
-
-/** */
-@GridInternal
-public class IncrementalSnapshotVerificationTask extends
AbstractSnapshotVerificationTask {
- /** Serial version uid. */
- private static final long serialVersionUID = 0L;
-
- /** {@inheritDoc} */
- @Override public SnapshotPartitionsVerifyTaskResult
reduce(List<ComputeJobResult> results) throws IgniteException {
- Map<Object, Map<Object, TransactionsHashRecord>> nodeTxHashMap = new
HashMap<>();
-
- List<List<TransactionsHashRecord>> txHashConflicts = new ArrayList<>();
- Map<PartitionKeyV2, List<PartitionHashRecordV2>> partHashes = new
HashMap<>();
- Map<ClusterNode, Collection<GridCacheVersion>> partiallyCommittedTxs =
new HashMap<>();
-
- Map<ClusterNode, Exception> errors = new HashMap<>();
-
- for (ComputeJobResult nodeRes: results) {
- if (nodeRes.getException() != null) {
- errors.put(nodeRes.getNode(), nodeRes.getException());
-
- continue;
- }
-
- IncrementalSnapshotVerificationTaskResult res = nodeRes.getData();
-
- if (!F.isEmpty(res.exceptions())) {
- errors.put(nodeRes.getNode(), F.first(res.exceptions()));
-
- continue;
- }
-
- if (!F.isEmpty(res.partiallyCommittedTxs()))
- partiallyCommittedTxs.put(nodeRes.getNode(),
res.partiallyCommittedTxs());
-
- for (Map.Entry<PartitionKeyV2, PartitionHashRecordV2> entry:
res.partHashRes().entrySet())
- partHashes.computeIfAbsent(entry.getKey(), v -> new
ArrayList<>()).add(entry.getValue());
-
- if (log.isDebugEnabled())
- log.debug("Handle VerifyIncrementalSnapshotJob result [node="
+ nodeRes.getNode() + ", taskRes=" + res + ']');
-
- nodeTxHashMap.put(nodeRes.getNode().consistentId(),
res.txHashRes());
-
- Iterator<Map.Entry<Object, TransactionsHashRecord>> resIt =
res.txHashRes().entrySet().iterator();
-
- while (resIt.hasNext()) {
- Map.Entry<Object, TransactionsHashRecord> nodeTxHash =
resIt.next();
-
- Map<Object, TransactionsHashRecord> prevNodeTxHash =
nodeTxHashMap.get(nodeTxHash.getKey());
-
- if (prevNodeTxHash != null) {
- TransactionsHashRecord hash = nodeTxHash.getValue();
- TransactionsHashRecord prevHash =
prevNodeTxHash.remove(hash.localConsistentId());
-
- if (prevHash == null || prevHash.transactionHash() !=
hash.transactionHash())
- txHashConflicts.add(F.asList(hash, prevHash));
-
- resIt.remove();
- }
- }
- }
-
- // Add all missed pairs to conflicts.
- nodeTxHashMap.values().stream()
- .flatMap(e -> e.values().stream())
- .forEach(e -> txHashConflicts.add(F.asList(e, null)));
-
- return new SnapshotPartitionsVerifyTaskResult(
- metas,
- errors.isEmpty() ?
- new IdleVerifyResultV2(partHashes, txHashConflicts,
partiallyCommittedTxs)
- : new IdleVerifyResultV2(errors));
- }
-
- /** {@inheritDoc} */
- @Override protected VerifyIncrementalSnapshotJob createJob(String name,
String consId, SnapshotPartitionsVerifyTaskArg args) {
- return new VerifyIncrementalSnapshotJob(name, args.snapshotPath(),
args.incrementIndex(), consId);
- }
-
- /** */
- private static class VerifyIncrementalSnapshotJob extends
AbstractSnapshotVerificationJob {
- /** Serial version uid. */
- private static final long serialVersionUID = 0L;
-
- /** Incremental snapshot index. */
- private final int incIdx;
-
- /** */
- private LongAdder procEntriesCnt;
-
- /**
- * @param snpName Snapshot name.
- * @param snpPath Snapshot directory path.
- * @param incIdx Incremental snapshot index.
- * @param consId Consistent id of the related node.
- */
- public VerifyIncrementalSnapshotJob(
- String snpName,
- @Nullable String snpPath,
- int incIdx,
- String consId
- ) {
- super(snpName, snpPath, consId, null, true);
-
- this.incIdx = incIdx;
- }
-
- /**
- * @return Map containing calculated transactions hash for every
remote node in the cluster.
- */
- @Override public IncrementalSnapshotVerificationTaskResult execute()
throws IgniteException {
- try {
- if (log.isInfoEnabled()) {
- log.info("Verify incremental snapshot procedure has been
initiated " +
- "[snpName=" + snpName + ", incrementIndex=" + incIdx +
", consId=" + consId + ']');
- }
-
- if (incIdx <= 0)
- return new IncrementalSnapshotVerificationTaskResult();
-
- BaselineTopology blt =
ignite.context().state().clusterState().baselineTopology();
-
- checkBaseline(blt);
-
- Map<Integer, StoredCacheData> txCaches = readTxCachesData();
-
- AtomicLong procSegCnt = new AtomicLong();
-
- IncrementalSnapshotProcessor proc = new
IncrementalSnapshotProcessor(
- ignite.context().cache().context(), snpName, snpPath,
incIdx, txCaches.keySet()
- ) {
- @Override void totalWalSegments(int segCnt) {
- // No-op.
- }
-
- @Override void processedWalSegments(int segCnt) {
- procSegCnt.set(segCnt);
- }
-
- @Override void initWalEntries(LongAdder entriesCnt) {
- procEntriesCnt = entriesCnt;
- }
- };
-
- short locNodeId = blt.consistentIdMapping().get(consId);
-
- Set<GridCacheVersion> activeDhtTxs = new HashSet<>();
- Map<GridCacheVersion, Set<Short>> txPrimParticipatingNodes =
new HashMap<>();
- Map<Short, HashHolder> nodesTxHash = new HashMap<>();
-
- Set<GridCacheVersion> partiallyCommittedTxs = new HashSet<>();
- // Hashes in this map calculated based on WAL records only,
not part-X.bin data.
- Map<PartitionKeyV2, HashHolder> partMap = new HashMap<>();
- List<Exception> exceptions = new ArrayList<>();
-
- Function<Short, HashHolder> hashHolderBuilder = (k) -> new
HashHolder();
-
- BiConsumer<GridCacheVersion, Set<Short>> calcTxHash = (xid,
participatingNodes) -> {
- for (short nodeId: participatingNodes) {
- if (nodeId != locNodeId) {
- HashHolder hash =
nodesTxHash.computeIfAbsent(nodeId, hashHolderBuilder);
-
- hash.increment(xid.hashCode(), 0);
- }
- }
- };
-
- // CacheId -> CacheGrpId.
- Map<Integer, Integer> cacheGrpId = txCaches.values().stream()
- .collect(Collectors.toMap(
- StoredCacheData::cacheId,
- cacheData ->
CU.cacheGroupId(cacheData.config().getName(), cacheData.config().getGroupName())
- ));
-
- LongAdder procTxCnt = new LongAdder();
-
- proc.process(dataEntry -> {
- if (dataEntry.op() == GridCacheOperation.READ ||
!exceptions.isEmpty())
- return;
-
- if (log.isTraceEnabled())
- log.trace("Checking data entry [entry=" + dataEntry +
']');
-
- if (!activeDhtTxs.contains(dataEntry.writeVersion()))
- partiallyCommittedTxs.add(dataEntry.nearXidVersion());
-
- StoredCacheData cacheData =
txCaches.get(dataEntry.cacheId());
-
- PartitionKeyV2 partKey = new PartitionKeyV2(
- cacheGrpId.get(dataEntry.cacheId()),
- dataEntry.partitionId(),
- CU.cacheOrGroupName(cacheData.config()));
-
- HashHolder hash = partMap.computeIfAbsent(partKey, (k) ->
new HashHolder());
-
- try {
- int valHash = dataEntry.key().hashCode();
-
- if (dataEntry.value() != null)
- valHash +=
Arrays.hashCode(dataEntry.value().valueBytes(null));
-
- int verHash = dataEntry.writeVersion().hashCode();
-
- hash.increment(valHash, verHash);
- }
- catch (IgniteCheckedException ex) {
- exceptions.add(ex);
- }
- }, txRec -> {
- if (!exceptions.isEmpty())
- return;
-
- if (log.isDebugEnabled())
- log.debug("Checking tx record [txRec=" + txRec + ']');
-
- if (txRec.state() == TransactionState.PREPARED) {
- // Collect only primary nodes. For some cases backup
nodes is included into TxRecord#participationNodes()
- // but actually doesn't even start transaction, for
example, if the node participates only as a backup
- // of reading only keys.
- Set<Short> primParticipatingNodes =
txRec.participatingNodes().keySet();
-
- if (primParticipatingNodes.contains(locNodeId)) {
-
txPrimParticipatingNodes.put(txRec.nearXidVersion(), primParticipatingNodes);
- activeDhtTxs.add(txRec.writeVersion());
- }
- else {
- for (Collection<Short> backups:
txRec.participatingNodes().values()) {
- if (backups.contains(ALL_NODES) ||
backups.contains(locNodeId))
- activeDhtTxs.add(txRec.writeVersion());
- }
- }
- }
- else if (txRec.state() == TransactionState.COMMITTED) {
- activeDhtTxs.remove(txRec.writeVersion());
-
- Set<Short> participatingNodes =
txPrimParticipatingNodes.remove(txRec.nearXidVersion());
-
- // Legal cases:
- // 1. This node is a transaction near node, but not
primary or backup node.
- // 2. This node participated in the transaction
multiple times (e.g., primary for one key and backup for other key).
- // 3. A transaction is included into previous
incremental snapshot.
- if (participatingNodes == null)
- return;
-
- procTxCnt.increment();
-
- calcTxHash.accept(txRec.nearXidVersion(),
participatingNodes);
- }
- else if (txRec.state() == TransactionState.ROLLED_BACK) {
- activeDhtTxs.remove(txRec.writeVersion());
-
txPrimParticipatingNodes.remove(txRec.nearXidVersion());
- }
- });
-
- // All active transactions that didn't log COMMITTED or
ROLL_BACK records are considered committed.
- // It is possible as incremental snapshot started after
transaction left IgniteTxManager#activeTransactions() collection,
- // but completed before the final TxRecord was written.
- for (Map.Entry<GridCacheVersion, Set<Short>> tx:
txPrimParticipatingNodes.entrySet())
- calcTxHash.accept(tx.getKey(), tx.getValue());
-
- Map<Object, TransactionsHashRecord> txHashRes =
nodesTxHash.entrySet().stream()
- .map(e -> new TransactionsHashRecord(
- consId,
- blt.compactIdMapping().get(e.getKey()),
- e.getValue().hash
- ))
- .collect(Collectors.toMap(
- TransactionsHashRecord::remoteConsistentId,
- Function.identity()
- ));
-
- Map<PartitionKeyV2, PartitionHashRecordV2> partHashRes =
partMap.entrySet().stream()
- .collect(Collectors.toMap(
- Map.Entry::getKey,
- e -> new PartitionHashRecordV2(
- e.getKey(),
- false,
- consId,
- null,
- 0,
- null,
- new VerifyPartitionContext(e.getValue())
- )
- ));
-
- if (log.isInfoEnabled()) {
- log.info("Verify incremental snapshot procedure finished "
+
- "[snpName=" + snpName + ", incrementIndex=" + incIdx +
", consId=" + consId +
- ", txCnt=" + procTxCnt.sum() + ", dataEntries=" +
procEntriesCnt.sum() +
- ", walSegments=" + procSegCnt.get() + ']');
- }
-
- return new IncrementalSnapshotVerificationTaskResult(
- txHashRes,
- partHashRes,
- partiallyCommittedTxs,
- exceptions);
- }
- catch (IgniteCheckedException | IOException e) {
- throw new IgniteException(e);
- }
- }
-
- /** Checks that current baseline topology matches baseline topology of
the snapshot. */
- private void checkBaseline(BaselineTopology blt) throws
IgniteCheckedException, IOException {
- IgniteSnapshotManager snpMgr =
ignite.context().cache().context().snapshotMgr();
-
- File snpDir = snpMgr.snapshotLocalDir(snpName, snpPath);
- SnapshotMetadata meta = snpMgr.readSnapshotMetadata(snpDir,
ignite.localNode().consistentId().toString());
-
- if (!F.eqNotOrdered(blt.consistentIds(), meta.baselineNodes())) {
- throw new IgniteCheckedException("Topologies of snapshot and
current cluster are different [snp=" +
- meta.baselineNodes() + ", current=" + blt.consistentIds()
+ ']');
- }
- }
-
- /** @return Collection of snapshotted transactional caches, key is a
cache ID. */
- private Map<Integer, StoredCacheData> readTxCachesData() throws
IgniteCheckedException, IOException {
- File snpDir =
ignite.context().cache().context().snapshotMgr().snapshotLocalDir(snpName,
snpPath);
-
- String folderName =
ignite.context().pdsFolderResolver().resolveFolders().folderName();
-
- return GridLocalConfigManager.readCachesData(
- new File(snpDir, databaseRelativePath(folderName)),
- MarshallerUtils.jdkMarshaller(ignite.name()),
- ignite.configuration())
- .values().stream()
- .filter(data -> data.config().getAtomicityMode() ==
CacheAtomicityMode.TRANSACTIONAL)
- .collect(Collectors.toMap(StoredCacheData::cacheId,
Function.identity()));
- }
-
- /** {@inheritDoc} */
- @Override public boolean equals(Object o) {
- if (this == o)
- return true;
-
- if (o == null || getClass() != o.getClass())
- return false;
-
- VerifyIncrementalSnapshotJob job = (VerifyIncrementalSnapshotJob)o;
-
- return snpName.equals(job.snpName) && Objects.equals(incIdx,
job.incIdx) && Objects.equals(snpPath, job.snpPath)
- && consId.equals(job.consId);
- }
-
- /** {@inheritDoc} */
- @Override public int hashCode() {
- return Objects.hash(snpName, incIdx, snpPath, consId);
- }
- }
-
- /** Holder for calculated hashes. */
- public static class HashHolder {
- /** */
- public int hash;
-
- /** */
- public int verHash;
-
- /** */
- public void increment(int hash, int verHash) {
- this.hash += hash;
- this.verHash += verHash;
- }
- }
-}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotCheckProcess.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotCheckProcess.java
index 9d4a47040c8..e9f64247e82 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotCheckProcess.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotCheckProcess.java
@@ -27,6 +27,7 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
@@ -147,14 +148,25 @@ public class SnapshotCheckProcess {
assert results.values().stream().noneMatch(res -> res != null &&
res.metas != null);
- if (ctx.req.allRestoreHandlers()) {
+ SnapshotChecker checker =
kctx.cache().context().snapshotMgr().checker();
+
+ if (ctx.req.incrementalIndex() > 0) {
+ IdleVerifyResultV2 chkRes = checker.reduceIncrementalResults(
+ mapResults(results, ctx.req.nodes(),
SnapshotCheckResponse::incrementalResult),
+ mapErrors(errors)
+ );
+
+ clusterOpFut.onDone(new
SnapshotPartitionsVerifyTaskResult(ctx.clusterMetas, chkRes));
+ }
+ else if (ctx.req.allRestoreHandlers()) {
try {
if (!errors.isEmpty())
throw F.firstValue(errors);
- Map<ClusterNode, Map<String, SnapshotHandlerResult<?>>> cstRes
= mapCustomHandlersResults(results, ctx.req.nodes());
+ Map<ClusterNode, Map<String, SnapshotHandlerResult<?>>> cstRes
= mapResults(results, ctx.req.nodes(),
+ SnapshotCheckResponse::customHandlersResults);
-
kctx.cache().context().snapshotMgr().checker().checkCustomHandlersResults(ctx.req.snapshotName(),
cstRes);
+ checker.checkCustomHandlersResults(ctx.req.snapshotName(),
cstRes);
clusterOpFut.onDone(new
SnapshotPartitionsVerifyTaskResult(ctx.clusterMetas, null));
}
@@ -166,7 +178,8 @@ public class SnapshotCheckProcess {
Map<ClusterNode, Exception> errors0 = mapErrors(errors);
if (!results.isEmpty()) {
- Map<ClusterNode, Map<PartitionKeyV2, PartitionHashRecordV2>>
results0 = mapPartsHashes(results, ctx.req.nodes());
+ Map<ClusterNode, Map<PartitionKeyV2, PartitionHashRecordV2>>
results0 = mapResults(results, ctx.req.nodes(),
+ SnapshotCheckResponse::partsHashes);
IdleVerifyResultV2 chkRes =
SnapshotChecker.reduceHashesResults(results0, errors0);
@@ -197,16 +210,29 @@ public class SnapshotCheckProcess {
// Might be already finished by asynchronous leave of a required node.
if (!phaseFut.isDone()) {
- CompletableFuture<? extends Map<?, ?>> workingFut =
req.allRestoreHandlers()
- ? snpMgr.checker().invokeCustomHandlers(ctx.locMeta,
req.snapshotPath(), req.groups(), true)
- : snpMgr.checker().checkPartitions(ctx.locMeta,
snpMgr.snapshotLocalDir(req.snapshotName(), req.snapshotPath()),
- req.groups(), false, true, false);
+ CompletableFuture<?> workingFut;
+
+ if (req.incrementalIndex() > 0) {
+ assert !req.allRestoreHandlers() : "Snapshot handlers aren't
supported for incremental snapshot.";
+
+ workingFut =
snpMgr.checker().checkIncrementalSnapshot(req.snapshotName(),
req.snapshotPath(), req.incrementalIndex());
+ }
+ else {
+ workingFut = req.allRestoreHandlers()
+ ? snpMgr.checker().invokeCustomHandlers(ctx.locMeta,
req.snapshotPath(), req.groups(), true)
+ : snpMgr.checker().checkPartitions(ctx.locMeta,
snpMgr.snapshotLocalDir(req.snapshotName(), req.snapshotPath()),
+ req.groups(), false, true, false);
+ }
workingFut.whenComplete((res, err) -> {
if (err != null)
phaseFut.onDone(err);
- else
- phaseFut.onDone(new SnapshotCheckResponse(res));
+ else {
+ if (req.incrementalIndex() > 0)
+ phaseFut.onDone(new
SnapshotCheckResponse((IncrementalSnapshotCheckResult)res));
+ else
+ phaseFut.onDone(new SnapshotCheckResponse((Map<?,
?>)res));
+ }
});
}
@@ -221,38 +247,25 @@ public class SnapshotCheckProcess {
}
/** */
- private Map<ClusterNode, Map<PartitionKeyV2, PartitionHashRecordV2>>
mapPartsHashes(
+ private <T> Map<ClusterNode, T> mapResults(
Map<UUID, SnapshotCheckResponse> results,
- Collection<UUID> requiredNodes
+ Set<UUID> requiredNodes,
+ Function<SnapshotCheckResponse, T> resExtractor
) {
return results.entrySet().stream()
.filter(e -> requiredNodes.contains(e.getKey()) && e.getValue() !=
null)
- .collect(Collectors.toMap(e ->
kctx.cluster().get().node(e.getKey()), e -> e.getValue().partsHashes()));
- }
-
- /** */
- private Map<ClusterNode, Map<String, SnapshotHandlerResult<?>>>
mapCustomHandlersResults(
- Map<UUID, SnapshotCheckResponse> results,
- Set<UUID> requiredNodes
- ) {
- return results.entrySet().stream()
- .filter(e -> requiredNodes.contains(e.getKey()) && e.getValue() !=
null)
- .collect(Collectors.toMap(e ->
kctx.cluster().get().node(e.getKey()), e ->
e.getValue().customHandlersResults()));
+ .collect(Collectors.toMap(e ->
kctx.cluster().get().node(e.getKey()), e -> resExtractor.apply(e.getValue())));
}
/**
- * @param snpName Snapshot name of the validation process. If {@code
null}, ignored.
- * @param reqId If {@code snpName} is {@code null}, is used to find the
operation request.
- * @return Current snapshot checking context by {@code snpName} or {@code
reqId}.
+ * @param snpName Snapshot name. If {@code null}, ignored.
+ * @param reqId If {@code ctxId} is {@code null}, is used to find the
operation context.
+ * @return Current snapshot checking context by {@code ctxId} or {@code
reqId}.
*/
private @Nullable SnapshotCheckContext context(@Nullable String snpName,
UUID reqId) {
- SnapshotCheckContext ctx = snpName == null
+ return snpName == null
? contexts.values().stream().filter(ctx0 ->
ctx0.req.requestId().equals(reqId)).findFirst().orElse(null)
: contexts.get(snpName);
-
- assert ctx == null || ctx.req.requestId().equals(reqId);
-
- return ctx;
}
/** Phase 1 beginning: prepare, collect and check local metas. */
@@ -289,6 +302,7 @@ public class SnapshotCheckProcess {
if (!phaseFut.isDone()) {
snpMgr.checker().checkLocalMetas(
snpMgr.snapshotLocalDir(req.snapshotName(),
req.snapshotPath()),
+ req.incrementalIndex(),
grpIds,
kctx.cluster().get().localNode().consistentId()
).whenComplete((locMetas, err) -> {
@@ -308,9 +322,7 @@ public class SnapshotCheckProcess {
Map<UUID, SnapshotCheckResponse> results,
Map<UUID, Throwable> errors
) {
- String snpName = snpName(results);
-
- SnapshotCheckContext ctx = context(snpName, reqId);
+ SnapshotCheckContext ctx = context(null, reqId);
// The context is not stored in the case of concurrent check of the
same snapshot but the operation future is registered.
GridFutureAdapter<SnapshotPartitionsVerifyTaskResult> clusterOpFut =
clusterOpFuts.get(reqId);
@@ -369,27 +381,13 @@ public class SnapshotCheckProcess {
}
}
- /** Finds current snapshot name from the metas. */
- private @Nullable String snpName(Map<UUID, SnapshotCheckResponse> results)
{
- for (SnapshotCheckResponse nodeRes : results.values()) {
- if (nodeRes == null || F.isEmpty(nodeRes.metas))
- continue;
-
- assert nodeRes.metas.get(0) != null : "Empty snapshot metadata in
the results";
- assert !F.isEmpty(nodeRes.metas.get(0).snapshotName()) : "Empty
snapshot name in a snapshot metadata.";
-
- return nodeRes.metas.get(0).snapshotName();
- }
-
- return null;
- }
-
/**
* Starts the snapshot validation process.
*
* @param snpName Snapshot name.
* @param snpPath Snapshot directory path.
* @param grpNames List of cache group names.
+ * @param incIdx Incremental snapshot index. If not positive, snapshot is
not considered as incremental.
* @param allRestoreHandlers If {@code true}, all the registered {@link
IgniteSnapshotManager#handlers()} of type
* {@link SnapshotHandlerType#RESTORE} are invoked.
Otherwise, only snapshot metadatas and partition
* hashes are validated.
@@ -398,6 +396,7 @@ public class SnapshotCheckProcess {
String snpName,
@Nullable String snpPath,
@Nullable Collection<String> grpNames,
+ int incIdx,
boolean allRestoreHandlers
) {
assert !F.isEmpty(snpName);
@@ -415,6 +414,7 @@ public class SnapshotCheckProcess {
snpName,
snpPath,
grpNames,
+ incIdx,
allRestoreHandlers
);
@@ -475,32 +475,65 @@ public class SnapshotCheckProcess {
/** Serial version uid. */
private static final long serialVersionUID = 0L;
- /** Metas for the phase 1. Is always {@code null} for the phase 2. */
+ /** @see #metas() */
@Nullable private final List<SnapshotMetadata> metas;
- /** Node's partition hashes for the phase 2. Is always {@code null}
for the phase 1. */
+ /**
+ * @see #partsHashes()
+ * @see #customHandlersResults()
+ */
@Nullable private final Map<?, ?> partsResults;
+ /** @see #incrementalResult() */
+ @Nullable private final IncrementalSnapshotCheckResult incRes;
+
/** Ctor for the phase 1. */
private SnapshotCheckResponse(@Nullable List<SnapshotMetadata> metas) {
this.metas = metas;
this.partsResults = null;
+ this.incRes = null;
}
- /** Ctor for the phase 2. */
- private SnapshotCheckResponse(@Nullable Map<?, ?> partsResults) {
+ /** Ctor for the phase 2 for normal snapshot. */
+ private SnapshotCheckResponse(Map<?, ?> partsResults) {
this.metas = null;
this.partsResults = partsResults;
+ this.incRes = null;
}
- /** */
- private Map<PartitionKeyV2, PartitionHashRecordV2> partsHashes() {
+ /** Ctor for the phase 2 for incremental snapshot. */
+ private SnapshotCheckResponse(IncrementalSnapshotCheckResult incRes) {
+ this.metas = null;
+ this.partsResults = null;
+ this.incRes = incRes;
+ }
+
+ /** Metas for the phase 1. Is always {@code null} for the phase 2. */
+ @Nullable private List<SnapshotMetadata> metas() {
+ return metas;
+ }
+
+ /**
+ * Node's partition hashes for the phase 2. Is always {@code null} for
the phase 1 or in case of incremental
+ * snapshot.
+ */
+ private @Nullable Map<PartitionKeyV2, PartitionHashRecordV2>
partsHashes() {
return (Map<PartitionKeyV2, PartitionHashRecordV2>)partsResults;
}
- /** */
- private Map<String, SnapshotHandlerResult<?>> customHandlersResults() {
+ /**
+ * Results of the custom handlers for the phase 2. Is always {@code
null} for the phase 1 or in case of incremental
+ * snapshot.
+ *
+ * @see IgniteSnapshotManager#handlers()
+ */
+ private @Nullable Map<String, SnapshotHandlerResult<?>>
customHandlersResults() {
return (Map<String, SnapshotHandlerResult<?>>)partsResults;
}
+
+ /** Incremental snapshot result for the phase 2. Is always {@code
null} for the phase 1 or in case of normal snapshot. */
+ private @Nullable IncrementalSnapshotCheckResult incrementalResult() {
+ return incRes;
+ }
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotCheckProcessRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotCheckProcessRequest.java
index b907f520a61..d1b38d4d512 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotCheckProcessRequest.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotCheckProcessRequest.java
@@ -40,6 +40,10 @@ public class SnapshotCheckProcessRequest extends
AbstractSnapshotOperationReques
@GridToStringInclude
private final boolean allRestoreHandlers;
+ /** Incremental snapshot index. If not positive, snapshot is not
considered as incremental. */
+ @GridToStringInclude
+ private final int incIdx;
+
/**
* Creates snapshot check process request.
*
@@ -48,6 +52,7 @@ public class SnapshotCheckProcessRequest extends
AbstractSnapshotOperationReques
* @param nodes Baseline node IDs that must be alive to complete the
operation..
* @param snpPath Snapshot directory path.
* @param grps List of cache group names.
+ * @param incIdx Incremental snapshot index. If not positive, snapshot is
not considered as incremental.
* @param allRestoreHandlers If {@code true}, all the registered {@link
IgniteSnapshotManager#handlers()} of type
* {@link SnapshotHandlerType#RESTORE} are
invoked. Otherwise, only snapshot metadatas and
* partition hashes are validated.
@@ -58,6 +63,7 @@ public class SnapshotCheckProcessRequest extends
AbstractSnapshotOperationReques
String snpName,
String snpPath,
@Nullable Collection<String> grps,
+ int incIdx,
boolean allRestoreHandlers
) {
super(reqId, snpName, snpPath, grps, 0, nodes);
@@ -65,6 +71,7 @@ public class SnapshotCheckProcessRequest extends
AbstractSnapshotOperationReques
assert !F.isEmpty(nodes);
this.allRestoreHandlers = allRestoreHandlers;
+ this.incIdx = incIdx;
}
/**
@@ -75,6 +82,11 @@ public class SnapshotCheckProcessRequest extends
AbstractSnapshotOperationReques
return allRestoreHandlers;
}
+ /** @return Incremental snapshot index. If not positive, snapshot is not
considered as incremental. */
+ public int incrementalIndex() {
+ return incIdx;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(SnapshotCheckProcessRequest.class, this,
super.toString());
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotChecker.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotChecker.java
index feca6343058..8a12db38ffd 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotChecker.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotChecker.java
@@ -16,10 +16,8 @@
*/
package org.apache.ignite.internal.processors.cache.persistence.snapshot;
-import java.io.BufferedInputStream;
import java.io.File;
import java.io.IOException;
-import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.file.DirectoryStream;
@@ -31,6 +29,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -38,10 +37,15 @@ import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.LongAdder;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.dump.DumpEntry;
import org.apache.ignite.internal.GridKernalContext;
@@ -52,6 +56,8 @@ import
org.apache.ignite.internal.managers.encryption.GroupKey;
import org.apache.ignite.internal.managers.encryption.GroupKeyEncrypted;
import org.apache.ignite.internal.pagemem.store.PageStore;
import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.GridCacheOperation;
+import org.apache.ignite.internal.processors.cache.GridLocalConfigManager;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.StoredCacheData;
import
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
@@ -64,8 +70,14 @@ import
org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
import
org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusMetaIO;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
import
org.apache.ignite.internal.processors.cache.persistence.tree.io.PagePartitionMetaIO;
+import
org.apache.ignite.internal.processors.cache.persistence.wal.FileDescriptor;
+import
org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
+import
org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory;
import org.apache.ignite.internal.processors.cache.verify.IdleVerifyUtility;
import
org.apache.ignite.internal.processors.cache.verify.PartitionHashRecordV2;
+import
org.apache.ignite.internal.processors.cache.verify.TransactionsHashRecord;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.processors.cluster.BaselineTopology;
import org.apache.ignite.internal.processors.compress.CompressionProcessor;
import org.apache.ignite.internal.util.GridUnsafe;
import org.apache.ignite.internal.util.lang.GridIterator;
@@ -74,10 +86,13 @@ import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.marshaller.MarshallerUtils;
import org.apache.ignite.spi.encryption.EncryptionSpi;
import org.apache.ignite.spi.encryption.noop.NoopEncryptionSpi;
+import org.apache.ignite.transactions.TransactionState;
import org.jetbrains.annotations.Nullable;
+import static
org.apache.ignite.internal.managers.discovery.ConsistentIdMapper.ALL_NODES;
import static org.apache.ignite.internal.pagemem.PageIdAllocator.FLAG_DATA;
import static org.apache.ignite.internal.pagemem.PageIdAllocator.FLAG_IDX;
import static
org.apache.ignite.internal.pagemem.PageIdAllocator.INDEX_PARTITION;
@@ -93,6 +108,8 @@ import static
org.apache.ignite.internal.processors.cache.persistence.file.FileP
import static
org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId.getTypeByPartId;
import static
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.SNAPSHOT_METAFILE_EXT;
import static
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.databaseRelativePath;
+import static
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.incrementalSnapshotWalsDir;
+import static
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.snapshotMetaFileName;
import static
org.apache.ignite.internal.processors.cache.persistence.snapshot.dump.CreateDumpFutureTask.DUMP_FILE_EXT;
import static
org.apache.ignite.internal.processors.cache.verify.IdleVerifyUtility.calculatePartitionHash;
import static
org.apache.ignite.internal.processors.cache.verify.IdleVerifyUtility.checkPartitionsPageCrcSum;
@@ -105,12 +122,6 @@ public class SnapshotChecker {
/** */
@Nullable protected final GridKernalContext kctx;
- /** */
- protected final Marshaller marshaller;
-
- /** */
- @Nullable protected final ClassLoader marshallerClsLdr;
-
/** */
protected final EncryptionSpi encryptionSpi;
@@ -121,14 +132,10 @@ public class SnapshotChecker {
public SnapshotChecker(
GridKernalContext kctx,
Marshaller marshaller,
- ExecutorService executorSrvc,
- @Nullable ClassLoader marshallerClsLdr
+ ExecutorService executorSrvc
) {
this.kctx = kctx;
- this.marshaller = marshaller;
- this.marshallerClsLdr = marshallerClsLdr;
-
this.encryptionSpi = kctx.config().getEncryptionSpi() == null ? new
NoopEncryptionSpi() : kctx.config().getEncryptionSpi();
this.executor = executorSrvc;
@@ -137,7 +144,7 @@ public class SnapshotChecker {
}
/** */
- protected List<SnapshotMetadata> readSnapshotMetadatas(File snpFullPath,
@Nullable Object nodeConstId) {
+ protected List<SnapshotMetadata> readSnapshotMetadatas(File snpFullPath,
Object nodeConstId) {
if (!(snpFullPath.exists() && snpFullPath.isDirectory()))
return Collections.emptyList();
@@ -161,7 +168,7 @@ public class SnapshotChecker {
try {
for (File smf : smfs) {
- SnapshotMetadata curr = readSnapshotMetadata(smf);
+ SnapshotMetadata curr =
kctx.cache().context().snapshotMgr().readSnapshotMetadata(smf);
if (prev != null && !prev.sameSnapshot(curr)) {
throw new IgniteException("Snapshot metadata files are
from different snapshots " +
@@ -192,37 +199,15 @@ public class SnapshotChecker {
}
}
- /** */
- public SnapshotMetadata readSnapshotMetadata(File smf)
- throws IgniteCheckedException, IOException {
- SnapshotMetadata meta = readFromFile(smf);
-
- String smfName = smf.getName().substring(0, smf.getName().length() -
SNAPSHOT_METAFILE_EXT.length());
-
- if (!U.maskForFileName(meta.consistentId()).equals(smfName)) {
- throw new IgniteException("Error reading snapshot metadata
[smfName=" + smfName + ", consId="
- + U.maskForFileName(meta.consistentId()));
- }
-
- return meta;
- }
-
- /** */
- public <T> T readFromFile(File smf)
- throws IOException, IgniteCheckedException {
- if (!smf.exists())
- throw new IgniteCheckedException("Snapshot metafile cannot be read
due to it doesn't exist: " + smf);
-
- try (InputStream in = new
BufferedInputStream(Files.newInputStream(smf.toPath()))) {
- return marshaller.unmarshal(in, marshallerClsLdr);
- }
- }
-
/** Launches local metas checking. */
- public CompletableFuture<List<SnapshotMetadata>> checkLocalMetas(File
snpPath, @Nullable Collection<Integer> grpIds,
- @Nullable Object locNodeCstId) {
+ public CompletableFuture<List<SnapshotMetadata>> checkLocalMetas(
+ File snpDir,
+ int incIdx,
+ @Nullable Collection<Integer> grpIds,
+ Object consId
+ ) {
return CompletableFuture.supplyAsync(() -> {
- List<SnapshotMetadata> snpMetas = readSnapshotMetadatas(snpPath,
locNodeCstId);
+ List<SnapshotMetadata> snpMetas = readSnapshotMetadatas(snpDir,
consId);
for (SnapshotMetadata meta : snpMetas) {
byte[] snpMasterKeyDigest = meta.masterKeyDigest();
@@ -265,10 +250,387 @@ public class SnapshotChecker {
}
}
+ if (incIdx > 0) {
+ List<SnapshotMetadata> metas = snpMetas.stream().filter(m ->
m.consistentId().equals(String.valueOf(consId)))
+ .collect(Collectors.toList());
+
+ if (metas.size() != 1) {
+ throw new IgniteException("Failed to find single snapshot
metafile on local node [locNodeId="
+ + consId + ", metas=" + snpMetas + ", snpName=" +
snpDir.getName()
+ + ", snpPath=" + snpDir.getParent() + "]. Incremental
snapshots requires exactly one meta file " +
+ "per node because they don't support restoring on a
different topology.");
+ }
+
+ checkIncrementalSnapshotsExist(metas.get(0), snpDir, incIdx);
+ }
+
return snpMetas;
}, executor);
}
+ /** Checks that all incremental snapshots are present, contain correct
metafile and WAL segments. */
+ private void checkIncrementalSnapshotsExist(SnapshotMetadata fullMeta,
File snpDir, int incIdx) {
+ try {
+ // Incremental snapshot must contain ClusterSnapshotRecord.
+ long startSeg = fullMeta.snapshotRecordPointer().index();
+
+ String snpName = fullMeta.snapshotName();
+
+ for (int inc = 1; inc <= incIdx; inc++) {
+ File incSnpDir =
kctx.cache().context().snapshotMgr().incrementalSnapshotLocalDir(snpName,
snpDir.getParent(), inc);
+
+ if (!incSnpDir.exists()) {
+ throw new IllegalArgumentException("No incremental
snapshot found " +
+ "[snpName=" + snpName + ", snpPath=" +
snpDir.getParent() + ", incrementIndex=" + inc + ']');
+ }
+
+ String metaFileName =
snapshotMetaFileName(kctx.pdsFolderResolver().resolveFolders().folderName());
+
+ File metafile =
incSnpDir.toPath().resolve(metaFileName).toFile();
+
+ IncrementalSnapshotMetadata incMeta =
kctx.cache().context().snapshotMgr().readFromFile(metafile);
+
+ if (!incMeta.matchBaseSnapshot(fullMeta)) {
+ throw new IllegalArgumentException("Incremental snapshot
doesn't match full snapshot " +
+ "[incMeta=" + incMeta + ", fullMeta=" + fullMeta +
']');
+ }
+
+ if (incMeta.incrementIndex() != inc) {
+ throw new IgniteException(
+ "Incremental snapshot meta has wrong index
[expectedIdx=" + inc + ", meta=" + incMeta + ']');
+ }
+
+ checkWalSegments(incMeta, startSeg,
incrementalSnapshotWalsDir(incSnpDir, incMeta.folderName()));
+
+ // Incremental snapshots must not cross each other.
+ startSeg = incMeta.incrementalSnapshotPointer().index() + 1;
+ }
+ }
+ catch (IgniteCheckedException | IOException e) {
+ throw new IgniteException(e);
+ }
+ }
+
+ /** Check that incremental snapshot contains all required WAL segments.
Throws {@link IgniteException} in case of any errors. */
+ private void checkWalSegments(IncrementalSnapshotMetadata meta, long
startWalSeg, File incSnpWalDir) {
+ IgniteWalIteratorFactory factory = new IgniteWalIteratorFactory(log);
+
+ List<FileDescriptor> walSeg = factory.resolveWalFiles(
+ new IgniteWalIteratorFactory.IteratorParametersBuilder()
+ .filesOrDirs(incSnpWalDir.listFiles(file ->
+
FileWriteAheadLogManager.WAL_SEGMENT_FILE_COMPACTED_PATTERN.matcher(file.getName()).matches())));
+
+ if (walSeg.isEmpty())
+ throw new IgniteException("No WAL segments found for incremental
snapshot [dir=" + incSnpWalDir + ']');
+
+ long actFirstSeg = walSeg.get(0).idx();
+
+ if (actFirstSeg != startWalSeg) {
+ throw new IgniteException("Missed WAL segment
[expectFirstSegment=" + startWalSeg
+ + ", actualFirstSegment=" + actFirstSeg + ", meta=" + meta +
']');
+ }
+
+ long expLastSeg = meta.incrementalSnapshotPointer().index();
+ long actLastSeg = walSeg.get(walSeg.size() - 1).idx();
+
+ if (actLastSeg != expLastSeg) {
+ throw new IgniteException("Missed WAL segment [expectLastSegment="
+ startWalSeg
+ + ", actualLastSegment=" + actFirstSeg + ", meta=" + meta +
']');
+ }
+
+ List<?> walSegGaps = factory.hasGaps(walSeg);
+
+ if (!walSegGaps.isEmpty())
+ throw new IgniteException("Missed WAL segments [misses=" +
walSegGaps + ", meta=" + meta + ']');
+ }
+
+ /** */
+ public CompletableFuture<IncrementalSnapshotCheckResult>
checkIncrementalSnapshot(
+ String snpName,
+ @Nullable String snpPath,
+ int incIdx
+ ) {
+ assert incIdx > 0;
+
+ return CompletableFuture.supplyAsync(
+ () -> {
+ String consId =
kctx.cluster().get().localNode().consistentId().toString();
+
+ File snpDir =
kctx.cache().context().snapshotMgr().snapshotLocalDir(snpName, snpPath);
+
+ try {
+ if (log.isInfoEnabled()) {
+ log.info("Verify incremental snapshot procedure has
been initiated " +
+ "[snpName=" + snpName + ", incrementIndex=" +
incIdx + ", consId=" + consId + ']');
+ }
+
+ BaselineTopology blt =
kctx.state().clusterState().baselineTopology();
+
+ SnapshotMetadata meta =
kctx.cache().context().snapshotMgr().readSnapshotMetadata(snpDir, consId);
+
+ if (!F.eqNotOrdered(blt.consistentIds(),
meta.baselineNodes())) {
+ throw new IgniteCheckedException("Topologies of
snapshot and current cluster are different [snp=" +
+ meta.baselineNodes() + ", current=" +
blt.consistentIds() + ']');
+ }
+
+ Map<Integer, StoredCacheData> txCaches =
readTxCachesData(snpDir);
+
+ AtomicLong procSegCnt = new AtomicLong();
+
+ LongAdder procEntriesCnt = new LongAdder();
+
+ IncrementalSnapshotProcessor proc = new
IncrementalSnapshotProcessor(
+ kctx.cache().context(), snpName, snpPath, incIdx,
txCaches.keySet()
+ ) {
+ @Override void totalWalSegments(int segCnt) {
+ // No-op.
+ }
+
+ @Override void processedWalSegments(int segCnt) {
+ procSegCnt.set(segCnt);
+ }
+
+ @Override void initWalEntries(LongAdder entriesCnt) {
+ procEntriesCnt.add(entriesCnt.sum());
+ }
+ };
+
+ short locNodeId = blt.consistentIdMapping().get(consId);
+
+ Set<GridCacheVersion> activeDhtTxs = new HashSet<>();
+ Map<GridCacheVersion, Set<Short>> txPrimParticipatingNodes
= new HashMap<>();
+ Map<Short, HashHolder> nodesTxHash = new HashMap<>();
+
+ Set<GridCacheVersion> partiallyCommittedTxs = new
HashSet<>();
+ // Hashes in this map calculated based on WAL records
only, not part-X.bin data.
+ Map<PartitionKeyV2, HashHolder> partMap = new HashMap<>();
+ List<Exception> exceptions = new ArrayList<>();
+
+ Function<Short, HashHolder> hashHolderBuilder = (k) -> new
HashHolder();
+
+ BiConsumer<GridCacheVersion, Set<Short>> calcTxHash =
(xid, participatingNodes) -> {
+ for (short nodeId : participatingNodes) {
+ if (nodeId != locNodeId) {
+ HashHolder hash =
nodesTxHash.computeIfAbsent(nodeId, hashHolderBuilder);
+
+ hash.increment(xid.hashCode(), 0);
+ }
+ }
+ };
+
+ // CacheId -> CacheGrpId.
+ Map<Integer, Integer> cacheGrpId =
txCaches.values().stream()
+ .collect(Collectors.toMap(
+ StoredCacheData::cacheId,
+ cacheData ->
CU.cacheGroupId(cacheData.config().getName(), cacheData.config().getGroupName())
+ ));
+
+ LongAdder procTxCnt = new LongAdder();
+
+ proc.process(dataEntry -> {
+ if (dataEntry.op() == GridCacheOperation.READ ||
!exceptions.isEmpty())
+ return;
+
+ if (log.isTraceEnabled())
+ log.trace("Checking data entry [entry=" +
dataEntry + ']');
+
+ if (!activeDhtTxs.contains(dataEntry.writeVersion()))
+
partiallyCommittedTxs.add(dataEntry.nearXidVersion());
+
+ StoredCacheData cacheData =
txCaches.get(dataEntry.cacheId());
+
+ PartitionKeyV2 partKey = new PartitionKeyV2(
+ cacheGrpId.get(dataEntry.cacheId()),
+ dataEntry.partitionId(),
+ CU.cacheOrGroupName(cacheData.config()));
+
+ HashHolder hash = partMap.computeIfAbsent(partKey, (k)
-> new HashHolder());
+
+ try {
+ int valHash = dataEntry.key().hashCode();
+
+ if (dataEntry.value() != null)
+ valHash +=
Arrays.hashCode(dataEntry.value().valueBytes(null));
+
+ int verHash = dataEntry.writeVersion().hashCode();
+
+ hash.increment(valHash, verHash);
+ }
+ catch (IgniteCheckedException ex) {
+ exceptions.add(ex);
+ }
+ }, txRec -> {
+ if (!exceptions.isEmpty())
+ return;
+
+ if (log.isDebugEnabled())
+ log.debug("Checking tx record [txRec=" + txRec +
']');
+
+ if (txRec.state() == TransactionState.PREPARED) {
+ // Collect only primary nodes. For some cases
backup nodes is included into TxRecord#participationNodes()
+ // but actually doesn't even start transaction,
for example, if the node participates only as a backup
+ // of reading only keys.
+ Set<Short> primParticipatingNodes =
txRec.participatingNodes().keySet();
+
+ if (primParticipatingNodes.contains(locNodeId)) {
+
txPrimParticipatingNodes.put(txRec.nearXidVersion(), primParticipatingNodes);
+ activeDhtTxs.add(txRec.writeVersion());
+ }
+ else {
+ for (Collection<Short> backups :
txRec.participatingNodes().values()) {
+ if (backups.contains(ALL_NODES) ||
backups.contains(locNodeId))
+ activeDhtTxs.add(txRec.writeVersion());
+ }
+ }
+ }
+ else if (txRec.state() == TransactionState.COMMITTED) {
+ activeDhtTxs.remove(txRec.writeVersion());
+
+ Set<Short> participatingNodes =
txPrimParticipatingNodes.remove(txRec.nearXidVersion());
+
+ // Legal cases:
+ // 1. This node is a transaction near node, but
not primary or backup node.
+ // 2. This node participated in the transaction
multiple times (e.g., primary for one key and
+ // backup for other key).
+ // 3. A transaction is included into previous
incremental snapshot.
+ if (participatingNodes == null)
+ return;
+
+ procTxCnt.increment();
+
+ calcTxHash.accept(txRec.nearXidVersion(),
participatingNodes);
+ }
+ else if (txRec.state() ==
TransactionState.ROLLED_BACK) {
+ activeDhtTxs.remove(txRec.writeVersion());
+
txPrimParticipatingNodes.remove(txRec.nearXidVersion());
+ }
+ });
+
+ // All active transactions that didn't log COMMITTED or
ROLL_BACK records are considered committed.
+ // It is possible as incremental snapshot started after
transaction left IgniteTxManager#activeTransactions()
+ // collection, but completed before the final TxRecord was
written.
+ for (Map.Entry<GridCacheVersion, Set<Short>> tx :
txPrimParticipatingNodes.entrySet())
+ calcTxHash.accept(tx.getKey(), tx.getValue());
+
+ Map<Object, TransactionsHashRecord> txHashRes =
nodesTxHash.entrySet().stream()
+ .map(e -> new TransactionsHashRecord(
+ consId,
+ blt.compactIdMapping().get(e.getKey()),
+ e.getValue().hash
+ ))
+ .collect(Collectors.toMap(
+ TransactionsHashRecord::remoteConsistentId,
+ Function.identity()
+ ));
+
+ Map<PartitionKeyV2, PartitionHashRecordV2> partHashRes =
partMap.entrySet().stream()
+ .collect(Collectors.toMap(
+ Map.Entry::getKey,
+ e -> new PartitionHashRecordV2(
+ e.getKey(),
+ false,
+ consId,
+ null,
+ 0,
+ null,
+ new
IdleVerifyUtility.VerifyPartitionContext(e.getValue().hash,
e.getValue().verHash)
+ )
+ ));
+
+ if (log.isInfoEnabled()) {
+ log.info("Verify incremental snapshot procedure
finished " +
+ "[snpName=" + snpName + ", incrementIndex=" +
incIdx + ", consId=" + consId +
+ ", txCnt=" + procTxCnt.sum() + ", dataEntries=" +
procEntriesCnt.sum() +
+ ", walSegments=" + procSegCnt.get() + ']');
+ }
+
+ return new IncrementalSnapshotCheckResult(
+ txHashRes,
+ partHashRes,
+ partiallyCommittedTxs,
+ exceptions
+ );
+ }
+ catch (IgniteCheckedException | IOException e) {
+ throw new IgniteException(e);
+ }
+ },
+ executor
+ );
+ }
+
+ /** @return Collection of snapshotted transactional caches, key is a cache
ID. */
+ private Map<Integer, StoredCacheData> readTxCachesData(File snpDir) throws
IgniteCheckedException, IOException {
+ String folderName =
kctx.pdsFolderResolver().resolveFolders().folderName();
+
+ return GridLocalConfigManager.readCachesData(
+ new File(snpDir, databaseRelativePath(folderName)),
+ MarshallerUtils.jdkMarshaller(kctx.igniteInstanceName()),
+ kctx.config())
+ .values().stream()
+ .filter(data -> data.config().getAtomicityMode() ==
CacheAtomicityMode.TRANSACTIONAL)
+ .collect(Collectors.toMap(StoredCacheData::cacheId,
Function.identity()));
+ }
+
+ /** */
+ public IdleVerifyResultV2 reduceIncrementalResults(
+ Map<ClusterNode, IncrementalSnapshotCheckResult> results,
+ Map<ClusterNode, Exception> operationErrors
+ ) {
+ if (!operationErrors.isEmpty())
+ return new IdleVerifyResultV2(operationErrors);
+
+ Map<Object, Map<Object, TransactionsHashRecord>> nodeTxHashMap = new
HashMap<>();
+ List<List<TransactionsHashRecord>> txHashConflicts = new ArrayList<>();
+ Map<PartitionKeyV2, List<PartitionHashRecordV2>> partHashes = new
HashMap<>();
+ Map<ClusterNode, Collection<GridCacheVersion>> partiallyCommittedTxs =
new HashMap<>();
+ Map<ClusterNode, Exception> errors = new HashMap<>();
+
+ results.forEach((node, res) -> {
+ if (res.exceptions().isEmpty() && errors.isEmpty()) {
+ if (!F.isEmpty(res.partiallyCommittedTxs()))
+ partiallyCommittedTxs.put(node,
res.partiallyCommittedTxs());
+
+ for (Map.Entry<PartitionKeyV2, PartitionHashRecordV2> entry :
res.partHashRes().entrySet())
+ partHashes.computeIfAbsent(entry.getKey(), v -> new
ArrayList<>()).add(entry.getValue());
+
+ if (log.isDebugEnabled())
+ log.debug("Handle VerifyIncrementalSnapshotJob result
[node=" + node + ", taskRes=" + res + ']');
+
+ nodeTxHashMap.put(node.consistentId(), res.txHashRes());
+
+ Iterator<Map.Entry<Object, TransactionsHashRecord>> resIt =
res.txHashRes().entrySet().iterator();
+
+ while (resIt.hasNext()) {
+ Map.Entry<Object, TransactionsHashRecord> nodeTxHash =
resIt.next();
+
+ Map<Object, TransactionsHashRecord> prevNodeTxHash =
nodeTxHashMap.get(nodeTxHash.getKey());
+
+ if (prevNodeTxHash != null) {
+ TransactionsHashRecord hash = nodeTxHash.getValue();
+ TransactionsHashRecord prevHash =
prevNodeTxHash.remove(hash.localConsistentId());
+
+ if (prevHash == null || prevHash.transactionHash() !=
hash.transactionHash())
+ txHashConflicts.add(F.asList(hash, prevHash));
+
+ resIt.remove();
+ }
+ }
+ }
+ else if (!res.exceptions().isEmpty())
+ errors.put(node, F.first(res.exceptions()));
+ });
+
+ // Add all missed pairs to conflicts.
+ nodeTxHashMap.values().stream()
+ .flatMap(e -> e.values().stream())
+ .forEach(e -> txHashConflicts.add(F.asList(e, null)));
+
+ return errors.isEmpty()
+ ? new IdleVerifyResultV2(partHashes, txHashConflicts,
partiallyCommittedTxs)
+ : new IdleVerifyResultV2(errors);
+ }
+
/** */
public static IdleVerifyResultV2 reduceHashesResults(
Map<ClusterNode, Map<PartitionKeyV2, PartitionHashRecordV2>> results,
@@ -298,7 +660,7 @@ public class SnapshotChecker {
@Nullable String snpPath,
Map<ClusterNode, List<SnapshotMetadata>> allMetas,
@Nullable Map<ClusterNode, Exception> exceptions,
- Object curNodeCstId
+ Object consId
) {
Map<ClusterNode, Exception> mappedExceptions = F.isEmpty(exceptions) ?
Collections.emptyMap() : new HashMap<>(exceptions);
@@ -333,7 +695,7 @@ public class SnapshotChecker {
if (firstMeta == null && mappedExceptions.isEmpty()) {
throw new IllegalArgumentException("Snapshot does not exists
[snapshot=" + snpName
- + (snpPath != null ? ", baseDir=" + snpPath : "") + ",
consistentId=" + curNodeCstId + ']');
+ + (snpPath != null ? ", baseDir=" + snpPath : "") + ",
consistentId=" + consId + ']');
}
if (!F.isEmpty(baselineNodes) && F.isEmpty(exceptions)) {
@@ -390,7 +752,7 @@ public class SnapshotChecker {
}
/**
- * Calls all the registered Absence validaton handlers. Reads snapshot
metadata.
+ * Calls all the registered custom validaton handlers. Reads snapshot
metadata.
*
* @see IgniteSnapshotManager#handlers()
*/
@@ -411,7 +773,7 @@ public class SnapshotChecker {
}
/**
- * Reads snapshot metadata. Requires snapshot meta to work.
+ * Calls all the registered custom validaton handlers.
*
* @see IgniteSnapshotManager#handlers()
*/
@@ -718,7 +1080,7 @@ public class SnapshotChecker {
File snpDir,
SnapshotMetadata meta,
Collection<Integer> grpIds,
- Object nodeCstId,
+ Object consId,
boolean procPartitionsData,
boolean skipHash
) {
@@ -732,14 +1094,14 @@ public class SnapshotChecker {
EncryptionSpi encSpi = meta.encryptionKey() != null ? encryptionSpi :
null;
- try (Dump dump = new Dump(snpDir, nodeCstId.toString(), true, true,
encSpi, log)) {
+ try (Dump dump = new Dump(snpDir, consId.toString(), true, true,
encSpi, log)) {
String nodeFolderName =
kctx.pdsFolderResolver().resolveFolders().folderName();
Collection<PartitionHashRecordV2> partitionHashRecordV2s =
U.doInParallel(
executor,
grpAndPartFiles.get2(),
part -> calculateDumpedPartitionHash(dump,
cacheGroupName(part.getParentFile()), partId(part.getName()),
- skipHash, nodeCstId, nodeFolderName)
+ skipHash, consId, nodeFolderName)
);
return
partitionHashRecordV2s.stream().collect(Collectors.toMap(PartitionHashRecordV2::partitionKey,
r -> r));
@@ -781,7 +1143,7 @@ public class SnapshotChecker {
Dump.DumpedPartitionIterator iter,
String grpName,
int part,
- Object consistentId
+ Object consId
) throws IgniteCheckedException {
long size = 0;
@@ -798,7 +1160,7 @@ public class SnapshotChecker {
return new PartitionHashRecordV2(
new PartitionKeyV2(CU.cacheId(grpName), part, grpName),
false,
- consistentId,
+ consId,
null,
size,
PartitionHashRecordV2.PartitionState.OWNING,
@@ -872,4 +1234,19 @@ public class SnapshotChecker {
return key != null && key.id() == keyId ? key : null;
}
}
+
+ /** Holder for calculated hashes. */
+ private static class HashHolder {
+ /** */
+ private int hash;
+
+ /** */
+ private int verHash;
+
+ /** */
+ private void increment(int hash, int verHash) {
+ this.hash += hash;
+ this.verHash += verHash;
+ }
+ }
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadataVerificationTask.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadataVerificationTask.java
index f3250b63b76..bad025849b3 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadataVerificationTask.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadataVerificationTask.java
@@ -17,18 +17,12 @@
package org.apache.ignite.internal.processors.cache.persistence.snapshot;
-import java.io.File;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.stream.Collectors;
-import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
-import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.compute.ComputeJob;
import org.apache.ignite.compute.ComputeJobAdapter;
@@ -36,22 +30,13 @@ import org.apache.ignite.compute.ComputeJobResult;
import org.apache.ignite.compute.ComputeJobResultPolicy;
import org.apache.ignite.compute.ComputeTaskAdapter;
import org.apache.ignite.internal.IgniteEx;
-import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
-import
org.apache.ignite.internal.processors.cache.persistence.wal.FileDescriptor;
-import
org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
-import
org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory;
import org.apache.ignite.internal.processors.task.GridInternal;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.resources.IgniteInstanceResource;
-import org.apache.ignite.resources.LoggerResource;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
-import static java.lang.String.valueOf;
-import static
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.incrementalSnapshotWalsDir;
-import static
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.snapshotMetaFileName;
-
/** Snapshot task to verify snapshot metadata on the baseline nodes for given
snapshot name. */
@GridInternal
public class SnapshotMetadataVerificationTask
@@ -90,10 +75,6 @@ public class SnapshotMetadataVerificationTask
@IgniteInstanceResource
private transient IgniteEx ignite;
- /** */
- @LoggerResource
- private transient IgniteLogger log;
-
/** */
private final SnapshotMetadataVerificationTaskArg arg;
@@ -106,112 +87,8 @@ public class SnapshotMetadataVerificationTask
@Override public List<SnapshotMetadata> execute() {
IgniteSnapshotManager snpMgr =
ignite.context().cache().context().snapshotMgr();
- List<SnapshotMetadata> snpMeta;
-
- try {
- snpMeta =
snpMgr.checker().checkLocalMetas(snpMgr.snapshotLocalDir(arg.snapshotName(),
arg.snapshotPath()),
- arg.grpIds(), ignite.localNode().consistentId()).get();
- }
- catch (InterruptedException | ExecutionException e) {
- throw new IgniteException("Failed to launch snapshot metadatas
check of snapshot '" + arg.snapshotName() + "'.", e);
- }
-
- if (arg.incrementIndex() > 0) {
- List<SnapshotMetadata> metas = snpMeta.stream()
- .filter(m ->
m.consistentId().equals(valueOf(ignite.localNode().consistentId())))
- .collect(Collectors.toList());
-
- if (metas.size() != 1) {
- throw new IgniteException("Failed to find single snapshot
metafile on local node [locNodeId="
- + ignite.localNode().consistentId() + ", metas=" +
snpMeta + ", snpName=" + arg.snapshotName()
- + ", snpPath=" + arg.snapshotPath() + "]. Incremental
snapshots requires exactly one meta file " +
- "per node because they don't support restoring on a
different topology.");
- }
-
- checkIncrementalSnapshots(metas.get(0), arg);
- }
-
- return snpMeta;
- }
-
- /** Checks that all incremental snapshots are present, contain correct
metafile and WAL segments. */
- public void checkIncrementalSnapshots(SnapshotMetadata fullMeta,
SnapshotMetadataVerificationTaskArg arg) {
- try {
- GridCacheSharedContext<Object, Object> ctx =
ignite.context().cache().context();
-
- IgniteSnapshotManager snpMgr = ctx.snapshotMgr();
-
- // Incremental snapshot must contain ClusterSnapshotRecord.
- long startSeg = fullMeta.snapshotRecordPointer().index();
-
- for (int inc = 1; inc <= arg.incrementIndex(); inc++) {
- File incSnpDir =
snpMgr.incrementalSnapshotLocalDir(arg.snapshotName(), arg.snapshotPath(), inc);
-
- if (!incSnpDir.exists()) {
- throw new IllegalArgumentException("No incremental
snapshot found " +
- "[snpName=" + arg.snapshotName() + ", snpPath=" +
arg.snapshotPath() + ", incrementIndex=" + inc + ']');
- }
-
- String folderName =
ctx.kernalContext().pdsFolderResolver().resolveFolders().folderName();
-
- String metaFileName = snapshotMetaFileName(folderName);
-
- File metafile =
incSnpDir.toPath().resolve(metaFileName).toFile();
-
- IncrementalSnapshotMetadata incMeta =
snpMgr.readFromFile(metafile);
-
- if (!incMeta.matchBaseSnapshot(fullMeta)) {
- throw new IllegalArgumentException("Incremental
snapshot doesn't match full snapshot " +
- "[incMeta=" + incMeta + ", fullMeta=" + fullMeta +
']');
- }
-
- if (incMeta.incrementIndex() != inc) {
- throw new IgniteException(
- "Incremental snapshot meta has wrong index
[expectedIdx=" + inc + ", meta=" + incMeta + ']');
- }
-
- checkWalSegments(incMeta, startSeg,
incrementalSnapshotWalsDir(incSnpDir, incMeta.folderName()));
-
- // Incremental snapshots must not cross each other.
- startSeg = incMeta.incrementalSnapshotPointer().index() +
1;
- }
- }
- catch (IgniteCheckedException | IOException e) {
- throw new IgniteException(e);
- }
- }
-
- /** Check that incremental snapshot contains all required WAL
segments. Throws {@link IgniteException} in case of any errors. */
- private void checkWalSegments(IncrementalSnapshotMetadata meta, long
startWalSeg, File incSnpWalDir) {
- IgniteWalIteratorFactory factory = new
IgniteWalIteratorFactory(log);
-
- List<FileDescriptor> walSeg = factory.resolveWalFiles(
- new IgniteWalIteratorFactory.IteratorParametersBuilder()
- .filesOrDirs(incSnpWalDir.listFiles(file ->
-
FileWriteAheadLogManager.WAL_SEGMENT_FILE_COMPACTED_PATTERN.matcher(file.getName()).matches())));
-
- if (walSeg.isEmpty())
- throw new IgniteException("No WAL segments found for
incremental snapshot [dir=" + incSnpWalDir + ']');
-
- long actFirstSeg = walSeg.get(0).idx();
-
- if (actFirstSeg != startWalSeg) {
- throw new IgniteException("Missed WAL segment
[expectFirstSegment=" + startWalSeg
- + ", actualFirstSegment=" + actFirstSeg + ", meta=" + meta
+ ']');
- }
-
- long expLastSeg = meta.incrementalSnapshotPointer().index();
- long actLastSeg = walSeg.get(walSeg.size() - 1).idx();
-
- if (actLastSeg != expLastSeg) {
- throw new IgniteException("Missed WAL segment
[expectLastSegment=" + startWalSeg
- + ", actualLastSegment=" + actFirstSeg + ", meta=" + meta
+ ']');
- }
-
- List<?> walSegGaps = factory.hasGaps(walSeg);
-
- if (!walSegGaps.isEmpty())
- throw new IgniteException("Missed WAL segments [misses=" +
walSegGaps + ", meta=" + meta + ']');
+ return
snpMgr.checker().checkLocalMetas(snpMgr.snapshotLocalDir(arg.snapshotName(),
arg.snapshotPath()),
+ arg.incrementIndex(), arg.grpIds(),
ignite.localNode().consistentId()).join();
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/IdleVerifyUtility.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/IdleVerifyUtility.java
index bc54704746a..4c1d1e917de 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/IdleVerifyUtility.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/IdleVerifyUtility.java
@@ -46,7 +46,6 @@ import
org.apache.ignite.internal.processors.cache.distributed.dht.topology.Grid
import
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStore;
-import
org.apache.ignite.internal.processors.cache.persistence.snapshot.IncrementalSnapshotVerificationTask.HashHolder;
import org.apache.ignite.internal.util.lang.GridIterator;
import org.apache.ignite.internal.util.lang.IgniteThrowableSupplier;
import org.apache.ignite.internal.util.typedef.F;
@@ -381,11 +380,12 @@ public class IdleVerifyUtility {
}
/**
- * @param hash Incremental snapshot hash holder.
+ * @param partHash Partition hash.
+ * @param partVerHash Version hash.
*/
- public VerifyPartitionContext(HashHolder hash) {
- this.partHash = hash.hash;
- this.partVerHash = hash.verHash;
+ public VerifyPartitionContext(int partHash, int partVerHash) {
+ this.partHash = partHash;
+ this.partVerHash = partVerHash;
}
/** */
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckTest.java
index 3e3ab9ee6e5..ac041c439e5 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckTest.java
@@ -95,7 +95,6 @@ import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteFuture;
import org.jetbrains.annotations.Nullable;
-import org.junit.Assume;
import org.junit.Before;
import org.junit.Test;
@@ -116,6 +115,7 @@ import static
org.apache.ignite.testframework.GridTestUtils.assertContains;
import static org.apache.ignite.testframework.GridTestUtils.assertNotContains;
import static
org.apache.ignite.testframework.GridTestUtils.assertThrowsAnyCause;
import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+import static org.junit.Assume.assumeFalse;
/**
* Cluster-wide snapshot check procedure tests.
@@ -249,7 +249,7 @@ public class IgniteClusterSnapshotCheckTest extends
AbstractSnapshotSelfTest {
/** @throws Exception If fails. */
@Test
public void testClusterSnapshotCheckPartitionCounters() throws Exception {
- Assume.assumeFalse("One copy of partiton created in only primary
mode", onlyPrimary);
+ assumeFalse("One copy of partiton created in only primary mode",
onlyPrimary);
IgniteEx ignite = startGridsWithCache(3, dfltCacheCfg.
setAffinity(new RendezvousAffinityFunction(false, 1)),
@@ -404,7 +404,7 @@ public class IgniteClusterSnapshotCheckTest extends
AbstractSnapshotSelfTest {
/** @throws Exception If fails. */
@Test
public void testClusterSnapshotCheckFailsOnPartitionDataDiffers() throws
Exception {
- Assume.assumeFalse("One copy of partiton created in only primary
mode", onlyPrimary);
+ assumeFalse("One copy of partiton created in only primary mode",
onlyPrimary);
CacheConfiguration<Integer, Value> ccfg = txCacheConfig(new
CacheConfiguration<Integer, Value>(DEFAULT_CACHE_NAME))
.setAffinity(new RendezvousAffinityFunction(false, 1));
@@ -641,6 +641,128 @@ public class IgniteClusterSnapshotCheckTest extends
AbstractSnapshotSelfTest {
}
}
+ /** Tests that concurrent full checks of normal and incremental the same
snapshot are declined . */
+ @Test
+ public void testConcurrentTheSameSnpFullAndIncrementalChecksDeclined()
throws Exception {
+ assumeFalse(encryption);
+
+ // 0 - coordinator; 0,1 - baselines; 2 - non-baseline; 3,4 - clients.
+ prepareGridsAndSnapshot(3, 2, 2, false);
+
+ snp(grid(3)).createIncrementalSnapshot(SNAPSHOT_NAME).get();
+
+ for (int i = 0; i < G.allGrids().size(); ++i) {
+ for (int j = 1; j < G.allGrids().size() - 1; ++j) {
+ int i0 = i;
+ int j0 = j;
+
+ doTestConcurrentSnpCheckOperations(
+ () -> new
IgniteFutureImpl<>(snp(grid(i0)).checkSnapshot(SNAPSHOT_NAME, null)),
+ () -> new
IgniteFutureImpl<>(snp(grid(j0)).checkSnapshot(SNAPSHOT_NAME, null, 1)),
+ CHECK_SNAPSHOT_METAS,
+ CHECK_SNAPSHOT_PARTS,
+ true,
+ false,
+ null,
+ null
+ );
+ }
+ }
+ }
+
+ /** Tests that concurrent full checks of the same incremental snapshot are
declined. */
+ @Test
+ public void testConcurrentTheSameIncrementalFullChecksDeclined() throws
Exception {
+ assumeFalse(encryption);
+
+ // 0 - coordinator; 0,1 - baselines; 2 - non-baseline; 3,4 - clients.
+ prepareGridsAndSnapshot(3, 2, 2, false);
+
+ snp(grid(3)).createIncrementalSnapshot(SNAPSHOT_NAME).get();
+
+ for (int i = 0; i < G.allGrids().size(); ++i) {
+ for (int j = 1; j < G.allGrids().size() - 1; ++j) {
+ int i0 = i;
+ int j0 = j;
+
+ doTestConcurrentSnpCheckOperations(
+ () -> new
IgniteFutureImpl<>(snp(grid(i0)).checkSnapshot(SNAPSHOT_NAME, null, 1)),
+ () -> new
IgniteFutureImpl<>(snp(grid(j0)).checkSnapshot(SNAPSHOT_NAME, null, 1)),
+ CHECK_SNAPSHOT_METAS,
+ CHECK_SNAPSHOT_PARTS,
+ true,
+ false,
+ null,
+ null
+ );
+ }
+ }
+ }
+
+ /** Tests that concurrent checks of different incremental snapshots are
declined. */
+ @Test
+ public void testConcurrentDifferentIncrementalFullChecksDeclined() throws
Exception {
+ assumeFalse(encryption);
+
+ // 0 - coordinator; 0,1 - baselines; 2 - non-baseline; 3,4 - clients.
+ prepareGridsAndSnapshot(3, 2, 2, false);
+
+ snp(grid(3)).createIncrementalSnapshot(SNAPSHOT_NAME).get();
+ snp(grid(3)).createIncrementalSnapshot(SNAPSHOT_NAME).get();
+
+ for (int i = 0; i < G.allGrids().size(); ++i) {
+ for (int j = 1; j < G.allGrids().size() - 1; ++j) {
+ int i0 = i;
+ int j0 = j;
+
+ doTestConcurrentSnpCheckOperations(
+ () -> new
IgniteFutureImpl<>(snp(grid(i0)).checkSnapshot(SNAPSHOT_NAME, null, 1)),
+ () -> new
IgniteFutureImpl<>(snp(grid(j0)).checkSnapshot(SNAPSHOT_NAME, null, 2)),
+ CHECK_SNAPSHOT_METAS,
+ CHECK_SNAPSHOT_PARTS,
+ true,
+ false,
+ null,
+ null
+ );
+ }
+ }
+ }
+
+ /** Tests that concurrent full restoration of a normal snapshot and check
of an incremental are declined. */
+ @Test
+ public void
testConcurrentTheSameSnpIncrementalCheckAndFullRestoreDeclined() throws
Exception {
+ assumeFalse(encryption);
+
+ // 0 - coordinator; 0,1 - baselines; 2 - non-baseline; 3,4 - clients.
+ prepareGridsAndSnapshot(3, 2, 2, false);
+
+ snp(grid(3)).createIncrementalSnapshot(SNAPSHOT_NAME).get();
+
+ grid(0).destroyCache(DEFAULT_CACHE_NAME);
+
+ awaitPartitionMapExchange();
+
+ // Snapshot restoration is disallowed from client nodes.
+ for (int i = 0; i < 3; ++i) {
+ for (int j = 1; j < G.allGrids().size() - 1; ++j) {
+ int i0 = i;
+ int j0 = j;
+
+ doTestConcurrentSnpCheckOperations(
+ () -> snp(grid(i0)).restoreSnapshot(SNAPSHOT_NAME, null,
null, 0, true),
+ () -> new
IgniteFutureImpl<>(snp(grid(j0)).checkSnapshot(SNAPSHOT_NAME, null, 1)),
+ CHECK_SNAPSHOT_METAS,
+ CHECK_SNAPSHOT_PARTS,
+ true,
+ false,
+ null,
+ () -> grid(0).destroyCache(DEFAULT_CACHE_NAME)
+ );
+ }
+ }
+ }
+
/** Tests that concurrent snapshot full checks are allowed for different
snapshots. */
@Test
public void testConcurrentDifferentSnpFullChecksAllowed() throws Exception
{
@@ -807,7 +929,7 @@ public class IgniteClusterSnapshotCheckTest extends
AbstractSnapshotSelfTest {
/** Tests that snapshot full check doesn't affect a snapshot creation. */
@Test
- public void testConcurrentSnpCheckAndCreateAllowed() throws Exception {
+ public void testConcurrentDifferentSnpCheckAndCreateAllowed() throws
Exception {
prepareGridsAndSnapshot(3, 2, 2, false);
for (int i = 0; i < G.allGrids().size(); ++i) {