This is an automated email from the ASF dual-hosted git repository.

namelchev pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 7f155cb349a IGNITE-17723 Added snapshot delta pages sort (#10360)
7f155cb349a is described below

commit 7f155cb349a41a817faf363915b3003c0ec4e1bc
Author: Nikita Amelchev <[email protected]>
AuthorDate: Mon Nov 28 11:14:22 2022 +0300

    IGNITE-17723 Added snapshot delta pages sort (#10360)
---
 docs/_docs/snapshots/snapshots.adoc                |  21 ++
 .../org/apache/ignite/IgniteSystemProperties.java  |  12 ++
 .../snapshot/IgniteSnapshotManager.java            | 233 ++++++++++++++++++---
 .../persistence/snapshot/SnapshotFutureTask.java   | 105 +++++++++-
 .../snapshot/AbstractSnapshotSelfTest.java         |   2 +-
 .../snapshot/IgniteClusterSnapshotDeltaTest.java   | 220 +++++++++++++++++++
 .../ignite/testsuites/IgniteSnapshotTestSuite.java |   4 +-
 7 files changed, 560 insertions(+), 37 deletions(-)

diff --git a/docs/_docs/snapshots/snapshots.adoc 
b/docs/_docs/snapshots/snapshots.adoc
index b9d21c20c73..c1e47857e15 100644
--- a/docs/_docs/snapshots/snapshots.adoc
+++ b/docs/_docs/snapshots/snapshots.adoc
@@ -95,6 +95,27 @@ increases the total amount of time for taking a snapshot. 
However, this keeps th
 
 See the link:perf-and-troubleshooting/thread-pools-tuning[Ignite Snapshot 
Execution Pool,window=_blank] page for more details.
 
+=== Distributed properties
+
+The distributed properties listed in the table below allow you to configure 
snapshots at runtime:
+
+[cols="1,3,1",opts="header"]
+|===
+|Parameter | Description | Default Value
+|`snapshotTransferRate`| Snapshot transfer rate limit in bytes/sec. | 0
+|===
+
+=== System properties
+
+The system properties listed in the table below allow you to configure 
snapshots:
+
+[cols="1,1,3,1",opts="header"]
+|===
+|Property | Type | Description | Default Value
+|`IGNITE_SNAPSHOT_SEQUENTIAL_WRITE`| Boolean | Flag to indicate that disk 
writes during snapshot process should be in a
+sequential manner when possible. This generates extra disk space usage. | True
+|===
+
 == Creating Snapshot
 
 Ignite provides several APIs for the snapshot creation. Let's review all the 
options.
diff --git 
a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java 
b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index 3c61822fefa..e3766d2954b 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -38,6 +38,7 @@ import org.apache.ignite.internal.client.GridClient;
 import org.apache.ignite.internal.marshaller.optimized.OptimizedMarshaller;
 import 
org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointEntry;
 import 
org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointMarkersStorage;
+import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager;
 import 
org.apache.ignite.internal.processors.metastorage.DistributedMetaStorage;
 import 
org.apache.ignite.internal.processors.performancestatistics.FilePerformanceStatisticsWriter;
 import 
org.apache.ignite.internal.processors.query.schema.SchemaIndexCachePartitionWorker;
@@ -102,6 +103,7 @@ import static 
org.apache.ignite.internal.processors.cache.persistence.pagemem.Fu
 import static 
org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryImpl.DFLT_DELAYED_REPLACED_PAGE_WRITE;
 import static 
org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryImpl.DFLT_LOADED_PAGES_BACKWARD_SHIFT_MAP;
 import static 
org.apache.ignite.internal.processors.cache.persistence.pagemem.PagesWriteThrottlePolicy.DFLT_THROTTLE_LOG_THRESHOLD;
+import static 
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.DFLT_IGNITE_SNAPSHOT_SEQUENTIAL_WRITE;
 import static 
org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree.IGNITE_BPLUS_TREE_LOCK_RETRIES_DEFAULT;
 import static 
org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.DFLT_CHECKPOINT_TRIGGER_ARCHIVE_SIZE_PERCENTAGE;
 import static 
org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.DFLT_THRESHOLD_WAIT_TIME_NEXT_WAL_SEGMENT;
@@ -2095,6 +2097,16 @@ public final class IgniteSystemProperties {
         defaults = "" + DFLT_IGNITE_USE_BINARY_ARRAYS)
     public static final String IGNITE_USE_BINARY_ARRAYS = 
"IGNITE_USE_BINARY_ARRAYS";
 
+    /**
+     * Flag to indicate that disk writes during snapshot process should be in 
a sequential manner when possible. This
+     * generates extra disk space usage.
+     * The default value is {@link 
IgniteSnapshotManager#DFLT_IGNITE_SNAPSHOT_SEQUENTIAL_WRITE}.
+     */
+    @SystemProperty(value = "Flag to indicate that disk writes during snapshot 
process should be in a sequential " +
+        "manner when possible. This generates extra disk space usage", 
defaults = "" + DFLT_IGNITE_SNAPSHOT_SEQUENTIAL_WRITE)
+    @IgniteExperimental
+    public static final String IGNITE_SNAPSHOT_SEQUENTIAL_WRITE = 
"IGNITE_SNAPSHOT_SEQUENTIAL_WRITE";
+
     /**
      * Enforces singleton.
      */
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
index 04cddffc847..8cbd8e07880 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.cache.persistence.snapshot;
 
 import java.io.BufferedInputStream;
+import java.io.Closeable;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
@@ -44,6 +45,7 @@ import java.util.Deque;
 import java.util.EnumMap;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -52,6 +54,7 @@ import java.util.Objects;
 import java.util.Optional;
 import java.util.Queue;
 import java.util.Set;
+import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedDeque;
@@ -72,6 +75,7 @@ import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteInterruptedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.IgniteSnapshot;
+import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.binary.BinaryType;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.compute.ComputeTask;
@@ -104,6 +108,7 @@ import 
org.apache.ignite.internal.managers.encryption.GroupKey;
 import org.apache.ignite.internal.managers.encryption.GroupKeyEncrypted;
 import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener;
 import 
org.apache.ignite.internal.managers.systemview.walker.SnapshotViewWalker;
+import org.apache.ignite.internal.pagemem.PageIdUtils;
 import org.apache.ignite.internal.pagemem.store.PageStore;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor;
@@ -175,6 +180,7 @@ import org.apache.ignite.spi.systemview.view.SnapshotView;
 import org.jetbrains.annotations.Nullable;
 
 import static java.nio.file.StandardOpenOption.READ;
+import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_SNAPSHOT_SEQUENTIAL_WRITE;
 import static 
org.apache.ignite.configuration.DataStorageConfiguration.DFLT_BINARY_METADATA_PATH;
 import static 
org.apache.ignite.configuration.DataStorageConfiguration.DFLT_MARSHALLER_PATH;
 import static org.apache.ignite.events.EventType.EVT_CLUSTER_SNAPSHOT_FAILED;
@@ -238,6 +244,9 @@ public class IgniteSnapshotManager extends 
GridCacheSharedManagerAdapter
     /** File with delta pages suffix. */
     public static final String DELTA_SUFFIX = ".delta";
 
+    /** File with delta pages index suffix. */
+    public static final String DELTA_IDX_SUFFIX = ".idx";
+
     /** File name template consists of delta pages. */
     public static final String PART_DELTA_TEMPLATE = PART_FILE_TEMPLATE + 
DELTA_SUFFIX;
 
@@ -280,6 +289,9 @@ public class IgniteSnapshotManager extends 
GridCacheSharedManagerAdapter
     /** Metastorage key to save currently running snapshot directory path. */
     private static final String SNP_RUNNING_DIR_KEY = "snapshot-running-dir";
 
+    /** Default value of {@link 
IgniteSystemProperties#IGNITE_SNAPSHOT_SEQUENTIAL_WRITE}. */
+    public static final boolean DFLT_IGNITE_SNAPSHOT_SEQUENTIAL_WRITE = true;
+
     /** @deprecated Use #SNP_RUNNING_DIR_KEY instead. */
     @Deprecated
     private static final String SNP_RUNNING_KEY = "snapshot-running";
@@ -394,6 +406,10 @@ public class IgniteSnapshotManager extends 
GridCacheSharedManagerAdapter
     /** Snapshot transfer rate limit in bytes/sec. */
     private final DistributedLongProperty snapshotTransferRate = 
detachedLongProperty(SNAPSHOT_TRANSFER_RATE_DMS_KEY);
 
+    /** Value of {@link 
IgniteSystemProperties#IGNITE_SNAPSHOT_SEQUENTIAL_WRITE}. */
+    private final boolean sequentialWrite =
+        IgniteSystemProperties.getBoolean(IGNITE_SNAPSHOT_SEQUENTIAL_WRITE, 
DFLT_IGNITE_SNAPSHOT_SEQUENTIAL_WRITE);
+
     /**
      * @param ctx Kernal context.
      */
@@ -425,6 +441,16 @@ public class IgniteSnapshotManager extends 
GridCacheSharedManagerAdapter
         return new File(snapshotCacheDir, partDeltaFileName(partId));
     }
 
+    /**
+     * Partition delta index file. Represents a sequence of page indexes that 
written to a delta.
+     *
+     * @param delta File with delta pages.
+     * @return File with delta pages index.
+     */
+    public static File partDeltaIndexFile(File delta) {
+        return new File(delta.getParent(), delta.getName() + DELTA_IDX_SUFFIX);
+    }
+
     /**
      * @param partId Partition id.
      * @return File name of delta partition pages.
@@ -1111,6 +1137,11 @@ public class IgniteSnapshotManager extends 
GridCacheSharedManagerAdapter
         return executeRestoreManagementTask(SnapshotRestoreStatusTask.class, 
snpName);
     }
 
+    /** @return {@code True} if disk writes during snapshot process should be 
in a sequential manner when possible. */
+    public boolean sequentialWrite() {
+        return sequentialWrite;
+    }
+
     /**
      * @param restoreId Restore process ID.
      * @return Server nodes on which a successful start of the cache(s) is 
required, if any of these nodes fails when
@@ -3157,6 +3188,10 @@ public class IgniteSnapshotManager extends 
GridCacheSharedManagerAdapter
         /** Size of page. */
         private final int pageSize;
 
+        /** Delta iterator factory. */
+        private final Factory<File, FileIOFactory, DeltaIterator> 
deltaIterFactory =
+            sequentialWrite() ? DeltaSortedIterator::new : DeltaIterator::new;
+
         /**
          * @param snpName Snapshot name.
          * @param snpPath Snapshot directory path.
@@ -3285,39 +3320,18 @@ public class IgniteSnapshotManager extends 
GridCacheSharedManagerAdapter
                 .encryptedFileIoFactory(IgniteSnapshotManager.this.ioFactory, 
pair.getGroupId()) :
                 IgniteSnapshotManager.this.ioFactory;
 
-            try (FileIO fileIo = ioFactory.create(delta, READ);
+            try (DeltaIterator deltaIter = deltaIterFactory.create(delta, 
ioFactory);
                  FilePageStore pageStore = 
(FilePageStore)storeMgr.getPageStoreFactory(pair.getGroupId(), encrypted)
                      .createPageStore(getTypeByPartId(pair.getPartitionId()), 
snpPart::toPath, v -> {})
             ) {
-                ByteBuffer pageBuf = ByteBuffer.allocate(pageSize)
-                    .order(ByteOrder.nativeOrder());
-
-                long totalBytes = fileIo.size();
-
-                assert totalBytes % pageSize == 0 : "Given file with delta 
pages has incorrect size: " + fileIo.size();
-
                 pageStore.beginRecover();
 
-                for (long pos = 0; pos < totalBytes; pos += pageSize) {
-                    long read = fileIo.readFully(pageBuf, pos);
-
-                    assert read == pageBuf.capacity();
-
-                    pageBuf.flip();
-
-                    if (log.isDebugEnabled()) {
-                        log.debug("Read page given delta file [path=" + 
delta.getName() +
-                            ", pageId=" + PageIO.getPageId(pageBuf) + ", pos=" 
+ pos + ", pages=" + (totalBytes / pageSize) +
-                            ", crcBuff=" + FastCrc.calcCrc(pageBuf, 
pageBuf.limit()) + ", crcPage=" + PageIO.getCrc(pageBuf) + ']');
-
-                        pageBuf.rewind();
-                    }
-
+                while (deltaIter.hasNext()) {
                     transferRateLimiter.acquire(pageSize);
 
-                    pageStore.write(PageIO.getPageId(pageBuf), pageBuf, 0, 
false);
+                    ByteBuffer page = deltaIter.next();
 
-                    pageBuf.flip();
+                    pageStore.write(PageIO.getPageId(page), page, 0, false);
                 }
 
                 pageStore.finishRecover();
@@ -3342,6 +3356,168 @@ public class IgniteSnapshotManager extends 
GridCacheSharedManagerAdapter
         }
     }
 
+    /** Delta file iterator. */
+    private class DeltaIterator implements Iterator<ByteBuffer>, Closeable {
+        /** Delta file. */
+        protected final File delta;
+
+        /** Delta file IO. */
+        private final FileIO fileIo;
+
+        /** Delta file length. */
+        protected final long totalBytes;
+
+        /** */
+        protected final int pageSize;
+
+        /** Pages count written to a delta file. */
+        protected final int pagesCnt;
+
+        /** */
+        protected final ByteBuffer pageBuf;
+
+        /** */
+        private long pos;
+
+        /** */
+        DeltaIterator(File delta, FileIOFactory ioFactory) throws IOException {
+            pageSize = 
cctx.kernalContext().config().getDataStorageConfiguration().getPageSize();
+
+            this.delta = delta;
+
+            fileIo = ioFactory.create(delta, READ);
+
+            totalBytes = fileIo.size();
+
+            assert totalBytes % pageSize == 0 : "Given file with delta pages 
has incorrect size: " + totalBytes;
+
+            pagesCnt = (int)(totalBytes / pageSize);
+
+            pageBuf = 
ByteBuffer.allocate(pageSize).order(ByteOrder.nativeOrder());
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean hasNext() {
+            return pos < totalBytes;
+        }
+
+        /** {@inheritDoc} */
+        @Override public ByteBuffer next() {
+            if (!hasNext())
+                throw new NoSuchElementException();
+
+            readPage(pos);
+
+            pos += pageSize;
+
+            return pageBuf;
+        }
+
+        /** Reads a page from the delta file from the given position. */
+        protected void readPage(long pos) {
+            pageBuf.clear();
+
+            try {
+                long read = fileIo.readFully(pageBuf, pos);
+
+                assert read == pageBuf.capacity();
+            }
+            catch (IOException e) {
+                throw new IgniteException(e);
+            }
+
+            pageBuf.flip();
+
+            if (log.isDebugEnabled()) {
+                log.debug("Read page given delta file [path=" + 
delta.getName() + ", pageId=" +
+                    PageIO.getPageId(pageBuf) + ", index=" + 
PageIdUtils.pageIndex(PageIO.getPageId(pageBuf)) +
+                    ", pos=" + pos + ", pagesCnt=" + pagesCnt + ", crcBuff=" +
+                    FastCrc.calcCrc(pageBuf, pageBuf.limit()) + ", crcPage=" + 
PageIO.getCrc(pageBuf) + ']');
+
+                pageBuf.rewind();
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public void close() throws IOException {
+            fileIo.close();
+        }
+    }
+
+    /**
+     * Delta file iterator sorted by page indexes to almost sequential disk 
writes on apply to a page store.
+     */
+    class DeltaSortedIterator extends DeltaIterator {
+        /** Snapshot delta sort batch size in pages count. */
+        public static final int DELTA_SORT_BATCH_SIZE = 500_000;
+
+        /** Delta index file IO. */
+        private final FileIO idxIo;
+
+        /** */
+        private int id;
+
+        /** */
+        private Iterator<Integer> sortedIter;
+
+        /** */
+        DeltaSortedIterator(File delta, FileIOFactory ioFactory) throws 
IOException {
+            super(delta, ioFactory);
+
+            File deltaIdx = partDeltaIndexFile(delta);
+
+            idxIo = pagesCnt > 0 ? 
IgniteSnapshotManager.this.ioFactory.create(deltaIdx, READ) : null;
+
+            assert deltaIdx.length() % 4 /* pageIdx */ == 0 : "Wrong delta 
index size: " + deltaIdx.length();
+            assert deltaIdx.length() / 4 == pagesCnt : "Wrong delta index 
pages count: " + deltaIdx.length();
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean hasNext() {
+            if (sortedIter == null || !sortedIter.hasNext())
+                advance();
+
+            return sortedIter.hasNext();
+        }
+
+        /** {@inheritDoc} */
+        @Override public ByteBuffer next() {
+            readPage((long)sortedIter.next() * pageSize);
+
+            return pageBuf;
+        }
+
+        /** */
+        private void advance() {
+            TreeMap<Integer, Integer> sorted = new TreeMap<>();
+
+            while (id < pagesCnt && sorted.size() < DELTA_SORT_BATCH_SIZE) {
+                pageBuf.clear();
+
+                try {
+                    idxIo.readFully(pageBuf);
+                }
+                catch (IOException e) {
+                    throw new IgniteException(e);
+                }
+
+                pageBuf.flip();
+
+                while (pageBuf.hasRemaining())
+                    sorted.put(pageBuf.getInt(), id++);
+            }
+
+            sortedIter = sorted.values().iterator();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void close() throws IOException {
+            super.close();
+
+            U.closeQuiet(idxIo);
+        }
+    }
+
     /** */
     private static class SnapshotOperationResponse implements Serializable {
         /** Serial version uid. */
@@ -3545,4 +3721,11 @@ public class IgniteSnapshotManager extends 
GridCacheSharedManagerAdapter
                 return new IgniteException("Snapshot has not been created", 
U.convertException(e));
         }
     }
+
+    /** Factory. */
+    @FunctionalInterface
+    private interface Factory<E1, E2, R> {
+        /** @return An instance of {@link R}. */
+        R create(E1 e1, E2 e2) throws IOException;
+    }
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java
index 579da37ee8e..d453d3f782e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java
@@ -73,6 +73,7 @@ import 
org.apache.ignite.internal.processors.metastorage.persistence.Distributed
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.lang.IgniteThrowableRunner;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.typedef.C3;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
@@ -85,6 +86,7 @@ import static 
org.apache.ignite.internal.processors.cache.persistence.file.FileP
 import static 
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.copy;
 import static 
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.databaseRelativePath;
 import static 
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.partDeltaFile;
+import static 
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.partDeltaIndexFile;
 
 /**
  * The requested map of cache groups and its partitions to include into 
snapshot represented as <tt>Map<Integer, Set<Integer>></tt>.
@@ -149,6 +151,10 @@ class SnapshotFutureTask extends 
AbstractSnapshotFutureTask<Set<GroupPartitionId
     /** Processed snapshot size in bytes. */
     private final AtomicLong processedSize = new AtomicLong();
 
+    /** Delta writer factory. */
+    private final C3<PageStore, File, Integer, PageStoreSerialWriter> 
deltaWriterFactory =
+        cctx.snapshotMgr().sequentialWrite() ? 
IndexedPageStoreSerialWriter::new : PageStoreSerialWriter::new;
+
     /**
      * @param cctx Shared context.
      * @param srcNodeId Node id which cause snapshot task creation.
@@ -522,7 +528,11 @@ class SnapshotFutureTask extends 
AbstractSnapshotFutureTask<Set<GroupPartitionId
                         // Wait for the completion of both futures - 
checkpoint end, copy partition.
                         .runAfterBothAsync(cpEndFut,
                             wrapExceptionIfStarted(() -> {
-                                File delta = 
partDeltaWriters.get(pair).deltaFile;
+                                PageStoreSerialWriter writer = 
partDeltaWriters.get(pair);
+
+                                writer.close();
+
+                                File delta = writer.deltaFile;
 
                                 try {
                                     // Atomically creates a new, empty delta 
file if and only if
@@ -540,6 +550,14 @@ class SnapshotFutureTask extends 
AbstractSnapshotFutureTask<Set<GroupPartitionId
                                 boolean deleted = delta.delete();
 
                                 assert deleted;
+
+                                File deltaIdx = partDeltaIndexFile(delta);
+
+                                if (deltaIdx.exists()) {
+                                    deleted = deltaIdx.delete();
+
+                                    assert deleted;
+                                }
                             }),
                             snpSndr.executor());
 
@@ -575,9 +593,9 @@ class SnapshotFutureTask extends 
AbstractSnapshotFutureTask<Set<GroupPartitionId
             GroupPartitionId pair = new GroupPartitionId(grpId, partId);
 
             PageStore store = pageStore.getStore(grpId, partId);
+            File delta = partDeltaFile(cacheWorkDir(tmpConsIdDir, dirName), 
partId);
 
-            partDeltaWriters.put(pair,
-                new PageStoreSerialWriter(store, 
partDeltaFile(cacheWorkDir(tmpConsIdDir, dirName), partId), encGrpId));
+            partDeltaWriters.put(pair, deltaWriterFactory.apply(store, delta, 
encGrpId));
 
             partFileLengths.put(pair, store.size());
         }
@@ -801,10 +819,10 @@ class SnapshotFutureTask extends 
AbstractSnapshotFutureTask<Set<GroupPartitionId
     private class PageStoreSerialWriter implements PageWriteListener, 
Closeable {
         /** Page store to which current writer is related to. */
         @GridToStringExclude
-        private final PageStore store;
+        protected final PageStore store;
 
         /** Partition delta file to store delta pages into. */
-        private final File deltaFile;
+        protected final File deltaFile;
 
         /** Id of encrypted cache group. If {@code null}, no encrypted IO is 
used. */
         private final Integer encryptedGrpId;
@@ -871,6 +889,12 @@ class SnapshotFutureTask extends 
AbstractSnapshotFutureTask<Set<GroupPartitionId
             }
         }
 
+        /** */
+        protected void init() throws IOException {
+            deltaFileIo = (encryptedGrpId == null ? ioFactory :
+                pageStore.encryptedFileIoFactory(ioFactory, 
encryptedGrpId)).create(deltaFile);
+        }
+
         /** {@inheritDoc} */
         @Override public void accept(long pageId, ByteBuffer buf) {
             assert buf.position() == 0 : buf.position();
@@ -883,10 +907,8 @@ class SnapshotFutureTask extends 
AbstractSnapshotFutureTask<Set<GroupPartitionId
                     if (stopped())
                         return;
 
-                    if (deltaFileIo == null) {
-                        deltaFileIo = (encryptedGrpId == null ? ioFactory :
-                            pageStore.encryptedFileIoFactory(ioFactory, 
encryptedGrpId)).create(deltaFile);
-                    }
+                    if (deltaFileIo == null)
+                        init();
                 }
                 catch (IOException e) {
                     acceptException(e);
@@ -948,7 +970,7 @@ class SnapshotFutureTask extends 
AbstractSnapshotFutureTask<Set<GroupPartitionId
          * @param pageBuf Page buffer to write.
          * @throws IOException If page writing failed (IO error occurred).
          */
-        private void writePage0(long pageId, ByteBuffer pageBuf) throws 
IOException {
+        protected synchronized void writePage0(long pageId, ByteBuffer 
pageBuf) throws IOException {
             assert deltaFileIo != null : "Delta pages storage is not inited: " 
+ this;
             assert pageBuf.position() == 0;
             assert pageBuf.order() == ByteOrder.nativeOrder() : "Page buffer 
order " + pageBuf.order()
@@ -992,6 +1014,69 @@ class SnapshotFutureTask extends 
AbstractSnapshotFutureTask<Set<GroupPartitionId
         }
     }
 
+    /** @see IgniteSnapshotManager.DeltaSortedIterator */
+    private class IndexedPageStoreSerialWriter extends PageStoreSerialWriter {
+        /** Delta index file IO. */
+        @GridToStringExclude
+        private volatile FileIO idxIo;
+
+        /** Buffer of page indexes written to the delta. */
+        private volatile ByteBuffer pageIdxs;
+
+        /** */
+        public IndexedPageStoreSerialWriter(PageStore store, File deltaFile, 
@Nullable Integer encryptedGrpId) {
+            super(store, deltaFile, encryptedGrpId);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void init() throws IOException {
+            super.init();
+
+            idxIo = ioFactory.create(partDeltaIndexFile(deltaFile));
+
+            pageIdxs = 
ByteBuffer.allocate(store.getPageSize()).order(ByteOrder.nativeOrder());
+
+            assert pageIdxs.capacity() % 4 == 0;
+        }
+
+        /** {@inheritDoc} */
+        @Override protected synchronized void writePage0(long pageId, 
ByteBuffer pageBuf) throws IOException {
+            super.writePage0(pageId, pageBuf);
+
+            pageIdxs.putInt(PageIdUtils.pageIndex(pageId));
+
+            if (!pageIdxs.hasRemaining())
+                flush();
+        }
+
+        /** Flush buffer with page indexes to the file. */
+        private void flush() throws IOException {
+            pageIdxs.flip();
+
+            idxIo.writeFully(pageIdxs);
+
+            pageIdxs.clear();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void close() {
+            super.close();
+
+            try {
+                if (idxIo != null)
+                    flush();
+            }
+            catch (IOException e) {
+                acceptException(new IgniteCheckedException("Error during 
writing page indexes to delta " +
+                    "partition index file [writer=" + this + ']', e));
+            }
+
+            U.closeQuiet(idxIo);
+
+            idxIo = null;
+        }
+    }
+
     /**
      *
      */
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java
index 702c8a9986b..6b647084ed9 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java
@@ -152,7 +152,7 @@ public abstract class AbstractSnapshotSelfTest extends 
GridCommonAbstractTest {
 
     /** Parameters. */
     @Parameterized.Parameters(name = "Encryption={0}")
-    public static Iterable<Boolean> encryptionParams() {
+    public static Collection<Boolean> encryptionParams() {
         return Arrays.asList(false, true);
     }
 
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotDeltaTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotDeltaTest.java
new file mode 100644
index 00000000000..8912396d691
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotDeltaTest.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.OpenOption;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.BiFunction;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
+import 
org.apache.ignite.internal.processors.cache.persistence.file.FileIODecorator;
+import 
org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
+import 
org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_SNAPSHOT_SEQUENTIAL_WRITE;
+import static 
org.apache.ignite.configuration.DataStorageConfiguration.DFLT_PAGE_SIZE;
+import static 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.CACHE_DIR_PREFIX;
+import static 
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.DeltaSortedIterator.DELTA_SORT_BATCH_SIZE;
+import static 
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.partDeltaIndexFile;
+import static org.apache.ignite.testframework.GridTestUtils.cartesianProduct;
+import static org.junit.Assert.assertArrayEquals;
+
+/**
+ * Cluster snapshot delta tests.
+ */
+@RunWith(Parameterized.class)
+public class IgniteClusterSnapshotDeltaTest extends AbstractSnapshotSelfTest {
+    /** */
+    @Parameterized.Parameter(1)
+    public boolean sequentialWrite;
+
+    /** Parameters. */
+    @Parameterized.Parameters(name = "encryption={0}, sequentialWrite={1}")
+    public static Collection<Object[]> parameters() {
+        return cartesianProduct(encryptionParams(), F.asList(false, true));
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        System.clearProperty(IGNITE_SNAPSHOT_SEQUENTIAL_WRITE);
+    }
+
+    /** @throws Exception If failed. */
+    @Test
+    public void testSendDelta() throws Exception {
+        int keys = 10_000;
+        byte[] payload = new byte[DFLT_PAGE_SIZE / 2];
+        int partCnt = 2;
+
+        System.setProperty(IGNITE_SNAPSHOT_SEQUENTIAL_WRITE, 
String.valueOf(sequentialWrite));
+
+        // 1. Start a cluster and fill cache.
+        ThreadLocalRandom.current().nextBytes(payload);
+
+        byte[] expPayload = Arrays.copyOf(payload, payload.length);
+
+        CacheConfiguration<Integer, byte[]> ccfg = new 
CacheConfiguration<Integer, byte[]>(DEFAULT_CACHE_NAME)
+            .setAffinity(new RendezvousAffinityFunction(false, partCnt));
+
+        String cacheDir = CACHE_DIR_PREFIX + DEFAULT_CACHE_NAME;
+
+        IgniteEx srv = startGridsWithCache(1, keys, (k) -> expPayload, ccfg);
+
+        if (sequentialWrite)
+            injectSequentialWriteCheck(srv);
+
+        IgniteSnapshotManager mgr = snp(srv);
+
+        BiFunction<String, String, SnapshotSender> old = 
mgr.localSnapshotSenderFactory();
+
+        CountDownLatch partStart = new CountDownLatch(partCnt);
+        CountDownLatch deltaApply = new CountDownLatch(1);
+
+        mgr.localSnapshotSenderFactory((rqId, nodeId) -> new 
DelegateSnapshotSender(log,
+            mgr.snapshotExecutorService(), old.apply(rqId, nodeId)) {
+            @Override public void sendPart0(File part, String cacheDirName, 
GroupPartitionId pair, Long length) {
+                if (cacheDir.equals(cacheDirName))
+                    partStart.countDown();
+
+                try {
+                    deltaApply.await();
+                }
+                catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+
+                super.sendPart0(part, cacheDirName, pair, length);
+            }
+
+            @Override public void sendDelta0(File delta, String cacheDirName, 
GroupPartitionId pair) {
+                if (cacheDir.equals(cacheDirName))
+                    assertTrue(delta.length() > 0);
+
+                if (!sequentialWrite)
+                    U.delete(partDeltaIndexFile(delta));
+
+                long start = System.nanoTime();
+
+                super.sendDelta0(delta, cacheDirName, pair);
+
+                if (cacheDir.equals(cacheDirName)) {
+                    log.info("Send delta [size=" + 
U.humanReadableByteCount(delta.length()) +
+                        ", time=" + (U.nanosToMillis(System.nanoTime() - 
start)) + "ms, part=" + pair + "]");
+                }
+            }
+        });
+
+        // 2. Start a snapshot and block copy of a partitions.
+        IgniteFuture<Void> fut = srv.snapshot().createSnapshot(SNAPSHOT_NAME);
+
+        GridTestUtils.waitForCondition(() -> mgr.currentCreateRequest() != 
null, getTestTimeout());
+
+        partStart.await();
+
+        // 3. Produce delta pages by data updates.
+        IgniteCache<Integer, byte[]> cache = srv.getOrCreateCache(ccfg);
+
+        ThreadLocalRandom.current().nextBytes(payload);
+
+        for (int i = 0; i < keys; i++)
+            cache.put(i, payload);
+
+        forceCheckpoint(srv);
+
+        // 4. Apply delta and wait for a snapshot complete.
+        deltaApply.countDown();
+
+        fut.get();
+
+        // 5. Destroy cache, restart the cluster and check data (delta was 
successfully applied).
+        srv.destroyCache(DEFAULT_CACHE_NAME);
+
+        stopAllGrids();
+
+        srv = startGridsFromSnapshot(1, SNAPSHOT_NAME);
+
+        cache = srv.cache(DEFAULT_CACHE_NAME);
+
+        for (int i = 0; i < keys; i++)
+            assertArrayEquals(expPayload, cache.get(i));
+    }
+
+    /** Injects test IO that checks sequential write to a pagestore on a delta 
apply. */
+    private void injectSequentialWriteCheck(IgniteEx srv) {
+        FilePageStoreManager pageStore = 
(FilePageStoreManager)srv.context().cache().context().pageStore();
+
+        FileIOFactory old = pageStore.getPageStoreFileIoFactory();
+
+        int idxsPerPage = pageStore.pageSize() / 4;
+
+        int idxsPerBatch = (DELTA_SORT_BATCH_SIZE / idxsPerPage) * idxsPerPage 
+ idxsPerPage;
+
+        FileIOFactory testFactory = new FileIOFactory() {
+            @Override public FileIO create(File file, OpenOption... modes) 
throws IOException {
+                FileIO fileIo = old.create(file, modes);
+
+                return new FileIODecorator(fileIo) {
+                    boolean isSequentialWrite = true;
+
+                    long lastPos;
+
+                    int idx;
+
+                    @Override public int write(ByteBuffer srcBuf, long pos) 
throws IOException {
+                        boolean batchRotation = idx++ % idxsPerBatch == 0;
+
+                        if (lastPos > pos && !batchRotation)
+                            isSequentialWrite = false;
+
+                        lastPos = pos;
+
+                        return super.write(srcBuf, pos);
+                    }
+
+                    @Override public void close() throws IOException {
+                        super.close();
+
+                        if (!isSequentialWrite)
+                            throw new RuntimeException("Non sequential 
write.");
+                    }
+                };
+            }
+        };
+
+        pageStore.setPageStoreFileIOFactories(testFactory, testFactory);
+    }
+}
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSnapshotTestSuite.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSnapshotTestSuite.java
index f151ba19739..33419af614b 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSnapshotTestSuite.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSnapshotTestSuite.java
@@ -19,6 +19,7 @@ package org.apache.ignite.testsuites;
 
 import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.EncryptedSnapshotTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteClusterSnapshotCheckTest;
+import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteClusterSnapshotDeltaTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteClusterSnapshotHandlerTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteClusterSnapshotRestoreSelfTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteClusterSnapshotSelfTest;
@@ -50,7 +51,8 @@ import org.junit.runners.Suite;
     EncryptedSnapshotTest.class,
     IgniteClusterSnapshotWalRecordTest.class,
     IgniteClusterSnapshotStreamerTest.class,
-    IgniteSnapshotConsistencyTest.class
+    IgniteSnapshotConsistencyTest.class,
+    IgniteClusterSnapshotDeltaTest.class
 })
 public class IgniteSnapshotTestSuite {
 }


Reply via email to