This is an automated email from the ASF dual-hosted git repository.
ivandasch pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new c1c0b285fb9 IGNITE-18199 Implement punching holes in snapshot
restoration process. (#10468)
c1c0b285fb9 is described below
commit c1c0b285fb9562d3e221f8faca755c8f2f1c6a31
Author: Ivan Daschinskiy <[email protected]>
AuthorDate: Mon Jan 9 16:56:38 2023 +0300
IGNITE-18199 Implement punching holes in snapshot restoration process.
(#10468)
---
.../snapshot/SnapshotCompressionBasicTest.java | 166 +++++++++++++++++----
.../snapshot/IgniteSnapshotManager.java | 2 +-
.../snapshot/SnapshotRestoreProcess.java | 144 +++++++++++++++---
3 files changed, 260 insertions(+), 52 deletions(-)
diff --git
a/modules/compress/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotCompressionBasicTest.java
b/modules/compress/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotCompressionBasicTest.java
index acfb53deb1a..cfe25bbeb29 100644
---
a/modules/compress/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotCompressionBasicTest.java
+++
b/modules/compress/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotCompressionBasicTest.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.cache.persistence.snapshot;
import java.io.IOException;
+import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
@@ -41,6 +42,7 @@ import org.apache.ignite.cache.QueryEntity;
import org.apache.ignite.cache.QueryIndex;
import org.apache.ignite.cluster.ClusterState;
import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.DiskPageCompression;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.GridKernalContextImpl;
@@ -63,6 +65,7 @@ import static
org.apache.ignite.configuration.IgniteConfiguration.DFLT_SNAPSHOT_
import static org.apache.ignite.events.EventType.EVTS_CLUSTER_SNAPSHOT;
import static
org.apache.ignite.events.EventType.EVT_CLUSTER_SNAPSHOT_RESTORE_FINISHED;
import static
org.apache.ignite.events.EventType.EVT_CLUSTER_SNAPSHOT_RESTORE_STARTED;
+import static
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.DFLT_STORE_DIR;
/** */
public class SnapshotCompressionBasicTest extends AbstractSnapshotSelfTest {
@@ -82,7 +85,10 @@ public class SnapshotCompressionBasicTest extends
AbstractSnapshotSelfTest {
protected static final long TIMEOUT = 30_000;
/** */
- private static final Map<String, String> CACHES = new HashMap<>();
+ protected static final Map<String, String> CACHES = new HashMap<>();
+
+ /** */
+ public static final int DFLT_GRIDS_CNT = 3;
static {
CACHES.put("cache1", "group1");
@@ -92,7 +98,7 @@ public class SnapshotCompressionBasicTest extends
AbstractSnapshotSelfTest {
}
/** */
- private static final Set<String> COMPRESSED_CACHES = new HashSet<>();
+ protected static final Set<String> COMPRESSED_CACHES = new HashSet<>();
static {
COMPRESSED_CACHES.add("cache1");
@@ -111,6 +117,7 @@ public class SnapshotCompressionBasicTest extends
AbstractSnapshotSelfTest {
IgniteConfiguration config =
super.getConfiguration(igniteInstanceName);
config.getDataStorageConfiguration().setPageSize(PAGE_SIZE);
+ config.setWorkDirectory(workingDirectory(config).toString());
return config;
}
@@ -125,8 +132,12 @@ public class SnapshotCompressionBasicTest extends
AbstractSnapshotSelfTest {
/** {@inheritDoc} */
@Before
- @Override public void beforeTestSnapshot() {
+ @Override public void beforeTestSnapshot() throws Exception {
+ assertTrue(G.allGrids().isEmpty());
+
locEvts.clear();
+
+ cleanPersistenceDir(true);
}
/** {@inheritDoc} */
@@ -135,29 +146,47 @@ public class SnapshotCompressionBasicTest extends
AbstractSnapshotSelfTest {
if (G.allGrids().isEmpty())
return;
- IgniteEx ig = grid(0);
- for (String cacheName : ig.cacheNames()) {
- IgniteCache cache = ig.cache(cacheName);
-
- cache.destroy();
- }
+ stopAllGrids();
+ }
+ /** {@inheritDoc} */
+ @Override public void afterTestsStopped() throws Exception {
stopAllGrids();
+
+ cleanPersistenceDir();
}
/** */
@Test
public void testRestoreFullSnapshot() throws Exception {
- IgniteEx ignite = startGrids(3);
+ testRestoreFullSnapshot(DFLT_GRIDS_CNT);
+ }
+
+ /** */
+ @Test
+ public void testRestoreFullSnapshot_OnLargerTopology() throws Exception {
+ testRestoreFullSnapshot(2 * DFLT_GRIDS_CNT);
+ }
+
+ /** */
+ private void testRestoreFullSnapshot(int gridCnt) throws Exception {
+ IgniteEx ignite = startGrids(gridCnt);
ignite.events().localListen(e -> locEvts.add(e.type()),
EVTS_CLUSTER_SNAPSHOT);
ignite.cluster().state(ClusterState.ACTIVE);
+ long withoutHolesSize = snapshotSize(G.allGrids(),
SNAPSHOT_WITHOUT_HOLES);
+
for (String snpName: Arrays.asList(SNAPSHOT_WITH_HOLES,
SNAPSHOT_WITHOUT_HOLES)) {
try {
ignite.snapshot().restoreSnapshot(snpName, null).get(TIMEOUT);
waitForEvents(EVT_CLUSTER_SNAPSHOT_RESTORE_STARTED,
EVT_CLUSTER_SNAPSHOT_RESTORE_FINISHED);
+ long persistSz = persistenseSize(G.allGrids());
+
+ assertTrue("persistSz < withoutHolesSize " + persistSz + "< "
+ withoutHolesSize,
+ persistSz < 0.75 * withoutHolesSize);
+
for (String cacheName : CACHES.keySet()) {
IgniteCache cache = ignite.cache(cacheName);
@@ -175,7 +204,7 @@ public class SnapshotCompressionBasicTest extends
AbstractSnapshotSelfTest {
/** */
@Test
public void testRestoreFail_OnGridWithoutCompression() throws Exception {
- IgniteEx ignite = startGrids(3);
+ IgniteEx ignite = startGrids(DFLT_GRIDS_CNT);
ignite.events().localListen(e -> locEvts.add(e.type()),
EVTS_CLUSTER_SNAPSHOT);
ignite.cluster().state(ClusterState.ACTIVE);
@@ -187,11 +216,10 @@ public class SnapshotCompressionBasicTest extends
AbstractSnapshotSelfTest {
}
}
-
/** */
@Test
public void testRestoreNotCompressed_OnGridWithoutCompression() throws
Exception {
- IgniteEx ignite = startGrids(3);
+ IgniteEx ignite = startGrids(DFLT_GRIDS_CNT);
ignite.events().localListen(e -> locEvts.add(e.type()),
EVTS_CLUSTER_SNAPSHOT);
ignite.cluster().state(ClusterState.ACTIVE);
@@ -227,6 +255,40 @@ public class SnapshotCompressionBasicTest extends
AbstractSnapshotSelfTest {
return i -> new Value("name_" + i);
}
+ /** {@inheritDoc} */
+ @Override protected void cleanPersistenceDir() throws Exception {
+ super.cleanPersistenceDir();
+
+ cleanPersistenceDir(false);
+ }
+
+ /** */
+ protected void cleanPersistenceDir(boolean saveSnap) throws Exception {
+ assertTrue("Grids are not stopped", F.isEmpty(G.allGrids()));
+
+ String mask = U.maskForFileName(getTestIgniteInstanceName());
+
+ try (DirectoryStream<Path> ds =
Files.newDirectoryStream(defaultWorkDirectory(),
+ path -> Files.isDirectory(path) &&
path.getFileName().toString().contains(mask))
+ ) {
+ for (Path dir : ds) {
+ if (!saveSnap) {
+ U.delete(dir);
+
+ continue;
+ }
+
+ U.delete(U.resolveWorkDirectory(dir.toString(), "cp", false));
+ U.delete(U.resolveWorkDirectory(dir.toString(),
DFLT_STORE_DIR, false));
+ U.delete(U.resolveWorkDirectory(dir.toString(),
DataStorageConfiguration.DFLT_MARSHALLER_PATH, false));
+ U.delete(U.resolveWorkDirectory(dir.toString(),
DataStorageConfiguration.DFLT_BINARY_METADATA_PATH, false));
+ }
+ }
+ catch (IOException e) {
+ throw new IgniteException(e);
+ }
+ }
+
/** */
protected void createTestSnapshot() throws Exception {
CacheConfiguration[] caches = CACHES.entrySet().stream()
@@ -253,7 +315,7 @@ public class SnapshotCompressionBasicTest extends
AbstractSnapshotSelfTest {
return config;
}).toArray(CacheConfiguration[]::new);
- IgniteEx ignite = startGridsWithCache(3, 1000, valueBuilder(), caches);
+ IgniteEx ignite = startGridsWithCache(DFLT_GRIDS_CNT, 1000,
valueBuilder(), caches);
forceCheckpoint();
@@ -272,31 +334,23 @@ public class SnapshotCompressionBasicTest extends
AbstractSnapshotSelfTest {
assertTrue(F.isEmpty(res.exceptions()));
}
- Path withHolesPath =
Paths.get(U.resolveWorkDirectory(U.defaultWorkDirectory(),
DFLT_SNAPSHOT_DIRECTORY, false)
- .toString(), SNAPSHOT_WITH_HOLES);
-
- Path withoutHolesPath =
Paths.get(U.resolveWorkDirectory(U.defaultWorkDirectory(),
DFLT_SNAPSHOT_DIRECTORY, false)
- .toString(), SNAPSHOT_WITHOUT_HOLES);
-
- long withHolesSize = directorySize(withHolesPath);
- long withoutHolesSize = directorySize(withoutHolesPath);
+ long withHolesSize = snapshotSize(G.allGrids(), SNAPSHOT_WITH_HOLES);
+ long withoutHolesSize = snapshotSize(G.allGrids(),
SNAPSHOT_WITHOUT_HOLES);
assertTrue("withHolesSize < withoutHolesSize: " + withHolesSize + " <
" + withoutHolesSize,
withHolesSize < withoutHolesSize);
- long idxWithHolesSize = directorySize(withHolesPath, "index\\.bin");
- long idxWithoutHolesSize = directorySize(withoutHolesPath,
"index\\.bin");
+ long idxWithHolesSize = snapshotSize(G.allGrids(),
SNAPSHOT_WITH_HOLES, "index\\.bin");
+ long idxWithoutHolesSize = snapshotSize(G.allGrids(),
SNAPSHOT_WITHOUT_HOLES, "index\\.bin");
assertTrue("idxWithHolesSize < idxWithoutHolesSize: " +
idxWithHolesSize + " < " + idxWithoutHolesSize,
idxWithHolesSize < idxWithoutHolesSize);
- ignite.cacheNames().forEach(c -> ignite.getOrCreateCache(c).destroy());
-
G.stopAll(true);
}
/** */
- private void failCompressionProcessor(Ignite ignite, String... snpNames) {
+ protected void failCompressionProcessor(Ignite ignite, String... snpNames)
{
CompressionProcessor compressProc =
((IgniteEx)ignite).context().compress();
CompressionProcessor spyCompressProc = Mockito.spy(compressProc);
@@ -334,12 +388,34 @@ public class SnapshotCompressionBasicTest extends
AbstractSnapshotSelfTest {
}
/** */
- private static long directorySize(Path path) throws IOException {
- return directorySize(path, null);
+ protected long persistenseSize(Collection<Ignite> grids) {
+ return grids.stream()
+ .map(ig -> workingDirectory(ig).resolve(DFLT_STORE_DIR))
+ .reduce(0L, (acc, p) -> acc + directorySize(p), Long::sum);
+ }
+
+ /** */
+ protected long snapshotSize(Collection<Ignite> grids, String snpName) {
+ return snapshotSize(grids, snpName, "(part-\\d+|index)\\.bin");
+ }
+
+ /** */
+ protected long snapshotSize(Collection<Ignite> grids, String snpName,
String pattern) {
+ return grids.stream()
+ .map(ig ->
workingDirectory(ig).resolve(DFLT_SNAPSHOT_DIRECTORY).resolve(snpName))
+ .reduce(0L, (acc, p) -> acc + directorySize(p, pattern),
Long::sum);
+ }
+
+ /** */
+ protected long directorySize(Path path) {
+ return directorySize(path, "(part-\\d+|index)\\.bin");
}
/** */
- private static long directorySize(Path path, String pattern) throws
IOException {
+ protected long directorySize(Path path, String pattern) {
+ if (!Files.exists(path))
+ return 0;
+
try (Stream<Path> walk = Files.walk(path)) {
return walk.filter(Files::isRegularFile)
.filter(f -> F.isEmpty(pattern) ||
f.getFileName().toString().matches(pattern))
@@ -352,10 +428,38 @@ public class SnapshotCompressionBasicTest extends
AbstractSnapshotSelfTest {
}
}).sum();
}
+ catch (IOException e) {
+ throw new IgniteException(e);
+ }
+ }
+
+ /** */
+ protected Path workingDirectory(Ignite ig) {
+ return workingDirectory(ig.configuration());
+ }
+
+ /** */
+ protected Path workingDirectory(IgniteConfiguration cfg) {
+ try {
+ return Paths.get(U.defaultWorkDirectory(),
U.maskForFileName(cfg.getIgniteInstanceName()));
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ }
+
+ /** */
+ protected Path defaultWorkDirectory() {
+ try {
+ return Paths.get(U.defaultWorkDirectory());
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
}
/** */
- private static class Value {
+ static class Value {
/** */
String name;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
index 3fa01e50df5..5d655f5b760 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
@@ -433,7 +433,7 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
marsh = MarshallerUtils.jdkMarshaller(ctx.igniteInstanceName());
- restoreCacheGrpProc = new SnapshotRestoreProcess(ctx);
+ restoreCacheGrpProc = new SnapshotRestoreProcess(ctx, locBuff);
// Manage remote snapshots.
snpRmtMgr = new SequentialRemoteSnapshotManager();
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
index b414632bdd4..a29291df066 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
@@ -20,6 +20,7 @@ package
org.apache.ignite.internal.processors.cache.persistence.snapshot;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
+import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
@@ -59,13 +60,18 @@ import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.managers.discovery.DiscoCache;
+import org.apache.ignite.internal.pagemem.PageIdUtils;
import
org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.GridLocalConfigManager;
import org.apache.ignite.internal.processors.cache.StoredCacheData;
+import
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStore;
import
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
+import
org.apache.ignite.internal.processors.cache.persistence.file.FileVersionCheckingFactory;
import
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.ClusterSnapshotFuture;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState;
+import org.apache.ignite.internal.processors.compress.CompressionProcessor;
import org.apache.ignite.internal.processors.metric.MetricRegistry;
import org.apache.ignite.internal.util.distributed.DistributedProcess;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
@@ -88,6 +94,7 @@ import static
org.apache.ignite.internal.processors.cache.persistence.file.FileP
import static
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.cacheGroupName;
import static
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.partId;
import static
org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage.METASTORAGE_CACHE_NAME;
+import static
org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId.getTypeByPartId;
import static
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.databaseRelativePath;
import static
org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.RESTORE_CACHE_GROUP_SNAPSHOT_PRELOAD;
import static
org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.RESTORE_CACHE_GROUP_SNAPSHOT_PREPARE;
@@ -131,6 +138,9 @@ public class SnapshotRestoreProcess {
/** Logger. */
private final IgniteLogger log;
+ /** */
+ private final ThreadLocal<ByteBuffer> locBuff;
+
/** Future to be completed when the cache restore process is complete
(this future will be returned to the user). */
private volatile ClusterSnapshotFuture fut;
@@ -142,12 +152,15 @@ public class SnapshotRestoreProcess {
/**
* @param ctx Kernal context.
+ * @param locBuff Thread local page buffer.
*/
- public SnapshotRestoreProcess(GridKernalContext ctx) {
+ public SnapshotRestoreProcess(GridKernalContext ctx,
ThreadLocal<ByteBuffer> locBuff) {
this.ctx = ctx;
log = ctx.log(getClass());
+ this.locBuff = locBuff;
+
prepareRestoreProc = new DistributedProcess<>(
ctx, RESTORE_CACHE_GROUP_SNAPSHOT_PREPARE, this::prepare,
this::finishPrepare);
@@ -790,6 +803,12 @@ public class SnapshotRestoreProcess {
}
}
+ if (!F.isEmpty(e.getValue().metas)) {
+
e.getValue().metas.stream().filter(SnapshotMetadata::hasCompressedGroups)
+ .forEach(meta ->
meta.cacheGroupIds().stream().filter(meta::isGroupWithCompresion)
+ .forEach(opCtx0::addCompressedGroup));
+ }
+
opCtx0.metasPerNode.put(e.getKey(), new
ArrayList<>(e.getValue().metas));
}
@@ -964,11 +983,7 @@ public class SnapshotRestoreProcess {
.contains(partFut.partId);
if (doCopy) {
- copyLocalAsync(ctx.cache().context().snapshotMgr(),
- opCtx0,
- snpCacheDir,
- tmpCacheDir,
- partFut);
+ copyLocalAsync(opCtx0, snpCacheDir, tmpCacheDir,
partFut);
}
return doCopy;
@@ -991,11 +1006,7 @@ public class SnapshotRestoreProcess {
allParts.computeIfAbsent(grpId, g -> new
HashSet<>())
.add(idxFut = new
PartitionRestoreFuture(INDEX_PARTITION, opCtx0.processedParts));
- copyLocalAsync(ctx.cache().context().snapshotMgr(),
- opCtx0,
- snpCacheDir,
- tmpCacheDir,
- idxFut);
+ copyLocalAsync(opCtx0, snpCacheDir, tmpCacheDir,
idxFut);
}
}
}
@@ -1051,7 +1062,27 @@ public class SnapshotRestoreProcess {
assert partFut != null :
snpFile.getAbsolutePath();
- partFut.complete(snpFile.toPath());
+ if (!opCtx0.isGroupCompressed(grpId)) {
+ partFut.complete(snpFile.toPath());
+
+ return;
+ }
+
+ CompletableFuture.runAsync(
+ () -> {
+ try {
+ punchHole(grpId, partId, snpFile);
+
+ partFut.complete(snpFile.toPath());
+ }
+ catch (Throwable t0) {
+ opCtx0.errHnd.accept(t0);
+
+
completeListExceptionally(rmtAwaitParts, t0);
+ }
+ },
+ snpMgr.snapshotExecutorService()
+ );
});
}
}
@@ -1334,13 +1365,11 @@ public class SnapshotRestoreProcess {
}
/**
- * @param mgr Ignite snapshot manager.
* @param opCtx Snapshot operation context.
* @param srcDir Snapshot directory to copy from.
* @param targetDir Destination directory to copy to.
*/
- private static void copyLocalAsync(
- IgniteSnapshotManager mgr,
+ private void copyLocalAsync(
SnapshotRestoreContext opCtx,
File srcDir,
File targetDir,
@@ -1348,8 +1377,11 @@ public class SnapshotRestoreProcess {
) {
File snpFile = new File(srcDir,
FilePageStoreManager.getPartitionFileName(partFut.partId));
Path partFile = Paths.get(targetDir.getAbsolutePath(),
FilePageStoreManager.getPartitionFileName(partFut.partId));
+ int grpId = groupIdFromTmpDir(targetDir);
- CompletableFuture.supplyAsync(() -> {
+ IgniteSnapshotManager snapMgr = ctx.cache().context().snapshotMgr();
+
+ CompletableFuture<Path> copyPartFut = CompletableFuture.supplyAsync(()
-> {
if (opCtx.stopChecker.getAsBoolean())
throw new IgniteInterruptedException("The operation has been
stopped on copy file: " + snpFile.getAbsolutePath());
@@ -1361,11 +1393,30 @@ public class SnapshotRestoreProcess {
", snpDir=" + snpFile.getAbsolutePath() + ", name=" +
snpFile.getName() + ']');
}
- IgniteSnapshotManager.copy(mgr.ioFactory(), snpFile,
partFile.toFile(), snpFile.length());
+ IgniteSnapshotManager.copy(snapMgr.ioFactory(), snpFile,
partFile.toFile(), snpFile.length());
return partFile;
- }, mgr.snapshotExecutorService())
- .whenComplete((r, t) -> opCtx.errHnd.accept(t))
+ }, snapMgr.snapshotExecutorService());
+
+ if (opCtx.isGroupCompressed(grpId)) {
+ copyPartFut = copyPartFut.thenComposeAsync(
+ p -> {
+ CompletableFuture<Path> result = new CompletableFuture<>();
+ try {
+ punchHole(grpId, partFut.partId, partFile.toFile());
+
+ result.complete(partFile);
+ }
+ catch (Throwable t) {
+ result.completeExceptionally(t);
+ }
+ return result;
+ },
+ snapMgr.snapshotExecutorService()
+ );
+ }
+
+ copyPartFut.whenComplete((r, t) -> opCtx.errHnd.accept(t))
.whenComplete((r, t) -> {
if (t == null)
partFut.complete(partFile);
@@ -1374,6 +1425,42 @@ public class SnapshotRestoreProcess {
});
}
+ /** */
+ private void punchHole(int grpId, int partId, File partFile) throws
Exception {
+ FilePageStoreManager storeMgr =
(FilePageStoreManager)ctx.cache().context().pageStore();
+ FileVersionCheckingFactory factory =
storeMgr.getPageStoreFactory(grpId, null);
+
+ try (FilePageStore pageStore =
(FilePageStore)factory.createPageStore(getTypeByPartId(partId), partFile, val
-> {})) {
+ pageStore.init();
+
+ ByteBuffer buf = locBuff.get();
+
+ long pageId = PageIdUtils.pageId(partId, (byte)0, 0);
+
+ for (int pageNo = 0; pageNo < pageStore.pages(); pageId++,
pageNo++) {
+ if (opCtx.stopChecker.getAsBoolean()) {
+ throw new IgniteInterruptedException("The operation has
been stopped while punching holes in file: "
+ + partFile.getAbsolutePath());
+ }
+
+ if (Thread.interrupted())
+ throw new IgniteInterruptedException("Thread has been
interrupted: " + Thread.currentThread().getName());
+
+ buf.clear();
+
+ pageStore.read(pageId, buf, true);
+
+ if (PageIO.getCompressionType(buf) ==
CompressionProcessor.UNCOMPRESSED_PAGE)
+ continue;
+
+ int comprPageSz = PageIO.getCompressedSize(buf);
+
+ if (comprPageSz < pageStore.getPageSize())
+ pageStore.punchHole(pageId, comprPageSz);
+ }
+ }
+ }
+
/**
* @param col Collection of sets to complete.
* @param ex Exception to set.
@@ -1431,6 +1518,9 @@ public class SnapshotRestoreProcess {
/** Stop condition checker. */
private final BooleanSupplier stopChecker = () -> err.get() != null;
+ /** Compressed groups. */
+ private final Set<Integer> comprGrps = new HashSet<>();
+
/** Cache ID to configuration mapping. */
private volatile Map<Integer, StoredCacheData> cfgs =
Collections.emptyMap();
@@ -1464,7 +1554,11 @@ public class SnapshotRestoreProcess {
* @param discoCache Baseline discovery cache for node IDs that must
be alive to complete the operation.
* @param cfgs Cache ID to configuration mapping.
*/
- protected SnapshotRestoreContext(SnapshotOperationRequest req,
DiscoCache discoCache, Map<Integer, StoredCacheData> cfgs) {
+ protected SnapshotRestoreContext(
+ SnapshotOperationRequest req,
+ DiscoCache discoCache,
+ Map<Integer, StoredCacheData> cfgs
+ ) {
reqId = req.requestId();
snpName = req.snapshotName();
snpPath = req.snapshotPath();
@@ -1481,6 +1575,16 @@ public class SnapshotRestoreProcess {
public Collection<UUID> nodes() {
return F.transform(discoCache.aliveBaselineNodes(), F.node2id());
}
+
+ /** */
+ public boolean isGroupCompressed(int grpId) {
+ return comprGrps.contains(grpId);
+ }
+
+ /** */
+ void addCompressedGroup(int grpId) {
+ comprGrps.add(grpId);
+ }
}
/** Snapshot operation prepare response. */