This is an automated email from the ASF dual-hosted git repository.
ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new eab9fef44f IGNITE-23084 Implement checkpoint buffer draining in
checkpoint threads if it's close to overflow (#4308)
eab9fef44f is described below
commit eab9fef44fc551857d2418fe20b3c9d6a7343832
Author: Ivan Bessonov <[email protected]>
AuthorDate: Fri Aug 30 13:08:37 2024 +0300
IGNITE-23084 Implement checkpoint buffer draining in checkpoint threads if
it's close to overflow (#4308)
---
.../persistence/PersistentPageMemory.java | 57 +++++++--
.../checkpoint/CheckpointDirtyPages.java | 8 ++
.../checkpoint/CheckpointPagesWriter.java | 128 ++++++++++++++++-----
.../checkpoint/CheckpointPagesWriterFactory.java | 4 +
.../persistence/checkpoint/CheckpointWorkflow.java | 2 +
.../persistence/checkpoint/Checkpointer.java | 4 +
.../RandomLruPageReplacementPolicy.java | 1 +
.../checkpoint/CheckpointPagesWriterTest.java | 8 +-
8 files changed, 173 insertions(+), 39 deletions(-)
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java
index fe72ae7bb3..217a2eff8a 100644
---
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java
@@ -154,6 +154,12 @@ public class PersistentPageMemory implements PageMemory {
/** Try again tag. */
public static final int TRY_AGAIN_TAG = -1;
+ /**
+ * Threshold of the checkpoint buffer. We should start forcefully
checkpointing its pages upon exceeding it. The value of {@code 2/3} is
+ * ported from {@code Ignite 2.x}.
+ */
+ private static final float CP_BUF_FILL_THRESHOLD = 2.0f / 3;
+
/** Data region configuration view. */
private final PersistentPageMemoryProfileView storageProfileView;
@@ -204,8 +210,7 @@ public class PersistentPageMemory implements PageMemory {
private final AtomicReference<CheckpointUrgency> checkpointUrgency = new
AtomicReference<>(NOT_REQUIRED);
/** Checkpoint page pool, {@code null} if not {@link #start() started}. */
- @Nullable
- private volatile PagePool checkpointPool;
+ private volatile @Nullable PagePool checkpointPool;
/**
* Delayed page replacement (rotation with disk) tracker. Because other
thread may require exactly the same page to be loaded from
@@ -332,9 +337,16 @@ public class PersistentPageMemory implements PageMemory {
this.segments = segments;
if (LOG.isInfoEnabled()) {
- LOG.info("Started page memory [memoryAllocated={}, pages={},
tableSize={}, replacementSize={}, checkpointBuffer={}]",
- readableSize(totalAllocated, false), pages,
readableSize(totalTblSize, false),
- readableSize(totalReplSize, false),
readableSize(checkpointBufferSize, false));
+ LOG.info(
+ "Started page memory [profile='{}',
memoryAllocated={}, pages={}, tableSize={}, replacementSize={},"
+ + " checkpointBuffer={}]",
+ storageProfileView.name(),
+ readableSize(totalAllocated, false),
+ pages,
+ readableSize(totalTblSize, false),
+ readableSize(totalReplSize, false),
+ readableSize(checkpointBufferSize, false)
+ );
}
}
}
@@ -1089,14 +1101,23 @@ public class PersistentPageMemory implements PageMemory
{
// Create a buffer copy if the page is scheduled for a checkpoint.
if (isInCheckpoint(fullId) && tempBufferPointer(absPtr) ==
INVALID_REL_PTR) {
- long tmpRelPtr =
checkpointPool.borrowOrAllocateFreePage(tag(fullId.pageId()));
+ long tmpRelPtr;
- if (tmpRelPtr == INVALID_REL_PTR) {
- rwLock.writeUnlock(absPtr + PAGE_LOCK_OFFSET, TAG_LOCK_ALWAYS);
+ PagePool checkpointPool = this.checkpointPool;
- throw new IgniteInternalException(
- "Failed to allocate temporary buffer for checkpoint
(increase checkpointPageBufferSize configuration property): "
- + storageProfileView.name());
+ while (true) {
+ tmpRelPtr =
checkpointPool.borrowOrAllocateFreePage(tag(fullId.pageId()));
+
+ if (tmpRelPtr != INVALID_REL_PTR) {
+ break;
+ }
+
+ // TODO https://issues.apache.org/jira/browse/IGNITE-23106
Replace spin-wait with a proper wait.
+ try {
+ Thread.sleep(1);
+ } catch (InterruptedException ignore) {
+ // No-op.
+ }
}
// Pin the page until checkpoint is not finished.
@@ -2085,4 +2106,18 @@ public class PersistentPageMemory implements PageMemory {
}
}
}
+
+ /**
+ * Checks if the Checkpoint Buffer is currently close to exhaustion.
+ */
+ public boolean isCpBufferOverflowThresholdExceeded() {
+ assert started;
+
+ PagePool checkpointPool = this.checkpointPool;
+
+ //noinspection NumericCastThatLosesPrecision
+ int checkpointBufLimit = (int) (checkpointPool.pages() *
CP_BUF_FILL_THRESHOLD);
+
+ return checkpointPool.size() > checkpointBufLimit;
+ }
}
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointDirtyPages.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointDirtyPages.java
index c9b1693edb..b87a4188e4 100644
---
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointDirtyPages.java
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointDirtyPages.java
@@ -158,6 +158,14 @@ class CheckpointDirtyPages {
return new CheckpointDirtyPagesView(regionIndex, fromPosition,
toPosition);
}
+ /**
+ * Returns a full list of {@link PersistentPageMemory} instances, for
which there exist at least a single dirty partition in current
+ * checkpoint.
+ */
+ List<PersistentPageMemory> dirtyPageMemoryInstances() {
+ return this.dirtyPages.stream().map(p ->
p.pageMemory).collect(toList());
+ }
+
/**
* View of {@link CheckpointDirtyPages} in which all dirty page IDs will
refer to the same {@link PersistentPageMemory} and contain the
* same groupId and partitionId and increasing pageIdx.
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriter.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriter.java
index d37b4342e8..e1ae8085e7 100644
---
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriter.java
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriter.java
@@ -29,13 +29,10 @@ import static
org.apache.ignite.internal.util.StringUtils.hexLong;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.BooleanSupplier;
import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
@@ -62,6 +59,14 @@ public class CheckpointPagesWriter implements Runnable {
/** Logger. */
private static final IgniteLogger LOG =
Loggers.forClass(CheckpointPagesWriter.class);
+ /**
+ * Size of a batch of pages that we drain from a single checkpoint buffer
at the same time. The value of {@code 10} is chosen
+ * arbitrarily. We may reconsider it in <a
href="https://issues.apache.org/jira/browse/IGNITE-23106">IGNITE-23106</a> if
necessary.
+ *
+ * @see #drainCheckpointBuffers(ByteBuffer, Map)
+ */
+ private static final int CP_BUFFER_PAGES_BATCH_THRESHOLD = 10;
+
/** Checkpoint specific metrics tracker. */
private final CheckpointMetricsTracker tracker;
@@ -73,6 +78,13 @@ public class CheckpointPagesWriter implements Runnable {
*/
private final IgniteConcurrentMultiPairQueue<PersistentPageMemory,
FullPageId> writePageIds;
+ /**
+ * List of {@link PersistentPageMemory} instances that have dirty
partitions in current checkpoint.
+ *
+ * @see #drainCheckpointBuffers(ByteBuffer, Map)
+ */
+ private final List<PersistentPageMemory> pageMemoryList;
+
/** Updated partitions -> count of written pages. */
private final ConcurrentMap<GroupPartitionId, LongAdder> updatedPartitions;
@@ -105,6 +117,7 @@ public class CheckpointPagesWriter implements Runnable {
*
* @param tracker Checkpoint metrics tracker.
* @param writePageIds Queue of dirty page IDs to write.
+ * @param pageMemoryList List of {@link PersistentPageMemory} instances
that have dirty partitions in current checkpoint.
* @param updatedPartitions Updated partitions.
* @param doneFut Done future.
* @param updateHeartbeat Update heartbeat callback.
@@ -118,6 +131,7 @@ public class CheckpointPagesWriter implements Runnable {
CheckpointPagesWriter(
CheckpointMetricsTracker tracker,
IgniteConcurrentMultiPairQueue<PersistentPageMemory, FullPageId>
writePageIds,
+ List<PersistentPageMemory> pageMemoryList,
ConcurrentMap<GroupPartitionId, LongAdder> updatedPartitions,
CompletableFuture<?> doneFut,
Runnable updateHeartbeat,
@@ -130,6 +144,7 @@ public class CheckpointPagesWriter implements Runnable {
) {
this.tracker = tracker;
this.writePageIds = writePageIds;
+ this.pageMemoryList = pageMemoryList;
this.updatedPartitions = updatedPartitions;
this.doneFut = doneFut;
this.updateHeartbeat = updateHeartbeat;
@@ -174,17 +189,19 @@ public class CheckpointPagesWriter implements Runnable {
Map<PersistentPageMemory, PageStoreWriter> pageStoreWriters = new
HashMap<>();
+ // Page store writers for checkpoint buffer pages are located in a
separate map, because they would not add elements to a
+ // "pageIdsToRetry" map. Instead, they would ignore unsuccessful write
lock acquisitions. It's implemented this way in order to
+ // avoid duplicates in "pageIdsToRetry", there must only be a single
source of pages to achieve that.
+ Map<PersistentPageMemory, PageStoreWriter> cpBufferPageStoreWriters =
new HashMap<>();
+
ByteBuffer tmpWriteBuf = threadBuf.get();
Result<PersistentPageMemory, FullPageId> queueResult = new Result<>();
GroupPartitionId partitionId = null;
- AtomicBoolean writeMetaPage = new AtomicBoolean();
-
- Set<GroupPartitionId> inProgressPartitions = new HashSet<>();
-
try {
+ // TODO https://issues.apache.org/jira/browse/IGNITE-23115 Try to
write file per thread.
while (!shutdownNow.getAsBoolean() &&
writePageIds.next(queueResult)) {
updateHeartbeat.run();
@@ -198,32 +215,19 @@ public class CheckpointPagesWriter implements Runnable {
// Starting for the new partition.
checkpointProgress.onStartPartitionProcessing(newPartitionId);
- inProgressPartitions.add(newPartitionId);
-
if (partitionId != null) {
// Finishing for the previous partition.
+ // TODO
https://issues.apache.org/jira/browse/IGNITE-23105 Reimplement partition
destruction awaiting.
checkpointProgress.onFinishPartitionProcessing(partitionId);
-
- inProgressPartitions.remove(partitionId);
}
partitionId = newPartitionId;
- updatedPartitions.computeIfAbsent(partitionId, partId -> {
- writeMetaPage.set(true);
-
- return new LongAdder();
- });
-
- if (writeMetaPage.get()) {
+ if (shouldWriteMetaPage(partitionId)) {
writePartitionMeta(pageMemory, partitionId,
tmpWriteBuf.rewind());
-
- writeMetaPage.set(false);
}
}
- tmpWriteBuf.rewind();
-
PageStoreWriter pageStoreWriter =
pageStoreWriters.computeIfAbsent(
pageMemory,
pm -> createPageStoreWriter(pm, pageIdsToRetry)
@@ -235,15 +239,83 @@ public class CheckpointPagesWriter implements Runnable {
}
// Should also be done for partitions that will be destroyed
to remove their pages from the data region.
- pageMemory.checkpointWritePage(fullId, tmpWriteBuf,
pageStoreWriter, tracker);
+ pageMemory.checkpointWritePage(fullId, tmpWriteBuf.rewind(),
pageStoreWriter, tracker);
+
+ drainCheckpointBuffers(tmpWriteBuf, cpBufferPageStoreWriters);
}
} finally {
-
inProgressPartitions.forEach(checkpointProgress::onFinishPartitionProcessing);
+ if (partitionId != null) {
+ checkpointProgress.onFinishPartitionProcessing(partitionId);
+ }
}
return pageIdsToRetry.isEmpty() ? EMPTY : new
IgniteConcurrentMultiPairQueue<>(pageIdsToRetry);
}
+ /**
+ * Checkpoints parts of checkpoint buffers if they are close to overflow.
Uses
+ * {@link PersistentPageMemory#isCpBufferOverflowThresholdExceeded()} to
detect that.
+ */
+ private void drainCheckpointBuffers(
+ ByteBuffer tmpWriteBuf,
+ Map<PersistentPageMemory, PageStoreWriter> pageStoreWriters
+ ) throws IgniteInternalCheckedException {
+ PageStoreWriter pageStoreWriter;
+ boolean retry = true;
+
+ while (retry) {
+ retry = false;
+
+ // We iterate over a list of page memory instances and delete at
most "CP_BUFFER_PAGES_BATCH_THRESHOLD" pages.
+ // If any of instances still return "true" from
"isCpBufferOverflowThresholdExceeded", then we would repeat this loop using a
+ // "retry" flag. This guarantees that every page memory instance
has a continuous process of checkpoint buffer draining, meaning
+ // that those who wait for a free space will receive it in a short
time.
+ for (PersistentPageMemory pageMemory : pageMemoryList) {
+ int count = 0;
+
+ while (pageMemory.isCpBufferOverflowThresholdExceeded()) {
+ if (++count >= CP_BUFFER_PAGES_BATCH_THRESHOLD) {
+ retry = true;
+
+ break;
+ }
+
+ updateHeartbeat.run();
+
+ FullPageId cpPageId = pageMemory.pullPageFromCpBuffer();
+
+ if (cpPageId.equals(FullPageId.NULL_PAGE)) {
+ break;
+ }
+
+ GroupPartitionId partitionId = toPartitionId(cpPageId);
+
+ if (shouldWriteMetaPage(partitionId)) {
+ writePartitionMeta(pageMemory, partitionId,
tmpWriteBuf.rewind());
+ }
+
+ pageStoreWriter = pageStoreWriters.computeIfAbsent(
+ pageMemory,
+ pm -> createPageStoreWriter(pm, null)
+ );
+
+ pageMemory.checkpointWritePage(cpPageId,
tmpWriteBuf.rewind(), pageStoreWriter, tracker);
+ }
+ }
+ }
+ }
+
+ /**
+ * Returns {@code true} if checkpointer should write meta page of
partition. Guaranteed to return {@code true} exactly once for every
+ * passed partition ID.
+ */
+ private boolean shouldWriteMetaPage(GroupPartitionId partitionId) {
+ // We deliberately avoid "computeIfAbsent" here for the sake of
performance.
+ // For the overwhelming amount of calls "partitionId" should already
be in the set.
+ return !updatedPartitions.containsKey(partitionId)
+ && null == updatedPartitions.putIfAbsent(partitionId, new
LongAdder());
+ }
+
/**
* Returns a new instance of {@link PageStoreWriter}.
*
@@ -252,11 +324,13 @@ public class CheckpointPagesWriter implements Runnable {
*/
private PageStoreWriter createPageStoreWriter(
PersistentPageMemory pageMemory,
- Map<PersistentPageMemory, List<FullPageId>> pagesToRetry
+ @Nullable Map<PersistentPageMemory, List<FullPageId>> pagesToRetry
) {
return (fullPageId, buf, tag) -> {
if (tag == TRY_AGAIN_TAG) {
- pagesToRetry.computeIfAbsent(pageMemory, k -> new
ArrayList<>()).add(fullPageId);
+ if (pagesToRetry != null) {
+ pagesToRetry.computeIfAbsent(pageMemory, k -> new
ArrayList<>()).add(fullPageId);
+ }
return;
}
@@ -303,6 +377,8 @@ public class CheckpointPagesWriter implements Runnable {
checkpointProgress.writtenPagesCounter().incrementAndGet();
updatedPartitions.get(partitionId).increment();
+
+ updateHeartbeat.run();
}
private static boolean hasPartitionChanged(@Nullable GroupPartitionId
partitionId, FullPageId pageId) {
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriterFactory.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriterFactory.java
index 31c7e619d0..301313ccd7 100644
---
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriterFactory.java
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriterFactory.java
@@ -19,6 +19,7 @@ package
org.apache.ignite.internal.pagememory.persistence.checkpoint;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
+import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.LongAdder;
@@ -82,6 +83,7 @@ public class CheckpointPagesWriterFactory {
*
* @param tracker Checkpoint metrics tracker.
* @param dirtyPageIdQueue Checkpoint dirty page ID queue to write.
+ * @param pageMemoryList List of {@link PersistentPageMemory} instances
that have dirty partitions in current checkpoint.
* @param updatedPartitions Updated partitions.
* @param doneWriteFut Write done future.
* @param updateHeartbeat Update heartbeat callback.
@@ -91,6 +93,7 @@ public class CheckpointPagesWriterFactory {
CheckpointPagesWriter build(
CheckpointMetricsTracker tracker,
IgniteConcurrentMultiPairQueue<PersistentPageMemory, FullPageId>
dirtyPageIdQueue,
+ List<PersistentPageMemory> pageMemoryList,
ConcurrentMap<GroupPartitionId, LongAdder> updatedPartitions,
CompletableFuture<?> doneWriteFut,
Runnable updateHeartbeat,
@@ -101,6 +104,7 @@ public class CheckpointPagesWriterFactory {
return new CheckpointPagesWriter(
tracker,
dirtyPageIdQueue,
+ pageMemoryList,
updatedPartitions,
doneWriteFut,
updateHeartbeat,
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflow.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflow.java
index db15f800ab..bc074fc851 100644
---
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflow.java
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflow.java
@@ -372,6 +372,8 @@ class CheckpointWorkflow {
Collection<? extends DataRegion<PersistentPageMemory>> dataRegions,
CompletableFuture<?> allowToReplace
) {
+ assert checkpointReadWriteLock.isWriteLockHeldByCurrentThread();
+
Map<DataRegion<?>, Set<FullPageId>> dirtyPartitionsMap =
this.dirtyPartitionsMap;
this.dirtyPartitionsMap = new ConcurrentHashMap<>();
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
index a197098c4f..1d2b40392f 100644
---
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
@@ -33,6 +33,7 @@ import static
org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermin
import java.io.IOException;
import java.nio.file.Path;
+import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.BlockingQueue;
@@ -431,12 +432,15 @@ public class Checkpointer extends IgniteWorker {
tracker.onPagesWriteStart();
+ List<PersistentPageMemory> pageMemoryList =
checkpointDirtyPages.dirtyPageMemoryInstances();
+
IgniteConcurrentMultiPairQueue<PersistentPageMemory, FullPageId>
writePageIds = checkpointDirtyPages.toDirtyPageIdQueue();
for (int i = 0; i < checkpointWritePageThreads; i++) {
CheckpointPagesWriter write = checkpointPagesWriterFactory.build(
tracker,
writePageIds,
+ pageMemoryList,
updatedPartitions,
futures[i] = new CompletableFuture<>(),
workProgressDispatcher::updateHeartbeat,
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/RandomLruPageReplacementPolicy.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/RandomLruPageReplacementPolicy.java
index 20ea1d3efc..b049dc106a 100644
---
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/RandomLruPageReplacementPolicy.java
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/RandomLruPageReplacementPolicy.java
@@ -117,6 +117,7 @@ public class RandomLruPageReplacementPolicy extends
PageReplacementPolicy {
final boolean dirty = PageHeader.dirty(absPageAddr);
+ // TODO https://issues.apache.org/jira/browse/IGNITE-23103 Fix
this condition.
if (relRmvAddr == rndAddr || pinned || skip || dirty) {
i--;
diff --git
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriterTest.java
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriterTest.java
index 5f8745289e..97d5dfdb04 100644
---
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriterTest.java
+++
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriterTest.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+import static java.util.Collections.singletonList;
import static org.apache.ignite.internal.pagememory.PageIdAllocator.FLAG_AUX;
import static org.apache.ignite.internal.pagememory.PageIdAllocator.FLAG_DATA;
import static
org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory.TRY_AGAIN_TAG;
@@ -129,6 +130,7 @@ public class CheckpointPagesWriterTest extends
BaseIgniteAbstractTest {
CheckpointPagesWriter pagesWriter = new CheckpointPagesWriter(
tracker,
writePageIds,
+ singletonList(pageMemory),
updatedPartitions,
doneFuture,
beforePageWrite,
@@ -168,12 +170,12 @@ public class CheckpointPagesWriterTest extends
BaseIgniteAbstractTest {
))
);
- verify(beforePageWrite, times(9)).run();
+ verify(beforePageWrite, times(11)).run();
verify(threadBuf, times(2)).get();
verify(partitionMeta0, times(1)).metaSnapshot(any(UUID.class));
- verify(partitionMeta0, times(1)).metaSnapshot(any(UUID.class));
+ verify(partitionMeta1, times(1)).metaSnapshot(any(UUID.class));
}
@Test
@@ -196,6 +198,7 @@ public class CheckpointPagesWriterTest extends
BaseIgniteAbstractTest {
CheckpointPagesWriter pagesWriter = new CheckpointPagesWriter(
new CheckpointMetricsTracker(),
new IgniteConcurrentMultiPairQueue<>(Map.of(pageMemory,
List.of(fullPageId(0, 0, 1)))),
+ singletonList(pageMemory),
new ConcurrentHashMap<>(),
doneFuture,
() -> {},
@@ -246,6 +249,7 @@ public class CheckpointPagesWriterTest extends
BaseIgniteAbstractTest {
CheckpointPagesWriter pagesWriter = new CheckpointPagesWriter(
new CheckpointMetricsTracker(),
writePageIds,
+ singletonList(pageMemory),
updatedPartitions,
doneFuture,
() -> {},