This is an automated email from the ASF dual-hosted git repository.

ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new eab9fef44f IGNITE-23084 Implement checkpoint buffer draining in 
checkpoint threads if it's close to overflow (#4308)
eab9fef44f is described below

commit eab9fef44fc551857d2418fe20b3c9d6a7343832
Author: Ivan Bessonov <[email protected]>
AuthorDate: Fri Aug 30 13:08:37 2024 +0300

    IGNITE-23084 Implement checkpoint buffer draining in checkpoint threads if 
it's close to overflow (#4308)
---
 .../persistence/PersistentPageMemory.java          |  57 +++++++--
 .../checkpoint/CheckpointDirtyPages.java           |   8 ++
 .../checkpoint/CheckpointPagesWriter.java          | 128 ++++++++++++++++-----
 .../checkpoint/CheckpointPagesWriterFactory.java   |   4 +
 .../persistence/checkpoint/CheckpointWorkflow.java |   2 +
 .../persistence/checkpoint/Checkpointer.java       |   4 +
 .../RandomLruPageReplacementPolicy.java            |   1 +
 .../checkpoint/CheckpointPagesWriterTest.java      |   8 +-
 8 files changed, 173 insertions(+), 39 deletions(-)

diff --git 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java
 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java
index fe72ae7bb3..217a2eff8a 100644
--- 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java
+++ 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java
@@ -154,6 +154,12 @@ public class PersistentPageMemory implements PageMemory {
     /** Try again tag. */
     public static final int TRY_AGAIN_TAG = -1;
 
+    /**
+     * Threshold of the checkpoint buffer. We should start forcefully 
checkpointing its pages upon exceeding it. The value of {@code 2/3} is
+     * ported from {@code Ignite 2.x}.
+     */
+    private static final float CP_BUF_FILL_THRESHOLD = 2.0f / 3;
+
     /** Data region configuration view. */
     private final PersistentPageMemoryProfileView storageProfileView;
 
@@ -204,8 +210,7 @@ public class PersistentPageMemory implements PageMemory {
     private final AtomicReference<CheckpointUrgency> checkpointUrgency = new 
AtomicReference<>(NOT_REQUIRED);
 
     /** Checkpoint page pool, {@code null} if not {@link #start() started}. */
-    @Nullable
-    private volatile PagePool checkpointPool;
+    private volatile @Nullable PagePool checkpointPool;
 
     /**
      * Delayed page replacement (rotation with disk) tracker. Because other 
thread may require exactly the same page to be loaded from
@@ -332,9 +337,16 @@ public class PersistentPageMemory implements PageMemory {
             this.segments = segments;
 
             if (LOG.isInfoEnabled()) {
-                LOG.info("Started page memory [memoryAllocated={}, pages={}, 
tableSize={}, replacementSize={}, checkpointBuffer={}]",
-                        readableSize(totalAllocated, false), pages, 
readableSize(totalTblSize, false),
-                        readableSize(totalReplSize, false), 
readableSize(checkpointBufferSize, false));
+                LOG.info(
+                        "Started page memory [profile='{}', 
memoryAllocated={}, pages={}, tableSize={}, replacementSize={},"
+                                + " checkpointBuffer={}]",
+                        storageProfileView.name(),
+                        readableSize(totalAllocated, false),
+                        pages,
+                        readableSize(totalTblSize, false),
+                        readableSize(totalReplSize, false),
+                        readableSize(checkpointBufferSize, false)
+                );
             }
         }
     }
@@ -1089,14 +1101,23 @@ public class PersistentPageMemory implements PageMemory 
{
 
         // Create a buffer copy if the page is scheduled for a checkpoint.
         if (isInCheckpoint(fullId) && tempBufferPointer(absPtr) == 
INVALID_REL_PTR) {
-            long tmpRelPtr = 
checkpointPool.borrowOrAllocateFreePage(tag(fullId.pageId()));
+            long tmpRelPtr;
 
-            if (tmpRelPtr == INVALID_REL_PTR) {
-                rwLock.writeUnlock(absPtr + PAGE_LOCK_OFFSET, TAG_LOCK_ALWAYS);
+            PagePool checkpointPool = this.checkpointPool;
 
-                throw new IgniteInternalException(
-                        "Failed to allocate temporary buffer for checkpoint 
(increase checkpointPageBufferSize configuration property): "
-                                + storageProfileView.name());
+            while (true) {
+                tmpRelPtr = 
checkpointPool.borrowOrAllocateFreePage(tag(fullId.pageId()));
+
+                if (tmpRelPtr != INVALID_REL_PTR) {
+                    break;
+                }
+
+                // TODO https://issues.apache.org/jira/browse/IGNITE-23106 
Replace spin-wait with a proper wait.
+                try {
+                    Thread.sleep(1);
+                } catch (InterruptedException ignore) {
+                    // No-op.
+                }
             }
 
             // Pin the page until checkpoint is not finished.
@@ -2085,4 +2106,18 @@ public class PersistentPageMemory implements PageMemory {
             }
         }
     }
+
+    /**
+     * Checks if the Checkpoint Buffer is currently close to exhaustion.
+     */
+    public boolean isCpBufferOverflowThresholdExceeded() {
+        assert started;
+
+        PagePool checkpointPool = this.checkpointPool;
+
+        //noinspection NumericCastThatLosesPrecision
+        int checkpointBufLimit = (int) (checkpointPool.pages() * 
CP_BUF_FILL_THRESHOLD);
+
+        return checkpointPool.size() > checkpointBufLimit;
+    }
 }
diff --git 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointDirtyPages.java
 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointDirtyPages.java
index c9b1693edb..b87a4188e4 100644
--- 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointDirtyPages.java
+++ 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointDirtyPages.java
@@ -158,6 +158,14 @@ class CheckpointDirtyPages {
         return new CheckpointDirtyPagesView(regionIndex, fromPosition, 
toPosition);
     }
 
+    /**
+     * Returns a full list of {@link PersistentPageMemory} instances, for 
which there exist at least a single dirty partition in current
+     * checkpoint.
+     */
+    List<PersistentPageMemory> dirtyPageMemoryInstances() {
+        return this.dirtyPages.stream().map(p -> 
p.pageMemory).collect(toList());
+    }
+
     /**
      * View of {@link CheckpointDirtyPages} in which all dirty page IDs will 
refer to the same {@link PersistentPageMemory} and contain the
      * same groupId and partitionId and increasing pageIdx.
diff --git 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriter.java
 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriter.java
index d37b4342e8..e1ae8085e7 100644
--- 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriter.java
+++ 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriter.java
@@ -29,13 +29,10 @@ import static 
org.apache.ignite.internal.util.StringUtils.hexLong;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.LongAdder;
 import java.util.function.BooleanSupplier;
 import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
@@ -62,6 +59,14 @@ public class CheckpointPagesWriter implements Runnable {
     /** Logger. */
     private static final IgniteLogger LOG = 
Loggers.forClass(CheckpointPagesWriter.class);
 
+    /**
+     * Size of a batch of pages that we drain from a single checkpoint buffer 
at the same time. The value of {@code 10} is chosen
+     * arbitrarily. We may reconsider it in <a 
href="https://issues.apache.org/jira/browse/IGNITE-23106";>IGNITE-23106</a> if 
necessary.
+     *
+     * @see #drainCheckpointBuffers(ByteBuffer, Map)
+     */
+    private static final int CP_BUFFER_PAGES_BATCH_THRESHOLD = 10;
+
     /** Checkpoint specific metrics tracker. */
     private final CheckpointMetricsTracker tracker;
 
@@ -73,6 +78,13 @@ public class CheckpointPagesWriter implements Runnable {
      */
     private final IgniteConcurrentMultiPairQueue<PersistentPageMemory, 
FullPageId> writePageIds;
 
+    /**
+     * List of {@link PersistentPageMemory} instances that have dirty 
partitions in current checkpoint.
+     *
+     * @see #drainCheckpointBuffers(ByteBuffer, Map)
+     */
+    private final List<PersistentPageMemory> pageMemoryList;
+
     /** Updated partitions -> count of written pages. */
     private final ConcurrentMap<GroupPartitionId, LongAdder> updatedPartitions;
 
@@ -105,6 +117,7 @@ public class CheckpointPagesWriter implements Runnable {
      *
      * @param tracker Checkpoint metrics tracker.
      * @param writePageIds Queue of dirty page IDs to write.
+     * @param pageMemoryList List of {@link PersistentPageMemory} instances 
that have dirty partitions in current checkpoint.
      * @param updatedPartitions Updated partitions.
      * @param doneFut Done future.
      * @param updateHeartbeat Update heartbeat callback.
@@ -118,6 +131,7 @@ public class CheckpointPagesWriter implements Runnable {
     CheckpointPagesWriter(
             CheckpointMetricsTracker tracker,
             IgniteConcurrentMultiPairQueue<PersistentPageMemory, FullPageId> 
writePageIds,
+            List<PersistentPageMemory> pageMemoryList,
             ConcurrentMap<GroupPartitionId, LongAdder> updatedPartitions,
             CompletableFuture<?> doneFut,
             Runnable updateHeartbeat,
@@ -130,6 +144,7 @@ public class CheckpointPagesWriter implements Runnable {
     ) {
         this.tracker = tracker;
         this.writePageIds = writePageIds;
+        this.pageMemoryList = pageMemoryList;
         this.updatedPartitions = updatedPartitions;
         this.doneFut = doneFut;
         this.updateHeartbeat = updateHeartbeat;
@@ -174,17 +189,19 @@ public class CheckpointPagesWriter implements Runnable {
 
         Map<PersistentPageMemory, PageStoreWriter> pageStoreWriters = new 
HashMap<>();
 
+        // Page store writers for checkpoint buffer pages are located in a 
separate map, because they would not add elements to a
+        // "pageIdsToRetry" map. Instead, they would ignore unsuccessful write 
lock acquisitions. It's implemented this way in order to
+        // avoid duplicates in "pageIdsToRetry", there must only be a single 
source of pages to achieve that.
+        Map<PersistentPageMemory, PageStoreWriter> cpBufferPageStoreWriters = 
new HashMap<>();
+
         ByteBuffer tmpWriteBuf = threadBuf.get();
 
         Result<PersistentPageMemory, FullPageId> queueResult = new Result<>();
 
         GroupPartitionId partitionId = null;
 
-        AtomicBoolean writeMetaPage = new AtomicBoolean();
-
-        Set<GroupPartitionId> inProgressPartitions = new HashSet<>();
-
         try {
+            // TODO https://issues.apache.org/jira/browse/IGNITE-23115 Try to 
write file per thread.
             while (!shutdownNow.getAsBoolean() && 
writePageIds.next(queueResult)) {
                 updateHeartbeat.run();
 
@@ -198,32 +215,19 @@ public class CheckpointPagesWriter implements Runnable {
                     // Starting for the new partition.
                     
checkpointProgress.onStartPartitionProcessing(newPartitionId);
 
-                    inProgressPartitions.add(newPartitionId);
-
                     if (partitionId != null) {
                         // Finishing for the previous partition.
+                        // TODO 
https://issues.apache.org/jira/browse/IGNITE-23105 Reimplement partition 
destruction awaiting.
                         
checkpointProgress.onFinishPartitionProcessing(partitionId);
-
-                        inProgressPartitions.remove(partitionId);
                     }
 
                     partitionId = newPartitionId;
 
-                    updatedPartitions.computeIfAbsent(partitionId, partId -> {
-                        writeMetaPage.set(true);
-
-                        return new LongAdder();
-                    });
-
-                    if (writeMetaPage.get()) {
+                    if (shouldWriteMetaPage(partitionId)) {
                         writePartitionMeta(pageMemory, partitionId, 
tmpWriteBuf.rewind());
-
-                        writeMetaPage.set(false);
                     }
                 }
 
-                tmpWriteBuf.rewind();
-
                 PageStoreWriter pageStoreWriter = 
pageStoreWriters.computeIfAbsent(
                         pageMemory,
                         pm -> createPageStoreWriter(pm, pageIdsToRetry)
@@ -235,15 +239,83 @@ public class CheckpointPagesWriter implements Runnable {
                 }
 
                 // Should also be done for partitions that will be destroyed 
to remove their pages from the data region.
-                pageMemory.checkpointWritePage(fullId, tmpWriteBuf, 
pageStoreWriter, tracker);
+                pageMemory.checkpointWritePage(fullId, tmpWriteBuf.rewind(), 
pageStoreWriter, tracker);
+
+                drainCheckpointBuffers(tmpWriteBuf, cpBufferPageStoreWriters);
             }
         } finally {
-            
inProgressPartitions.forEach(checkpointProgress::onFinishPartitionProcessing);
+            if (partitionId != null) {
+                checkpointProgress.onFinishPartitionProcessing(partitionId);
+            }
         }
 
         return pageIdsToRetry.isEmpty() ? EMPTY : new 
IgniteConcurrentMultiPairQueue<>(pageIdsToRetry);
     }
 
+    /**
+     * Checkpoints parts of checkpoint buffers if they are close to overflow. 
Uses
+     * {@link PersistentPageMemory#isCpBufferOverflowThresholdExceeded()} to 
detect that.
+     */
+    private void drainCheckpointBuffers(
+            ByteBuffer tmpWriteBuf,
+            Map<PersistentPageMemory, PageStoreWriter> pageStoreWriters
+    ) throws IgniteInternalCheckedException {
+        PageStoreWriter pageStoreWriter;
+        boolean retry = true;
+
+        while (retry) {
+            retry = false;
+
+            // We iterate over a list of page memory instances and delete at 
most "CP_BUFFER_PAGES_BATCH_THRESHOLD" pages.
+            // If any of instances still return "true" from 
"isCpBufferOverflowThresholdExceeded", then we would repeat this loop using a
+            // "retry" flag. This guarantees that every page memory instance 
has a continuous process of checkpoint buffer draining, meaning
+            // that those who wait for a free space will receive it in a short 
time.
+            for (PersistentPageMemory pageMemory : pageMemoryList) {
+                int count = 0;
+
+                while (pageMemory.isCpBufferOverflowThresholdExceeded()) {
+                    if (++count >= CP_BUFFER_PAGES_BATCH_THRESHOLD) {
+                        retry = true;
+
+                        break;
+                    }
+
+                    updateHeartbeat.run();
+
+                    FullPageId cpPageId = pageMemory.pullPageFromCpBuffer();
+
+                    if (cpPageId.equals(FullPageId.NULL_PAGE)) {
+                        break;
+                    }
+
+                    GroupPartitionId partitionId = toPartitionId(cpPageId);
+
+                    if (shouldWriteMetaPage(partitionId)) {
+                        writePartitionMeta(pageMemory, partitionId, 
tmpWriteBuf.rewind());
+                    }
+
+                    pageStoreWriter = pageStoreWriters.computeIfAbsent(
+                            pageMemory,
+                            pm -> createPageStoreWriter(pm, null)
+                    );
+
+                    pageMemory.checkpointWritePage(cpPageId, 
tmpWriteBuf.rewind(), pageStoreWriter, tracker);
+                }
+            }
+        }
+    }
+
+    /**
+     * Returns {@code true} if checkpointer should write meta page of 
partition. Guaranteed to return {@code true} exactly once for every
+     * passed partition ID.
+     */
+    private boolean shouldWriteMetaPage(GroupPartitionId partitionId) {
+        // We deliberately avoid "computeIfAbsent" here for the sake of 
performance.
+        // For the overwhelming amount of calls "partitionId" should already 
be in the set.
+        return !updatedPartitions.containsKey(partitionId)
+                && null == updatedPartitions.putIfAbsent(partitionId, new 
LongAdder());
+    }
+
     /**
      * Returns a new instance of {@link PageStoreWriter}.
      *
@@ -252,11 +324,13 @@ public class CheckpointPagesWriter implements Runnable {
      */
     private PageStoreWriter createPageStoreWriter(
             PersistentPageMemory pageMemory,
-            Map<PersistentPageMemory, List<FullPageId>> pagesToRetry
+            @Nullable Map<PersistentPageMemory, List<FullPageId>> pagesToRetry
     ) {
         return (fullPageId, buf, tag) -> {
             if (tag == TRY_AGAIN_TAG) {
-                pagesToRetry.computeIfAbsent(pageMemory, k -> new 
ArrayList<>()).add(fullPageId);
+                if (pagesToRetry != null) {
+                    pagesToRetry.computeIfAbsent(pageMemory, k -> new 
ArrayList<>()).add(fullPageId);
+                }
 
                 return;
             }
@@ -303,6 +377,8 @@ public class CheckpointPagesWriter implements Runnable {
         checkpointProgress.writtenPagesCounter().incrementAndGet();
 
         updatedPartitions.get(partitionId).increment();
+
+        updateHeartbeat.run();
     }
 
     private static boolean hasPartitionChanged(@Nullable GroupPartitionId 
partitionId, FullPageId pageId) {
diff --git 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriterFactory.java
 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriterFactory.java
index 31c7e619d0..301313ccd7 100644
--- 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriterFactory.java
+++ 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriterFactory.java
@@ -19,6 +19,7 @@ package 
org.apache.ignite.internal.pagememory.persistence.checkpoint;
 
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.LongAdder;
@@ -82,6 +83,7 @@ public class CheckpointPagesWriterFactory {
      *
      * @param tracker Checkpoint metrics tracker.
      * @param dirtyPageIdQueue Checkpoint dirty page ID queue to write.
+     * @param pageMemoryList List of {@link PersistentPageMemory} instances 
that have dirty partitions in current checkpoint.
      * @param updatedPartitions Updated partitions.
      * @param doneWriteFut Write done future.
      * @param updateHeartbeat Update heartbeat callback.
@@ -91,6 +93,7 @@ public class CheckpointPagesWriterFactory {
     CheckpointPagesWriter build(
             CheckpointMetricsTracker tracker,
             IgniteConcurrentMultiPairQueue<PersistentPageMemory, FullPageId> 
dirtyPageIdQueue,
+            List<PersistentPageMemory> pageMemoryList,
             ConcurrentMap<GroupPartitionId, LongAdder> updatedPartitions,
             CompletableFuture<?> doneWriteFut,
             Runnable updateHeartbeat,
@@ -101,6 +104,7 @@ public class CheckpointPagesWriterFactory {
         return new CheckpointPagesWriter(
                 tracker,
                 dirtyPageIdQueue,
+                pageMemoryList,
                 updatedPartitions,
                 doneWriteFut,
                 updateHeartbeat,
diff --git 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflow.java
 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflow.java
index db15f800ab..bc074fc851 100644
--- 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflow.java
+++ 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflow.java
@@ -372,6 +372,8 @@ class CheckpointWorkflow {
             Collection<? extends DataRegion<PersistentPageMemory>> dataRegions,
             CompletableFuture<?> allowToReplace
     ) {
+        assert checkpointReadWriteLock.isWriteLockHeldByCurrentThread();
+
         Map<DataRegion<?>, Set<FullPageId>> dirtyPartitionsMap = 
this.dirtyPartitionsMap;
 
         this.dirtyPartitionsMap = new ConcurrentHashMap<>();
diff --git 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
index a197098c4f..1d2b40392f 100644
--- 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
+++ 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
@@ -33,6 +33,7 @@ import static 
org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermin
 
 import java.io.IOException;
 import java.nio.file.Path;
+import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.BlockingQueue;
@@ -431,12 +432,15 @@ public class Checkpointer extends IgniteWorker {
 
         tracker.onPagesWriteStart();
 
+        List<PersistentPageMemory> pageMemoryList = 
checkpointDirtyPages.dirtyPageMemoryInstances();
+
         IgniteConcurrentMultiPairQueue<PersistentPageMemory, FullPageId> 
writePageIds = checkpointDirtyPages.toDirtyPageIdQueue();
 
         for (int i = 0; i < checkpointWritePageThreads; i++) {
             CheckpointPagesWriter write = checkpointPagesWriterFactory.build(
                     tracker,
                     writePageIds,
+                    pageMemoryList,
                     updatedPartitions,
                     futures[i] = new CompletableFuture<>(),
                     workProgressDispatcher::updateHeartbeat,
diff --git 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/RandomLruPageReplacementPolicy.java
 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/RandomLruPageReplacementPolicy.java
index 20ea1d3efc..b049dc106a 100644
--- 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/RandomLruPageReplacementPolicy.java
+++ 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/RandomLruPageReplacementPolicy.java
@@ -117,6 +117,7 @@ public class RandomLruPageReplacementPolicy extends 
PageReplacementPolicy {
 
                 final boolean dirty = PageHeader.dirty(absPageAddr);
 
+                // TODO https://issues.apache.org/jira/browse/IGNITE-23103 Fix 
this condition.
                 if (relRmvAddr == rndAddr || pinned || skip || dirty) {
                     i--;
 
diff --git 
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriterTest.java
 
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriterTest.java
index 5f8745289e..97d5dfdb04 100644
--- 
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriterTest.java
+++ 
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriterTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.pagememory.persistence.checkpoint;
 
+import static java.util.Collections.singletonList;
 import static org.apache.ignite.internal.pagememory.PageIdAllocator.FLAG_AUX;
 import static org.apache.ignite.internal.pagememory.PageIdAllocator.FLAG_DATA;
 import static 
org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory.TRY_AGAIN_TAG;
@@ -129,6 +130,7 @@ public class CheckpointPagesWriterTest extends 
BaseIgniteAbstractTest {
         CheckpointPagesWriter pagesWriter = new CheckpointPagesWriter(
                 tracker,
                 writePageIds,
+                singletonList(pageMemory),
                 updatedPartitions,
                 doneFuture,
                 beforePageWrite,
@@ -168,12 +170,12 @@ public class CheckpointPagesWriterTest extends 
BaseIgniteAbstractTest {
                 ))
         );
 
-        verify(beforePageWrite, times(9)).run();
+        verify(beforePageWrite, times(11)).run();
 
         verify(threadBuf, times(2)).get();
 
         verify(partitionMeta0, times(1)).metaSnapshot(any(UUID.class));
-        verify(partitionMeta0, times(1)).metaSnapshot(any(UUID.class));
+        verify(partitionMeta1, times(1)).metaSnapshot(any(UUID.class));
     }
 
     @Test
@@ -196,6 +198,7 @@ public class CheckpointPagesWriterTest extends 
BaseIgniteAbstractTest {
         CheckpointPagesWriter pagesWriter = new CheckpointPagesWriter(
                 new CheckpointMetricsTracker(),
                 new IgniteConcurrentMultiPairQueue<>(Map.of(pageMemory, 
List.of(fullPageId(0, 0, 1)))),
+                singletonList(pageMemory),
                 new ConcurrentHashMap<>(),
                 doneFuture,
                 () -> {},
@@ -246,6 +249,7 @@ public class CheckpointPagesWriterTest extends 
BaseIgniteAbstractTest {
         CheckpointPagesWriter pagesWriter = new CheckpointPagesWriter(
                 new CheckpointMetricsTracker(),
                 writePageIds,
+                singletonList(pageMemory),
                 updatedPartitions,
                 doneFuture,
                 () -> {},

Reply via email to