This is an automated email from the ASF dual-hosted git repository.
namelchev pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 7f155cb349a IGNITE-17723 Added snapshot delta pages sort (#10360)
7f155cb349a is described below
commit 7f155cb349a41a817faf363915b3003c0ec4e1bc
Author: Nikita Amelchev <[email protected]>
AuthorDate: Mon Nov 28 11:14:22 2022 +0300
IGNITE-17723 Added snapshot delta pages sort (#10360)
---
docs/_docs/snapshots/snapshots.adoc | 21 ++
.../org/apache/ignite/IgniteSystemProperties.java | 12 ++
.../snapshot/IgniteSnapshotManager.java | 233 ++++++++++++++++++---
.../persistence/snapshot/SnapshotFutureTask.java | 105 +++++++++-
.../snapshot/AbstractSnapshotSelfTest.java | 2 +-
.../snapshot/IgniteClusterSnapshotDeltaTest.java | 220 +++++++++++++++++++
.../ignite/testsuites/IgniteSnapshotTestSuite.java | 4 +-
7 files changed, 560 insertions(+), 37 deletions(-)
diff --git a/docs/_docs/snapshots/snapshots.adoc
b/docs/_docs/snapshots/snapshots.adoc
index b9d21c20c73..c1e47857e15 100644
--- a/docs/_docs/snapshots/snapshots.adoc
+++ b/docs/_docs/snapshots/snapshots.adoc
@@ -95,6 +95,27 @@ increases the total amount of time for taking a snapshot.
However, this keeps th
See the link:perf-and-troubleshooting/thread-pools-tuning[Ignite Snapshot
Execution Pool,window=_blank] page for more details.
+=== Distributed properties
+
+The distributed properties listed in the table below allow you to configure
snapshots at runtime:
+
+[cols="1,3,1",opts="header"]
+|===
+|Parameter | Description | Default Value
+|`snapshotTransferRate`| Snapshot transfer rate limit in bytes/sec. | 0
+|===
+
+=== System properties
+
+The system properties listed in the table below allow you to configure
snapshots:
+
+[cols="1,1,3,1",opts="header"]
+|===
+|Property | Type | Description | Default Value
+|`IGNITE_SNAPSHOT_SEQUENTIAL_WRITE`| Boolean | Flag to indicate that disk
writes during snapshot process should be in a
+sequential manner when possible. This generates extra disk space usage. | True
+|===
+
== Creating Snapshot
Ignite provides several APIs for the snapshot creation. Let's review all the
options.
diff --git
a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index 3c61822fefa..e3766d2954b 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -38,6 +38,7 @@ import org.apache.ignite.internal.client.GridClient;
import org.apache.ignite.internal.marshaller.optimized.OptimizedMarshaller;
import
org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointEntry;
import
org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointMarkersStorage;
+import
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager;
import
org.apache.ignite.internal.processors.metastorage.DistributedMetaStorage;
import
org.apache.ignite.internal.processors.performancestatistics.FilePerformanceStatisticsWriter;
import
org.apache.ignite.internal.processors.query.schema.SchemaIndexCachePartitionWorker;
@@ -102,6 +103,7 @@ import static
org.apache.ignite.internal.processors.cache.persistence.pagemem.Fu
import static
org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryImpl.DFLT_DELAYED_REPLACED_PAGE_WRITE;
import static
org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryImpl.DFLT_LOADED_PAGES_BACKWARD_SHIFT_MAP;
import static
org.apache.ignite.internal.processors.cache.persistence.pagemem.PagesWriteThrottlePolicy.DFLT_THROTTLE_LOG_THRESHOLD;
+import static
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.DFLT_IGNITE_SNAPSHOT_SEQUENTIAL_WRITE;
import static
org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree.IGNITE_BPLUS_TREE_LOCK_RETRIES_DEFAULT;
import static
org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.DFLT_CHECKPOINT_TRIGGER_ARCHIVE_SIZE_PERCENTAGE;
import static
org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.DFLT_THRESHOLD_WAIT_TIME_NEXT_WAL_SEGMENT;
@@ -2095,6 +2097,16 @@ public final class IgniteSystemProperties {
defaults = "" + DFLT_IGNITE_USE_BINARY_ARRAYS)
public static final String IGNITE_USE_BINARY_ARRAYS =
"IGNITE_USE_BINARY_ARRAYS";
+ /**
+ * Flag to indicate that disk writes during snapshot process should be in
a sequential manner when possible. This
+ * generates extra disk space usage.
+ * The default value is {@link
IgniteSnapshotManager#DFLT_IGNITE_SNAPSHOT_SEQUENTIAL_WRITE}.
+ */
+ @SystemProperty(value = "Flag to indicate that disk writes during snapshot
process should be in a sequential " +
+ "manner when possible. This generates extra disk space usage",
defaults = "" + DFLT_IGNITE_SNAPSHOT_SEQUENTIAL_WRITE)
+ @IgniteExperimental
+ public static final String IGNITE_SNAPSHOT_SEQUENTIAL_WRITE =
"IGNITE_SNAPSHOT_SEQUENTIAL_WRITE";
+
/**
* Enforces singleton.
*/
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
index 04cddffc847..8cbd8e07880 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.cache.persistence.snapshot;
import java.io.BufferedInputStream;
+import java.io.Closeable;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
@@ -44,6 +45,7 @@ import java.util.Deque;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -52,6 +54,7 @@ import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
+import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
@@ -72,6 +75,7 @@ import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteInterruptedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSnapshot;
+import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.binary.BinaryType;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.compute.ComputeTask;
@@ -104,6 +108,7 @@ import
org.apache.ignite.internal.managers.encryption.GroupKey;
import org.apache.ignite.internal.managers.encryption.GroupKeyEncrypted;
import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener;
import
org.apache.ignite.internal.managers.systemview.walker.SnapshotViewWalker;
+import org.apache.ignite.internal.pagemem.PageIdUtils;
import org.apache.ignite.internal.pagemem.store.PageStore;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor;
@@ -175,6 +180,7 @@ import org.apache.ignite.spi.systemview.view.SnapshotView;
import org.jetbrains.annotations.Nullable;
import static java.nio.file.StandardOpenOption.READ;
+import static
org.apache.ignite.IgniteSystemProperties.IGNITE_SNAPSHOT_SEQUENTIAL_WRITE;
import static
org.apache.ignite.configuration.DataStorageConfiguration.DFLT_BINARY_METADATA_PATH;
import static
org.apache.ignite.configuration.DataStorageConfiguration.DFLT_MARSHALLER_PATH;
import static org.apache.ignite.events.EventType.EVT_CLUSTER_SNAPSHOT_FAILED;
@@ -238,6 +244,9 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
/** File with delta pages suffix. */
public static final String DELTA_SUFFIX = ".delta";
+ /** File with delta pages index suffix. */
+ public static final String DELTA_IDX_SUFFIX = ".idx";
+
/** File name template consists of delta pages. */
public static final String PART_DELTA_TEMPLATE = PART_FILE_TEMPLATE +
DELTA_SUFFIX;
@@ -280,6 +289,9 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
/** Metastorage key to save currently running snapshot directory path. */
private static final String SNP_RUNNING_DIR_KEY = "snapshot-running-dir";
+ /** Default value of {@link
IgniteSystemProperties#IGNITE_SNAPSHOT_SEQUENTIAL_WRITE}. */
+ public static final boolean DFLT_IGNITE_SNAPSHOT_SEQUENTIAL_WRITE = true;
+
/** @deprecated Use #SNP_RUNNING_DIR_KEY instead. */
@Deprecated
private static final String SNP_RUNNING_KEY = "snapshot-running";
@@ -394,6 +406,10 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
/** Snapshot transfer rate limit in bytes/sec. */
private final DistributedLongProperty snapshotTransferRate =
detachedLongProperty(SNAPSHOT_TRANSFER_RATE_DMS_KEY);
+ /** Value of {@link
IgniteSystemProperties#IGNITE_SNAPSHOT_SEQUENTIAL_WRITE}. */
+ private final boolean sequentialWrite =
+ IgniteSystemProperties.getBoolean(IGNITE_SNAPSHOT_SEQUENTIAL_WRITE,
DFLT_IGNITE_SNAPSHOT_SEQUENTIAL_WRITE);
+
/**
* @param ctx Kernal context.
*/
@@ -425,6 +441,16 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
return new File(snapshotCacheDir, partDeltaFileName(partId));
}
+ /**
+ * Partition delta index file. Represents a sequence of page indexes that
written to a delta.
+ *
+ * @param delta File with delta pages.
+ * @return File with delta pages index.
+ */
+ public static File partDeltaIndexFile(File delta) {
+ return new File(delta.getParent(), delta.getName() + DELTA_IDX_SUFFIX);
+ }
+
/**
* @param partId Partition id.
* @return File name of delta partition pages.
@@ -1111,6 +1137,11 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
return executeRestoreManagementTask(SnapshotRestoreStatusTask.class,
snpName);
}
+ /** @return {@code True} if disk writes during snapshot process should be
in a sequential manner when possible. */
+ public boolean sequentialWrite() {
+ return sequentialWrite;
+ }
+
/**
* @param restoreId Restore process ID.
* @return Server nodes on which a successful start of the cache(s) is
required, if any of these nodes fails when
@@ -3157,6 +3188,10 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
/** Size of page. */
private final int pageSize;
+ /** Delta iterator factory. */
+ private final Factory<File, FileIOFactory, DeltaIterator>
deltaIterFactory =
+ sequentialWrite() ? DeltaSortedIterator::new : DeltaIterator::new;
+
/**
* @param snpName Snapshot name.
* @param snpPath Snapshot directory path.
@@ -3285,39 +3320,18 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
.encryptedFileIoFactory(IgniteSnapshotManager.this.ioFactory,
pair.getGroupId()) :
IgniteSnapshotManager.this.ioFactory;
- try (FileIO fileIo = ioFactory.create(delta, READ);
+ try (DeltaIterator deltaIter = deltaIterFactory.create(delta,
ioFactory);
FilePageStore pageStore =
(FilePageStore)storeMgr.getPageStoreFactory(pair.getGroupId(), encrypted)
.createPageStore(getTypeByPartId(pair.getPartitionId()),
snpPart::toPath, v -> {})
) {
- ByteBuffer pageBuf = ByteBuffer.allocate(pageSize)
- .order(ByteOrder.nativeOrder());
-
- long totalBytes = fileIo.size();
-
- assert totalBytes % pageSize == 0 : "Given file with delta
pages has incorrect size: " + fileIo.size();
-
pageStore.beginRecover();
- for (long pos = 0; pos < totalBytes; pos += pageSize) {
- long read = fileIo.readFully(pageBuf, pos);
-
- assert read == pageBuf.capacity();
-
- pageBuf.flip();
-
- if (log.isDebugEnabled()) {
- log.debug("Read page given delta file [path=" +
delta.getName() +
- ", pageId=" + PageIO.getPageId(pageBuf) + ", pos="
+ pos + ", pages=" + (totalBytes / pageSize) +
- ", crcBuff=" + FastCrc.calcCrc(pageBuf,
pageBuf.limit()) + ", crcPage=" + PageIO.getCrc(pageBuf) + ']');
-
- pageBuf.rewind();
- }
-
+ while (deltaIter.hasNext()) {
transferRateLimiter.acquire(pageSize);
- pageStore.write(PageIO.getPageId(pageBuf), pageBuf, 0,
false);
+ ByteBuffer page = deltaIter.next();
- pageBuf.flip();
+ pageStore.write(PageIO.getPageId(page), page, 0, false);
}
pageStore.finishRecover();
@@ -3342,6 +3356,168 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
}
}
+ /** Delta file iterator. */
+ private class DeltaIterator implements Iterator<ByteBuffer>, Closeable {
+ /** Delta file. */
+ protected final File delta;
+
+ /** Delta file IO. */
+ private final FileIO fileIo;
+
+ /** Delta file length. */
+ protected final long totalBytes;
+
+ /** */
+ protected final int pageSize;
+
+ /** Pages count written to a delta file. */
+ protected final int pagesCnt;
+
+ /** */
+ protected final ByteBuffer pageBuf;
+
+ /** */
+ private long pos;
+
+ /** */
+ DeltaIterator(File delta, FileIOFactory ioFactory) throws IOException {
+ pageSize =
cctx.kernalContext().config().getDataStorageConfiguration().getPageSize();
+
+ this.delta = delta;
+
+ fileIo = ioFactory.create(delta, READ);
+
+ totalBytes = fileIo.size();
+
+ assert totalBytes % pageSize == 0 : "Given file with delta pages
has incorrect size: " + totalBytes;
+
+ pagesCnt = (int)(totalBytes / pageSize);
+
+ pageBuf =
ByteBuffer.allocate(pageSize).order(ByteOrder.nativeOrder());
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean hasNext() {
+ return pos < totalBytes;
+ }
+
+ /** {@inheritDoc} */
+ @Override public ByteBuffer next() {
+ if (!hasNext())
+ throw new NoSuchElementException();
+
+ readPage(pos);
+
+ pos += pageSize;
+
+ return pageBuf;
+ }
+
+ /** Reads a page from the delta file from the given position. */
+ protected void readPage(long pos) {
+ pageBuf.clear();
+
+ try {
+ long read = fileIo.readFully(pageBuf, pos);
+
+ assert read == pageBuf.capacity();
+ }
+ catch (IOException e) {
+ throw new IgniteException(e);
+ }
+
+ pageBuf.flip();
+
+ if (log.isDebugEnabled()) {
+ log.debug("Read page given delta file [path=" +
delta.getName() + ", pageId=" +
+ PageIO.getPageId(pageBuf) + ", index=" +
PageIdUtils.pageIndex(PageIO.getPageId(pageBuf)) +
+ ", pos=" + pos + ", pagesCnt=" + pagesCnt + ", crcBuff=" +
+ FastCrc.calcCrc(pageBuf, pageBuf.limit()) + ", crcPage=" +
PageIO.getCrc(pageBuf) + ']');
+
+ pageBuf.rewind();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() throws IOException {
+ fileIo.close();
+ }
+ }
+
+ /**
+ * Delta file iterator sorted by page indexes to almost sequential disk
writes on apply to a page store.
+ */
+ class DeltaSortedIterator extends DeltaIterator {
+ /** Snapshot delta sort batch size in pages count. */
+ public static final int DELTA_SORT_BATCH_SIZE = 500_000;
+
+ /** Delta index file IO. */
+ private final FileIO idxIo;
+
+ /** */
+ private int id;
+
+ /** */
+ private Iterator<Integer> sortedIter;
+
+ /** */
+ DeltaSortedIterator(File delta, FileIOFactory ioFactory) throws
IOException {
+ super(delta, ioFactory);
+
+ File deltaIdx = partDeltaIndexFile(delta);
+
+ idxIo = pagesCnt > 0 ?
IgniteSnapshotManager.this.ioFactory.create(deltaIdx, READ) : null;
+
+ assert deltaIdx.length() % 4 /* pageIdx */ == 0 : "Wrong delta
index size: " + deltaIdx.length();
+ assert deltaIdx.length() / 4 == pagesCnt : "Wrong delta index
pages count: " + deltaIdx.length();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean hasNext() {
+ if (sortedIter == null || !sortedIter.hasNext())
+ advance();
+
+ return sortedIter.hasNext();
+ }
+
+ /** {@inheritDoc} */
+ @Override public ByteBuffer next() {
+ readPage((long)sortedIter.next() * pageSize);
+
+ return pageBuf;
+ }
+
+ /** */
+ private void advance() {
+ TreeMap<Integer, Integer> sorted = new TreeMap<>();
+
+ while (id < pagesCnt && sorted.size() < DELTA_SORT_BATCH_SIZE) {
+ pageBuf.clear();
+
+ try {
+ idxIo.readFully(pageBuf);
+ }
+ catch (IOException e) {
+ throw new IgniteException(e);
+ }
+
+ pageBuf.flip();
+
+ while (pageBuf.hasRemaining())
+ sorted.put(pageBuf.getInt(), id++);
+ }
+
+ sortedIter = sorted.values().iterator();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() throws IOException {
+ super.close();
+
+ U.closeQuiet(idxIo);
+ }
+ }
+
/** */
private static class SnapshotOperationResponse implements Serializable {
/** Serial version uid. */
@@ -3545,4 +3721,11 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
return new IgniteException("Snapshot has not been created",
U.convertException(e));
}
}
+
+ /** Factory. */
+ @FunctionalInterface
+ private interface Factory<E1, E2, R> {
+ /** @return An instance of {@link R}. */
+ R create(E1 e1, E2 e2) throws IOException;
+ }
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java
index 579da37ee8e..d453d3f782e 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java
@@ -73,6 +73,7 @@ import
org.apache.ignite.internal.processors.metastorage.persistence.Distributed
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.IgniteThrowableRunner;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.typedef.C3;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
@@ -85,6 +86,7 @@ import static
org.apache.ignite.internal.processors.cache.persistence.file.FileP
import static
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.copy;
import static
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.databaseRelativePath;
import static
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.partDeltaFile;
+import static
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.partDeltaIndexFile;
/**
* The requested map of cache groups and its partitions to include into
snapshot represented as <tt>Map<Integer, Set<Integer>></tt>.
@@ -149,6 +151,10 @@ class SnapshotFutureTask extends
AbstractSnapshotFutureTask<Set<GroupPartitionId
/** Processed snapshot size in bytes. */
private final AtomicLong processedSize = new AtomicLong();
+ /** Delta writer factory. */
+ private final C3<PageStore, File, Integer, PageStoreSerialWriter>
deltaWriterFactory =
+ cctx.snapshotMgr().sequentialWrite() ?
IndexedPageStoreSerialWriter::new : PageStoreSerialWriter::new;
+
/**
* @param cctx Shared context.
* @param srcNodeId Node id which cause snapshot task creation.
@@ -522,7 +528,11 @@ class SnapshotFutureTask extends
AbstractSnapshotFutureTask<Set<GroupPartitionId
// Wait for the completion of both futures -
checkpoint end, copy partition.
.runAfterBothAsync(cpEndFut,
wrapExceptionIfStarted(() -> {
- File delta =
partDeltaWriters.get(pair).deltaFile;
+ PageStoreSerialWriter writer =
partDeltaWriters.get(pair);
+
+ writer.close();
+
+ File delta = writer.deltaFile;
try {
// Atomically creates a new, empty delta
file if and only if
@@ -540,6 +550,14 @@ class SnapshotFutureTask extends
AbstractSnapshotFutureTask<Set<GroupPartitionId
boolean deleted = delta.delete();
assert deleted;
+
+ File deltaIdx = partDeltaIndexFile(delta);
+
+ if (deltaIdx.exists()) {
+ deleted = deltaIdx.delete();
+
+ assert deleted;
+ }
}),
snpSndr.executor());
@@ -575,9 +593,9 @@ class SnapshotFutureTask extends
AbstractSnapshotFutureTask<Set<GroupPartitionId
GroupPartitionId pair = new GroupPartitionId(grpId, partId);
PageStore store = pageStore.getStore(grpId, partId);
+ File delta = partDeltaFile(cacheWorkDir(tmpConsIdDir, dirName),
partId);
- partDeltaWriters.put(pair,
- new PageStoreSerialWriter(store,
partDeltaFile(cacheWorkDir(tmpConsIdDir, dirName), partId), encGrpId));
+ partDeltaWriters.put(pair, deltaWriterFactory.apply(store, delta,
encGrpId));
partFileLengths.put(pair, store.size());
}
@@ -801,10 +819,10 @@ class SnapshotFutureTask extends
AbstractSnapshotFutureTask<Set<GroupPartitionId
private class PageStoreSerialWriter implements PageWriteListener,
Closeable {
/** Page store to which current writer is related to. */
@GridToStringExclude
- private final PageStore store;
+ protected final PageStore store;
/** Partition delta file to store delta pages into. */
- private final File deltaFile;
+ protected final File deltaFile;
/** Id of encrypted cache group. If {@code null}, no encrypted IO is
used. */
private final Integer encryptedGrpId;
@@ -871,6 +889,12 @@ class SnapshotFutureTask extends
AbstractSnapshotFutureTask<Set<GroupPartitionId
}
}
+ /** */
+ protected void init() throws IOException {
+ deltaFileIo = (encryptedGrpId == null ? ioFactory :
+ pageStore.encryptedFileIoFactory(ioFactory,
encryptedGrpId)).create(deltaFile);
+ }
+
/** {@inheritDoc} */
@Override public void accept(long pageId, ByteBuffer buf) {
assert buf.position() == 0 : buf.position();
@@ -883,10 +907,8 @@ class SnapshotFutureTask extends
AbstractSnapshotFutureTask<Set<GroupPartitionId
if (stopped())
return;
- if (deltaFileIo == null) {
- deltaFileIo = (encryptedGrpId == null ? ioFactory :
- pageStore.encryptedFileIoFactory(ioFactory,
encryptedGrpId)).create(deltaFile);
- }
+ if (deltaFileIo == null)
+ init();
}
catch (IOException e) {
acceptException(e);
@@ -948,7 +970,7 @@ class SnapshotFutureTask extends
AbstractSnapshotFutureTask<Set<GroupPartitionId
* @param pageBuf Page buffer to write.
* @throws IOException If page writing failed (IO error occurred).
*/
- private void writePage0(long pageId, ByteBuffer pageBuf) throws
IOException {
+ protected synchronized void writePage0(long pageId, ByteBuffer
pageBuf) throws IOException {
assert deltaFileIo != null : "Delta pages storage is not inited: "
+ this;
assert pageBuf.position() == 0;
assert pageBuf.order() == ByteOrder.nativeOrder() : "Page buffer
order " + pageBuf.order()
@@ -992,6 +1014,69 @@ class SnapshotFutureTask extends
AbstractSnapshotFutureTask<Set<GroupPartitionId
}
}
+ /** @see IgniteSnapshotManager.DeltaSortedIterator */
+ private class IndexedPageStoreSerialWriter extends PageStoreSerialWriter {
+ /** Delta index file IO. */
+ @GridToStringExclude
+ private volatile FileIO idxIo;
+
+ /** Buffer of page indexes written to the delta. */
+ private volatile ByteBuffer pageIdxs;
+
+ /** */
+ public IndexedPageStoreSerialWriter(PageStore store, File deltaFile,
@Nullable Integer encryptedGrpId) {
+ super(store, deltaFile, encryptedGrpId);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void init() throws IOException {
+ super.init();
+
+ idxIo = ioFactory.create(partDeltaIndexFile(deltaFile));
+
+ pageIdxs =
ByteBuffer.allocate(store.getPageSize()).order(ByteOrder.nativeOrder());
+
+ assert pageIdxs.capacity() % 4 == 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected synchronized void writePage0(long pageId,
ByteBuffer pageBuf) throws IOException {
+ super.writePage0(pageId, pageBuf);
+
+ pageIdxs.putInt(PageIdUtils.pageIndex(pageId));
+
+ if (!pageIdxs.hasRemaining())
+ flush();
+ }
+
+ /** Flush buffer with page indexes to the file. */
+ private void flush() throws IOException {
+ pageIdxs.flip();
+
+ idxIo.writeFully(pageIdxs);
+
+ pageIdxs.clear();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() {
+ super.close();
+
+ try {
+ if (idxIo != null)
+ flush();
+ }
+ catch (IOException e) {
+ acceptException(new IgniteCheckedException("Error during
writing page indexes to delta " +
+ "partition index file [writer=" + this + ']', e));
+ }
+
+ U.closeQuiet(idxIo);
+
+ idxIo = null;
+ }
+ }
+
/**
*
*/
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java
index 702c8a9986b..6b647084ed9 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java
@@ -152,7 +152,7 @@ public abstract class AbstractSnapshotSelfTest extends
GridCommonAbstractTest {
/** Parameters. */
@Parameterized.Parameters(name = "Encryption={0}")
- public static Iterable<Boolean> encryptionParams() {
+ public static Collection<Boolean> encryptionParams() {
return Arrays.asList(false, true);
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotDeltaTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotDeltaTest.java
new file mode 100644
index 00000000000..8912396d691
--- /dev/null
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotDeltaTest.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.OpenOption;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.BiFunction;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
+import
org.apache.ignite.internal.processors.cache.persistence.file.FileIODecorator;
+import
org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
+import
org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static
org.apache.ignite.IgniteSystemProperties.IGNITE_SNAPSHOT_SEQUENTIAL_WRITE;
+import static
org.apache.ignite.configuration.DataStorageConfiguration.DFLT_PAGE_SIZE;
+import static
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.CACHE_DIR_PREFIX;
+import static
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.DeltaSortedIterator.DELTA_SORT_BATCH_SIZE;
+import static
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.partDeltaIndexFile;
+import static org.apache.ignite.testframework.GridTestUtils.cartesianProduct;
+import static org.junit.Assert.assertArrayEquals;
+
+/**
+ * Cluster snapshot delta tests.
+ */
+@RunWith(Parameterized.class)
+public class IgniteClusterSnapshotDeltaTest extends AbstractSnapshotSelfTest {
+ /** */
+ @Parameterized.Parameter(1)
+ public boolean sequentialWrite;
+
+ /** Parameters. */
+ @Parameterized.Parameters(name = "encryption={0}, sequentialWrite={1}")
+ public static Collection<Object[]> parameters() {
+ return cartesianProduct(encryptionParams(), F.asList(false, true));
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ System.clearProperty(IGNITE_SNAPSHOT_SEQUENTIAL_WRITE);
+ }
+
+ /** @throws Exception If failed. */
+ @Test
+ public void testSendDelta() throws Exception {
+ int keys = 10_000;
+ byte[] payload = new byte[DFLT_PAGE_SIZE / 2];
+ int partCnt = 2;
+
+ System.setProperty(IGNITE_SNAPSHOT_SEQUENTIAL_WRITE,
String.valueOf(sequentialWrite));
+
+ // 1. Start a cluster and fill cache.
+ ThreadLocalRandom.current().nextBytes(payload);
+
+ byte[] expPayload = Arrays.copyOf(payload, payload.length);
+
+ CacheConfiguration<Integer, byte[]> ccfg = new
CacheConfiguration<Integer, byte[]>(DEFAULT_CACHE_NAME)
+ .setAffinity(new RendezvousAffinityFunction(false, partCnt));
+
+ String cacheDir = CACHE_DIR_PREFIX + DEFAULT_CACHE_NAME;
+
+ IgniteEx srv = startGridsWithCache(1, keys, (k) -> expPayload, ccfg);
+
+ if (sequentialWrite)
+ injectSequentialWriteCheck(srv);
+
+ IgniteSnapshotManager mgr = snp(srv);
+
+ BiFunction<String, String, SnapshotSender> old =
mgr.localSnapshotSenderFactory();
+
+ CountDownLatch partStart = new CountDownLatch(partCnt);
+ CountDownLatch deltaApply = new CountDownLatch(1);
+
+ mgr.localSnapshotSenderFactory((rqId, nodeId) -> new
DelegateSnapshotSender(log,
+ mgr.snapshotExecutorService(), old.apply(rqId, nodeId)) {
+ @Override public void sendPart0(File part, String cacheDirName,
GroupPartitionId pair, Long length) {
+ if (cacheDir.equals(cacheDirName))
+ partStart.countDown();
+
+ try {
+ deltaApply.await();
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ super.sendPart0(part, cacheDirName, pair, length);
+ }
+
+ @Override public void sendDelta0(File delta, String cacheDirName,
GroupPartitionId pair) {
+ if (cacheDir.equals(cacheDirName))
+ assertTrue(delta.length() > 0);
+
+ if (!sequentialWrite)
+ U.delete(partDeltaIndexFile(delta));
+
+ long start = System.nanoTime();
+
+ super.sendDelta0(delta, cacheDirName, pair);
+
+ if (cacheDir.equals(cacheDirName)) {
+ log.info("Send delta [size=" +
U.humanReadableByteCount(delta.length()) +
+ ", time=" + (U.nanosToMillis(System.nanoTime() -
start)) + "ms, part=" + pair + "]");
+ }
+ }
+ });
+
+ // 2. Start a snapshot and block copy of a partitions.
+ IgniteFuture<Void> fut = srv.snapshot().createSnapshot(SNAPSHOT_NAME);
+
+ GridTestUtils.waitForCondition(() -> mgr.currentCreateRequest() !=
null, getTestTimeout());
+
+ partStart.await();
+
+ // 3. Produce delta pages by data updates.
+ IgniteCache<Integer, byte[]> cache = srv.getOrCreateCache(ccfg);
+
+ ThreadLocalRandom.current().nextBytes(payload);
+
+ for (int i = 0; i < keys; i++)
+ cache.put(i, payload);
+
+ forceCheckpoint(srv);
+
+ // 4. Apply delta and wait for a snapshot complete.
+ deltaApply.countDown();
+
+ fut.get();
+
+ // 5. Destroy cache, restart the cluster and check data (delta was
successfully applied).
+ srv.destroyCache(DEFAULT_CACHE_NAME);
+
+ stopAllGrids();
+
+ srv = startGridsFromSnapshot(1, SNAPSHOT_NAME);
+
+ cache = srv.cache(DEFAULT_CACHE_NAME);
+
+ for (int i = 0; i < keys; i++)
+ assertArrayEquals(expPayload, cache.get(i));
+ }
+
+ /** Injects test IO that checks sequential write to a pagestore on a delta
apply. */
+ private void injectSequentialWriteCheck(IgniteEx srv) {
+ FilePageStoreManager pageStore =
(FilePageStoreManager)srv.context().cache().context().pageStore();
+
+ FileIOFactory old = pageStore.getPageStoreFileIoFactory();
+
+ int idxsPerPage = pageStore.pageSize() / 4;
+
+ int idxsPerBatch = (DELTA_SORT_BATCH_SIZE / idxsPerPage) * idxsPerPage
+ idxsPerPage;
+
+ FileIOFactory testFactory = new FileIOFactory() {
+ @Override public FileIO create(File file, OpenOption... modes)
throws IOException {
+ FileIO fileIo = old.create(file, modes);
+
+ return new FileIODecorator(fileIo) {
+ boolean isSequentialWrite = true;
+
+ long lastPos;
+
+ int idx;
+
+ @Override public int write(ByteBuffer srcBuf, long pos)
throws IOException {
+ boolean batchRotation = idx++ % idxsPerBatch == 0;
+
+ if (lastPos > pos && !batchRotation)
+ isSequentialWrite = false;
+
+ lastPos = pos;
+
+ return super.write(srcBuf, pos);
+ }
+
+ @Override public void close() throws IOException {
+ super.close();
+
+ if (!isSequentialWrite)
+ throw new RuntimeException("Non sequential
write.");
+ }
+ };
+ }
+ };
+
+ pageStore.setPageStoreFileIOFactories(testFactory, testFactory);
+ }
+}
diff --git
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSnapshotTestSuite.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSnapshotTestSuite.java
index f151ba19739..33419af614b 100644
---
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSnapshotTestSuite.java
+++
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSnapshotTestSuite.java
@@ -19,6 +19,7 @@ package org.apache.ignite.testsuites;
import
org.apache.ignite.internal.processors.cache.persistence.snapshot.EncryptedSnapshotTest;
import
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteClusterSnapshotCheckTest;
+import
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteClusterSnapshotDeltaTest;
import
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteClusterSnapshotHandlerTest;
import
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteClusterSnapshotRestoreSelfTest;
import
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteClusterSnapshotSelfTest;
@@ -50,7 +51,8 @@ import org.junit.runners.Suite;
EncryptedSnapshotTest.class,
IgniteClusterSnapshotWalRecordTest.class,
IgniteClusterSnapshotStreamerTest.class,
- IgniteSnapshotConsistencyTest.class
+ IgniteSnapshotConsistencyTest.class,
+ IgniteClusterSnapshotDeltaTest.class
})
public class IgniteSnapshotTestSuite {
}