This is an automated email from the ASF dual-hosted git repository.
timoninmaxim pushed a commit to branch IGNITE-22662__snapshot_refactoring
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to
refs/heads/IGNITE-22662__snapshot_refactoring by this push:
new 840d89bf439 Dedicated Snapshot Checker (#11423)
840d89bf439 is described below
commit 840d89bf439733a349487091fa10e209e094d5fc
Author: Vladimir Steshin <[email protected]>
AuthorDate: Wed Jul 10 15:30:08 2024 +0300
Dedicated Snapshot Checker (#11423)
---
.../snapshot/IgniteSnapshotManager.java | 103 +----
...ionsVerifyHandler.java => SnapshotChecker.java} | 500 +++++++++++++--------
.../snapshot/SnapshotMetadataVerificationTask.java | 104 +----
.../snapshot/SnapshotPartitionsVerifyHandler.java | 414 +----------------
4 files changed, 343 insertions(+), 778 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
index be0d9afd6c9..66438cbdb36 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
@@ -17,18 +17,15 @@
package org.apache.ignite.internal.processors.cache.persistence.snapshot;
-import java.io.BufferedInputStream;
import java.io.Closeable;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
-import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.FileChannel;
-import java.nio.file.DirectoryStream;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
import java.nio.file.Path;
@@ -147,7 +144,6 @@ import
org.apache.ignite.internal.processors.cache.persistence.tree.io.DataPageP
import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
import org.apache.ignite.internal.processors.cache.persistence.wal.crc.FastCrc;
-import
org.apache.ignite.internal.processors.cache.persistence.wal.reader.StandaloneGridKernalContext;
import
org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.tree.DataRow;
import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState;
@@ -208,7 +204,6 @@ import static
org.apache.ignite.internal.GridClosureCallMode.BROADCAST;
import static
org.apache.ignite.internal.IgniteFeatures.PERSISTENCE_CACHE_SNAPSHOT;
import static org.apache.ignite.internal.IgniteFeatures.nodeSupports;
import static
org.apache.ignite.internal.MarshallerContextImpl.mappingFileStoreWorkDir;
-import static
org.apache.ignite.internal.MarshallerContextImpl.resolveMappingFileStoreWorkDir;
import static org.apache.ignite.internal.MarshallerContextImpl.saveMappings;
import static
org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT;
import static
org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
@@ -222,7 +217,6 @@ import static
org.apache.ignite.internal.pagemem.PageIdUtils.toDetailString;
import static
org.apache.ignite.internal.processors.cache.GridCacheUtils.baselineNode;
import static
org.apache.ignite.internal.processors.cache.GridCacheUtils.isPersistenceEnabled;
import static
org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl.binaryWorkDir;
-import static
org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl.resolveBinaryWorkDir;
import static
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.INDEX_FILE_NAME;
import static
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.PART_FILE_TEMPLATE;
import static
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.cacheDirectories;
@@ -473,6 +467,9 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
private final boolean sequentialWrite =
IgniteSystemProperties.getBoolean(IGNITE_SNAPSHOT_SEQUENTIAL_WRITE,
DFLT_IGNITE_SNAPSHOT_SEQUENTIAL_WRITE);
+ /** Snapshot validator. */
+ private final SnapshotChecker snpChecker;
+
/**
* @param ctx Kernal context.
*/
@@ -493,6 +490,8 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
// Manage remote snapshots.
snpRmtMgr = new SequentialRemoteSnapshotManager();
+
+ snpChecker = new SnapshotChecker(ctx, marsh,
ctx.pools().getSnapshotExecutorService(), U.resolveClassLoader(ctx.config()));
}
/**
@@ -1986,17 +1985,7 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
* @return Snapshot metadata instance.
*/
private SnapshotMetadata readSnapshotMetadata(File smf) throws
IgniteCheckedException, IOException {
- SnapshotMetadata meta = readFromFile(smf);
-
- String smfName = smf.getName().substring(0, smf.getName().length() -
SNAPSHOT_METAFILE_EXT.length());
-
- if (!U.maskForFileName(meta.consistentId()).equals(smfName)) {
- throw new IgniteException(
- "Error reading snapshot metadata [smfName=" + smfName + ",
consId=" + U.maskForFileName(meta.consistentId())
- );
- }
-
- return meta;
+ return snpChecker.readSnapshotMetadata(smf);
}
/**
@@ -2005,12 +1994,7 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
* @param <T> Type of metadata.
*/
public <T> T readFromFile(File smf) throws IgniteCheckedException,
IOException {
- if (!smf.exists())
- throw new IgniteCheckedException("Snapshot metafile cannot be read
due to it doesn't exist: " + smf);
-
- try (InputStream in = new
BufferedInputStream(Files.newInputStream(smf.toPath()))) {
- return marsh.unmarshal(in,
U.resolveClassLoader(cctx.gridConfig()));
- }
+ return snpChecker.readFromFile(smf);
}
/**
@@ -2026,58 +2010,7 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
File snpDir = snapshotLocalDir(snpName, snpPath);
- if (!(snpDir.exists() && snpDir.isDirectory()))
- return Collections.emptyList();
-
- List<File> smfs = new ArrayList<>();
-
- try (DirectoryStream<Path> ds =
Files.newDirectoryStream(snpDir.toPath())) {
- for (Path d : ds) {
- if (Files.isRegularFile(d) &&
d.getFileName().toString().toLowerCase().endsWith(SNAPSHOT_METAFILE_EXT))
- smfs.add(d.toFile());
- }
- }
- catch (IOException e) {
- throw new IgniteException(e);
- }
-
- if (smfs.isEmpty())
- return Collections.emptyList();
-
- Map<String, SnapshotMetadata> metasMap = new HashMap<>();
- SnapshotMetadata prev = null;
-
- try {
- for (File smf : smfs) {
- SnapshotMetadata curr = readSnapshotMetadata(smf);
-
- if (prev != null && !prev.sameSnapshot(curr)) {
- throw new IgniteException("Snapshot metadata files are
from different snapshots " +
- "[prev=" + prev + ", curr=" + curr + ']');
- }
-
- metasMap.put(curr.consistentId(), curr);
-
- prev = curr;
- }
- }
- catch (IgniteCheckedException | IOException e) {
- throw new IgniteException(e);
- }
-
- SnapshotMetadata currNodeSmf =
metasMap.remove(cctx.localNode().consistentId().toString());
-
- // Snapshot metadata for the local node must be first in the result
map.
- if (currNodeSmf == null)
- return new ArrayList<>(metasMap.values());
- else {
- List<SnapshotMetadata> result = new ArrayList<>();
-
- result.add(currNodeSmf);
- result.addAll(metasMap.values());
-
- return result;
- }
+ return snpChecker.readSnapshotMetadatas(snpDir,
cctx.localNode().consistentId());
}
/**
@@ -2570,21 +2503,6 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
return U.maskForFileName(consId) + SNAPSHOT_METAFILE_EXT;
}
- /**
- * @param snpDir The full path to the snapshot files.
- * @param folderName The node folder name, usually it's the same as the
U.maskForFileName(consistentId).
- * @return Standalone kernal context related to the snapshot.
- * @throws IgniteCheckedException If fails.
- */
- public StandaloneGridKernalContext createStandaloneKernalContext(
- CompressionProcessor cmpProc,
- File snpDir,
- String folderName
- ) throws IgniteCheckedException {
- return new StandaloneGridKernalContext(log, cmpProc,
resolveBinaryWorkDir(snpDir.getAbsolutePath(), folderName),
- resolveMappingFileStoreWorkDir(snpDir.getAbsolutePath()));
- }
-
/**
* @param grpName Cache group name.
* @param partId Partition id.
@@ -2948,6 +2866,11 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
return ioFactory;
}
+ /** */
+ public SnapshotChecker checker() {
+ return snpChecker;
+ }
+
/**
* @param nodeId Remote node id on which requests has been registered.
* @return Snapshot future related to given node id.
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyHandler.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotChecker.java
similarity index 54%
copy from
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyHandler.java
copy to
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotChecker.java
index 8f569b66f09..0a12e085322 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyHandler.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotChecker.java
@@ -1,12 +1,12 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
+ * contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -14,17 +14,19 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+import java.io.BufferedInputStream;
import java.io.File;
import java.io.IOException;
+import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -33,6 +35,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
@@ -40,14 +43,12 @@ import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.dump.DumpEntry;
import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.management.cache.IdleVerifyResultV2;
import org.apache.ignite.internal.management.cache.PartitionKeyV2;
import
org.apache.ignite.internal.managers.encryption.EncryptionCacheKeyProvider;
import org.apache.ignite.internal.managers.encryption.GroupKey;
import org.apache.ignite.internal.managers.encryption.GroupKeyEncrypted;
import org.apache.ignite.internal.pagemem.store.PageStore;
import org.apache.ignite.internal.processors.cache.CacheObject;
-import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.StoredCacheData;
import
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
@@ -59,15 +60,17 @@ import
org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
import
org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusMetaIO;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
import
org.apache.ignite.internal.processors.cache.persistence.tree.io.PagePartitionMetaIO;
-import
org.apache.ignite.internal.processors.cache.verify.IdleVerifyUtility.VerifyPartitionContext;
+import org.apache.ignite.internal.processors.cache.verify.IdleVerifyUtility;
import
org.apache.ignite.internal.processors.cache.verify.PartitionHashRecordV2;
import org.apache.ignite.internal.processors.compress.CompressionProcessor;
-import org.apache.ignite.internal.util.GridStringBuilder;
import org.apache.ignite.internal.util.GridUnsafe;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.spi.encryption.EncryptionSpi;
+import org.apache.ignite.spi.encryption.noop.NoopEncryptionSpi;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.internal.pagemem.PageIdAllocator.FLAG_DATA;
@@ -83,77 +86,271 @@ import static
org.apache.ignite.internal.processors.cache.persistence.file.FileP
import static
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.cachePartitionFiles;
import static
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.partId;
import static
org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId.getTypeByPartId;
+import static
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.SNAPSHOT_METAFILE_EXT;
import static
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.databaseRelativePath;
import static
org.apache.ignite.internal.processors.cache.persistence.snapshot.dump.CreateDumpFutureTask.DUMP_FILE_EXT;
-import static
org.apache.ignite.internal.processors.cache.persistence.wal.reader.StandaloneGridKernalContext.closeAllComponents;
-import static
org.apache.ignite.internal.processors.cache.persistence.wal.reader.StandaloneGridKernalContext.startAllComponents;
import static
org.apache.ignite.internal.processors.cache.verify.IdleVerifyUtility.calculatePartitionHash;
import static
org.apache.ignite.internal.processors.cache.verify.IdleVerifyUtility.checkPartitionsPageCrcSum;
-/**
- * Default snapshot restore handler for checking snapshot partitions
consistency.
- */
-public class SnapshotPartitionsVerifyHandler implements
SnapshotHandler<Map<PartitionKeyV2, PartitionHashRecordV2>> {
- /** Shared context. */
- protected final GridCacheSharedContext<?, ?> cctx;
+/** */
+public class SnapshotChecker {
+ /** */
+ protected final IgniteLogger log;
+
+ /** */
+ @Nullable protected final GridKernalContext kctx;
+
+ /** */
+ protected final Marshaller marshaller;
+
+ /** */
+ @Nullable protected final ClassLoader marshallerClsLdr;
+
+ /** */
+ protected final EncryptionSpi encryptionSpi;
+
+ /** */
+ @Nullable protected final CompressionProcessor compression;
+
+ /** */
+ protected final ExecutorService executor;
+
+ /** */
+ public SnapshotChecker(
+ GridKernalContext kctx,
+ Marshaller marshaller,
+ ExecutorService executorSrvc,
+ @Nullable ClassLoader marshallerClsLdr
+ ) {
+ this.kctx = kctx;
+
+ this.marshaller = marshaller;
+ this.marshallerClsLdr = marshallerClsLdr;
+
+ this.encryptionSpi = kctx.config().getEncryptionSpi() == null ? new
NoopEncryptionSpi() : kctx.config().getEncryptionSpi();
+
+ this.compression = kctx.compress();
+
+ this.executor = executorSrvc;
+
+ this.log = kctx.log(getClass());
+ }
+
+ /** */
+ protected List<SnapshotMetadata> readSnapshotMetadatas(File snpFullPath,
@Nullable Object nodeConstId) {
+ if (!(snpFullPath.exists() && snpFullPath.isDirectory()))
+ return Collections.emptyList();
+
+ List<File> smfs = new ArrayList<>();
+
+ try (DirectoryStream<Path> ds =
Files.newDirectoryStream(snpFullPath.toPath())) {
+ for (Path d : ds) {
+ if (Files.isRegularFile(d) &&
d.getFileName().toString().toLowerCase().endsWith(SNAPSHOT_METAFILE_EXT))
+ smfs.add(d.toFile());
+ }
+ }
+ catch (IOException e) {
+ throw new IgniteException(e);
+ }
+
+ if (smfs.isEmpty())
+ return Collections.emptyList();
+
+ Map<String, SnapshotMetadata> metasMap = new HashMap<>();
+ SnapshotMetadata prev = null;
+
+ try {
+ for (File smf : smfs) {
+ SnapshotMetadata curr = readSnapshotMetadata(smf);
+
+ if (prev != null && !prev.sameSnapshot(curr)) {
+ throw new IgniteException("Snapshot metadata files are
from different snapshots " +
+ "[prev=" + prev + ", curr=" + curr + ']');
+ }
+
+ metasMap.put(curr.consistentId(), curr);
+
+ prev = curr;
+ }
+ }
+ catch (IgniteCheckedException | IOException e) {
+ throw new IgniteException(e);
+ }
+
+ SnapshotMetadata currNodeSmf = nodeConstId == null ? null :
metasMap.remove(nodeConstId.toString());
- /** Logger. */
- private final IgniteLogger log;
+ // Snapshot metadata for the local node must be first in the result
map.
+ if (currNodeSmf == null)
+ return new ArrayList<>(metasMap.values());
+ else {
+ List<SnapshotMetadata> result = new ArrayList<>();
- /** @param cctx Shared context. */
- public SnapshotPartitionsVerifyHandler(GridCacheSharedContext<?, ?> cctx) {
- this.cctx = cctx;
+ result.add(currNodeSmf);
+ result.addAll(metasMap.values());
- log = cctx.logger(getClass());
+ return result;
+ }
}
- /** {@inheritDoc} */
- @Override public SnapshotHandlerType type() {
- return SnapshotHandlerType.RESTORE;
+ /** */
+ public SnapshotMetadata readSnapshotMetadata(File smf)
+ throws IgniteCheckedException, IOException {
+ SnapshotMetadata meta = readFromFile(smf);
+
+ String smfName = smf.getName().substring(0, smf.getName().length() -
SNAPSHOT_METAFILE_EXT.length());
+
+ if (!U.maskForFileName(meta.consistentId()).equals(smfName)) {
+ throw new IgniteException("Error reading snapshot metadata
[smfName=" + smfName + ", consId="
+ + U.maskForFileName(meta.consistentId()));
+ }
+
+ return meta;
}
- /** {@inheritDoc} */
- @Override public Map<PartitionKeyV2, PartitionHashRecordV2>
invoke(SnapshotHandlerContext opCtx) throws IgniteCheckedException {
- if (!opCtx.snapshotDirectory().exists())
- throw new IgniteCheckedException("Snapshot directory doesn't
exists: " + opCtx.snapshotDirectory());
+ /** */
+ public <T> T readFromFile(File smf)
+ throws IOException, IgniteCheckedException {
+ if (!smf.exists())
+ throw new IgniteCheckedException("Snapshot metafile cannot be read
due to it doesn't exist: " + smf);
- SnapshotMetadata meta = opCtx.metadata();
+ try (InputStream in = new
BufferedInputStream(Files.newInputStream(smf.toPath()))) {
+ return marshaller.unmarshal(in, marshallerClsLdr);
+ }
+ }
+
+ /** */
+ public List<SnapshotMetadata> checkLocalMetas(File snpFullPath, @Nullable
Collection<Integer> cacheGrpIds,
+ @Nullable Object locNodeConsistId) {
+ List<SnapshotMetadata> snpMetas = readSnapshotMetadatas(snpFullPath,
locNodeConsistId);
- Set<Integer> grps = F.isEmpty(opCtx.groups())
- ? new HashSet<>(meta.partitions().keySet())
- :
opCtx.groups().stream().map(CU::cacheId).collect(Collectors.toSet());
+ for (SnapshotMetadata meta : snpMetas) {
+ byte[] snpMasterKeyDigest = meta.masterKeyDigest();
- if (type() == SnapshotHandlerType.CREATE) {
- grps = grps.stream().filter(grp -> grp ==
MetaStorage.METASTORAGE_CACHE_ID ||
- CU.affinityNode(
- cctx.localNode(),
-
cctx.kernalContext().cache().cacheGroupDescriptor(grp).config().getNodeFilter()
- )
- ).collect(Collectors.toSet());
+ if (encryptionSpi.masterKeyDigest() == null && snpMasterKeyDigest
!= null) {
+ throw new IllegalStateException("Snapshot '" +
meta.snapshotName() + "' has encrypted caches " +
+ "while encryption is disabled. To restore this snapshot,
start Ignite with configured " +
+ "encryption and the same master key.");
+ }
+
+ if (snpMasterKeyDigest != null &&
!Arrays.equals(snpMasterKeyDigest, encryptionSpi.masterKeyDigest())) {
+ throw new IllegalStateException("Snapshot '" +
meta.snapshotName() + "' has different master " +
+ "key digest. To restore this snapshot, start Ignite with
the same master key.");
+ }
+
+ Collection<Integer> grpIdsToFind = new
HashSet<>(F.isEmpty(cacheGrpIds) ? meta.cacheGroupIds() : cacheGrpIds);
+
+ if (meta.hasCompressedGroups() &&
grpIdsToFind.stream().anyMatch(meta::isGroupWithCompression)) {
+ try {
+ compression.checkPageCompressionSupported();
+ }
+ catch (NullPointerException | IgniteCheckedException e) {
+ String grpWithCompr =
grpIdsToFind.stream().filter(meta::isGroupWithCompression)
+ .map(String::valueOf).collect(Collectors.joining(",
"));
+
+ String msg = "Requested cache groups [" + grpWithCompr +
"] for check " +
+ "from snapshot '" + meta.snapshotName() + "' are
compressed while " +
+ "disk page compression is disabled. To check these
groups please " +
+ "start Ignite with ignite-compress module in
classpath";
+
+ throw new IllegalStateException(msg);
+ }
+ }
+
+ grpIdsToFind.removeAll(meta.partitions().keySet());
+
+ if (!grpIdsToFind.isEmpty() && !new
HashSet<>(meta.cacheGroupIds()).containsAll(grpIdsToFind)) {
+ throw new IllegalArgumentException("Cache group(s) was not
found in the snapshot [groups=" + grpIdsToFind +
+ ", snapshot=" + meta.snapshotName() + ']');
+ }
}
- Set<File> partFiles = new HashSet<>();
+ return snpMetas;
+ }
+
+ /** */
+ public static Map<ClusterNode, Exception> checkClusterMetas(
+ String snpName,
+ @Nullable String snpPath,
+ Map<ClusterNode, List<SnapshotMetadata>> allMetas,
+ Map<ClusterNode, Exception> knownExceptions
+ ) {
+ Map<ClusterNode, Exception> resultExceptions = new
HashMap<>(knownExceptions);
+ SnapshotMetadata firstMeta = null;
+ Set<String> baselineNodes = Collections.emptySet();
+
+ for (Map.Entry<ClusterNode, List<SnapshotMetadata>> nme :
allMetas.entrySet()) {
+ ClusterNode node = nme.getKey();
+ Exception e = knownExceptions.get(node);
+
+ if (e != null) {
+ resultExceptions.put(node, e);
+
+ continue;
+ }
+
+ for (SnapshotMetadata meta : nme.getValue()) {
+ if (firstMeta == null) {
+ firstMeta = meta;
+
+ baselineNodes = new HashSet<>(firstMeta.baselineNodes());
+ }
+
+ baselineNodes.remove(meta.consistentId());
+
+ if (!firstMeta.sameSnapshot(meta)) {
+ resultExceptions.put(node, new IgniteException("An error
occurred during comparing snapshot metadata "
+ + "from cluster nodes [firstMeta=" + firstMeta + ",
meta=" + meta + ", nodeId=" + node.id() + ']'));
+ }
+ }
+ }
+
+ if (firstMeta == null && resultExceptions.isEmpty()) {
+ assert !allMetas.isEmpty();
+
+ for (ClusterNode node : allMetas.keySet()) {
+ Exception e = new IllegalArgumentException("Snapshot does not
exists [snapshot=" + snpName
+ + (snpPath != null ? ", baseDir=" + snpPath : "") + ",
consistentId=" + node.consistentId() + ']');
+
+ resultExceptions.put(node, e);
+ }
+ }
+
+ if (!F.isEmpty(baselineNodes) && F.isEmpty(knownExceptions)) {
+ throw new IgniteException("No snapshot metadatas found for the
baseline nodes " +
+ "with consistent ids: " + String.join(", ", baselineNodes));
+ }
+
+ return resultExceptions;
+ }
+
+ /** */
+ private IgniteBiTuple<Map<Integer, File>, Set<File>>
preparePartitions(SnapshotMetadata meta, Collection<Integer> grps, File snpDir)
{
Map<Integer, File> grpDirs = new HashMap<>();
+ Set<File> partFiles = new HashSet<>();
+
+ Set<Integer> grpsLeft = new HashSet<>(F.isEmpty(grps) ?
meta.partitions().keySet() : grps);
- for (File dir : cacheDirectories(new File(opCtx.snapshotDirectory(),
databaseRelativePath(meta.folderName())), name -> true)) {
+ for (File dir : cacheDirectories(new File(snpDir,
databaseRelativePath(meta.folderName())), name -> true)) {
int grpId = CU.cacheId(cacheGroupName(dir));
- if (!grps.remove(grpId))
+ if (!grpsLeft.remove(grpId))
continue;
- Set<Integer> parts = meta.partitions().get(grpId) == null ?
Collections.emptySet() :
- new HashSet<>(meta.partitions().get(grpId));
+ grpDirs.put(grpId, dir);
+
+ Set<Integer> parts = new HashSet<>(meta.partitions().get(grpId) ==
null ? Collections.emptySet()
+ : meta.partitions().get(grpId));
- for (File part : cachePartitionFiles(dir,
+ for (File partFile : cachePartitionFiles(dir,
(meta.dump() ? DUMP_FILE_EXT : FILE_SUFFIX) +
(meta.compressPartitions() ? ZIP_SUFFIX : "")
)) {
- int partId = partId(part.getName());
+ int partId = partId(partFile.getName());
if (!parts.remove(partId))
continue;
- partFiles.add(part);
+ partFiles.add(partFile);
}
if (!parts.isEmpty()) {
@@ -161,54 +358,40 @@ public class SnapshotPartitionsVerifyHandler implements
SnapshotHandler<Map<Part
"[grpId=" + grpId + ", snpName=" + meta.snapshotName() +
", consId=" + meta.consistentId() +
", missed=" + parts + ", meta=" + meta + ']');
}
-
- grpDirs.put(grpId, dir);
}
- if (!grps.isEmpty()) {
+ if (!grpsLeft.isEmpty()) {
throw new IgniteException("Snapshot data doesn't contain required
cache groups " +
- "[grps=" + grps + ", snpName=" + meta.snapshotName() + ",
consId=" + meta.consistentId() +
+ "[grps=" + grpsLeft + ", snpName=" + meta.snapshotName() + ",
consId=" + meta.consistentId() +
", meta=" + meta + ']');
}
- if (!opCtx.check()) {
- log.info("Snapshot data integrity check skipped [snpName=" +
meta.snapshotName() + ']');
-
- return Collections.emptyMap();
- }
-
- return meta.dump()
- ? checkDumpFiles(opCtx, partFiles)
- : checkSnapshotFiles(opCtx, grpDirs, meta, partFiles,
isPunchHoleEnabled(opCtx, grpDirs.keySet()));
+ return new IgniteBiTuple<>(grpDirs, partFiles);
}
/** */
- private Map<PartitionKeyV2, PartitionHashRecordV2> checkSnapshotFiles(
- SnapshotHandlerContext opCtx,
- Map<Integer, File> grpDirs,
+ public Map<PartitionKeyV2, PartitionHashRecordV2> checkSnapshotFiles(
+ File snpDir,
+ Set<Integer> grpIds,
SnapshotMetadata meta,
- Set<File> partFiles,
+ boolean forCreation,
+ boolean skipHash,
boolean punchHoleEnabled
) throws IgniteCheckedException {
+ IgniteBiTuple<Map<Integer, File>, Set<File>> grpAndPartFiles =
preparePartitions(meta, grpIds, snpDir);
+
Map<PartitionKeyV2, PartitionHashRecordV2> res = new
ConcurrentHashMap<>();
ThreadLocal<ByteBuffer> buff = ThreadLocal.withInitial(() ->
ByteBuffer.allocateDirect(meta.pageSize())
.order(ByteOrder.nativeOrder()));
- IgniteSnapshotManager snpMgr = cctx.snapshotMgr();
-
- GridKernalContext snpCtx =
snpMgr.createStandaloneKernalContext(cctx.kernalContext().compress(),
- opCtx.snapshotDirectory(), meta.folderName());
-
- FilePageStoreManager storeMgr = (FilePageStoreManager)cctx.pageStore();
-
- EncryptionCacheKeyProvider snpEncrKeyProvider = new
SnapshotEncryptionKeyProvider(cctx.kernalContext(), grpDirs);
+ FilePageStoreManager storeMgr =
(FilePageStoreManager)kctx.cache().context().pageStore();
- startAllComponents(snpCtx);
+ EncryptionCacheKeyProvider snpEncrKeyProvider = new
SnapshotEncryptionKeyProvider(kctx, grpAndPartFiles.get1());
try {
U.doInParallel(
- snpMgr.snapshotExecutorService(),
- partFiles,
+ executor,
+ grpAndPartFiles.get2(),
part -> {
String grpName = cacheGroupName(part.getParentFile());
int grpId = CU.cacheId(grpName);
@@ -220,7 +403,7 @@ public class SnapshotPartitionsVerifyHandler implements
SnapshotHandler<Map<Part
) {
pageStore.init();
- if (punchHoleEnabled &&
meta.isGroupWithCompression(grpId) && type() == SnapshotHandlerType.CREATE) {
+ if (punchHoleEnabled &&
meta.isGroupWithCompression(grpId) && forCreation) {
byte pageType = partId == INDEX_PARTITION ?
FLAG_IDX : FLAG_DATA;
checkPartitionsPageCrcSum(() -> pageStore, partId,
pageType, (id, buffer) -> {
@@ -241,14 +424,14 @@ public class SnapshotPartitionsVerifyHandler implements
SnapshotHandler<Map<Part
}
if (partId == INDEX_PARTITION) {
- if (!skipHash())
+ if (!skipHash)
checkPartitionsPageCrcSum(() -> pageStore,
INDEX_PARTITION, FLAG_IDX);
return null;
}
if (grpId == MetaStorage.METASTORAGE_CACHE_ID) {
- if (!skipHash())
+ if (!skipHash)
checkPartitionsPageCrcSum(() -> pageStore,
partId, FLAG_DATA);
return null;
@@ -261,7 +444,7 @@ public class SnapshotPartitionsVerifyHandler implements
SnapshotHandler<Map<Part
long pageAddr = GridUnsafe.bufferAddress(pageBuff);
if (PageIO.getCompressionType(pageBuff) !=
CompressionProcessor.UNCOMPRESSED_PAGE)
- snpCtx.compress().decompressPage(pageBuff,
pageStore.getPageSize());
+ kctx.compress().decompressPage(pageBuff,
pageStore.getPageSize());
PagePartitionMetaIO io = PageIO.getPageIO(pageBuff);
GridDhtPartitionState partState =
fromOrdinal(io.getPartitionState(pageAddr));
@@ -291,13 +474,13 @@ public class SnapshotPartitionsVerifyHandler implements
SnapshotHandler<Map<Part
GridDhtPartitionState.OWNING,
false,
size,
- skipHash() ? F.emptyIterator()
- : snpMgr.partitionRowIterator(snpCtx, grpName,
partId, pageStore));
+ skipHash ? F.emptyIterator()
+ :
kctx.cache().context().snapshotMgr().partitionRowIterator(kctx, grpName,
partId, pageStore));
assert hash != null : "OWNING must have hash: " + key;
// We should skip size comparison if there are entries
to expire exist.
- if (hasExpiringEntries(snpCtx, pageStore, pageBuff,
io.getPendingTreeRoot(pageAddr)))
+ if (hasExpiringEntries(kctx, pageStore, pageBuff,
io.getPendingTreeRoot(pageAddr)))
hash.hasExpiringEntries(true);
res.put(key, hash);
@@ -315,9 +498,6 @@ public class SnapshotPartitionsVerifyHandler implements
SnapshotHandler<Map<Part
throw t;
}
- finally {
- closeAllComponents(snpCtx);
- }
return res;
}
@@ -357,74 +537,52 @@ public class SnapshotPartitionsVerifyHandler implements
SnapshotHandler<Map<Part
}
/** */
- private Map<PartitionKeyV2, PartitionHashRecordV2> checkDumpFiles(
- SnapshotHandlerContext opCtx,
- Set<File> partFiles
+ public Map<PartitionKeyV2, PartitionHashRecordV2> checkDumpFiles(
+ File snpDir,
+ SnapshotMetadata meta,
+ Collection<Integer> grpIds,
+ Object nodeCstId,
+ boolean skipHash
) {
- try {
- String consistentId =
cctx.kernalContext().pdsFolderResolver().resolveFolders().consistentId().toString();
+ EncryptionSpi encSpi = meta.encryptionKey() != null ? encryptionSpi :
null;
- EncryptionSpi encSpi = opCtx.metadata().encryptionKey() != null ?
cctx.gridConfig().getEncryptionSpi() : null;
+ try (Dump dump = new Dump(snpDir,
U.maskForFileName(nodeCstId.toString()), true, true, encSpi, log)) {
+ IgniteBiTuple<Map<Integer, File>, Set<File>> grpAndPartFiles =
preparePartitions(meta, grpIds, dump.dumpDirectory());
- try (Dump dump = new Dump(opCtx.snapshotDirectory(), consistentId,
true, true, encSpi, log)) {
- Collection<PartitionHashRecordV2> partitionHashRecordV2s =
U.doInParallel(
- cctx.snapshotMgr().snapshotExecutorService(),
- partFiles,
- part -> calculateDumpedPartitionHash(dump,
cacheGroupName(part.getParentFile()), partId(part.getName()))
- );
-
- return
partitionHashRecordV2s.stream().collect(Collectors.toMap(PartitionHashRecordV2::partitionKey,
r -> r));
- }
- catch (Throwable t) {
- log.error("Error executing handler: ", t);
+ Collection<PartitionHashRecordV2> partitionHashRecordV2s =
U.doInParallel(
+ executor,
+ grpAndPartFiles.get2(),
+ part -> calculateDumpedPartitionHash(dump,
cacheGroupName(part.getParentFile()), partId(part.getName()),
+ skipHash, nodeCstId,
U.maskForFileName(nodeCstId.toString()))
+ );
- throw new IgniteException(t);
- }
+ return
partitionHashRecordV2s.stream().collect(Collectors.toMap(PartitionHashRecordV2::partitionKey,
r -> r));
}
- catch (IgniteCheckedException e) {
- throw new IgniteException(e);
+ catch (Throwable t) {
+ log.error("An unexpected error occurred during dump partitions
verifying.", t);
+
+ throw new IgniteException(t);
}
}
/** */
- private PartitionHashRecordV2 calculateDumpedPartitionHash(Dump dump,
String grpName, int part) {
- if (skipHash()) {
+ public static PartitionHashRecordV2 calculateDumpedPartitionHash(Dump
dump, String grpName, int part, boolean skipHash,
+ Object nodeConstId, String nodeFolderName) {
+ if (skipHash) {
return new PartitionHashRecordV2(
new PartitionKeyV2(CU.cacheId(grpName), part, grpName),
false,
- cctx.localNode().consistentId(),
+ nodeConstId,
null,
0,
PartitionHashRecordV2.PartitionState.OWNING,
- new VerifyPartitionContext()
+ new IdleVerifyUtility.VerifyPartitionContext()
);
}
try {
- String node =
cctx.kernalContext().pdsFolderResolver().resolveFolders().folderName();
-
- try (Dump.DumpedPartitionIterator iter = dump.iterator(node,
CU.cacheId(grpName), part)) {
- long size = 0;
-
- VerifyPartitionContext ctx = new VerifyPartitionContext();
-
- while (iter.hasNext()) {
- DumpEntry e = iter.next();
-
- ctx.update((KeyCacheObject)e.key(),
(CacheObject)e.value(), e.version());
-
- size++;
- }
-
- return new PartitionHashRecordV2(
- new PartitionKeyV2(CU.cacheId(grpName), part, grpName),
- false,
- cctx.localNode().consistentId(),
- null,
- size,
- PartitionHashRecordV2.PartitionState.OWNING,
- ctx
- );
+ try (Dump.DumpedPartitionIterator iter =
dump.iterator(nodeFolderName, CU.cacheId(grpName), part)) {
+ return calculateDumpPartitionHash(iter, grpName, part,
nodeConstId);
}
}
catch (Exception e) {
@@ -432,62 +590,34 @@ public class SnapshotPartitionsVerifyHandler implements
SnapshotHandler<Map<Part
}
}
- /** {@inheritDoc} */
- @Override public void complete(String name,
- Collection<SnapshotHandlerResult<Map<PartitionKeyV2,
PartitionHashRecordV2>>> results) throws IgniteCheckedException {
- Map<PartitionKeyV2, List<PartitionHashRecordV2>> clusterHashes = new
HashMap<>();
- Map<ClusterNode, Exception> errs = new HashMap<>();
-
- for (SnapshotHandlerResult<Map<PartitionKeyV2, PartitionHashRecordV2>>
res : results) {
- if (res.error() != null) {
- errs.put(res.node(), res.error());
-
- continue;
- }
-
- for (Map.Entry<PartitionKeyV2, PartitionHashRecordV2> entry :
res.data().entrySet())
- clusterHashes.computeIfAbsent(entry.getKey(), v -> new
ArrayList<>()).add(entry.getValue());
- }
-
- IdleVerifyResultV2 verifyResult = new
IdleVerifyResultV2(clusterHashes, errs);
-
- if (errs.isEmpty() && !verifyResult.hasConflicts())
- return;
-
- GridStringBuilder buf = new GridStringBuilder();
-
- verifyResult.print(buf::a, true);
-
- throw new IgniteCheckedException(buf.toString());
- }
+ /** */
+ public static PartitionHashRecordV2 calculateDumpPartitionHash(
+ Dump.DumpedPartitionIterator iter,
+ String grpName,
+ int part,
+ Object consistentId
+ ) throws IgniteCheckedException {
+ long size = 0;
- /**
- * Provides flag of full hash calculation.
- *
- * @return {@code True} if full partition hash calculation is required.
{@code False} otherwise.
- */
- protected boolean skipHash() {
- return false;
- }
+ IdleVerifyUtility.VerifyPartitionContext ctx = new
IdleVerifyUtility.VerifyPartitionContext();
- /** */
- protected boolean isPunchHoleEnabled(SnapshotHandlerContext opCtx,
Set<Integer> grpIds) {
- SnapshotMetadata meta = opCtx.metadata();
- Path snapshotDir = opCtx.snapshotDirectory().toPath();
+ while (iter.hasNext()) {
+ DumpEntry e = iter.next();
- if (meta.hasCompressedGroups() &&
grpIds.stream().anyMatch(meta::isGroupWithCompression)) {
- try {
-
cctx.kernalContext().compress().checkPageCompressionSupported(snapshotDir,
meta.pageSize());
+ ctx.update((KeyCacheObject)e.key(), (CacheObject)e.value(),
e.version());
- return true;
- }
- catch (Exception e) {
- log.info("File system doesn't support page compression on
snapshot directory: " + snapshotDir
- + ", snapshot may have larger size than expected.");
- }
+ size++;
}
- return false;
+ return new PartitionHashRecordV2(
+ new PartitionKeyV2(CU.cacheId(grpName), part, grpName),
+ false,
+ consistentId,
+ null,
+ size,
+ PartitionHashRecordV2.PartitionState.OWNING,
+ ctx
+ );
}
/**
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadataVerificationTask.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadataVerificationTask.java
index fc6d4e431c3..90d55f7d646 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadataVerificationTask.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadataVerificationTask.java
@@ -20,14 +20,9 @@ package
org.apache.ignite.internal.processors.cache.persistence.snapshot;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.stream.Collectors;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
@@ -43,11 +38,11 @@ import
org.apache.ignite.internal.processors.cache.persistence.wal.FileDescripto
import
org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
import
org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory;
import org.apache.ignite.internal.processors.task.GridInternal;
-import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.resources.LoggerResource;
import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
import static
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.incrementalSnapshotWalsDir;
@@ -105,10 +100,11 @@ public class SnapshotMetadataVerificationTask
@Override public List<SnapshotMetadata> execute() {
IgniteSnapshotManager snpMgr =
ignite.context().cache().context().snapshotMgr();
- List<SnapshotMetadata> snpMeta =
snpMgr.readSnapshotMetadatas(arg.snapshotName(), arg.snapshotPath());
-
- for (SnapshotMetadata meta : snpMeta)
- checkMeta(meta);
+ List<SnapshotMetadata> snpMeta = snpMgr.checker().checkLocalMetas(
+ snpMgr.snapshotLocalDir(arg.snapshotName(),
arg.snapshotPath()),
+ arg.grpIds(),
+ ignite.localNode().consistentId()
+ );
if (arg.incrementIndex() > 0) {
List<SnapshotMetadata> metas = snpMeta.stream()
@@ -128,49 +124,6 @@ public class SnapshotMetadataVerificationTask
return snpMeta;
}
- /** */
- private void checkMeta(SnapshotMetadata meta) {
- byte[] snpMasterKeyDigest = meta.masterKeyDigest();
- byte[] masterKeyDigest =
ignite.context().config().getEncryptionSpi().masterKeyDigest();
-
- if (masterKeyDigest == null && snpMasterKeyDigest != null) {
- throw new IllegalStateException("Snapshot '" +
meta.snapshotName() + "' has encrypted caches " +
- "while encryption is disabled. To restore this snapshot,
start Ignite with configured " +
- "encryption and the same master key.");
- }
-
- if (snpMasterKeyDigest != null &&
!Arrays.equals(snpMasterKeyDigest, masterKeyDigest)) {
- throw new IllegalStateException("Snapshot '" +
meta.snapshotName() + "' has different master " +
- "key digest. To restore this snapshot, start Ignite with
the same master key.");
- }
-
- Collection<Integer> grpIds = new HashSet<>(F.isEmpty(arg.grpIds())
? meta.cacheGroupIds() : arg.grpIds());
-
- if (meta.hasCompressedGroups() &&
grpIds.stream().anyMatch(meta::isGroupWithCompression)) {
- try {
-
ignite.context().compress().checkPageCompressionSupported();
- }
- catch (NullPointerException | IgniteCheckedException e) {
- String grpWithCompr =
grpIds.stream().filter(meta::isGroupWithCompression)
- .map(String::valueOf).collect(Collectors.joining(",
"));
-
- String msg = "Requested cache groups [" + grpWithCompr +
"] for check " +
- "from snapshot '" + meta.snapshotName() + "' are
compressed while " +
- "disk page compression is disabled. To check these
groups please " +
- "start Ignite with ignite-compress module in
classpath";
-
- throw new IllegalStateException(msg);
- }
- }
-
- grpIds.removeAll(meta.partitions().keySet());
-
- if (!grpIds.isEmpty() && !new
HashSet<>(meta.cacheGroupIds()).containsAll(grpIds)) {
- throw new IllegalArgumentException("Cache group(s) was not
found in the snapshot [groups=" + grpIds +
- ", snapshot=" + arg.snapshotName() + ']');
- }
- }
-
/** Checks that all incremental snapshots are present, contain correct
metafile and WAL segments. */
public void checkIncrementalSnapshots(SnapshotMetadata fullMeta,
SnapshotMetadataVerificationTaskArg arg) {
try {
@@ -249,13 +202,11 @@ public class SnapshotMetadataVerificationTask
}
/** {@inheritDoc} */
- @Override public SnapshotMetadataVerificationTaskResult
reduce(List<ComputeJobResult> results) throws IgniteException {
+ @Override public @Nullable SnapshotMetadataVerificationTaskResult reduce(
+ List<ComputeJobResult> results) throws IgniteException {
Map<ClusterNode, List<SnapshotMetadata>> reduceRes = new HashMap<>();
Map<ClusterNode, Exception> exs = new HashMap<>();
- SnapshotMetadata first = null;
- Set<String> baselineMetasLeft = Collections.emptySet();
-
for (ComputeJobResult res : results) {
if (res.getException() != null) {
exs.put(res.getNode(), res.getException());
@@ -263,45 +214,10 @@ public class SnapshotMetadataVerificationTask
continue;
}
- List<SnapshotMetadata> metas = res.getData();
-
- for (SnapshotMetadata meta : metas) {
- if (first == null) {
- first = meta;
-
- baselineMetasLeft = new HashSet<>(meta.baselineNodes());
- }
-
- baselineMetasLeft.remove(meta.consistentId());
-
- if (!first.sameSnapshot(meta)) {
- exs.put(res.getNode(),
- new IgniteException("An error occurred during
comparing snapshot metadata from cluster nodes " +
- "[first=" + first + ", meta=" + meta + ", nodeId="
+ res.getNode().id() + ']'));
-
- continue;
- }
-
- reduceRes.computeIfAbsent(res.getNode(), n -> new
ArrayList<>()).add(meta);
- }
- }
-
- if (first == null && exs.isEmpty()) {
- assert !results.isEmpty();
-
- for (ComputeJobResult res : results) {
- Exception e = new IllegalArgumentException("Snapshot does not
exists [snapshot=" + arg.snapshotName()
- + (arg.snapshotPath() != null ? ", baseDir=" +
arg.snapshotPath() : "") + ", consistentId="
- + res.getNode().consistentId() + ']');
-
- exs.put(res.getNode(), e);
- }
+ reduceRes.computeIfAbsent(res.getNode(), n -> new
ArrayList<>()).addAll(res.getData());
}
- if (!F.isEmpty(baselineMetasLeft) && F.isEmpty(exs)) {
- exs.put(ignite.localNode(), new IgniteException("No snapshot
metadatas found for the baseline nodes " +
- "with consistent ids: " + String.join(", ",
baselineMetasLeft)));
- }
+ exs = SnapshotChecker.checkClusterMetas(arg.snapshotName(),
arg.snapshotPath(), reduceRes, exs);
return new SnapshotMetadataVerificationTaskResult(reduceRes, exs);
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyHandler.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyHandler.java
index 8f569b66f09..8a1f328378a 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyHandler.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyHandler.java
@@ -17,12 +17,6 @@
package org.apache.ignite.internal.processors.cache.persistence.snapshot;
-import java.io.File;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-import java.nio.file.DirectoryStream;
-import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
@@ -32,63 +26,18 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.dump.DumpEntry;
-import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.management.cache.IdleVerifyResultV2;
import org.apache.ignite.internal.management.cache.PartitionKeyV2;
-import
org.apache.ignite.internal.managers.encryption.EncryptionCacheKeyProvider;
-import org.apache.ignite.internal.managers.encryption.GroupKey;
-import org.apache.ignite.internal.managers.encryption.GroupKeyEncrypted;
-import org.apache.ignite.internal.pagemem.store.PageStore;
-import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
-import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.processors.cache.StoredCacheData;
-import
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
-import
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStore;
-import
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
import
org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage;
-import
org.apache.ignite.internal.processors.cache.persistence.snapshot.dump.Dump;
-import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
-import
org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusMetaIO;
-import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
-import
org.apache.ignite.internal.processors.cache.persistence.tree.io.PagePartitionMetaIO;
-import
org.apache.ignite.internal.processors.cache.verify.IdleVerifyUtility.VerifyPartitionContext;
import
org.apache.ignite.internal.processors.cache.verify.PartitionHashRecordV2;
-import org.apache.ignite.internal.processors.compress.CompressionProcessor;
import org.apache.ignite.internal.util.GridStringBuilder;
-import org.apache.ignite.internal.util.GridUnsafe;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.spi.encryption.EncryptionSpi;
-import org.jetbrains.annotations.Nullable;
-
-import static org.apache.ignite.internal.pagemem.PageIdAllocator.FLAG_DATA;
-import static org.apache.ignite.internal.pagemem.PageIdAllocator.FLAG_IDX;
-import static
org.apache.ignite.internal.pagemem.PageIdAllocator.INDEX_PARTITION;
-import static
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING;
-import static
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.fromOrdinal;
-import static
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.CACHE_DATA_FILENAME;
-import static
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.FILE_SUFFIX;
-import static
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.ZIP_SUFFIX;
-import static
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.cacheDirectories;
-import static
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.cacheGroupName;
-import static
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.cachePartitionFiles;
-import static
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.partId;
-import static
org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId.getTypeByPartId;
-import static
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.databaseRelativePath;
-import static
org.apache.ignite.internal.processors.cache.persistence.snapshot.dump.CreateDumpFutureTask.DUMP_FILE_EXT;
-import static
org.apache.ignite.internal.processors.cache.persistence.wal.reader.StandaloneGridKernalContext.closeAllComponents;
-import static
org.apache.ignite.internal.processors.cache.persistence.wal.reader.StandaloneGridKernalContext.startAllComponents;
-import static
org.apache.ignite.internal.processors.cache.verify.IdleVerifyUtility.calculatePartitionHash;
-import static
org.apache.ignite.internal.processors.cache.verify.IdleVerifyUtility.checkPartitionsPageCrcSum;
/**
* Default snapshot restore handler for checking snapshot partitions
consistency.
@@ -132,304 +81,18 @@ public class SnapshotPartitionsVerifyHandler implements
SnapshotHandler<Map<Part
).collect(Collectors.toSet());
}
- Set<File> partFiles = new HashSet<>();
-
- Map<Integer, File> grpDirs = new HashMap<>();
-
- for (File dir : cacheDirectories(new File(opCtx.snapshotDirectory(),
databaseRelativePath(meta.folderName())), name -> true)) {
- int grpId = CU.cacheId(cacheGroupName(dir));
-
- if (!grps.remove(grpId))
- continue;
-
- Set<Integer> parts = meta.partitions().get(grpId) == null ?
Collections.emptySet() :
- new HashSet<>(meta.partitions().get(grpId));
-
- for (File part : cachePartitionFiles(dir,
- (meta.dump() ? DUMP_FILE_EXT : FILE_SUFFIX) +
(meta.compressPartitions() ? ZIP_SUFFIX : "")
- )) {
- int partId = partId(part.getName());
-
- if (!parts.remove(partId))
- continue;
-
- partFiles.add(part);
- }
-
- if (!parts.isEmpty()) {
- throw new IgniteException("Snapshot data doesn't contain
required cache group partition " +
- "[grpId=" + grpId + ", snpName=" + meta.snapshotName() +
", consId=" + meta.consistentId() +
- ", missed=" + parts + ", meta=" + meta + ']');
- }
-
- grpDirs.put(grpId, dir);
- }
-
- if (!grps.isEmpty()) {
- throw new IgniteException("Snapshot data doesn't contain required
cache groups " +
- "[grps=" + grps + ", snpName=" + meta.snapshotName() + ",
consId=" + meta.consistentId() +
- ", meta=" + meta + ']');
- }
-
if (!opCtx.check()) {
log.info("Snapshot data integrity check skipped [snpName=" +
meta.snapshotName() + ']');
return Collections.emptyMap();
}
- return meta.dump()
- ? checkDumpFiles(opCtx, partFiles)
- : checkSnapshotFiles(opCtx, grpDirs, meta, partFiles,
isPunchHoleEnabled(opCtx, grpDirs.keySet()));
- }
-
- /** */
- private Map<PartitionKeyV2, PartitionHashRecordV2> checkSnapshotFiles(
- SnapshotHandlerContext opCtx,
- Map<Integer, File> grpDirs,
- SnapshotMetadata meta,
- Set<File> partFiles,
- boolean punchHoleEnabled
- ) throws IgniteCheckedException {
- Map<PartitionKeyV2, PartitionHashRecordV2> res = new
ConcurrentHashMap<>();
- ThreadLocal<ByteBuffer> buff = ThreadLocal.withInitial(() ->
ByteBuffer.allocateDirect(meta.pageSize())
- .order(ByteOrder.nativeOrder()));
-
- IgniteSnapshotManager snpMgr = cctx.snapshotMgr();
-
- GridKernalContext snpCtx =
snpMgr.createStandaloneKernalContext(cctx.kernalContext().compress(),
- opCtx.snapshotDirectory(), meta.folderName());
-
- FilePageStoreManager storeMgr = (FilePageStoreManager)cctx.pageStore();
-
- EncryptionCacheKeyProvider snpEncrKeyProvider = new
SnapshotEncryptionKeyProvider(cctx.kernalContext(), grpDirs);
-
- startAllComponents(snpCtx);
-
- try {
- U.doInParallel(
- snpMgr.snapshotExecutorService(),
- partFiles,
- part -> {
- String grpName = cacheGroupName(part.getParentFile());
- int grpId = CU.cacheId(grpName);
- int partId = partId(part.getName());
-
- try (FilePageStore pageStore =
-
(FilePageStore)storeMgr.getPageStoreFactory(grpId,
snpEncrKeyProvider.getActiveKey(grpId) != null ?
- snpEncrKeyProvider :
null).createPageStore(getTypeByPartId(partId), part::toPath, val -> {})
- ) {
- pageStore.init();
-
- if (punchHoleEnabled &&
meta.isGroupWithCompression(grpId) && type() == SnapshotHandlerType.CREATE) {
- byte pageType = partId == INDEX_PARTITION ?
FLAG_IDX : FLAG_DATA;
-
- checkPartitionsPageCrcSum(() -> pageStore, partId,
pageType, (id, buffer) -> {
- if (PageIO.getCompressionType(buffer) ==
CompressionProcessor.UNCOMPRESSED_PAGE)
- return;
-
- int comprPageSz =
PageIO.getCompressedSize(buffer);
-
- if (comprPageSz < pageStore.getPageSize()) {
- try {
- pageStore.punchHole(id, comprPageSz);
- }
- catch (Exception ignored) {
- // No-op.
- }
- }
- });
- }
-
- if (partId == INDEX_PARTITION) {
- if (!skipHash())
- checkPartitionsPageCrcSum(() -> pageStore,
INDEX_PARTITION, FLAG_IDX);
-
- return null;
- }
-
- if (grpId == MetaStorage.METASTORAGE_CACHE_ID) {
- if (!skipHash())
- checkPartitionsPageCrcSum(() -> pageStore,
partId, FLAG_DATA);
-
- return null;
- }
-
- ByteBuffer pageBuff = buff.get();
- pageBuff.clear();
- pageStore.read(0, pageBuff, true);
-
- long pageAddr = GridUnsafe.bufferAddress(pageBuff);
-
- if (PageIO.getCompressionType(pageBuff) !=
CompressionProcessor.UNCOMPRESSED_PAGE)
- snpCtx.compress().decompressPage(pageBuff,
pageStore.getPageSize());
-
- PagePartitionMetaIO io = PageIO.getPageIO(pageBuff);
- GridDhtPartitionState partState =
fromOrdinal(io.getPartitionState(pageAddr));
-
- if (partState != OWNING) {
- throw new IgniteCheckedException("Snapshot
partitions must be in the OWNING " +
- "state only: " + partState);
- }
-
- long updateCntr = io.getUpdateCounter(pageAddr);
- long size = io.getSize(pageAddr);
-
- if (log.isDebugEnabled()) {
- log.debug("Partition [grpId=" + grpId
- + ", id=" + partId
- + ", counter=" + updateCntr
- + ", size=" + size + "]");
- }
-
- // Snapshot partitions must always be in OWNING state.
- // There is no `primary` partitions for snapshot.
- PartitionKeyV2 key = new PartitionKeyV2(grpId, partId,
grpName);
-
- PartitionHashRecordV2 hash =
calculatePartitionHash(key,
- updateCntr,
- meta.consistentId(),
- GridDhtPartitionState.OWNING,
- false,
- size,
- skipHash() ? F.emptyIterator()
- : snpMgr.partitionRowIterator(snpCtx, grpName,
partId, pageStore));
-
- assert hash != null : "OWNING must have hash: " + key;
-
- // We should skip size comparison if there are entries
to expire exist.
- if (hasExpiringEntries(snpCtx, pageStore, pageBuff,
io.getPendingTreeRoot(pageAddr)))
- hash.hasExpiringEntries(true);
-
- res.put(key, hash);
- }
- catch (IOException e) {
- throw new IgniteCheckedException(e);
- }
-
- return null;
- }
- );
- }
- catch (Throwable t) {
- log.error("Error executing handler: ", t);
-
- throw t;
- }
- finally {
- closeAllComponents(snpCtx);
- }
+ if (meta.dump())
+ return
cctx.snapshotMgr().checker().checkDumpFiles(opCtx.snapshotDirectory(), meta,
grps, cctx.localNode().consistentId(),
+ skipHash());
- return res;
- }
-
- /** */
- private boolean hasExpiringEntries(
- GridKernalContext ctx,
- PageStore pageStore,
- ByteBuffer pageBuff,
- long pendingTreeMetaId
- ) throws IgniteCheckedException {
- if (pendingTreeMetaId == 0)
- return false;
-
- long pageAddr = GridUnsafe.bufferAddress(pageBuff);
-
- pageBuff.clear();
- pageStore.read(pendingTreeMetaId, pageBuff, true);
-
- if (PageIO.getCompressionType(pageBuff) !=
CompressionProcessor.UNCOMPRESSED_PAGE)
- ctx.compress().decompressPage(pageBuff, pageStore.getPageSize());
-
- BPlusMetaIO treeIO = BPlusMetaIO.VERSIONS.forPage(pageAddr);
-
- int rootLvl = treeIO.getRootLevel(pageAddr);
- long rootId = treeIO.getFirstPageId(pageAddr, rootLvl);
-
- pageBuff.clear();
- pageStore.read(rootId, pageBuff, true);
-
- if (PageIO.getCompressionType(pageBuff) !=
CompressionProcessor.UNCOMPRESSED_PAGE)
- ctx.compress().decompressPage(pageBuff, pageStore.getPageSize());
-
- BPlusIO<?> rootIO = PageIO.getPageIO(pageBuff);
-
- return rootIO.getCount(pageAddr) != 0;
- }
-
- /** */
- private Map<PartitionKeyV2, PartitionHashRecordV2> checkDumpFiles(
- SnapshotHandlerContext opCtx,
- Set<File> partFiles
- ) {
- try {
- String consistentId =
cctx.kernalContext().pdsFolderResolver().resolveFolders().consistentId().toString();
-
- EncryptionSpi encSpi = opCtx.metadata().encryptionKey() != null ?
cctx.gridConfig().getEncryptionSpi() : null;
-
- try (Dump dump = new Dump(opCtx.snapshotDirectory(), consistentId,
true, true, encSpi, log)) {
- Collection<PartitionHashRecordV2> partitionHashRecordV2s =
U.doInParallel(
- cctx.snapshotMgr().snapshotExecutorService(),
- partFiles,
- part -> calculateDumpedPartitionHash(dump,
cacheGroupName(part.getParentFile()), partId(part.getName()))
- );
-
- return
partitionHashRecordV2s.stream().collect(Collectors.toMap(PartitionHashRecordV2::partitionKey,
r -> r));
- }
- catch (Throwable t) {
- log.error("Error executing handler: ", t);
-
- throw new IgniteException(t);
- }
- }
- catch (IgniteCheckedException e) {
- throw new IgniteException(e);
- }
- }
-
- /** */
- private PartitionHashRecordV2 calculateDumpedPartitionHash(Dump dump,
String grpName, int part) {
- if (skipHash()) {
- return new PartitionHashRecordV2(
- new PartitionKeyV2(CU.cacheId(grpName), part, grpName),
- false,
- cctx.localNode().consistentId(),
- null,
- 0,
- PartitionHashRecordV2.PartitionState.OWNING,
- new VerifyPartitionContext()
- );
- }
-
- try {
- String node =
cctx.kernalContext().pdsFolderResolver().resolveFolders().folderName();
-
- try (Dump.DumpedPartitionIterator iter = dump.iterator(node,
CU.cacheId(grpName), part)) {
- long size = 0;
-
- VerifyPartitionContext ctx = new VerifyPartitionContext();
-
- while (iter.hasNext()) {
- DumpEntry e = iter.next();
-
- ctx.update((KeyCacheObject)e.key(),
(CacheObject)e.value(), e.version());
-
- size++;
- }
-
- return new PartitionHashRecordV2(
- new PartitionKeyV2(CU.cacheId(grpName), part, grpName),
- false,
- cctx.localNode().consistentId(),
- null,
- size,
- PartitionHashRecordV2.PartitionState.OWNING,
- ctx
- );
- }
- }
- catch (Exception e) {
- throw new IgniteException(e);
- }
+ return
cctx.snapshotMgr().checker().checkSnapshotFiles(opCtx.snapshotDirectory(),
grps, meta,
+ type() == SnapshotHandlerType.CREATE, skipHash(),
isPunchHoleEnabled(opCtx, grps));
}
/** {@inheritDoc} */
@@ -489,71 +152,4 @@ public class SnapshotPartitionsVerifyHandler implements
SnapshotHandler<Map<Part
return false;
}
-
- /**
- * Provides encryption keys stored within snapshot.
- * <p>
- * To restore an encrypted snapshot, we have to read the keys it was
encrypted with. The better place for the is
- * Metastore. But it is currently unreadable as simple structure. Once it
is done, we should move snapshot
- * encryption keys there.
- */
- private static class SnapshotEncryptionKeyProvider implements
EncryptionCacheKeyProvider {
- /** Kernal context */
- private final GridKernalContext ctx;
-
- /** Data dirs of snapshot's caches by group id. */
- private final Map<Integer, File> grpDirs;
-
- /** Encryption keys loaded from snapshot. */
- private final ConcurrentHashMap<Integer, GroupKey> decryptedKeys = new
ConcurrentHashMap<>();
-
- /**
- * Constructor.
- *
- * @param ctx Kernal context.
- * @param grpDirs Data dirictories of cache groups by id.
- */
- private SnapshotEncryptionKeyProvider(GridKernalContext ctx,
Map<Integer, File> grpDirs) {
- this.ctx = ctx;
- this.grpDirs = grpDirs;
- }
-
- /** {@inheritDoc} */
- @Override public @Nullable GroupKey getActiveKey(int grpId) {
- return decryptedKeys.computeIfAbsent(grpId, id -> {
- GroupKey grpKey = null;
-
- try (DirectoryStream<Path> ds =
Files.newDirectoryStream(grpDirs.get(grpId).toPath(),
- p -> Files.isRegularFile(p) &&
p.toString().endsWith(CACHE_DATA_FILENAME))) {
- for (Path p : ds) {
- StoredCacheData cacheData =
ctx.cache().configManager().readCacheData(p.toFile());
-
- GroupKeyEncrypted grpKeyEncrypted =
cacheData.groupKeyEncrypted();
-
- if (grpKeyEncrypted == null)
- return null;
-
- if (grpKey == null)
- grpKey = new GroupKey(grpKeyEncrypted.id(),
ctx.config().getEncryptionSpi().decryptKey(grpKeyEncrypted.key()));
- else {
- assert grpKey.equals(new
GroupKey(grpKeyEncrypted.id(),
-
ctx.config().getEncryptionSpi().decryptKey(grpKeyEncrypted.key())));
- }
- }
-
- return grpKey;
- }
- catch (Exception e) {
- throw new IgniteException("Unable to extract ciphered
encryption key of cache group " + id + '.', e);
- }
- });
- }
-
- /** {@inheritDoc} */
- @Override public @Nullable GroupKey groupKey(int grpId, int keyId) {
- GroupKey key = getActiveKey(grpId);
-
- return key != null && key.id() == keyId ? key : null;
- }
- }
}