This is an automated email from the ASF dual-hosted git repository.

ivandasch pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new c1c0b285fb9 IGNITE-18199 Implement punching holes in snapshot 
restoration process. (#10468)
c1c0b285fb9 is described below

commit c1c0b285fb9562d3e221f8faca755c8f2f1c6a31
Author: Ivan Daschinskiy <[email protected]>
AuthorDate: Mon Jan 9 16:56:38 2023 +0300

    IGNITE-18199 Implement punching holes in snapshot restoration process. 
(#10468)
---
 .../snapshot/SnapshotCompressionBasicTest.java     | 166 +++++++++++++++++----
 .../snapshot/IgniteSnapshotManager.java            |   2 +-
 .../snapshot/SnapshotRestoreProcess.java           | 144 +++++++++++++++---
 3 files changed, 260 insertions(+), 52 deletions(-)

diff --git 
a/modules/compress/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotCompressionBasicTest.java
 
b/modules/compress/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotCompressionBasicTest.java
index acfb53deb1a..cfe25bbeb29 100644
--- 
a/modules/compress/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotCompressionBasicTest.java
+++ 
b/modules/compress/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotCompressionBasicTest.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.cache.persistence.snapshot;
 
 import java.io.IOException;
+import java.nio.file.DirectoryStream;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
@@ -41,6 +42,7 @@ import org.apache.ignite.cache.QueryEntity;
 import org.apache.ignite.cache.QueryIndex;
 import org.apache.ignite.cluster.ClusterState;
 import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
 import org.apache.ignite.configuration.DiskPageCompression;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.GridKernalContextImpl;
@@ -63,6 +65,7 @@ import static 
org.apache.ignite.configuration.IgniteConfiguration.DFLT_SNAPSHOT_
 import static org.apache.ignite.events.EventType.EVTS_CLUSTER_SNAPSHOT;
 import static 
org.apache.ignite.events.EventType.EVT_CLUSTER_SNAPSHOT_RESTORE_FINISHED;
 import static 
org.apache.ignite.events.EventType.EVT_CLUSTER_SNAPSHOT_RESTORE_STARTED;
+import static 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.DFLT_STORE_DIR;
 
 /** */
 public class SnapshotCompressionBasicTest extends AbstractSnapshotSelfTest {
@@ -82,7 +85,10 @@ public class SnapshotCompressionBasicTest extends 
AbstractSnapshotSelfTest {
     protected static final long TIMEOUT = 30_000;
 
     /** */
-    private static final Map<String, String> CACHES = new HashMap<>();
+    protected static final Map<String, String> CACHES = new HashMap<>();
+
+    /** */
+    public static final int DFLT_GRIDS_CNT = 3;
 
     static {
         CACHES.put("cache1", "group1");
@@ -92,7 +98,7 @@ public class SnapshotCompressionBasicTest extends 
AbstractSnapshotSelfTest {
     }
 
     /** */
-    private static final Set<String> COMPRESSED_CACHES = new HashSet<>();
+    protected static final Set<String> COMPRESSED_CACHES = new HashSet<>();
 
     static {
         COMPRESSED_CACHES.add("cache1");
@@ -111,6 +117,7 @@ public class SnapshotCompressionBasicTest extends 
AbstractSnapshotSelfTest {
         IgniteConfiguration config = 
super.getConfiguration(igniteInstanceName);
 
         config.getDataStorageConfiguration().setPageSize(PAGE_SIZE);
+        config.setWorkDirectory(workingDirectory(config).toString());
 
         return config;
     }
@@ -125,8 +132,12 @@ public class SnapshotCompressionBasicTest extends 
AbstractSnapshotSelfTest {
 
     /** {@inheritDoc} */
     @Before
-    @Override public void beforeTestSnapshot() {
+    @Override public void beforeTestSnapshot() throws Exception {
+        assertTrue(G.allGrids().isEmpty());
+
         locEvts.clear();
+
+        cleanPersistenceDir(true);
     }
 
     /** {@inheritDoc} */
@@ -135,29 +146,47 @@ public class SnapshotCompressionBasicTest extends 
AbstractSnapshotSelfTest {
         if (G.allGrids().isEmpty())
             return;
 
-        IgniteEx ig = grid(0);
-        for (String cacheName : ig.cacheNames()) {
-            IgniteCache cache = ig.cache(cacheName);
-
-            cache.destroy();
-        }
+        stopAllGrids();
+    }
 
+    /** {@inheritDoc} */
+    @Override public void afterTestsStopped() throws Exception {
         stopAllGrids();
+
+        cleanPersistenceDir();
     }
 
     /** */
     @Test
     public void testRestoreFullSnapshot() throws Exception {
-        IgniteEx ignite = startGrids(3);
+        testRestoreFullSnapshot(DFLT_GRIDS_CNT);
+    }
+
+    /** */
+    @Test
+    public void testRestoreFullSnapshot_OnLargerTopology() throws Exception {
+        testRestoreFullSnapshot(2 * DFLT_GRIDS_CNT);
+    }
+
+    /** */
+    private void testRestoreFullSnapshot(int gridCnt) throws Exception {
+        IgniteEx ignite = startGrids(gridCnt);
         ignite.events().localListen(e -> locEvts.add(e.type()), 
EVTS_CLUSTER_SNAPSHOT);
         ignite.cluster().state(ClusterState.ACTIVE);
 
+        long withoutHolesSize = snapshotSize(G.allGrids(), 
SNAPSHOT_WITHOUT_HOLES);
+
         for (String snpName: Arrays.asList(SNAPSHOT_WITH_HOLES, 
SNAPSHOT_WITHOUT_HOLES)) {
             try {
                 ignite.snapshot().restoreSnapshot(snpName, null).get(TIMEOUT);
 
                 waitForEvents(EVT_CLUSTER_SNAPSHOT_RESTORE_STARTED, 
EVT_CLUSTER_SNAPSHOT_RESTORE_FINISHED);
 
+                long persistSz = persistenseSize(G.allGrids());
+
+                assertTrue("persistSz < withoutHolesSize " + persistSz + "< " 
+ withoutHolesSize,
+                    persistSz < 0.75 * withoutHolesSize);
+
                 for (String cacheName : CACHES.keySet()) {
                     IgniteCache cache = ignite.cache(cacheName);
 
@@ -175,7 +204,7 @@ public class SnapshotCompressionBasicTest extends 
AbstractSnapshotSelfTest {
     /** */
     @Test
     public void testRestoreFail_OnGridWithoutCompression() throws Exception {
-        IgniteEx ignite = startGrids(3);
+        IgniteEx ignite = startGrids(DFLT_GRIDS_CNT);
         ignite.events().localListen(e -> locEvts.add(e.type()), 
EVTS_CLUSTER_SNAPSHOT);
         ignite.cluster().state(ClusterState.ACTIVE);
 
@@ -187,11 +216,10 @@ public class SnapshotCompressionBasicTest extends 
AbstractSnapshotSelfTest {
         }
     }
 
-
     /** */
     @Test
     public void testRestoreNotCompressed_OnGridWithoutCompression() throws 
Exception {
-        IgniteEx ignite = startGrids(3);
+        IgniteEx ignite = startGrids(DFLT_GRIDS_CNT);
         ignite.events().localListen(e -> locEvts.add(e.type()), 
EVTS_CLUSTER_SNAPSHOT);
         ignite.cluster().state(ClusterState.ACTIVE);
 
@@ -227,6 +255,40 @@ public class SnapshotCompressionBasicTest extends 
AbstractSnapshotSelfTest {
         return i -> new Value("name_" + i);
     }
 
+    /** {@inheritDoc} */
+    @Override protected void cleanPersistenceDir() throws Exception {
+        super.cleanPersistenceDir();
+
+        cleanPersistenceDir(false);
+    }
+
+    /** */
+    protected void cleanPersistenceDir(boolean saveSnap) throws Exception {
+        assertTrue("Grids are not stopped", F.isEmpty(G.allGrids()));
+
+        String mask = U.maskForFileName(getTestIgniteInstanceName());
+
+        try (DirectoryStream<Path> ds = 
Files.newDirectoryStream(defaultWorkDirectory(),
+            path -> Files.isDirectory(path) && 
path.getFileName().toString().contains(mask))
+        ) {
+            for (Path dir : ds) {
+                if (!saveSnap) {
+                    U.delete(dir);
+
+                    continue;
+                }
+
+                U.delete(U.resolveWorkDirectory(dir.toString(), "cp", false));
+                U.delete(U.resolveWorkDirectory(dir.toString(), 
DFLT_STORE_DIR, false));
+                U.delete(U.resolveWorkDirectory(dir.toString(), 
DataStorageConfiguration.DFLT_MARSHALLER_PATH, false));
+                U.delete(U.resolveWorkDirectory(dir.toString(), 
DataStorageConfiguration.DFLT_BINARY_METADATA_PATH, false));
+            }
+        }
+        catch (IOException e) {
+            throw new IgniteException(e);
+        }
+    }
+
     /** */
     protected void createTestSnapshot() throws Exception {
         CacheConfiguration[] caches = CACHES.entrySet().stream()
@@ -253,7 +315,7 @@ public class SnapshotCompressionBasicTest extends 
AbstractSnapshotSelfTest {
                 return config;
             }).toArray(CacheConfiguration[]::new);
 
-        IgniteEx ignite = startGridsWithCache(3, 1000, valueBuilder(), caches);
+        IgniteEx ignite = startGridsWithCache(DFLT_GRIDS_CNT, 1000, 
valueBuilder(), caches);
 
         forceCheckpoint();
 
@@ -272,31 +334,23 @@ public class SnapshotCompressionBasicTest extends 
AbstractSnapshotSelfTest {
             assertTrue(F.isEmpty(res.exceptions()));
         }
 
-        Path withHolesPath = 
Paths.get(U.resolveWorkDirectory(U.defaultWorkDirectory(), 
DFLT_SNAPSHOT_DIRECTORY, false)
-            .toString(), SNAPSHOT_WITH_HOLES);
-
-        Path withoutHolesPath = 
Paths.get(U.resolveWorkDirectory(U.defaultWorkDirectory(), 
DFLT_SNAPSHOT_DIRECTORY, false)
-            .toString(), SNAPSHOT_WITHOUT_HOLES);
-
-        long withHolesSize = directorySize(withHolesPath);
-        long withoutHolesSize = directorySize(withoutHolesPath);
+        long withHolesSize = snapshotSize(G.allGrids(), SNAPSHOT_WITH_HOLES);
+        long withoutHolesSize = snapshotSize(G.allGrids(), 
SNAPSHOT_WITHOUT_HOLES);
 
         assertTrue("withHolesSize < withoutHolesSize: " + withHolesSize + " < 
" + withoutHolesSize,
             withHolesSize < withoutHolesSize);
 
-        long idxWithHolesSize = directorySize(withHolesPath, "index\\.bin");
-        long idxWithoutHolesSize = directorySize(withoutHolesPath, 
"index\\.bin");
+        long idxWithHolesSize = snapshotSize(G.allGrids(), 
SNAPSHOT_WITH_HOLES, "index\\.bin");
+        long idxWithoutHolesSize = snapshotSize(G.allGrids(), 
SNAPSHOT_WITHOUT_HOLES, "index\\.bin");
 
         assertTrue("idxWithHolesSize < idxWithoutHolesSize: " + 
idxWithHolesSize + " < " + idxWithoutHolesSize,
             idxWithHolesSize < idxWithoutHolesSize);
 
-        ignite.cacheNames().forEach(c -> ignite.getOrCreateCache(c).destroy());
-
         G.stopAll(true);
     }
 
     /** */
-    private void failCompressionProcessor(Ignite ignite, String... snpNames) {
+    protected void failCompressionProcessor(Ignite ignite, String... snpNames) 
{
         CompressionProcessor compressProc = 
((IgniteEx)ignite).context().compress();
 
         CompressionProcessor spyCompressProc = Mockito.spy(compressProc);
@@ -334,12 +388,34 @@ public class SnapshotCompressionBasicTest extends 
AbstractSnapshotSelfTest {
     }
 
     /** */
-    private static long directorySize(Path path) throws IOException {
-        return directorySize(path, null);
+    protected long persistenseSize(Collection<Ignite> grids) {
+        return grids.stream()
+            .map(ig -> workingDirectory(ig).resolve(DFLT_STORE_DIR))
+            .reduce(0L, (acc, p) -> acc + directorySize(p), Long::sum);
+    }
+
+    /** */
+    protected long snapshotSize(Collection<Ignite> grids, String snpName) {
+        return snapshotSize(grids, snpName, "(part-\\d+|index)\\.bin");
+    }
+
+    /** */
+    protected long snapshotSize(Collection<Ignite> grids, String snpName, 
String pattern) {
+        return grids.stream()
+            .map(ig -> 
workingDirectory(ig).resolve(DFLT_SNAPSHOT_DIRECTORY).resolve(snpName))
+            .reduce(0L, (acc, p) -> acc + directorySize(p, pattern), 
Long::sum);
+    }
+
+    /** */
+    protected long directorySize(Path path) {
+        return directorySize(path, "(part-\\d+|index)\\.bin");
     }
 
     /** */
-    private static long directorySize(Path path, String pattern) throws 
IOException {
+    protected long directorySize(Path path, String pattern) {
+        if (!Files.exists(path))
+            return 0;
+
         try (Stream<Path> walk = Files.walk(path)) {
             return walk.filter(Files::isRegularFile)
                 .filter(f -> F.isEmpty(pattern) || 
f.getFileName().toString().matches(pattern))
@@ -352,10 +428,38 @@ public class SnapshotCompressionBasicTest extends 
AbstractSnapshotSelfTest {
                     }
                 }).sum();
         }
+        catch (IOException e) {
+            throw new IgniteException(e);
+        }
+    }
+
+    /** */
+    protected Path workingDirectory(Ignite ig) {
+        return workingDirectory(ig.configuration());
+    }
+
+    /** */
+    protected Path workingDirectory(IgniteConfiguration cfg) {
+        try {
+            return Paths.get(U.defaultWorkDirectory(), 
U.maskForFileName(cfg.getIgniteInstanceName()));
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteException(e);
+        }
+    }
+
+    /** */
+    protected Path defaultWorkDirectory() {
+        try {
+            return Paths.get(U.defaultWorkDirectory());
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteException(e);
+        }
     }
 
     /** */
-    private static class Value {
+    static class Value {
         /** */
         String name;
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
index 3fa01e50df5..5d655f5b760 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
@@ -433,7 +433,7 @@ public class IgniteSnapshotManager extends 
GridCacheSharedManagerAdapter
 
         marsh = MarshallerUtils.jdkMarshaller(ctx.igniteInstanceName());
 
-        restoreCacheGrpProc = new SnapshotRestoreProcess(ctx);
+        restoreCacheGrpProc = new SnapshotRestoreProcess(ctx, locBuff);
 
         // Manage remote snapshots.
         snpRmtMgr = new SequentialRemoteSnapshotManager();
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
index b414632bdd4..a29291df066 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
@@ -20,6 +20,7 @@ package 
org.apache.ignite.internal.processors.cache.persistence.snapshot;
 import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
+import java.nio.ByteBuffer;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
@@ -59,13 +60,18 @@ import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.NodeStoppingException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.managers.discovery.DiscoCache;
+import org.apache.ignite.internal.pagemem.PageIdUtils;
 import 
org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.GridLocalConfigManager;
 import org.apache.ignite.internal.processors.cache.StoredCacheData;
+import 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStore;
 import 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
+import 
org.apache.ignite.internal.processors.cache.persistence.file.FileVersionCheckingFactory;
 import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.ClusterSnapshotFuture;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
 import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState;
+import org.apache.ignite.internal.processors.compress.CompressionProcessor;
 import org.apache.ignite.internal.processors.metric.MetricRegistry;
 import org.apache.ignite.internal.util.distributed.DistributedProcess;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
@@ -88,6 +94,7 @@ import static 
org.apache.ignite.internal.processors.cache.persistence.file.FileP
 import static 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.cacheGroupName;
 import static 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.partId;
 import static 
org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage.METASTORAGE_CACHE_NAME;
+import static 
org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId.getTypeByPartId;
 import static 
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.databaseRelativePath;
 import static 
org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.RESTORE_CACHE_GROUP_SNAPSHOT_PRELOAD;
 import static 
org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.RESTORE_CACHE_GROUP_SNAPSHOT_PREPARE;
@@ -131,6 +138,9 @@ public class SnapshotRestoreProcess {
     /** Logger. */
     private final IgniteLogger log;
 
+    /** */
+    private final ThreadLocal<ByteBuffer> locBuff;
+
     /** Future to be completed when the cache restore process is complete 
(this future will be returned to the user). */
     private volatile ClusterSnapshotFuture fut;
 
@@ -142,12 +152,15 @@ public class SnapshotRestoreProcess {
 
     /**
      * @param ctx Kernal context.
+     * @param locBuff Thread local page buffer.
      */
-    public SnapshotRestoreProcess(GridKernalContext ctx) {
+    public SnapshotRestoreProcess(GridKernalContext ctx, 
ThreadLocal<ByteBuffer> locBuff) {
         this.ctx = ctx;
 
         log = ctx.log(getClass());
 
+        this.locBuff = locBuff;
+
         prepareRestoreProc = new DistributedProcess<>(
             ctx, RESTORE_CACHE_GROUP_SNAPSHOT_PREPARE, this::prepare, 
this::finishPrepare);
 
@@ -790,6 +803,12 @@ public class SnapshotRestoreProcess {
                 }
             }
 
+            if (!F.isEmpty(e.getValue().metas)) {
+                
e.getValue().metas.stream().filter(SnapshotMetadata::hasCompressedGroups)
+                    .forEach(meta -> 
meta.cacheGroupIds().stream().filter(meta::isGroupWithCompresion)
+                        .forEach(opCtx0::addCompressedGroup));
+            }
+
             opCtx0.metasPerNode.put(e.getKey(), new 
ArrayList<>(e.getValue().metas));
         }
 
@@ -964,11 +983,7 @@ public class SnapshotRestoreProcess {
                             .contains(partFut.partId);
 
                         if (doCopy) {
-                            copyLocalAsync(ctx.cache().context().snapshotMgr(),
-                                opCtx0,
-                                snpCacheDir,
-                                tmpCacheDir,
-                                partFut);
+                            copyLocalAsync(opCtx0, snpCacheDir, tmpCacheDir, 
partFut);
                         }
 
                         return doCopy;
@@ -991,11 +1006,7 @@ public class SnapshotRestoreProcess {
                             allParts.computeIfAbsent(grpId, g -> new 
HashSet<>())
                                 .add(idxFut = new 
PartitionRestoreFuture(INDEX_PARTITION, opCtx0.processedParts));
 
-                            copyLocalAsync(ctx.cache().context().snapshotMgr(),
-                                opCtx0,
-                                snpCacheDir,
-                                tmpCacheDir,
-                                idxFut);
+                            copyLocalAsync(opCtx0, snpCacheDir, tmpCacheDir, 
idxFut);
                         }
                     }
                 }
@@ -1051,7 +1062,27 @@ public class SnapshotRestoreProcess {
 
                                 assert partFut != null : 
snpFile.getAbsolutePath();
 
-                                partFut.complete(snpFile.toPath());
+                                if (!opCtx0.isGroupCompressed(grpId)) {
+                                    partFut.complete(snpFile.toPath());
+
+                                    return;
+                                }
+
+                                CompletableFuture.runAsync(
+                                    () -> {
+                                        try {
+                                            punchHole(grpId, partId, snpFile);
+
+                                            partFut.complete(snpFile.toPath());
+                                        }
+                                        catch (Throwable t0) {
+                                            opCtx0.errHnd.accept(t0);
+
+                                            
completeListExceptionally(rmtAwaitParts, t0);
+                                        }
+                                    },
+                                    snpMgr.snapshotExecutorService()
+                                );
                             });
                 }
             }
@@ -1334,13 +1365,11 @@ public class SnapshotRestoreProcess {
     }
 
     /**
-     * @param mgr Ignite snapshot manager.
      * @param opCtx Snapshot operation context.
      * @param srcDir Snapshot directory to copy from.
      * @param targetDir Destination directory to copy to.
      */
-    private static void copyLocalAsync(
-        IgniteSnapshotManager mgr,
+    private void copyLocalAsync(
         SnapshotRestoreContext opCtx,
         File srcDir,
         File targetDir,
@@ -1348,8 +1377,11 @@ public class SnapshotRestoreProcess {
     ) {
         File snpFile = new File(srcDir, 
FilePageStoreManager.getPartitionFileName(partFut.partId));
         Path partFile = Paths.get(targetDir.getAbsolutePath(), 
FilePageStoreManager.getPartitionFileName(partFut.partId));
+        int grpId = groupIdFromTmpDir(targetDir);
 
-        CompletableFuture.supplyAsync(() -> {
+        IgniteSnapshotManager snapMgr = ctx.cache().context().snapshotMgr();
+
+        CompletableFuture<Path> copyPartFut = CompletableFuture.supplyAsync(() 
-> {
             if (opCtx.stopChecker.getAsBoolean())
                 throw new IgniteInterruptedException("The operation has been 
stopped on copy file: " + snpFile.getAbsolutePath());
 
@@ -1361,11 +1393,30 @@ public class SnapshotRestoreProcess {
                     ", snpDir=" + snpFile.getAbsolutePath() + ", name=" + 
snpFile.getName() + ']');
             }
 
-            IgniteSnapshotManager.copy(mgr.ioFactory(), snpFile, 
partFile.toFile(), snpFile.length());
+            IgniteSnapshotManager.copy(snapMgr.ioFactory(), snpFile, 
partFile.toFile(), snpFile.length());
 
             return partFile;
-        }, mgr.snapshotExecutorService())
-            .whenComplete((r, t) -> opCtx.errHnd.accept(t))
+        }, snapMgr.snapshotExecutorService());
+
+        if (opCtx.isGroupCompressed(grpId)) {
+            copyPartFut = copyPartFut.thenComposeAsync(
+                p -> {
+                    CompletableFuture<Path> result = new CompletableFuture<>();
+                    try {
+                        punchHole(grpId, partFut.partId, partFile.toFile());
+
+                        result.complete(partFile);
+                    }
+                    catch (Throwable t) {
+                        result.completeExceptionally(t);
+                    }
+                    return result;
+                },
+                snapMgr.snapshotExecutorService()
+            );
+        }
+
+        copyPartFut.whenComplete((r, t) -> opCtx.errHnd.accept(t))
             .whenComplete((r, t) -> {
                 if (t == null)
                     partFut.complete(partFile);
@@ -1374,6 +1425,42 @@ public class SnapshotRestoreProcess {
             });
     }
 
+    /** */
+    private void punchHole(int grpId, int partId, File partFile) throws 
Exception {
+        FilePageStoreManager storeMgr = 
(FilePageStoreManager)ctx.cache().context().pageStore();
+        FileVersionCheckingFactory factory = 
storeMgr.getPageStoreFactory(grpId, null);
+
+        try (FilePageStore pageStore = 
(FilePageStore)factory.createPageStore(getTypeByPartId(partId), partFile, val 
-> {})) {
+            pageStore.init();
+
+            ByteBuffer buf = locBuff.get();
+
+            long pageId = PageIdUtils.pageId(partId, (byte)0, 0);
+
+            for (int pageNo = 0; pageNo < pageStore.pages(); pageId++, 
pageNo++) {
+                if (opCtx.stopChecker.getAsBoolean()) {
+                    throw new IgniteInterruptedException("The operation has 
been stopped while punching holes in file: "
+                        + partFile.getAbsolutePath());
+                }
+
+                if (Thread.interrupted())
+                    throw new IgniteInterruptedException("Thread has been 
interrupted: " + Thread.currentThread().getName());
+
+                buf.clear();
+
+                pageStore.read(pageId, buf, true);
+
+                if (PageIO.getCompressionType(buf) == 
CompressionProcessor.UNCOMPRESSED_PAGE)
+                    continue;
+
+                int comprPageSz = PageIO.getCompressedSize(buf);
+
+                if (comprPageSz < pageStore.getPageSize())
+                    pageStore.punchHole(pageId, comprPageSz);
+            }
+        }
+    }
+
     /**
      * @param col Collection of sets to complete.
      * @param ex Exception to set.
@@ -1431,6 +1518,9 @@ public class SnapshotRestoreProcess {
         /** Stop condition checker. */
         private final BooleanSupplier stopChecker = () -> err.get() != null;
 
+        /** Compressed groups. */
+        private final Set<Integer> comprGrps = new HashSet<>();
+
         /** Cache ID to configuration mapping. */
         private volatile Map<Integer, StoredCacheData> cfgs = 
Collections.emptyMap();
 
@@ -1464,7 +1554,11 @@ public class SnapshotRestoreProcess {
          * @param discoCache Baseline discovery cache for node IDs that must 
be alive to complete the operation.
          * @param cfgs Cache ID to configuration mapping.
          */
-        protected SnapshotRestoreContext(SnapshotOperationRequest req, 
DiscoCache discoCache, Map<Integer, StoredCacheData> cfgs) {
+        protected SnapshotRestoreContext(
+            SnapshotOperationRequest req,
+            DiscoCache discoCache,
+            Map<Integer, StoredCacheData> cfgs
+        ) {
             reqId = req.requestId();
             snpName = req.snapshotName();
             snpPath = req.snapshotPath();
@@ -1481,6 +1575,16 @@ public class SnapshotRestoreProcess {
         public Collection<UUID> nodes() {
             return F.transform(discoCache.aliveBaselineNodes(), F.node2id());
         }
+
+        /** */
+        public boolean isGroupCompressed(int grpId) {
+            return comprGrps.contains(grpId);
+        }
+
+        /** */
+        void addCompressedGroup(int grpId) {
+            comprGrps.add(grpId);
+        }
     }
 
     /** Snapshot operation prepare response. */

Reply via email to