This is an automated email from the ASF dual-hosted git repository.
nizhikov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 6d7ab21ff45 IGNITE-24481 Incremental snapshot migration to
SnapshotFileTree (#11875)
6d7ab21ff45 is described below
commit 6d7ab21ff45f1c61f17b9103b47e67048fd7e03f
Author: Nikolay <[email protected]>
AuthorDate: Tue Feb 18 12:19:43 2025 +0300
IGNITE-24481 Incremental snapshot migration to SnapshotFileTree (#11875)
---
.../persistence/file/FilePageStoreManager.java | 10 --
.../cache/persistence/filename/NodeFileTree.java | 55 ++++++--
.../persistence/filename/SnapshotFileTree.java | 38 ++++--
.../snapshot/AbstractSnapshotVerificationTask.java | 32 ++++-
.../snapshot/IgniteSnapshotManager.java | 142 +++++----------------
.../snapshot/IncrementalSnapshotFutureTask.java | 48 +++----
.../snapshot/IncrementalSnapshotProcessor.java | 58 ++++-----
.../IncrementalSnapshotVerificationTask.java | 17 ++-
.../snapshot/SnapshotHandlerRestoreTask.java | 21 +--
.../snapshot/SnapshotMetadataVerificationTask.java | 38 +++---
.../snapshot/SnapshotOperationRequest.java | 19 +++
.../snapshot/SnapshotPartitionsVerifyTask.java | 23 ++--
.../snapshot/SnapshotRestoreProcess.java | 77 ++++-------
.../snapshot/AbstractSnapshotSelfTest.java | 2 +-
.../IgniteClusterSnapshotRestoreSelfTest.java | 6 +-
.../snapshot/IgniteSnapshotRemoteRequestTest.java | 6 +-
.../IncrementalSnapshotCheckBeforeRestoreTest.java | 4 +-
.../IncrementalSnapshotRestoreTest.java | 8 +-
.../testframework/junits/GridAbstractTest.java | 2 +-
19 files changed, 293 insertions(+), 313 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
index d58aa066e7e..cd40750d266 100755
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
@@ -78,7 +78,6 @@ import org.apache.ignite.lang.IgniteOutClosure;
import org.apache.ignite.maintenance.MaintenanceRegistry;
import org.apache.ignite.maintenance.MaintenanceTask;
import org.apache.ignite.thread.IgniteThread;
-import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import static java.nio.file.Files.delete;
@@ -91,7 +90,6 @@ import static
org.apache.ignite.internal.processors.cache.persistence.filename.N
import static
org.apache.ignite.internal.processors.cache.persistence.filename.NodeFileTree.FILE_SUFFIX;
import static
org.apache.ignite.internal.processors.cache.persistence.filename.NodeFileTree.INDEX_FILE_NAME;
import static
org.apache.ignite.internal.processors.cache.persistence.filename.NodeFileTree.TMP_SUFFIX;
-import static
org.apache.ignite.internal.processors.cache.persistence.filename.NodeFileTree.partitionFileName;
import static
org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage.METASTORAGE_DIR_NAME;
/**
@@ -671,14 +669,6 @@ public class FilePageStoreManager extends
GridCacheSharedManagerAdapter implemen
}
}
- /**
- * @param cacheWorkDir Cache work directory.
- * @param partId Partition id.
- */
- @NotNull private Path getPartitionFilePath(File cacheWorkDir, int partId) {
- return new File(cacheWorkDir, partitionFileName(partId)).toPath();
- }
-
/**
* @param cacheWorkDir Cache work directory.
*/
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/filename/NodeFileTree.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/filename/NodeFileTree.java
index 865b9f99643..87f37123bc6 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/filename/NodeFileTree.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/filename/NodeFileTree.java
@@ -188,12 +188,6 @@ public class NodeFileTree extends SharedFileTree {
/** Filter out all cache directories. */
public static final Predicate<File> CACHE_DIR_FILTER = dir ->
cacheDir(dir) || cacheGroupDir(dir);
- /** Prefix for {@link #cacheStorage(String)} directory in case of single
cache. */
- private static final String CACHE_DIR_PREFIX = "cache-";
-
- /** Prefix for {@link #cacheStorage(String)} directory in case of cache
group. */
- private static final String CACHE_GRP_DIR_PREFIX = "cacheGroup-";
-
/** Filter out all cache directories including {@link MetaStorage}. */
public static final Predicate<File> CACHE_DIR_WITH_META_FILTER = dir ->
CACHE_DIR_FILTER.test(dir) ||
@@ -217,6 +211,15 @@ public class NodeFileTree extends SharedFileTree {
/** */
public static final String CACHE_DATA_TMP_FILENAME = CACHE_DATA_FILENAME +
TMP_SUFFIX;
+ /** Temporary cache directory prefix. */
+ private static final String TMP_CACHE_DIR_PREFIX = "_tmp_snp_restore_";
+
+ /** Prefix for {@link #cacheStorage(String)} directory in case of single
cache. */
+ private static final String CACHE_DIR_PREFIX = "cache-";
+
+ /** Prefix for {@link #cacheStorage(String)} directory in case of cache
group. */
+ private static final String CACHE_GRP_DIR_PREFIX = "cacheGroup-";
+
/** Folder name for consistent id. */
private final String folderName;
@@ -520,6 +523,14 @@ public class NodeFileTree extends SharedFileTree {
return new File(nodeStorage, cacheDirName);
}
+ /**
+ * @param cacheDirName Cache directory name.
+ * @return Store directory for given cache.
+ */
+ public File tmpCacheStorage(String cacheDirName) {
+ return new File(nodeStorage, TMP_CACHE_DIR_PREFIX + cacheDirName);
+ }
+
/**
* @param workDir Cache work directory.
* @param cacheDirName Cache directory name.
@@ -589,6 +600,24 @@ public class NodeFileTree extends SharedFileTree {
return f.getName().equals(CACHE_DATA_FILENAME);
}
+ /**
+ * @param f File.
+ * @return {@code True} if file conforms temp cache storage name pattern.
+ */
+ public static boolean tmpCacheStorage(File f) {
+ return f.isDirectory() && f.getName().startsWith(TMP_CACHE_DIR_PREFIX);
+ }
+
+ /**
+ * @param f Temporary cache directory.
+ * @return Cache or group id.
+ */
+ public static String tmpDirCacheName(File f) {
+ assert tmpCacheStorage(f) : f;
+
+ return cacheName(f.getName().substring(TMP_CACHE_DIR_PREFIX.length()));
+ }
+
/**
* @param isSharedGroup {@code True} if cache is sharing the same
`underlying` cache.
* @param cacheOrGroupName Cache name.
@@ -604,12 +633,18 @@ public class NodeFileTree extends SharedFileTree {
}
/**
- * @param dir Directory
+ * @param f Directory
* @return Cache name for directory, if it conforms cache storage pattern.
*/
- public static String cacheName(File dir) {
- String name = dir.getName();
+ public static String cacheName(File f) {
+ return cacheName(f.getName());
+ }
+ /**
+ * @param name File name.
+ * @return Cache name.
+ */
+ private static String cacheName(String name) {
if (name.startsWith(CACHE_GRP_DIR_PREFIX))
return name.substring(CACHE_GRP_DIR_PREFIX.length());
else if (name.startsWith(CACHE_DIR_PREFIX))
@@ -617,7 +652,7 @@ public class NodeFileTree extends SharedFileTree {
else if (name.equals(MetaStorage.METASTORAGE_DIR_NAME))
return METASTORAGE_CACHE_NAME;
else
- throw new IgniteException("Directory doesn't match the cache or
cache group prefix: " + dir);
+ throw new IgniteException("Directory doesn't match the cache or
cache group prefix: " + name);
}
/**
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/filename/SnapshotFileTree.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/filename/SnapshotFileTree.java
index 827d6bad177..3e6a6e011e2 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/filename/SnapshotFileTree.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/filename/SnapshotFileTree.java
@@ -18,7 +18,7 @@
package org.apache.ignite.internal.processors.cache.persistence.filename;
import java.io.File;
-import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.jetbrains.annotations.Nullable;
@@ -66,22 +66,30 @@ public class SnapshotFileTree extends NodeFileTree {
private final NodeFileTree tmpFt;
/**
- * @param loc Local node.
+ * @param ctx Kernal context.
* @param name Snapshot name.
* @param path Optional snapshot path.
*/
- public SnapshotFileTree(IgniteEx loc, String name, @Nullable String path) {
- super(root(loc.context().pdsFolderResolver().fileTree(), name, path),
loc.context().pdsFolderResolver().fileTree().folderName());
+ public SnapshotFileTree(GridKernalContext ctx, String name, @Nullable
String path) {
+ this(ctx, name, path, ctx.pdsFolderResolver().fileTree().folderName(),
ctx.discovery().localNode().consistentId().toString());
+ }
+
+ /**
+ * @param ctx Kernal context.
+ * @param consId Consistent id.
+ * @param name Snapshot name.
+ * @param path Optional snapshot path.
+ */
+ public SnapshotFileTree(GridKernalContext ctx, String name, @Nullable
String path, String folderName, String consId) {
+ super(root(ctx.pdsFolderResolver().fileTree(), name, path),
folderName);
A.notNullOrEmpty(name, "Snapshot name cannot be null or empty.");
A.ensure(U.alphanumericUnderscore(name), "Snapshot name must satisfy
the following name pattern: a-zA-Z0-9_");
- NodeFileTree ft = loc.context().pdsFolderResolver().fileTree();
-
this.name = name;
this.path = path;
- this.consId = loc.localNode().consistentId().toString();
- this.tmpFt = new NodeFileTree(new File(ft.snapshotTempRoot(), name),
folderName());
+ this.consId = consId;
+ this.tmpFt = new NodeFileTree(new File(snapshotTempRoot(), name),
folderName());
}
/** @return Snapshot name. */
@@ -100,8 +108,11 @@ public class SnapshotFileTree extends NodeFileTree {
}
/**
+ * Returns file tree for specific incremental snapshot.
+ * Root will be something like {@code
"work/snapshots/mybackup/increments/0000000000000001"}.
+ *
* @param incIdx Increment index.
- * @return Root directory for incremental snapshot.
+ * @return Incremental snapshot file tree.
*/
public IncrementalSnapshotFileTree incrementalSnapshotFileTree(int incIdx)
{
return new IncrementalSnapshotFileTree(
@@ -151,6 +162,15 @@ public class SnapshotFileTree extends NodeFileTree {
return new File(root, snapshotMetaFileName(consId) + TMP_SUFFIX);
}
+ /**
+ * Note, this consistent id can differ from the local consistent id.
+ * In case snapshot was moved from other node.
+ * @return Consistent id of the snapshot.
+ */
+ public String consistentId() {
+ return consId;
+ }
+
/**
* @param partId Partition id.
* @return File name of delta partition pages.
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotVerificationTask.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotVerificationTask.java
index e8d348fc6da..dcbec54a6ad 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotVerificationTask.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotVerificationTask.java
@@ -32,6 +32,7 @@ import org.apache.ignite.compute.ComputeJobResult;
import org.apache.ignite.compute.ComputeJobResultPolicy;
import org.apache.ignite.compute.ComputeTaskAdapter;
import org.apache.ignite.internal.IgniteEx;
+import
org.apache.ignite.internal.processors.cache.persistence.filename.SnapshotFileTree;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.resources.LoggerResource;
@@ -79,7 +80,7 @@ public abstract class AbstractSnapshotVerificationTask extends
if (meta == null)
continue;
- jobs.put(createJob(meta.snapshotName(), meta.consistentId(),
arg), e.getKey());
+ jobs.put(createJob(meta.snapshotName(), meta.folderName(),
meta.consistentId(), arg), e.getKey());
if (allMetas.isEmpty())
break;
@@ -97,12 +98,18 @@ public abstract class AbstractSnapshotVerificationTask
extends
/**
* @param name Snapshot name.
+ * @param folderName Folder name for snapshot.
* @param consId Consistent id of the related node.
* @param args Check snapshot parameters.
*
* @return Compute job.
*/
- protected abstract AbstractSnapshotVerificationJob createJob(String name,
String consId, SnapshotPartitionsVerifyTaskArg args);
+ protected abstract AbstractSnapshotVerificationJob createJob(
+ String name,
+ String folderName,
+ String consId,
+ SnapshotPartitionsVerifyTaskArg args
+ );
/** */
protected abstract static class AbstractSnapshotVerificationJob extends
ComputeJobAdapter {
@@ -123,7 +130,10 @@ public abstract class AbstractSnapshotVerificationTask
extends
/** Snapshot directory path. */
@Nullable protected final String snpPath;
- /** Consistent id of the related node. */
+ /** Folder name for snapshot. */
+ protected final String folderName;
+
+ /** Consistent id of the snapshot data. */
protected final String consId;
/** Set of cache groups to be checked in the snapshot. {@code Null} or
empty to check everything. */
@@ -132,9 +142,13 @@ public abstract class AbstractSnapshotVerificationTask
extends
/** If {@code true}, calculates and compares partition hashes.
Otherwise, only basic snapshot validation is launched. */
protected final boolean check;
+ /** Snapshot file tree. */
+ protected transient SnapshotFileTree sft;
+
/**
* @param snpName Snapshot name.
* @param snpPath Snapshot directory path.
+ * @param folderName Folder name for snapshot.
* @param consId Consistent id of the related node.
* @param rqGrps Set of cache groups to be checked in the snapshot.
{@code Null} or empty to check everything.
* @param check If {@code true}, calculates and compares partition
hashes. Otherwise, only basic snapshot validation is launched.
@@ -142,15 +156,27 @@ public abstract class AbstractSnapshotVerificationTask
extends
protected AbstractSnapshotVerificationJob(
String snpName,
@Nullable String snpPath,
+ String folderName,
String consId,
@Nullable Collection<String> rqGrps,
boolean check
) {
this.snpName = snpName;
this.snpPath = snpPath;
+ this.folderName = folderName;
this.consId = consId;
this.rqGrps = rqGrps;
this.check = check;
}
+
+ /** {@inheritDoc} */
+ @Override public Object execute() throws IgniteException {
+ sft = new SnapshotFileTree(ignite.context(), snpName, snpPath,
folderName, consId);
+
+ return execute0();
+ }
+
+ /** Exectues actual job. */
+ protected abstract Object execute0();
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
index b24ee54cbb9..5de49f1c6e7 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
@@ -135,6 +135,8 @@ import
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStor
import
org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
import
org.apache.ignite.internal.processors.cache.persistence.filename.NodeFileTree;
import
org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolderSettings;
+import
org.apache.ignite.internal.processors.cache.persistence.filename.SnapshotFileTree;
+import
org.apache.ignite.internal.processors.cache.persistence.filename.SnapshotFileTree.IncrementalSnapshotFileTree;
import
org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage;
import
org.apache.ignite.internal.processors.cache.persistence.metastorage.MetastorageLifecycleListener;
import
org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadOnlyMetastorage;
@@ -225,7 +227,6 @@ import static
org.apache.ignite.internal.processors.cache.persistence.filename.S
import static
org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage.METASTORAGE_CACHE_ID;
import static
org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage.METASTORAGE_CACHE_NAME;
import static
org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId.getTypeByPartId;
-import static
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotRestoreProcess.formatTmpDirName;
import static
org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO.T_DATA;
import static
org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO.getPageIO;
import static
org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO.getType;
@@ -797,54 +798,6 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
return snpPath == null ? new File(ft.snapshotsRoot(), snpName) : new
File(snpPath, snpName);
}
- /**
- * Returns path to specific incremental snapshot.
- * For example, {@code
"work/snapshots/mybackup/increments/0000000000000001"}.
- *
- * @param snpName Snapshot name.
- * @param snpPath Snapshot directory path.
- * @param incIdx Increment index.
- * @return Local snapshot directory where snapshot files are located.
- */
- public File incrementalSnapshotLocalDir(String snpName, @Nullable String
snpPath, int incIdx) {
- return new File(incrementalSnapshotsLocalRootDir(snpName, snpPath),
U.fixedLengthNumberName(incIdx, null));
- }
-
- /**
- * Returns root folder for incremental snapshot.
- * For example, {@code "work/snapshots/mybackup/increments/"}.
- *
- * @param snpName Snapshot name.
- * @param snpPath Snapshot directory path.
- * @return Local snapshot directory where snapshot files are located.
- */
- public File incrementalSnapshotsLocalRootDir(String snpName, @Nullable
String snpPath) {
- return new File(snapshotLocalDir(snpName, snpPath), INC_SNP_DIR);
- }
-
- /**
- * @param incSnpDir Incremental snapshot directory.
- * @param consId Consistent ID.
- * @return WALs directory for specified incremental snapshot.
- */
- public static File incrementalSnapshotWalsDir(File incSnpDir, String
consId) {
- return new NodeFileTree(incSnpDir, U.maskForFileName(consId)).wal();
- }
-
- /**
- * @param snpName Snapshot name.
- * @return Local snapshot directory for snapshot with given name.
- * @throws IgniteCheckedException If directory doesn't exist.
- */
- private File resolveSnapshotDir(String snpName, @Nullable String snpPath)
throws IgniteCheckedException {
- File snpDir = snapshotLocalDir(snpName, snpPath);
-
- if (!snpDir.exists())
- throw new IgniteCheckedException("Snapshot directory doesn't
exists: " + snpDir.getAbsolutePath());
-
- return snpDir;
- }
-
/**
* @param req Request on snapshot creation.
* @return Future which will be completed when a snapshot has been started.
@@ -857,6 +810,8 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
"Another snapshot operation in progress [req=" + req + ",
curr=" + clusterSnpReq + ']'));
}
+ req.snapshotFileTree(new SnapshotFileTree(cctx.kernalContext(),
req.snapshotName(), req.snapshotPath()));
+
clusterSnpReq = req;
if (req.incremental())
@@ -903,12 +858,9 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
SnapshotMetadata meta;
try {
- meta = readSnapshotMetadata(new File(
- snapshotLocalDir(req.snapshotName(), req.snapshotPath()),
-
snapshotMetaFileName(cctx.localNode().consistentId().toString())
- ));
+ meta = readSnapshotMetadata(req.snapshotFileTree().meta());
- checkIncrementalCanBeCreated(req.snapshotName(),
req.snapshotPath(), meta);
+ checkIncrementalCanBeCreated(req.snapshotFileTree(), meta);
}
catch (IgniteCheckedException | IOException e) {
return new GridFinishedFuture<>(e);
@@ -963,7 +915,8 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
SnapshotOperationRequest req,
SnapshotMetadata meta
) {
- File incSnpDir = incrementalSnapshotLocalDir(req.snapshotName(),
req.snapshotPath(), req.incrementIndex());
+ SnapshotFileTree sft = req.snapshotFileTree();
+ IncrementalSnapshotFileTree ift =
sft.incrementalSnapshotFileTree(req.incrementIndex());
WALPointer lowPtr;
if (req.incrementIndex() == 1)
@@ -974,7 +927,7 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
IncrementalSnapshotMetadata prevIncSnpMeta;
try {
- prevIncSnpMeta =
readIncrementalSnapshotMetadata(req.snapshotName(), req.snapshotPath(),
prevIdx);
+ prevIncSnpMeta =
readIncrementalSnapshotMetadata(sft.incrementalSnapshotFileTree(prevIdx).meta());
}
catch (IgniteCheckedException | IOException e) {
return new GridFinishedFuture<>(e);
@@ -988,15 +941,14 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
req.operationalNodeId(),
req.requestId(),
meta,
- req.snapshotPath(),
- req.incrementIndex(),
+ ift,
lowPtr,
markWalFut
)).chain(fut -> {
if (fut.error() != null)
throw F.wrap(fut.error());
- assert incSnpDir.exists() : "Incremental snapshot directory must
exists";
+ assert ift.root().exists() : "Incremental snapshot directory must
exists";
IncrementalSnapshotMetadata incMeta = new
IncrementalSnapshotMetadata(
req.requestId(),
@@ -1008,10 +960,7 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
markWalFut.result()
);
- storeSnapshotMeta(
- incMeta,
- new File(incSnpDir,
snapshotMetaFileName(pdsSettings.folderName()))
- );
+ storeSnapshotMeta(incMeta, ift.meta());
return new SnapshotOperationResponse();
});
@@ -1037,7 +986,7 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
"[snpName=" + req.snapshotName() + ", incIdx=" +
req.incrementIndex());
}
- writeSnapshotDirectoryToMetastorage(incSnpDir);
+ writeSnapshotDirectoryToMetastorage(ift.root());
task.start();
});
@@ -1046,20 +995,11 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
}
/**
- * @param snpName Full snapshot name.
- * @param snpPath Optional path to snapshot, if differs from default.
- * @param incIdx Index of incremental snapshot.
+ * @param meta Meta file.
* @return Read incremental snapshot metadata.
*/
- public IncrementalSnapshotMetadata readIncrementalSnapshotMetadata(
- String snpName,
- @Nullable String snpPath,
- int incIdx
- ) throws IgniteCheckedException, IOException {
- return readFromFile(new File(
- incrementalSnapshotLocalDir(snpName, snpPath, incIdx),
- snapshotMetaFileName(pdsSettings.folderName())
- ));
+ public IncrementalSnapshotMetadata readIncrementalSnapshotMetadata(File
meta) throws IgniteCheckedException, IOException {
+ return readFromFile(meta);
}
/**
@@ -1354,9 +1294,9 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
snpReq.error(req.error());
if (req.incremental())
-
U.delete(incrementalSnapshotLocalDir(req.snapshotName(), req.snapshotPath(),
req.incrementIndex()));
+
U.delete(snpReq.snapshotFileTree().incrementalSnapshotFileTree(req.incrementIndex()).root());
else
- deleteSnapshot(snapshotLocalDir(req.snapshotName(),
req.snapshotPath()), pdsSettings);
+ deleteSnapshot(snpReq.snapshotFileTree().root(),
pdsSettings);
}
else if (!F.isEmpty(req.warnings())) {
// Pass the warnings further to the next stage for the
case when snapshot started from not coordinator.
@@ -1606,7 +1546,7 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
throw new UnsupportedOperationException("Client and daemon nodes
can not perform this operation.");
synchronized (snpOpMux) {
- File[] incDirs = incrementalSnapshotsLocalRootDir(snpName,
snpPath).listFiles(File::isDirectory);
+ File[] incDirs = new SnapshotFileTree(cctx.kernalContext(),
snpName, snpPath).incrementsRoot().listFiles(File::isDirectory);
if (incDirs == null)
return 0;
@@ -2010,7 +1950,7 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
* @return Collection of incremental snapshots metafiles.
*/
public Collection<IncrementalSnapshotMetadata>
readIncrementalSnapshotMetadatas(String snpName) {
- File[] incDirs = incrementalSnapshotsLocalRootDir(snpName, null)
+ File[] incDirs = new SnapshotFileTree(cctx.kernalContext(), snpName,
null).incrementsRoot()
.listFiles((dir, name) ->
INC_SNP_NAME_PATTERN.matcher(name).matches());
if (incDirs == null)
@@ -2538,7 +2478,10 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
int partId,
@Nullable EncryptionCacheKeyProvider encrKeyProvider
) throws IgniteCheckedException {
- File snpDir = resolveSnapshotDir(snpName, null);
+ File snpDir = snapshotLocalDir(snpName, null);
+
+ if (!snpDir.exists())
+ throw new IgniteCheckedException("Snapshot directory doesn't
exists: " + snpDir.getAbsolutePath());
File nodePath = new File(snpDir,
databaseRelativePath(ft.folderName()));
@@ -2978,28 +2921,22 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
/**
* Checks that incremental snapshot can be created for given full snapshot
and current cluster state.
*
- * @param name Full snapshot name.
- * @param snpPath Snapshot path.
+ * @param sft Snapshot file tree.
* @param meta Full snapshot metadata.
*/
private void checkIncrementalCanBeCreated(
- String name,
- @Nullable String snpPath,
+ SnapshotFileTree sft,
SnapshotMetadata meta
) throws IgniteCheckedException, IOException {
- File snpDir = snapshotLocalDir(name, snpPath);
-
IgniteWriteAheadLogManager wal = cctx.wal();
if (wal == null)
throw new IgniteCheckedException("Create incremental snapshot
request has been rejected. WAL must be enabled.");
- NodeFileTree ft = cctx.kernalContext().pdsFolderResolver().fileTree();
-
if (!ft.walArchiveEnabled())
throw new IgniteCheckedException("Create incremental snapshot
request has been rejected. WAL archive must be enabled.");
- ensureHardLinkAvailable(ft.walArchive().toPath(), snpDir.toPath());
+ ensureHardLinkAvailable(ft.walArchive().toPath(), sft.root().toPath());
Set<String> aliveNodesConsIds = cctx.discovery().aliveServerNodes()
.stream()
@@ -3013,7 +2950,8 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
}
}
- File rootSnpCachesDir = new File(snpDir,
databaseRelativePath(meta.folderName()));
+ assert Objects.equals(sft.consistentId(), meta.consistentId()) :
sft.consistentId() + " != " + meta.consistentId();
+ assert Objects.equals(sft.folderName(), meta.folderName()) :
sft.folderName() + " != " + meta.folderName();
for (int grpId : meta.cacheGroupIds()) {
if (grpId == METASTORAGE_CACHE_ID)
@@ -3036,17 +2974,14 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
"Encrypted cache groups not supported [groupId=" + grpId +
']');
}
- List<File> snpCacheDir =
- cacheDirectories(rootSnpCachesDir, grpName ->
gctx.cacheOrGroupName().equals(grpName));
+ File snpCacheDir = sft.cacheStorage(gctx.config());
- if (snpCacheDir.isEmpty()) {
+ if (!snpCacheDir.exists()) {
throw new IgniteCheckedException("Create incremental snapshot
request has been rejected. " +
"Cache group directory not found [groupId=" + grpId + ']');
}
- assert snpCacheDir.size() == 1 : "Single snapshot cache directory
must be found";
-
- for (File snpDataFile :
FilePageStoreManager.cacheDataFiles(snpCacheDir.get(0))) {
+ for (File snpDataFile :
FilePageStoreManager.cacheDataFiles(snpCacheDir)) {
StoredCacheData snpCacheData =
GridLocalConfigManager.readCacheData(
snpDataFile,
cctx.kernalContext().marshallerContext().jdkMarshaller(),
@@ -3055,10 +2990,7 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
byte[] snpCacheDataBytes =
Files.readAllBytes(snpDataFile.toPath());
- File nodeDataFile = new
File(snpDataFile.getAbsolutePath().replace(
- rootSnpCachesDir.getAbsolutePath(),
- pdsSettings.persistentStoreNodePath().getAbsolutePath()
- ));
+ File nodeDataFile =
ft.cacheConfigurationFile(snpCacheData.config());
if (!nodeDataFile.exists()) {
throw new IgniteCheckedException("Create incremental
snapshot request has been rejected. " +
@@ -3866,16 +3798,12 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
try {
task.partsLeft.compareAndSet(-1, partsCnt);
- File cacheDir = ft.cacheStorage(cacheDirName);
+ File tmpCacheDir = ft.tmpCacheStorage(cacheDirName);
- File tmpCacheDir =
U.resolveWorkDirectory(ft.nodeStorage().getAbsolutePath(),
- formatTmpDirName(cacheDir).getName(), false);
+ U.mkdirs(tmpCacheDir);
return Paths.get(tmpCacheDir.getAbsolutePath(),
partitionFileName(partId)).toString();
}
- catch (IgniteCheckedException e) {
- throw new IgniteException(e);
- }
finally {
busyLock.leaveBusy();
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotFutureTask.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotFutureTask.java
index d368957432b..b72632d9bd1 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotFutureTask.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotFutureTask.java
@@ -22,7 +22,6 @@ import java.io.FileFilter;
import java.io.IOException;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.Files;
-import java.nio.file.Path;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
@@ -35,21 +34,17 @@ import
org.apache.ignite.internal.pagemem.wal.record.IncrementalSnapshotFinishRe
import
org.apache.ignite.internal.pagemem.wal.record.delta.ClusterSnapshotRecord;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import
org.apache.ignite.internal.processors.cache.persistence.filename.NodeFileTree;
+import
org.apache.ignite.internal.processors.cache.persistence.filename.SnapshotFileTree.IncrementalSnapshotFileTree;
import
org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
import org.apache.ignite.internal.util.typedef.internal.CU;
-import org.jetbrains.annotations.Nullable;
import static
org.apache.ignite.internal.binary.BinaryUtils.METADATA_FILE_SUFFIX;
-import static
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.incrementalSnapshotWalsDir;
/** */
class IncrementalSnapshotFutureTask extends AbstractSnapshotFutureTask<Void>
implements BiConsumer<String, File> {
- /** Index of incremental snapshot. */
- private final int incIdx;
-
- /** Snapshot path. */
- private final @Nullable String snpPath;
+ /** Incremental file tree. */
+ private final IncrementalSnapshotFileTree ift;
/** Metadata of the full snapshot. */
private final Set<Integer> affectedCacheGrps;
@@ -70,8 +65,7 @@ class IncrementalSnapshotFutureTask extends
AbstractSnapshotFutureTask<Void> imp
UUID srcNodeId,
UUID reqNodeId,
SnapshotMetadata meta,
- @Nullable String snpPath,
- int incIdx,
+ IncrementalSnapshotFileTree ift,
WALPointer lowPtr,
IgniteInternalFuture<WALPointer> highPtrFut
) {
@@ -99,8 +93,7 @@ class IncrementalSnapshotFutureTask extends
AbstractSnapshotFutureTask<Void> imp
null
);
- this.incIdx = incIdx;
- this.snpPath = snpPath;
+ this.ift = ift;
this.affectedCacheGrps = new HashSet<>(meta.cacheGroupIds());
this.lowPtr = lowPtr;
this.highPtrFut = highPtrFut;
@@ -116,7 +109,7 @@ class IncrementalSnapshotFutureTask extends
AbstractSnapshotFutureTask<Void> imp
/** {@inheritDoc} */
@Override public boolean start() {
try {
- File incSnpDir =
cctx.snapshotMgr().incrementalSnapshotLocalDir(snpName, snpPath, incIdx);
+ File incSnpDir = ift.root();
if (!incSnpDir.mkdirs() && !incSnpDir.exists()) {
onDone(new IgniteException("Can't create snapshot directory
[dir=" + incSnpDir.getAbsolutePath() + ']'));
@@ -132,22 +125,19 @@ class IncrementalSnapshotFutureTask extends
AbstractSnapshotFutureTask<Void> imp
}
try {
- String folderName =
cctx.kernalContext().pdsFolderResolver().resolveFolders().folderName();
-
- copyWal(incrementalSnapshotWalsDir(incSnpDir, folderName),
highPtrFut.result());
-
NodeFileTree ft =
cctx.kernalContext().pdsFolderResolver().fileTree();
- NodeFileTree snpFt = new NodeFileTree(incSnpDir,
folderName);
+
+ copyWal(ft, highPtrFut.result());
copyFiles(
ft.marshaller(),
- snpFt.marshaller(),
+ ift.marshaller(),
BinaryUtils::notTmpFile
);
copyFiles(
ft.binaryMeta(),
- snpFt.binaryMeta(),
+ ift.binaryMeta(),
file -> file.getName().endsWith(METADATA_FILE_SUFFIX)
);
@@ -170,15 +160,15 @@ class IncrementalSnapshotFutureTask extends
AbstractSnapshotFutureTask<Void> imp
/**
* Copies WAL segments to the incremental snapshot directory.
*
- * @param incSnpWalDir Incremental snapshot directory.
+ * @param ft Node file tree.
* @param highPtr High WAL pointer to copy.
* @throws IgniteInterruptedCheckedException If failed.
* @throws IOException If failed.
*/
- private void copyWal(File incSnpWalDir, WALPointer highPtr) throws
IgniteInterruptedCheckedException, IOException {
+ private void copyWal(NodeFileTree ft, WALPointer highPtr) throws
IgniteInterruptedCheckedException, IOException {
// First increment must include low segment, because full snapshot
knows nothing about WAL.
// All other begins from the next segment because lowPtr already saved
inside previous increment.
- long lowIdx = lowPtr.index() + (incIdx == 1 ? 0 : 1);
+ long lowIdx = lowPtr.index() + (ift.index() == 1 ? 0 : 1);
long highIdx = highPtr.index();
assert
cctx.gridConfig().getDataStorageConfiguration().isWalCompactionEnabled() : "WAL
Compaction must be enabled";
@@ -192,21 +182,21 @@ class IncrementalSnapshotFutureTask extends
AbstractSnapshotFutureTask<Void> imp
if (log.isInfoEnabled())
log.info("Linking WAL segments into incremental snapshot [lowIdx="
+ lowIdx + ", " + "highIdx=" + highIdx + ']');
- if (!incSnpWalDir.mkdirs() && !incSnpWalDir.exists())
- throw new IgniteException("Failed to create snapshot WAL directory
[idx=" + incSnpWalDir + ']');
+ if (!ift.wal().mkdirs() && !ift.wal().exists())
+ throw new IgniteException("Failed to create snapshot WAL directory
[idx=" + ift.wal() + ']');
for (; lowIdx <= highIdx; lowIdx++) {
- File seg =
cctx.kernalContext().pdsFolderResolver().fileTree().zipWalArchiveSegment(lowIdx);
+ File seg = ft.zipWalArchiveSegment(lowIdx);
if (!seg.exists())
throw new IgniteException("WAL segment not found in archive
[idx=" + lowIdx + ']');
- Path segLink = incSnpWalDir.toPath().resolve(seg.getName());
+ File segLink = ift.walSegment(lowIdx);
if (log.isDebugEnabled())
- log.debug("Creaing segment link [path=" +
segLink.toAbsolutePath() + ']');
+ log.debug("Creaing segment link [path=" +
segLink.getAbsolutePath() + ']');
- Files.createLink(segLink, seg.toPath());
+ Files.createLink(segLink.toPath(), seg.toPath());
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotProcessor.java
index 0cb0f3da1d4..bd30dfe284b 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotProcessor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotProcessor.java
@@ -38,7 +38,8 @@ import org.apache.ignite.internal.pagemem.wal.record.TxRecord;
import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
import
org.apache.ignite.internal.pagemem.wal.record.delta.ClusterSnapshotRecord;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
-import
org.apache.ignite.internal.processors.cache.persistence.filename.NodeFileTree;
+import
org.apache.ignite.internal.processors.cache.persistence.filename.SnapshotFileTree;
+import
org.apache.ignite.internal.processors.cache.persistence.filename.SnapshotFileTree.IncrementalSnapshotFileTree;
import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
import
org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -52,7 +53,6 @@ import static
org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType
import static
org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.INCREMENTAL_SNAPSHOT_FINISH_RECORD;
import static
org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.INCREMENTAL_SNAPSHOT_START_RECORD;
import static
org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.TX_RECORD;
-import static
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.incrementalSnapshotWalsDir;
import static
org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER;
/** Processes incremental snapshot: parse WAL segments and handles records. */
@@ -63,11 +63,8 @@ abstract class IncrementalSnapshotProcessor {
/** Ignite logger. */
private final IgniteLogger log;
- /** Snapshot name. */
- private final String snpName;
-
- /** Snapshot path. */
- private final String snpPath;
+ /** Snapshot file tree. */
+ private final SnapshotFileTree sft;
/** Incremental snapshot index. */
private final int incIdx;
@@ -76,10 +73,9 @@ abstract class IncrementalSnapshotProcessor {
private final Set<Integer> cacheIds;
/** */
- IncrementalSnapshotProcessor(GridCacheSharedContext<?, ?> cctx, String
snpName, String snpPath, int incIdx, Set<Integer> cacheIds) {
+ IncrementalSnapshotProcessor(GridCacheSharedContext<?, ?> cctx,
SnapshotFileTree sft, int incIdx, Set<Integer> cacheIds) {
this.cctx = cctx;
- this.snpName = snpName;
- this.snpPath = snpPath;
+ this.sft = sft;
this.incIdx = incIdx;
this.cacheIds = cacheIds;
@@ -97,20 +93,18 @@ abstract class IncrementalSnapshotProcessor {
@Nullable Consumer<TxRecord> txHnd
) throws IgniteCheckedException, IOException {
IncrementalSnapshotMetadata meta = cctx.snapshotMgr()
- .readIncrementalSnapshotMetadata(snpName, snpPath, incIdx);
+
.readIncrementalSnapshotMetadata(sft.incrementalSnapshotFileTree(incIdx).meta());
- File[] segments = walSegments(meta.folderName());
+ File[] segments = walSegments();
totalWalSegments(segments.length);
UUID incSnpId = meta.requestId();
- NodeFileTree ft = cctx.kernalContext().pdsFolderResolver().fileTree();
-
File lastSeg = Arrays.stream(segments)
.map(File::toPath)
- .max(Comparator.comparingLong(ft::walSegmentIndex))
- .orElseThrow(() -> new IgniteCheckedException("Last WAL segment
wasn't found [snpName=" + snpName + ']'))
+ .max(Comparator.comparingLong(sft::walSegmentIndex))
+ .orElseThrow(() -> new IgniteCheckedException("Last WAL segment
wasn't found [snpName=" + sft.name() + ']'))
.toFile();
IncrementalSnapshotFinishRecord incSnpFinRec =
readFinishRecord(lastSeg, incSnpId);
@@ -145,22 +139,20 @@ abstract class IncrementalSnapshotProcessor {
WALRecord rec = walRec.getValue();
- if (rec.type() == CLUSTER_SNAPSHOT) {
- if
(((ClusterSnapshotRecord)rec).clusterSnapshotName().equals(snpName)) {
- startIdx = walRec.getKey().index();
+ if (rec.type() == CLUSTER_SNAPSHOT &&
((ClusterSnapshotRecord)rec).clusterSnapshotName().equals(sft.name())) {
+ startIdx = walRec.getKey().index();
- break;
- }
+ break;
}
}
if (startIdx < 0) {
throw new IgniteCheckedException("System WAL record for full
snapshot wasn't found " +
- "[snpName=" + snpName + ", walSegFile=" + segments[0] +
']');
+ "[snpName=" + sft.name() + ", walSegFile=" + segments[0] +
']');
}
UUID prevIncSnpId = incIdx > 1
- ? cctx.snapshotMgr().readIncrementalSnapshotMetadata(snpName,
snpPath, incIdx - 1).requestId()
+ ?
cctx.snapshotMgr().readIncrementalSnapshotMetadata(sft.incrementalSnapshotFileTree(incIdx
- 1).meta()).requestId()
: null;
IgnitePredicate<GridCacheVersion> txVerFilter = prevIncSnpId !=
null
@@ -223,24 +215,22 @@ abstract class IncrementalSnapshotProcessor {
}
/** @return WAL segments to restore for specified incremental index since
the base snapshot. */
- private File[] walSegments(String folderName) throws
IgniteCheckedException {
+ private File[] walSegments() throws IgniteCheckedException {
File[] segments = null;
for (int i = 1; i <= incIdx; i++) {
- File incSnpDir =
cctx.snapshotMgr().incrementalSnapshotLocalDir(snpName, snpPath, i);
-
- if (!incSnpDir.exists())
- throw new IgniteCheckedException("Incremental snapshot doesn't
exists [dir=" + incSnpDir + ']');
+ IncrementalSnapshotFileTree ift =
sft.incrementalSnapshotFileTree(i);
- File incSnpWalDir = incrementalSnapshotWalsDir(incSnpDir,
folderName);
+ if (!ift.root().exists())
+ throw new IgniteCheckedException("Incremental snapshot doesn't
exists [dir=" + ift.root() + ']');
- if (!incSnpWalDir.exists())
- throw new IgniteCheckedException("Incremental snapshot WAL
directory doesn't exists [dir=" + incSnpWalDir + ']');
+ if (!ift.wal().exists())
+ throw new IgniteCheckedException("Incremental snapshot WAL
directory doesn't exists [dir=" + ift.wal() + ']');
- File[] incSegs =
incSnpWalDir.listFiles(WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER);
+ File[] incSegs =
ift.wal().listFiles(WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER);
if (incSegs == null)
- throw new IgniteCheckedException("Failed to list WAL segments
from snapshot directory [dir=" + incSnpDir + ']');
+ throw new IgniteCheckedException("Failed to list WAL segments
from snapshot directory [dir=" + ift.root() + ']');
if (segments == null)
segments = incSegs;
@@ -255,7 +245,7 @@ abstract class IncrementalSnapshotProcessor {
if (F.isEmpty(segments)) {
throw new IgniteCheckedException("No WAL segments found for
incremental snapshot " +
- "[snpName=" + snpName + ", snpPath=" + snpPath + ",
incrementIndex=" + incIdx + ']');
+ "[snpName=" + sft.name() + ", snpPath=" + sft.path() + ",
incrementIndex=" + incIdx + ']');
}
return segments;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotVerificationTask.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotVerificationTask.java
index 366a13a3056..2cb7d81f302 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotVerificationTask.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotVerificationTask.java
@@ -132,8 +132,13 @@ public class IncrementalSnapshotVerificationTask extends
AbstractSnapshotVerific
}
/** {@inheritDoc} */
- @Override protected VerifyIncrementalSnapshotJob createJob(String name,
String consId, SnapshotPartitionsVerifyTaskArg args) {
- return new VerifyIncrementalSnapshotJob(name, args.snapshotPath(),
args.incrementIndex(), consId);
+ @Override protected VerifyIncrementalSnapshotJob createJob(
+ String name,
+ String folderName,
+ String consId,
+ SnapshotPartitionsVerifyTaskArg args
+ ) {
+ return new VerifyIncrementalSnapshotJob(name, args.snapshotPath(),
args.incrementIndex(), folderName, consId);
}
/** */
@@ -151,15 +156,17 @@ public class IncrementalSnapshotVerificationTask extends
AbstractSnapshotVerific
* @param snpName Snapshot name.
* @param snpPath Snapshot directory path.
* @param incIdx Incremental snapshot index.
+ * @param folderName Folder name for snapshot.
* @param consId Consistent id of the related node.
*/
public VerifyIncrementalSnapshotJob(
String snpName,
@Nullable String snpPath,
int incIdx,
+ String folderName,
String consId
) {
- super(snpName, snpPath, consId, null, true);
+ super(snpName, snpPath, folderName, consId, null, true);
this.incIdx = incIdx;
}
@@ -167,7 +174,7 @@ public class IncrementalSnapshotVerificationTask extends
AbstractSnapshotVerific
/**
* @return Map containing calculated transactions hash for every
remote node in the cluster.
*/
- @Override public IncrementalSnapshotVerificationTaskResult execute()
throws IgniteException {
+ @Override public IncrementalSnapshotVerificationTaskResult execute0()
throws IgniteException {
try {
if (log.isInfoEnabled()) {
log.info("Verify incremental snapshot procedure has been
initiated " +
@@ -186,7 +193,7 @@ public class IncrementalSnapshotVerificationTask extends
AbstractSnapshotVerific
AtomicLong procSegCnt = new AtomicLong();
IncrementalSnapshotProcessor proc = new
IncrementalSnapshotProcessor(
- ignite.context().cache().context(), snpName, snpPath,
incIdx, txCaches.keySet()
+ ignite.context().cache().context(), sft, incIdx,
txCaches.keySet()
) {
@Override void totalWalSegments(int segCnt) {
// No-op.
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotHandlerRestoreTask.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotHandlerRestoreTask.java
index c2db78f2093..462840f0a67 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotHandlerRestoreTask.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotHandlerRestoreTask.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.processors.cache.persistence.snapshot;
-import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
@@ -39,8 +38,13 @@ public class SnapshotHandlerRestoreTask extends
AbstractSnapshotVerificationTask
private static final long serialVersionUID = 0L;
/** {@inheritDoc} */
- @Override protected SnapshotHandlerRestoreJob createJob(String name,
String consId, SnapshotPartitionsVerifyTaskArg args) {
- return new SnapshotHandlerRestoreJob(name, args.snapshotPath(),
consId, args.cacheGroupNames(), args.check());
+ @Override protected SnapshotHandlerRestoreJob createJob(
+ String name,
+ String folderName,
+ String consId,
+ SnapshotPartitionsVerifyTaskArg args
+ ) {
+ return new SnapshotHandlerRestoreJob(name, args.snapshotPath(),
folderName, consId, args.cacheGroupNames(), args.check());
}
/** {@inheritDoc} */
@@ -90,6 +94,7 @@ public class SnapshotHandlerRestoreTask extends
AbstractSnapshotVerificationTask
/**
* @param snpName Snapshot name.
* @param snpPath Snapshot directory path.
+ * @param folderName Folder name for snapshot.
* @param consId Consistent id of the related node.
* @param grps Cache group names.
* @param check If {@code true} check snapshot before restore.
@@ -97,22 +102,22 @@ public class SnapshotHandlerRestoreTask extends
AbstractSnapshotVerificationTask
public SnapshotHandlerRestoreJob(
String snpName,
@Nullable String snpPath,
+ String folderName,
String consId,
Collection<String> grps,
boolean check
) {
- super(snpName, snpPath, consId, grps, check);
+ super(snpName, snpPath, folderName, consId, grps, check);
}
/** {@inheritDoc} */
- @Override public Map<String, SnapshotHandlerResult<Object>> execute() {
+ @Override public Map<String, SnapshotHandlerResult<Object>> execute0()
{
try {
IgniteSnapshotManager snpMgr =
ignite.context().cache().context().snapshotMgr();
- File snpDir = snpMgr.snapshotLocalDir(snpName, snpPath);
- SnapshotMetadata meta = snpMgr.readSnapshotMetadata(snpDir,
consId);
+ SnapshotMetadata meta =
snpMgr.readSnapshotMetadata(sft.meta());
return snpMgr.handlers().invokeAll(SnapshotHandlerType.RESTORE,
- new SnapshotHandlerContext(meta, rqGrps,
ignite.localNode(), snpDir, false, check));
+ new SnapshotHandlerContext(meta, rqGrps,
ignite.localNode(), sft.root(), false, check));
}
catch (IgniteCheckedException | IOException e) {
throw new IgniteException(e);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadataVerificationTask.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadataVerificationTask.java
index 370e922c7a4..a021b66588b 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadataVerificationTask.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadataVerificationTask.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.processors.cache.persistence.snapshot;
-import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -40,8 +39,9 @@ import org.apache.ignite.compute.ComputeJobResultPolicy;
import org.apache.ignite.compute.ComputeTaskAdapter;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import
org.apache.ignite.internal.processors.cache.persistence.filename.SnapshotFileTree;
+import
org.apache.ignite.internal.processors.cache.persistence.filename.SnapshotFileTree.IncrementalSnapshotFileTree;
import
org.apache.ignite.internal.processors.cache.persistence.wal.FileDescriptor;
-import
org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
import
org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory;
import org.apache.ignite.internal.processors.task.GridInternal;
import org.apache.ignite.internal.util.typedef.F;
@@ -51,8 +51,7 @@ import org.apache.ignite.resources.LoggerResource;
import org.jetbrains.annotations.NotNull;
import static java.lang.String.valueOf;
-import static
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.incrementalSnapshotWalsDir;
-import static
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.snapshotMetaFileName;
+import static
org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.WAL_SEGMENT_FILE_COMPACTED_FILTER;
/** Snapshot task to verify snapshot metadata on the baseline nodes for given
snapshot name. */
@GridInternal
@@ -108,7 +107,9 @@ public class SnapshotMetadataVerificationTask
@Override public List<SnapshotMetadata> execute() {
IgniteSnapshotManager snpMgr =
ignite.context().cache().context().snapshotMgr();
- List<SnapshotMetadata> snpMeta =
snpMgr.readSnapshotMetadatas(arg.snapshotName(), arg.snapshotPath());
+ SnapshotFileTree sft = new SnapshotFileTree(ignite.context(),
arg.snapshotName(), arg.snapshotPath());
+
+ List<SnapshotMetadata> snpMeta =
snpMgr.readSnapshotMetadatas(sft.name(), sft.path());
for (SnapshotMetadata meta : snpMeta)
checkMeta(meta);
@@ -125,7 +126,7 @@ public class SnapshotMetadataVerificationTask
"per node because they don't support restoring on a
different topology.");
}
- checkIncrementalSnapshots(metas.get(0), arg);
+ checkIncrementalSnapshots(metas.get(0), sft,
arg.incrementIndex());
}
return snpMeta;
@@ -175,7 +176,7 @@ public class SnapshotMetadataVerificationTask
}
/** Checks that all incremental snapshots are present, contain correct
metafile and WAL segments. */
- public void checkIncrementalSnapshots(SnapshotMetadata fullMeta,
SnapshotMetadataVerificationTaskArg arg) {
+ public void checkIncrementalSnapshots(SnapshotMetadata fullMeta,
SnapshotFileTree sft, int incIdx) {
try {
GridCacheSharedContext<Object, Object> ctx =
ignite.context().cache().context();
@@ -184,21 +185,15 @@ public class SnapshotMetadataVerificationTask
// Incremental snapshot must contain ClusterSnapshotRecord.
long startSeg = fullMeta.snapshotRecordPointer().index();
- for (int inc = 1; inc <= arg.incrementIndex(); inc++) {
- File incSnpDir =
snpMgr.incrementalSnapshotLocalDir(arg.snapshotName(), arg.snapshotPath(), inc);
+ for (int inc = 1; inc <= incIdx; inc++) {
+ IncrementalSnapshotFileTree ift =
sft.incrementalSnapshotFileTree(inc);
- if (!incSnpDir.exists()) {
+ if (!ift.root().exists()) {
throw new IllegalArgumentException("No incremental
snapshot found " +
"[snpName=" + arg.snapshotName() + ", snpPath=" +
arg.snapshotPath() + ", incrementIndex=" + inc + ']');
}
- String folderName =
ctx.kernalContext().pdsFolderResolver().resolveFolders().folderName();
-
- String metaFileName = snapshotMetaFileName(folderName);
-
- File metafile =
incSnpDir.toPath().resolve(metaFileName).toFile();
-
- IncrementalSnapshotMetadata incMeta =
snpMgr.readFromFile(metafile);
+ IncrementalSnapshotMetadata incMeta =
snpMgr.readIncrementalSnapshotMetadata(ift.meta());
if (!incMeta.matchBaseSnapshot(fullMeta)) {
throw new IllegalArgumentException("Incremental
snapshot doesn't match full snapshot " +
@@ -210,7 +205,7 @@ public class SnapshotMetadataVerificationTask
"Incremental snapshot meta has wrong index
[expectedIdx=" + inc + ", meta=" + incMeta + ']');
}
- checkWalSegments(incMeta, startSeg,
incrementalSnapshotWalsDir(incSnpDir, incMeta.folderName()));
+ checkWalSegments(incMeta, startSeg, ift);
// Incremental snapshots must not cross each other.
startSeg = incMeta.incrementalSnapshotPointer().index() +
1;
@@ -222,16 +217,15 @@ public class SnapshotMetadataVerificationTask
}
/** Check that incremental snapshot contains all required WAL
segments. Throws {@link IgniteException} in case of any errors. */
- private void checkWalSegments(IncrementalSnapshotMetadata meta, long
startWalSeg, File incSnpWalDir) {
+ private void checkWalSegments(IncrementalSnapshotMetadata meta, long
startWalSeg, IncrementalSnapshotFileTree ift) {
IgniteWalIteratorFactory factory = new
IgniteWalIteratorFactory(log);
List<FileDescriptor> walSeg = factory.resolveWalFiles(
new IgniteWalIteratorFactory.IteratorParametersBuilder()
- .filesOrDirs(incSnpWalDir.listFiles(file ->
-
FileWriteAheadLogManager.WAL_SEGMENT_FILE_COMPACTED_PATTERN.matcher(file.getName()).matches())));
+
.filesOrDirs(ift.wal().listFiles(WAL_SEGMENT_FILE_COMPACTED_FILTER)));
if (walSeg.isEmpty())
- throw new IgniteException("No WAL segments found for
incremental snapshot [dir=" + incSnpWalDir + ']');
+ throw new IgniteException("No WAL segments found for
incremental snapshot [dir=" + ift.wal() + ']');
long actFirstSeg = walSeg.get(0).idx();
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotOperationRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotOperationRequest.java
index dd5be36a44f..f21ffd4b00b 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotOperationRequest.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotOperationRequest.java
@@ -22,6 +22,7 @@ import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.UUID;
+import
org.apache.ignite.internal.processors.cache.persistence.filename.SnapshotFileTree;
import org.apache.ignite.internal.util.distributed.DistributedProcess;
import
org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
@@ -70,6 +71,10 @@ public class SnapshotOperationRequest implements
Serializable {
@GridToStringExclude
private transient SnapshotMetadata meta;
+ /** Snapshot file tree. */
+ @GridToStringExclude
+ private transient SnapshotFileTree sft;
+
/**
* Warning flag of concurrent inconsistent-by-nature streamer updates.
*/
@@ -292,6 +297,20 @@ public class SnapshotOperationRequest implements
Serializable {
this.meta = meta;
}
+ /**
+ * Stores snapshot file tree.
+ */
+ public void snapshotFileTree(SnapshotFileTree sft) {
+ this.sft = sft;
+ }
+
+ /**
+ * @return Snapshot file tree.
+ */
+ public SnapshotFileTree snapshotFileTree() {
+ return sft;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(SnapshotOperationRequest.class, this);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyTask.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyTask.java
index 661074abe50..e2f7f86762e 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyTask.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyTask.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.processors.cache.persistence.snapshot;
-import java.io.File;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
@@ -46,8 +45,13 @@ public class SnapshotPartitionsVerifyTask extends
AbstractSnapshotVerificationTa
private static final long serialVersionUID = 0L;
/** {@inheritDoc} */
- @Override protected VerifySnapshotPartitionsJob createJob(String name,
String consId, SnapshotPartitionsVerifyTaskArg args) {
- return new VerifySnapshotPartitionsJob(name, args.snapshotPath(),
consId, args.cacheGroupNames(), args.check());
+ @Override protected VerifySnapshotPartitionsJob createJob(
+ String name,
+ String folderName,
+ String consId,
+ SnapshotPartitionsVerifyTaskArg args
+ ) {
+ return new VerifySnapshotPartitionsJob(name, args.snapshotPath(),
folderName, consId, args.cacheGroupNames(), args.check());
}
/** {@inheritDoc} */
@@ -62,23 +66,25 @@ public class SnapshotPartitionsVerifyTask extends
AbstractSnapshotVerificationTa
/**
* @param snpName Snapshot name to validate.
+ * @param snpPath Snapshot directory path.
+ * @param folderName Folder name for snapshot.
* @param consId Consistent id of the related node.
* @param rqGrps Set of cache groups to be checked in the snapshot or
{@code empty} to check everything.
- * @param snpPath Snapshot directory path.
* @param check If {@code true} check snapshot before restore.
*/
public VerifySnapshotPartitionsJob(
String snpName,
@Nullable String snpPath,
+ String folderName,
String consId,
Collection<String> rqGrps,
boolean check
) {
- super(snpName, snpPath, consId, rqGrps, check);
+ super(snpName, snpPath, folderName, consId, rqGrps, check);
}
/** {@inheritDoc} */
- @Override public Map<PartitionKey, PartitionHashRecord> execute()
throws IgniteException {
+ @Override public Map<PartitionKey, PartitionHashRecord> execute0()
throws IgniteException {
GridCacheSharedContext<?, ?> cctx =
ignite.context().cache().context();
if (log.isInfoEnabled()) {
@@ -87,11 +93,10 @@ public class SnapshotPartitionsVerifyTask extends
AbstractSnapshotVerificationTa
}
try {
- File snpDir = cctx.snapshotMgr().snapshotLocalDir(snpName,
snpPath);
- SnapshotMetadata meta =
cctx.snapshotMgr().readSnapshotMetadata(snpDir, consId);
+ SnapshotMetadata meta =
cctx.snapshotMgr().readSnapshotMetadata(sft.meta());
return new SnapshotPartitionsVerifyHandler(cctx)
- .invoke(new SnapshotHandlerContext(meta, rqGrps,
ignite.localNode(), snpDir, false, check));
+ .invoke(new SnapshotHandlerContext(meta, rqGrps,
ignite.localNode(), sft.root(), false, check));
}
catch (IgniteCheckedException | IOException e) {
throw new IgniteException(e);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
index bdd4990aaa3..e4867d30a5d 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.cache.persistence.snapshot;
import java.io.File;
+import java.io.FileFilter;
import java.io.IOException;
import java.io.Serializable;
import java.nio.ByteBuffer;
@@ -80,6 +81,7 @@ import
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStor
import
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
import
org.apache.ignite.internal.processors.cache.persistence.file.FileVersionCheckingFactory;
import
org.apache.ignite.internal.processors.cache.persistence.filename.NodeFileTree;
+import
org.apache.ignite.internal.processors.cache.persistence.filename.SnapshotFileTree;
import
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.ClusterSnapshotFuture;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState;
@@ -104,7 +106,6 @@ import static
org.apache.ignite.internal.processors.cache.persistence.filename.N
import static
org.apache.ignite.internal.processors.cache.persistence.filename.NodeFileTree.partId;
import static
org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage.METASTORAGE_CACHE_NAME;
import static
org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId.getTypeByPartId;
-import static
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.databaseRelativePath;
import static
org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.RESTORE_CACHE_GROUP_SNAPSHOT_PRELOAD;
import static
org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.RESTORE_CACHE_GROUP_SNAPSHOT_PREPARE;
import static
org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.RESTORE_CACHE_GROUP_SNAPSHOT_ROLLBACK;
@@ -116,9 +117,6 @@ import static
org.apache.ignite.internal.util.distributed.DistributedProcess.Dis
* Distributed process to restore cache group from the snapshot.
*/
public class SnapshotRestoreProcess {
- /** Temporary cache directory prefix. */
- public static final String TMP_CACHE_DIR_PREFIX = "_tmp_snp_restore_";
-
/** Snapshot restore metrics prefix. */
public static final String SNAPSHOT_RESTORE_METRICS = "snapshot-restore";
@@ -134,6 +132,9 @@ public class SnapshotRestoreProcess {
/** Kernal context. */
private final GridKernalContext ctx;
+ /** Node file tree. */
+ private final NodeFileTree ft;
+
/** Cache group restore prepare phase. */
private final DistributedProcess<SnapshotOperationRequest,
SnapshotRestoreOperationResponse> prepareRestoreProc;
@@ -173,6 +174,7 @@ public class SnapshotRestoreProcess {
*/
public SnapshotRestoreProcess(GridKernalContext ctx,
ThreadLocal<ByteBuffer> locBuff) {
this.ctx = ctx;
+ ft = ctx.pdsFolderResolver().fileTree();
log = ctx.log(getClass());
@@ -203,9 +205,7 @@ public class SnapshotRestoreProcess {
* @throws IgniteCheckedException If it was not possible to delete some
temporary directory.
*/
protected void cleanup() throws IgniteCheckedException {
- File nodeStorage = ctx.pdsFolderResolver().fileTree().nodeStorage();
-
- for (File dir : nodeStorage.listFiles(dir -> dir.isDirectory() &&
dir.getName().startsWith(TMP_CACHE_DIR_PREFIX))) {
+ for (File dir :
ft.nodeStorage().listFiles((FileFilter)NodeFileTree::tmpCacheStorage)) {
if (!U.delete(dir)) {
throw new IgniteCheckedException("Unable to remove temporary
directory, " +
"try deleting it manually [dir=" + dir + ']');
@@ -713,26 +713,6 @@ public class SnapshotRestoreProcess {
}
}
- /**
- * @param cacheDir Cache directory.
- * @return Temporary directory.
- */
- static File formatTmpDirName(File cacheDir) {
- return new File(cacheDir.getParent(), TMP_CACHE_DIR_PREFIX +
cacheDir.getName());
- }
-
- /**
- * @param tmpCacheDir Temporary cache directory.
- * @return Cache or group id.
- */
- static int groupIdFromTmpDir(File tmpCacheDir) {
- assert tmpCacheDir.getName().startsWith(TMP_CACHE_DIR_PREFIX) :
tmpCacheDir;
-
- String cacheGrpName =
tmpCacheDir.getName().substring(TMP_CACHE_DIR_PREFIX.length());
-
- return CU.cacheId(cacheName(new File(tmpCacheDir.getParentFile(),
cacheGrpName)));
- }
-
/**
* @param curOpCtx Restore operation context to enrich.
* @param req Request to prepare cache group restore from the snapshot.
@@ -760,7 +740,6 @@ public class SnapshotRestoreProcess {
}
Map<String, StoredCacheData> cfgsByName = new HashMap<>();
- NodeFileTree ft = ctx.pdsFolderResolver().fileTree();
GridLocalConfigManager locCfgMgr = cctx.cache().configManager();
// Collect the cache configurations and prepare a temporary directory
for copying files.
@@ -793,7 +772,7 @@ public class SnapshotRestoreProcess {
}
}
- File tmpCacheDir = formatTmpDirName(cacheDir);
+ File tmpCacheDir = ft.tmpCacheStorage(cacheDir.getName());
if (tmpCacheDir.exists()) {
throw new IgniteCheckedException("Unable to restore cache
group, temp directory already exists " +
@@ -843,8 +822,6 @@ public class SnapshotRestoreProcess {
Map<Integer, StoredCacheData> globalCfgs = new HashMap<>();
- NodeFileTree ft = ctx.pdsFolderResolver().fileTree();
-
for (Map.Entry<UUID, SnapshotRestoreOperationResponse> e :
res.entrySet()) {
if (e.getValue().ccfgs != null) {
for (StoredCacheData cacheData : e.getValue().ccfgs) {
@@ -950,24 +927,22 @@ public class SnapshotRestoreProcess {
", caches=" + F.transform(opCtx0.dirs,
NodeFileTree::cacheName) + ']');
}
- File snpDir = snpMgr.snapshotLocalDir(opCtx0.snpName,
opCtx0.snpPath);
-
CompletableFuture<Void> metaFut =
ctx.localNodeId().equals(opCtx0.opNodeId) ?
CompletableFuture.runAsync(
() -> {
try {
SnapshotMetadata meta =
F.first(opCtx0.metasPerNode.get(opCtx0.opNodeId));
- File dir = opCtx0.incIdx > 0 ?
- ctx.cache().context().snapshotMgr()
-
.incrementalSnapshotLocalDir(opCtx0.snpName, opCtx0.snpPath, opCtx0.incIdx)
- : snpDir;
+ SnapshotFileTree sft
+ = new SnapshotFileTree(ctx, opCtx0.snpName,
opCtx0.snpPath, meta.folderName(), meta.consistentId());
- NodeFileTree ft = new NodeFileTree(dir,
meta.folderName());
+ NodeFileTree metaFt = opCtx0.incIdx > 0
+ ?
sft.incrementalSnapshotFileTree(opCtx0.incIdx)
+ : sft;
- ctx.cacheObjects().updateMetadata(ft.binaryMeta(),
opCtx0.stopChecker);
+
ctx.cacheObjects().updateMetadata(metaFt.binaryMeta(), opCtx0.stopChecker);
- restoreMappings(ft.marshaller(),
opCtx0.stopChecker);
+ restoreMappings(metaFt.marshaller(),
opCtx0.stopChecker);
}
catch (Throwable t) {
log.error("Unable to perform metadata update
operation for the cache groups restore process", t);
@@ -997,7 +972,7 @@ public class SnapshotRestoreProcess {
if (log.isInfoEnabled())
cacheGrpNames.put(grpId, cacheOrGrpName);
- File tmpCacheDir = formatTmpDirName(dir);
+ File tmpCacheDir = ft.tmpCacheStorage(dir.getName());
tmpCacheDir.mkdir();
Set<PartitionRestoreFuture> leftParts;
@@ -1034,8 +1009,10 @@ public class SnapshotRestoreProcess {
if (leftParts.isEmpty())
break;
- File snpCacheDir = new File(snpDir,
- Paths.get(databaseRelativePath(meta.folderName()),
dir.getName()).toString());
+ SnapshotFileTree sft
+ = new SnapshotFileTree(ctx, opCtx0.snpName,
opCtx0.snpPath, meta.folderName(), meta.consistentId());
+
+ File snpCacheDir = sft.cacheStorage(dir.getName());
leftParts.removeIf(partFut -> {
boolean doCopy =
ofNullable(meta.partitions().get(grpId))
@@ -1057,9 +1034,7 @@ public class SnapshotRestoreProcess {
", dir=" + dir.getName() + ']');
}
- File idxFile = new File(snpCacheDir,
NodeFileTree.partitionFileName(INDEX_PARTITION));
-
- if (idxFile.exists()) {
+ if (sft.partitionFile(dir.getName(),
INDEX_PARTITION).exists()) {
PartitionRestoreFuture idxFut;
allParts.computeIfAbsent(grpId, g -> new
HashSet<>())
@@ -1119,7 +1094,7 @@ public class SnapshotRestoreProcess {
return;
}
- int grpId =
groupIdFromTmpDir(snpFile.getParentFile());
+ int grpId =
CU.cacheId(NodeFileTree.tmpDirCacheName(snpFile.getParentFile()));
int partId = partId(snpFile);
PartitionRestoreFuture partFut =
F.find(allParts.get(grpId), null,
@@ -1170,7 +1145,7 @@ public class SnapshotRestoreProcess {
throw new IgniteInterruptedException("The
operation has been stopped on temporary directory switch.");
for (File src : opCtx0.dirs)
- Files.move(formatTmpDirName(src).toPath(),
src.toPath(), StandardCopyOption.ATOMIC_MOVE);
+
Files.move(ft.tmpCacheStorage(src.getName()).toPath(), src.toPath(),
StandardCopyOption.ATOMIC_MOVE);
}
catch (IOException e) {
throw new IgniteException(e);
@@ -1421,7 +1396,7 @@ public class SnapshotRestoreProcess {
SnapshotRestoreContext opCtx0 = opCtx;
IncrementalSnapshotProcessor incSnpProc = new
IncrementalSnapshotProcessor(
- ctx.cache().context(), opCtx0.snpName, opCtx0.snpPath,
opCtx0.incIdx, cacheIds
+ ctx.cache().context(), new SnapshotFileTree(ctx, opCtx0.snpName,
opCtx0.snpPath), opCtx0.incIdx, cacheIds
) {
@Override void totalWalSegments(int segCnt) {
opCtx0.totalWalSegments = segCnt;
@@ -1651,7 +1626,7 @@ public class SnapshotRestoreProcess {
IgniteCheckedException ex = null;
for (File cacheDir : opCtx0.dirs) {
- File tmpCacheDir = formatTmpDirName(cacheDir);
+ File tmpCacheDir = ft.tmpCacheStorage(cacheDir.getName());
if (tmpCacheDir.exists() && !U.delete(tmpCacheDir)) {
log.error("Unable to perform rollback routine
completely, cannot remove temp directory " +
@@ -1725,7 +1700,7 @@ public class SnapshotRestoreProcess {
) {
File snpFile = new File(srcDir,
NodeFileTree.partitionFileName(partFut.partId));
Path partFile = Paths.get(targetDir.getAbsolutePath(),
NodeFileTree.partitionFileName(partFut.partId));
- int grpId = groupIdFromTmpDir(targetDir);
+ int grpId = CU.cacheId(NodeFileTree.tmpDirCacheName(targetDir));
IgniteSnapshotManager snapMgr = ctx.cache().context().snapshotMgr();
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java
index 900c29268af..34adc602c9c 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java
@@ -870,7 +870,7 @@ public abstract class AbstractSnapshotSelfTest extends
GridCommonAbstractTest {
/** */
private void checkIncrementalSnapshotWalRecords(IgniteEx node,
IncrementalSnapshotFileTree incSnpFt) {
try {
- IncrementalSnapshotMetadata incSnpMeta =
snp(node).readFromFile(incSnpFt.meta());
+ IncrementalSnapshotMetadata incSnpMeta =
snp(node).readIncrementalSnapshotMetadata(incSnpFt.meta());
WALIterator it = new IgniteWalIteratorFactory(log).iterator(
new
IgniteWalIteratorFactory.IteratorParametersBuilder().filesOrDirs(incSnpFt.wal()));
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreSelfTest.java
index c3b418bc891..be1993fbde7 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreSelfTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreSelfTest.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.cache.persistence.snapshot;
import java.io.File;
+import java.io.FileFilter;
import java.io.IOException;
import java.nio.file.OpenOption;
import java.nio.file.Paths;
@@ -75,7 +76,6 @@ import static
org.apache.ignite.events.EventType.EVT_CLUSTER_SNAPSHOT_RESTORE_FI
import static
org.apache.ignite.events.EventType.EVT_CLUSTER_SNAPSHOT_RESTORE_STARTED;
import static
org.apache.ignite.internal.processors.cache.persistence.filename.NodeFileTree.FILE_SUFFIX;
import static
org.apache.ignite.internal.processors.cache.persistence.filename.NodeFileTree.PART_FILE_PREFIX;
-import static
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotRestoreProcess.TMP_CACHE_DIR_PREFIX;
import static
org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.RESTORE_CACHE_GROUP_SNAPSHOT_PRELOAD;
import static
org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.RESTORE_CACHE_GROUP_SNAPSHOT_PREPARE;
import static
org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.RESTORE_CACHE_GROUP_SNAPSHOT_START;
@@ -703,7 +703,7 @@ public class IgniteClusterSnapshotRestoreSelfTest extends
IgniteClusterSnapshotR
GridTestUtils.assertThrowsAnyCause(log, () -> fut.get(TIMEOUT),
ClusterTopologyCheckedException.class, null);
- File[] files = node2dbDir.listFiles(file ->
file.getName().startsWith(TMP_CACHE_DIR_PREFIX));
+ File[] files =
node2dbDir.listFiles((FileFilter)NodeFileTree::tmpCacheStorage);
assertEquals("A temp directory with potentially corrupted files must
exist.", 1, files.length);
ensureCacheAbsent(dfltCacheCfg);
@@ -712,7 +712,7 @@ public class IgniteClusterSnapshotRestoreSelfTest extends
IgniteClusterSnapshotR
startGrid(2);
- files = node2dbDir.listFiles(file ->
file.getName().startsWith(TMP_CACHE_DIR_PREFIX));
+ files =
node2dbDir.listFiles((FileFilter)NodeFileTree::tmpCacheStorage);
assertEquals("A temp directory should be removed at node startup", 0,
files.length);
waitForEvents(EVT_CLUSTER_SNAPSHOT_RESTORE_STARTED,
EVT_CLUSTER_SNAPSHOT_RESTORE_FAILED);
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotRemoteRequestTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotRemoteRequestTest.java
index 6f66cfadcca..889ac2399cd 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotRemoteRequestTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotRemoteRequestTest.java
@@ -44,6 +44,7 @@ import
org.apache.ignite.internal.TestRecordingCommunicationSpi;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import
org.apache.ignite.internal.managers.communication.TransmissionCancelledException;
import
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
+import
org.apache.ignite.internal.processors.cache.persistence.filename.NodeFileTree;
import
org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
@@ -56,7 +57,6 @@ import org.junit.Test;
import static
org.apache.ignite.configuration.IgniteConfiguration.DFLT_SNAPSHOT_THREAD_POOL_SIZE;
import static
org.apache.ignite.internal.processors.cache.persistence.filename.NodeFileTree.partId;
-import static
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotRestoreProcess.groupIdFromTmpDir;
import static
org.apache.ignite.testframework.GridTestUtils.assertThrowsAnyCause;
import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
@@ -243,7 +243,7 @@ public class IgniteSnapshotRemoteRequestTest extends
IgniteClusterSnapshotRestor
() -> false,
(part, t) -> {
if (t == null) {
- int grpId = groupIdFromTmpDir(part.getParentFile());
+ int grpId =
CU.cacheId(NodeFileTree.tmpDirCacheName(part.getParentFile()));
assertTrue("Received cache group has not been requested",
parts.containsKey(grpId));
assertTrue("Received partition has not been requested",
@@ -378,7 +378,7 @@ public class IgniteSnapshotRemoteRequestTest extends
IgniteClusterSnapshotRestor
return (part, t) -> {
assertNull(t);
- int grpId = groupIdFromTmpDir(part.getParentFile());
+ int grpId =
CU.cacheId(NodeFileTree.tmpDirCacheName(part.getParentFile()));
assertTrue("Received cache group has not been requested",
parts.containsKey(grpId));
assertTrue("Received partition has not been requested",
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/incremental/IncrementalSnapshotCheckBeforeRestoreTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/incremental/IncrementalSnapshotCheckBeforeRestoreTest.java
index 8d623446908..702b771bddb 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/incremental/IncrementalSnapshotCheckBeforeRestoreTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/incremental/IncrementalSnapshotCheckBeforeRestoreTest.java
@@ -263,7 +263,7 @@ public class IncrementalSnapshotCheckBeforeRestoreTest
extends AbstractSnapshotS
File incMetaFile = snapshotFileTree(srv,
SNP).incrementalSnapshotFileTree(1).meta();
- IncrementalSnapshotMetadata meta = snp(srv).readFromFile(incMetaFile);
+ IncrementalSnapshotMetadata meta =
snp(srv).readIncrementalSnapshotMetadata(incMetaFile);
U.delete(incMetaFile);
@@ -294,7 +294,7 @@ public class IncrementalSnapshotCheckBeforeRestoreTest
extends AbstractSnapshotS
File incMetaFile = snapshotFileTree(srv,
SNP).incrementalSnapshotFileTree(1).meta();
- IncrementalSnapshotMetadata meta = snp(srv).readFromFile(incMetaFile);
+ IncrementalSnapshotMetadata meta =
snp(srv).readIncrementalSnapshotMetadata(incMetaFile);
U.delete(incMetaFile);
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/incremental/IncrementalSnapshotRestoreTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/incremental/IncrementalSnapshotRestoreTest.java
index d54e16a0baa..03cd300e8c4 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/incremental/IncrementalSnapshotRestoreTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/incremental/IncrementalSnapshotRestoreTest.java
@@ -1038,11 +1038,7 @@ public class IncrementalSnapshotRestoreTest extends
AbstractIncrementalSnapshotT
}
/** {@inheritDoc} */
- @Override public IncrementalSnapshotMetadata
readIncrementalSnapshotMetadata(
- String snpName,
- @Nullable String snpPath,
- int incIdx
- ) throws IgniteCheckedException, IOException {
+ @Override public IncrementalSnapshotMetadata
readIncrementalSnapshotMetadata(File meta) throws IgniteCheckedException,
IOException {
if (fail != null) {
Runnable f = fail;
@@ -1051,7 +1047,7 @@ public class IncrementalSnapshotRestoreTest extends
AbstractIncrementalSnapshotT
f.run();
}
- return super.readIncrementalSnapshotMetadata(snpName, snpPath,
incIdx);
+ return super.readIncrementalSnapshotMetadata(meta);
}
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
index 425c4e32beb..3f063fe5ea6 100755
---
a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
@@ -3229,6 +3229,6 @@ public abstract class GridAbstractTest extends
JUnitAssertAware {
* @return Snapshot directories for specific snapshot.
*/
protected static SnapshotFileTree snapshotFileTree(IgniteEx srv, String
name, String path) {
- return new SnapshotFileTree(srv, name, path);
+ return new SnapshotFileTree(srv.context(), name, path);
}
}