This is an automated email from the ASF dual-hosted git repository.

tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new c8f6929690 IGNITE-23115 Checkpoint single partition from a single 
thread (#4379)
c8f6929690 is described below

commit c8f692969004f38997fec60bff0f601e4cad948b
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Fri Sep 13 16:54:36 2024 +0300

    IGNITE-23115 Checkpoint single partition from a single thread (#4379)
---
 .../pagememory/persistence/GroupPartitionId.java   |  31 +---
 .../checkpoint/CheckpointDirtyPages.java           | 100 +++-------
 .../persistence/checkpoint/CheckpointManager.java  |  13 +-
 .../checkpoint/CheckpointPagesWriter.java          | 205 ++++++++++++---------
 .../checkpoint/CheckpointPagesWriterFactory.java   |   7 +-
 .../persistence/checkpoint/CheckpointWorkflow.java |  21 ++-
 .../persistence/checkpoint/Checkpointer.java       |   6 +-
 .../checkpoint/DirtyPagesAndPartitions.java}       |  32 ++--
 .../checkpoint/CheckpointDirtyPagesTest.java       | 127 +++++--------
 .../checkpoint/CheckpointManagerTest.java          |  17 +-
 .../checkpoint/CheckpointPagesWriterTest.java      |  57 ++++--
 .../persistence/checkpoint/CheckpointTest.java     |   7 +-
 .../checkpoint/CheckpointWorkflowTest.java         |  46 ++---
 .../persistence/checkpoint/CheckpointerTest.java   |  13 +-
 ...heckpointTest.java => TestCheckpointUtils.java} |  34 ++--
 15 files changed, 332 insertions(+), 384 deletions(-)

diff --git 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/GroupPartitionId.java
 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/GroupPartitionId.java
index 73c97a3671..5c30c3c267 100644
--- 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/GroupPartitionId.java
+++ 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/GroupPartitionId.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.pagememory.persistence;
 
+import org.apache.ignite.internal.pagememory.FullPageId;
 import org.apache.ignite.internal.tostring.S;
 
 /**
@@ -54,13 +55,11 @@ public class GroupPartitionId implements 
Comparable<GroupPartitionId> {
         return partId;
     }
 
-    /** {@inheritDoc} */
     @Override
     public String toString() {
         return S.toString(GroupPartitionId.class, this);
     }
 
-    /** {@inheritDoc} */
     @Override
     public boolean equals(Object o) {
         if (this == o) {
@@ -73,14 +72,10 @@ public class GroupPartitionId implements 
Comparable<GroupPartitionId> {
 
         GroupPartitionId key = (GroupPartitionId) o;
 
-        if (grpId != key.grpId) {
-            return false;
-        }
+        return grpId == key.grpId && partId == key.partId;
 
-        return partId == key.partId;
     }
 
-    /** {@inheritDoc} */
     @Override
     public int hashCode() {
         int result = grpId;
@@ -90,25 +85,19 @@ public class GroupPartitionId implements 
Comparable<GroupPartitionId> {
         return result;
     }
 
-    /** {@inheritDoc} */
     @Override
     public int compareTo(GroupPartitionId o) {
-        if (getGroupId() < o.getGroupId()) {
-            return -1;
-        }
+        int cmp = Integer.compare(getGroupId(), o.getGroupId());
 
-        if (getGroupId() > o.getGroupId()) {
-            return 1;
+        if (cmp != 0) {
+            return cmp;
         }
 
-        if (getPartitionId() < o.getPartitionId()) {
-            return -1;
-        }
-
-        if (getPartitionId() > o.getPartitionId()) {
-            return 1;
-        }
+        return Integer.compare(getPartitionId(), o.getPartitionId());
+    }
 
-        return 0;
+    /** Converts given full page ID to a {@link GroupPartitionId}. */
+    public static GroupPartitionId convert(FullPageId fullPageId) {
+        return new GroupPartitionId(fullPageId.groupId(), 
fullPageId.partitionId());
     }
 }
diff --git 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointDirtyPages.java
 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointDirtyPages.java
index b87a4188e4..86086ffa8f 100644
--- 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointDirtyPages.java
+++ 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointDirtyPages.java
@@ -20,21 +20,18 @@ package 
org.apache.ignite.internal.pagememory.persistence.checkpoint;
 import static java.util.Arrays.binarySearch;
 import static java.util.stream.Collectors.toList;
 import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageId;
-import static 
org.apache.ignite.internal.pagememory.util.PageIdUtils.partitionId;
 
 import java.util.Comparator;
 import java.util.List;
 import java.util.RandomAccess;
 import org.apache.ignite.internal.lang.IgniteBiTuple;
 import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.pagememory.persistence.GroupPartitionId;
 import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
 import org.apache.ignite.internal.util.IgniteConcurrentMultiPairQueue;
 import org.jetbrains.annotations.Nullable;
 
-/**
- * Dirty pages of data regions, with sorted page IDs by {@link 
#DIRTY_PAGE_COMPARATOR} and unsorted partition IDs that should be
- * checkpointed.
- */
+/** Dirty pages of data regions, with sorted page IDs by {@link 
#DIRTY_PAGE_COMPARATOR} and partition IDs that should be checkpointed. */
 class CheckpointDirtyPages {
     /** Dirty page ID comparator by groupId -> partitionId -> pageIdx. */
     static final Comparator<FullPageId> DIRTY_PAGE_COMPARATOR = Comparator
@@ -44,8 +41,8 @@ class CheckpointDirtyPages {
     /** Empty checkpoint dirty pages. */
     static final CheckpointDirtyPages EMPTY = new 
CheckpointDirtyPages(List.of());
 
-    /** Dirty pages of data regions, with sorted page IDs by {@link 
#DIRTY_PAGE_COMPARATOR} and unsorted partition IDs. */
-    private final List<DataRegionDirtyPages<FullPageId[]>> dirtyPages;
+    /** Dirty pages and partitions of data regions, with sorted dirty page IDs 
by {@link #DIRTY_PAGE_COMPARATOR}. */
+    private final List<DirtyPagesAndPartitions> dirtyPagesAndPartitions;
 
     /** Total number of dirty page IDs. */
     private final int dirtyPagesCount;
@@ -53,14 +50,15 @@ class CheckpointDirtyPages {
     /**
      * Constructor.
      *
-     * @param dirtyPages Dirty pages of data regions, with sorted page IDs by 
{@link #DIRTY_PAGE_COMPARATOR} and unsorted partition IDs.
+     * @param dirtyPagesAndPartitions Dirty pages and partitions of data 
regions, with sorted dirty page IDs by
+     *     {@link #DIRTY_PAGE_COMPARATOR}. Expected list with {@link 
RandomAccess}.
      */
-    public CheckpointDirtyPages(List<DataRegionDirtyPages<FullPageId[]>> 
dirtyPages) {
-        assert dirtyPages instanceof RandomAccess : dirtyPages;
+    CheckpointDirtyPages(List<DirtyPagesAndPartitions> 
dirtyPagesAndPartitions) {
+        assert dirtyPagesAndPartitions instanceof RandomAccess : 
dirtyPagesAndPartitions;
 
-        this.dirtyPages = dirtyPages;
+        this.dirtyPagesAndPartitions = dirtyPagesAndPartitions;
 
-        dirtyPagesCount = dirtyPages.stream().mapToInt(pages -> 
pages.dirtyPages.length).sum();
+        dirtyPagesCount = dirtyPagesAndPartitions.stream().mapToInt(pages -> 
pages.dirtyPages.length).sum();
     }
 
     /**
@@ -70,15 +68,16 @@ class CheckpointDirtyPages {
         return dirtyPagesCount;
     }
 
-    /**
-     * Returns a queue of dirty page IDs to be written to a checkpoint.
-     */
-    public IgniteConcurrentMultiPairQueue<PersistentPageMemory, FullPageId> 
toDirtyPageIdQueue() {
-        List<IgniteBiTuple<PersistentPageMemory, FullPageId[]>> dirtyPageIds = 
dirtyPages.stream()
-                .map(pages -> new IgniteBiTuple<>(pages.pageMemory, 
pages.dirtyPages))
+    /** Creates a concurrent queue of dirty partitions to be written to at 
checkpoint. */
+    public IgniteConcurrentMultiPairQueue<PersistentPageMemory, 
GroupPartitionId> toDirtyPartitionQueue() {
+        List<IgniteBiTuple<PersistentPageMemory, GroupPartitionId[]>> 
dirtyPartitions = dirtyPagesAndPartitions.stream()
+                .map(dirtyPagesAndPartitions -> new IgniteBiTuple<>(
+                        dirtyPagesAndPartitions.pageMemory,
+                        
dirtyPagesAndPartitions.dirtyPartitions.toArray(GroupPartitionId[]::new))
+                )
                 .collect(toList());
 
-        return new IgniteConcurrentMultiPairQueue<>(dirtyPageIds);
+        return new IgniteConcurrentMultiPairQueue<>(dirtyPartitions);
     }
 
     /**
@@ -89,8 +88,8 @@ class CheckpointDirtyPages {
      * @param partId Partition ID.
      */
     public @Nullable CheckpointDirtyPagesView 
getPartitionView(PersistentPageMemory pageMemory, int grpId, int partId) {
-        for (int i = 0; i < dirtyPages.size(); i++) {
-            if (dirtyPages.get(i).pageMemory == pageMemory) {
+        for (int i = 0; i < dirtyPagesAndPartitions.size(); i++) {
+            if (dirtyPagesAndPartitions.get(i).pageMemory == pageMemory) {
                 return getPartitionView(i, grpId, partId);
             }
         }
@@ -102,7 +101,7 @@ class CheckpointDirtyPages {
         FullPageId startPageId = new FullPageId(pageId(partId, (byte) 0, 0), 
grpId);
         FullPageId endPageId = new FullPageId(pageId(partId + 1, (byte) 0, 0), 
grpId);
 
-        FullPageId[] pageIds = dirtyPages.get(dirtyPagesIdx).dirtyPages;
+        FullPageId[] pageIds = 
dirtyPagesAndPartitions.get(dirtyPagesIdx).dirtyPages;
 
         int fromIndex = binarySearch(pageIds, startPageId, 
DIRTY_PAGE_COMPARATOR);
 
@@ -119,51 +118,12 @@ class CheckpointDirtyPages {
         return new CheckpointDirtyPagesView(dirtyPagesIdx, fromIndex, toIndex);
     }
 
-    /**
-     * Looks for the next dirty page IDs view from the current one, {@code 
null} if not found.
-     *
-     * @param currentView Current view to dirty pages, {@code null} to get 
first.
-     */
-    public @Nullable CheckpointDirtyPagesView nextPartitionView(@Nullable 
CheckpointDirtyPagesView currentView) {
-        assert currentView == null || currentView.owner() == this : 
currentView;
-
-        if (dirtyPages.isEmpty()) {
-            return null;
-        }
-
-        int regionIndex;
-        int fromPosition;
-
-        if (currentView == null) {
-            regionIndex = 0;
-            fromPosition = 0;
-        } else {
-            regionIndex = currentView.needsNextRegion() ? 
currentView.regionIndex + 1 : currentView.regionIndex;
-            fromPosition = currentView.needsNextRegion() ? 0 : 
currentView.toPosition;
-        }
-
-        if (regionIndex >= dirtyPages.size()) {
-            return null;
-        }
-
-        FullPageId[] pageIds = dirtyPages.get(regionIndex).dirtyPages;
-
-        FullPageId startPageId = pageIds[fromPosition];
-        FullPageId endPageId = new 
FullPageId(pageId(partitionId(startPageId.pageId()) + 1, (byte) 0, 0), 
startPageId.groupId());
-
-        int toPosition = binarySearch(pageIds, fromPosition, pageIds.length, 
endPageId, DIRTY_PAGE_COMPARATOR);
-
-        toPosition = toPosition > 0 ? toPosition : -toPosition - 1;
-
-        return new CheckpointDirtyPagesView(regionIndex, fromPosition, 
toPosition);
-    }
-
     /**
      * Returns a full list of {@link PersistentPageMemory} instances, for 
which there exist at least a single dirty partition in current
      * checkpoint.
      */
     List<PersistentPageMemory> dirtyPageMemoryInstances() {
-        return this.dirtyPages.stream().map(p -> 
p.pageMemory).collect(toList());
+        return this.dirtyPagesAndPartitions.stream().map(p -> 
p.pageMemory).collect(toList());
     }
 
     /**
@@ -173,7 +133,7 @@ class CheckpointDirtyPages {
      * <p>Thread safe.
      */
     class CheckpointDirtyPagesView {
-        /** Element index in {@link CheckpointDirtyPages#dirtyPages}. */
+        /** Element index in {@link 
CheckpointDirtyPages#dirtyPagesAndPartitions}. */
         private final int regionIndex;
 
         /** Starting position (inclusive) of the dirty page within the element 
at {@link #regionIndex}. */
@@ -185,7 +145,7 @@ class CheckpointDirtyPages {
         /**
          * Private constructor.
          *
-         * @param regionIndex Element index in {@link 
CheckpointDirtyPages#dirtyPages}.
+         * @param regionIndex Element index in {@link 
CheckpointDirtyPages#dirtyPagesAndPartitions}.
          * @param fromPosition Starting position (inclusive) of the dirty page 
within the element at {@link #regionIndex}.
          * @param toPosition End position (exclusive) of the dirty page within 
the element at {@link #regionIndex}.
          */
@@ -201,14 +161,14 @@ class CheckpointDirtyPages {
          * @param index Dirty page index.
          */
         public FullPageId get(int index) {
-            return dirtyPages.get(this.regionIndex).dirtyPages[fromPosition + 
index];
+            return 
dirtyPagesAndPartitions.get(this.regionIndex).dirtyPages[fromPosition + index];
         }
 
         /**
          * Returns the page memory for view.
          */
         public PersistentPageMemory pageMemory() {
-            return dirtyPages.get(regionIndex).pageMemory;
+            return dirtyPagesAndPartitions.get(regionIndex).pageMemory;
         }
 
         /**
@@ -217,14 +177,6 @@ class CheckpointDirtyPages {
         public int size() {
             return toPosition - fromPosition;
         }
-
-        private CheckpointDirtyPages owner() {
-            return CheckpointDirtyPages.this;
-        }
-
-        private boolean needsNextRegion() {
-            return toPosition == dirtyPages.get(regionIndex).dirtyPages.length;
-        }
     }
 
     private static boolean equalsByGroupAndPartition(FullPageId pageId0, 
FullPageId pageId1) {
diff --git 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java
 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java
index 4e9c38f46a..4d85b72087 100644
--- 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java
+++ 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java
@@ -317,7 +317,18 @@ public class CheckpointManager {
 
         CompletableFuture<DeltaFilePageStoreIo> deltaFilePageStoreFuture = 
filePageStore.getOrCreateNewDeltaFile(
                 index -> 
filePageStoreManager.tmpDeltaFilePageStorePath(pageId.groupId(), 
pageId.partitionId(), index),
-                () -> 
pageIndexesForDeltaFilePageStore(pagesToWrite.getPartitionView(pageMemory, 
pageId.groupId(), pageId.partitionId()))
+                () -> {
+                    CheckpointDirtyPagesView partitionView = 
pagesToWrite.getPartitionView(
+                            pageMemory,
+                            pageId.groupId(),
+                            pageId.partitionId()
+                    );
+
+                    assert partitionView != null : String.format("Unable to 
find view for dirty pages: [patitionId=%s, pageMemory=%s]",
+                            GroupPartitionId.convert(pageId), pageMemory);
+
+                    return pageIndexesForDeltaFilePageStore(partitionView);
+                }
         );
 
         deltaFilePageStoreFuture.join().write(pageId.pageId(), pageBuf, 
calculateCrc);
diff --git 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriter.java
 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriter.java
index e1ae8085e7..bf93a24494 100644
--- 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriter.java
+++ 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriter.java
@@ -23,7 +23,6 @@ import static 
org.apache.ignite.internal.pagememory.io.PageIo.getVersion;
 import static 
org.apache.ignite.internal.pagememory.persistence.PartitionMeta.partitionMetaPageId;
 import static 
org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory.TRY_AGAIN_TAG;
 import static org.apache.ignite.internal.pagememory.util.PageIdUtils.flag;
-import static 
org.apache.ignite.internal.util.IgniteConcurrentMultiPairQueue.EMPTY;
 import static org.apache.ignite.internal.util.StringUtils.hexLong;
 
 import java.nio.ByteBuffer;
@@ -31,6 +30,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.LongAdder;
@@ -47,6 +47,7 @@ import 
org.apache.ignite.internal.pagememory.persistence.PartitionMeta.Partition
 import org.apache.ignite.internal.pagememory.persistence.PartitionMetaManager;
 import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
 import org.apache.ignite.internal.pagememory.persistence.WriteDirtyPage;
+import 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointDirtyPages.CheckpointDirtyPagesView;
 import org.apache.ignite.internal.pagememory.persistence.io.PartitionMetaIo;
 import org.apache.ignite.internal.util.IgniteConcurrentMultiPairQueue;
 import org.apache.ignite.internal.util.IgniteConcurrentMultiPairQueue.Result;
@@ -55,6 +56,7 @@ import org.jetbrains.annotations.Nullable;
 /**
  * Implementation of page writer which able to store pages to disk during 
checkpoint.
  */
+// TODO: IGNITE-23203 Write retry pages in multiple threads 
 public class CheckpointPagesWriter implements Runnable {
     /** Logger. */
     private static final IgniteLogger LOG = 
Loggers.forClass(CheckpointPagesWriter.class);
@@ -63,25 +65,20 @@ public class CheckpointPagesWriter implements Runnable {
      * Size of a batch of pages that we drain from a single checkpoint buffer 
at the same time. The value of {@code 10} is chosen
      * arbitrarily. We may reconsider it in <a 
href="https://issues.apache.org/jira/browse/IGNITE-23106";>IGNITE-23106</a> if 
necessary.
      *
-     * @see #drainCheckpointBuffers(ByteBuffer, Map)
+     * @see #drainCheckpointBuffers(ByteBuffer)
      */
     private static final int CP_BUFFER_PAGES_BATCH_THRESHOLD = 10;
 
     /** Checkpoint specific metrics tracker. */
     private final CheckpointMetricsTracker tracker;
 
-    /**
-     * Queue of dirty page IDs to write under this task.
-     *
-     * <p>Overall pages to write may be greater than this queue, since it may 
be necessary to retire write some pages due to unsuccessful
-     * page write lock acquisition
-     */
-    private final IgniteConcurrentMultiPairQueue<PersistentPageMemory, 
FullPageId> writePageIds;
+    /** Queue of dirty partitions IDs to write under this task. */
+    private final IgniteConcurrentMultiPairQueue<PersistentPageMemory, 
GroupPartitionId> dirtyPartitionQueue;
 
     /**
      * List of {@link PersistentPageMemory} instances that have dirty 
partitions in current checkpoint.
      *
-     * @see #drainCheckpointBuffers(ByteBuffer, Map)
+     * @see #drainCheckpointBuffers(ByteBuffer)
      */
     private final List<PersistentPageMemory> pageMemoryList;
 
@@ -116,7 +113,7 @@ public class CheckpointPagesWriter implements Runnable {
      * Creates task for write pages.
      *
      * @param tracker Checkpoint metrics tracker.
-     * @param writePageIds Queue of dirty page IDs to write.
+     * @param dirtyPartitionQueue Queue of dirty partition IDs to write.
      * @param pageMemoryList List of {@link PersistentPageMemory} instances 
that have dirty partitions in current checkpoint.
      * @param updatedPartitions Updated partitions.
      * @param doneFut Done future.
@@ -130,7 +127,7 @@ public class CheckpointPagesWriter implements Runnable {
      */
     CheckpointPagesWriter(
             CheckpointMetricsTracker tracker,
-            IgniteConcurrentMultiPairQueue<PersistentPageMemory, FullPageId> 
writePageIds,
+            IgniteConcurrentMultiPairQueue<PersistentPageMemory, 
GroupPartitionId> dirtyPartitionQueue,
             List<PersistentPageMemory> pageMemoryList,
             ConcurrentMap<GroupPartitionId, LongAdder> updatedPartitions,
             CompletableFuture<?> doneFut,
@@ -143,7 +140,7 @@ public class CheckpointPagesWriter implements Runnable {
             BooleanSupplier shutdownNow
     ) {
         this.tracker = tracker;
-        this.writePageIds = writePageIds;
+        this.dirtyPartitionQueue = dirtyPartitionQueue;
         this.pageMemoryList = pageMemoryList;
         this.updatedPartitions = updatedPartitions;
         this.doneFut = doneFut;
@@ -159,15 +156,26 @@ public class CheckpointPagesWriter implements Runnable {
     @Override
     public void run() {
         try {
-            IgniteConcurrentMultiPairQueue<PersistentPageMemory, FullPageId> 
pageIdsToRetry = writePages(writePageIds);
+            Map<PersistentPageMemory, List<FullPageId>> pageIdsToRetry = new 
HashMap<>();
 
-            while (!pageIdsToRetry.isEmpty()) {
-                if (LOG.isInfoEnabled()) {
-                    LOG.info("Checkpoint pages were not written yet due to "
-                            + "unsuccessful page write lock acquisition and 
will be retried [pageCount={}]", pageIdsToRetry.size());
-                }
+            ByteBuffer tmpWriteBuf = threadBuf.get();
+
+            var queueResult = new Result<PersistentPageMemory, 
GroupPartitionId>();
+
+            while (!shutdownNow.getAsBoolean() && 
dirtyPartitionQueue.next(queueResult)) {
+                updateHeartbeat.run();
+
+                PersistentPageMemory pageMemory = queueResult.getKey();
+
+                PageStoreWriter pageStoreWriter = 
createPageStoreWriter(pageMemory, pageIdsToRetry);
+
+                writeDirtyPages(pageMemory, queueResult.getValue(), 
tmpWriteBuf, pageStoreWriter);
+            }
 
-                pageIdsToRetry = writePages(pageIdsToRetry);
+            while (!shutdownNow.getAsBoolean() && !pageIdsToRetry.isEmpty()) {
+                updateHeartbeat.run();
+
+                pageIdsToRetry = writeRetryDirtyPages(pageIdsToRetry, 
tmpWriteBuf);
             }
 
             doneFut.complete(null);
@@ -176,90 +184,105 @@ public class CheckpointPagesWriter implements Runnable {
         }
     }
 
-    /**
-     * Writes dirty pages.
-     *
-     * @param writePageIds Queue of dirty page IDs to write.
-     * @return pagesToRetry Queue dirty page IDs which should be retried.
-     */
-    private IgniteConcurrentMultiPairQueue<PersistentPageMemory, FullPageId> 
writePages(
-            IgniteConcurrentMultiPairQueue<PersistentPageMemory, FullPageId> 
writePageIds
+    private void writeDirtyPages(
+            PersistentPageMemory pageMemory,
+            GroupPartitionId partitionId,
+            ByteBuffer tmpWriteBuf,
+            PageStoreWriter pageStoreWriter
     ) throws IgniteInternalCheckedException {
-        Map<PersistentPageMemory, List<FullPageId>> pageIdsToRetry = new 
HashMap<>();
+        CheckpointDirtyPagesView checkpointDirtyPagesView = 
checkpointDirtyPagesView(pageMemory, partitionId);
 
-        Map<PersistentPageMemory, PageStoreWriter> pageStoreWriters = new 
HashMap<>();
+        checkpointProgress.onStartPartitionProcessing(partitionId);
 
-        // Page store writers for checkpoint buffer pages are located in a 
separate map, because they would not add elements to a
-        // "pageIdsToRetry" map. Instead, they would ignore unsuccessful write 
lock acquisitions. It's implemented this way in order to
-        // avoid duplicates in "pageIdsToRetry", there must only be a single 
source of pages to achieve that.
-        Map<PersistentPageMemory, PageStoreWriter> cpBufferPageStoreWriters = 
new HashMap<>();
+        try {
+            if (shouldWriteMetaPage(partitionId)) {
+                writePartitionMeta(pageMemory, partitionId, 
tmpWriteBuf.rewind());
+            }
 
-        ByteBuffer tmpWriteBuf = threadBuf.get();
+            for (int i = 0; i < checkpointDirtyPagesView.size() && 
!shutdownNow.getAsBoolean(); i++) {
+                updateHeartbeat.run();
 
-        Result<PersistentPageMemory, FullPageId> queueResult = new Result<>();
+                FullPageId pageId = checkpointDirtyPagesView.get(i);
 
-        GroupPartitionId partitionId = null;
+                if (pageId.pageIdx() == 0) {
+                    // Skip meta-pages, they are written by 
"writePartitionMeta".
+                    continue;
+                }
 
-        try {
-            // TODO https://issues.apache.org/jira/browse/IGNITE-23115 Try to 
write file per thread.
-            while (!shutdownNow.getAsBoolean() && 
writePageIds.next(queueResult)) {
-                updateHeartbeat.run();
+                writeDirtyPage(pageMemory, pageId, tmpWriteBuf, 
pageStoreWriter);
+            }
+        } finally {
+            checkpointProgress.onFinishPartitionProcessing(partitionId);
+        }
+    }
 
-                FullPageId fullId = queueResult.getValue();
+    private void writeDirtyPage(
+            PersistentPageMemory pageMemory,
+            FullPageId pageId,
+            ByteBuffer tmpWriteBuf,
+            PageStoreWriter pageStoreWriter
+    ) throws IgniteInternalCheckedException {
+        // Should also be done for partitions that will be destroyed to remove 
their pages from the data region.
+        pageMemory.checkpointWritePage(pageId, tmpWriteBuf.rewind(), 
pageStoreWriter, tracker);
 
-                PersistentPageMemory pageMemory = queueResult.getKey();
+        drainCheckpointBuffers(tmpWriteBuf);
+    }
 
-                if (hasPartitionChanged(partitionId, fullId)) {
-                    GroupPartitionId newPartitionId = toPartitionId(fullId);
+    private Map<PersistentPageMemory, List<FullPageId>> writeRetryDirtyPages(
+            Map<PersistentPageMemory, List<FullPageId>> pageIdsToRetry,
+            ByteBuffer tmpWriteBuf
+    ) throws IgniteInternalCheckedException {
+        if (LOG.isInfoEnabled()) {
+            int pageCount = 
pageIdsToRetry.values().stream().mapToInt(List::size).sum();
 
-                    // Starting for the new partition.
-                    
checkpointProgress.onStartPartitionProcessing(newPartitionId);
+            LOG.info("Checkpoint pages were not written yet due to "
+                    + "unsuccessful page write lock acquisition and will be 
retried [pageCount={}]", pageCount);
+        }
 
-                    if (partitionId != null) {
-                        // Finishing for the previous partition.
-                        // TODO 
https://issues.apache.org/jira/browse/IGNITE-23105 Reimplement partition 
destruction awaiting.
-                        
checkpointProgress.onFinishPartitionProcessing(partitionId);
-                    }
+        var newPageIdsToRetry = new HashMap<PersistentPageMemory, 
List<FullPageId>>();
 
-                    partitionId = newPartitionId;
+        for (Entry<PersistentPageMemory, List<FullPageId>> entry : 
pageIdsToRetry.entrySet()) {
+            PersistentPageMemory pageMemory = entry.getKey();
 
-                    if (shouldWriteMetaPage(partitionId)) {
-                        writePartitionMeta(pageMemory, partitionId, 
tmpWriteBuf.rewind());
+            PageStoreWriter pageStoreWriter = 
createPageStoreWriter(pageMemory, newPageIdsToRetry);
+
+            GroupPartitionId partitionId = null;
+
+            try {
+                for (FullPageId pageId : entry.getValue()) {
+                    if (shutdownNow.getAsBoolean()) {
+                        return Map.of();
                     }
-                }
 
-                PageStoreWriter pageStoreWriter = 
pageStoreWriters.computeIfAbsent(
-                        pageMemory,
-                        pm -> createPageStoreWriter(pm, pageIdsToRetry)
-                );
+                    updateHeartbeat.run();
+
+                    if (partitionIdChanged(partitionId, pageId)) {
+                        if (partitionId != null) {
+                            
checkpointProgress.onFinishPartitionProcessing(partitionId);
+                        }
 
-                if (fullId.pageIdx() == 0) {
-                    // Skip meta-pages, they are written by 
"writePartitionMeta".
-                    continue;
-                }
+                        partitionId = GroupPartitionId.convert(pageId);
 
-                // Should also be done for partitions that will be destroyed 
to remove their pages from the data region.
-                pageMemory.checkpointWritePage(fullId, tmpWriteBuf.rewind(), 
pageStoreWriter, tracker);
+                        
checkpointProgress.onStartPartitionProcessing(partitionId);
+                    }
 
-                drainCheckpointBuffers(tmpWriteBuf, cpBufferPageStoreWriters);
-            }
-        } finally {
-            if (partitionId != null) {
-                checkpointProgress.onFinishPartitionProcessing(partitionId);
+                    writeDirtyPage(pageMemory, pageId, tmpWriteBuf, 
pageStoreWriter);
+                }
+            } finally {
+                if (partitionId != null) {
+                    
checkpointProgress.onFinishPartitionProcessing(partitionId);
+                }
             }
         }
 
-        return pageIdsToRetry.isEmpty() ? EMPTY : new 
IgniteConcurrentMultiPairQueue<>(pageIdsToRetry);
+        return newPageIdsToRetry;
     }
 
     /**
      * Checkpoints parts of checkpoint buffers if they are close to overflow. 
Uses
      * {@link PersistentPageMemory#isCpBufferOverflowThresholdExceeded()} to 
detect that.
      */
-    private void drainCheckpointBuffers(
-            ByteBuffer tmpWriteBuf,
-            Map<PersistentPageMemory, PageStoreWriter> pageStoreWriters
-    ) throws IgniteInternalCheckedException {
+    private void drainCheckpointBuffers(ByteBuffer tmpWriteBuf) throws 
IgniteInternalCheckedException {
         PageStoreWriter pageStoreWriter;
         boolean retry = true;
 
@@ -288,16 +311,13 @@ public class CheckpointPagesWriter implements Runnable {
                         break;
                     }
 
-                    GroupPartitionId partitionId = toPartitionId(cpPageId);
+                    GroupPartitionId partitionId = 
GroupPartitionId.convert(cpPageId);
 
                     if (shouldWriteMetaPage(partitionId)) {
                         writePartitionMeta(pageMemory, partitionId, 
tmpWriteBuf.rewind());
                     }
 
-                    pageStoreWriter = pageStoreWriters.computeIfAbsent(
-                            pageMemory,
-                            pm -> createPageStoreWriter(pm, null)
-                    );
+                    pageStoreWriter = createPageStoreWriter(pageMemory, null);
 
                     pageMemory.checkpointWritePage(cpPageId, 
tmpWriteBuf.rewind(), pageStoreWriter, tracker);
                 }
@@ -350,7 +370,7 @@ public class CheckpointPagesWriter implements Runnable {
 
             pageWriter.write(pageMemory, fullPageId, buf);
 
-            updatedPartitions.get(toPartitionId(fullPageId)).increment();
+            
updatedPartitions.get(GroupPartitionId.convert(fullPageId)).increment();
         };
     }
 
@@ -381,11 +401,24 @@ public class CheckpointPagesWriter implements Runnable {
         updateHeartbeat.run();
     }
 
-    private static boolean hasPartitionChanged(@Nullable GroupPartitionId 
partitionId, FullPageId pageId) {
-        return partitionId == null || partitionId.getPartitionId() != 
pageId.partitionId() || partitionId.getGroupId() != pageId.groupId();
+    private CheckpointDirtyPagesView 
checkpointDirtyPagesView(PersistentPageMemory pageMemory, GroupPartitionId 
partitionId) {
+        CheckpointDirtyPages checkpointDirtyPages = 
checkpointProgress.pagesToWrite();
+
+        assert checkpointDirtyPages != null;
+
+        CheckpointDirtyPagesView partitionView = 
checkpointDirtyPages.getPartitionView(
+                pageMemory,
+                partitionId.getGroupId(),
+                partitionId.getPartitionId()
+        );
+
+        assert partitionView != null : String.format("Unable to find view for 
dirty pages: [patitionId=%s, pageMemory=%s]", partitionId,
+                pageMemory);
+
+        return partitionView;
     }
 
-    private static GroupPartitionId toPartitionId(FullPageId pageId) {
-        return new GroupPartitionId(pageId.groupId(), pageId.partitionId());
+    private static boolean partitionIdChanged(@Nullable GroupPartitionId 
partitionId, FullPageId pageId) {
+        return partitionId == null || partitionId.getGroupId() != 
pageId.groupId() || partitionId.getPartitionId() != pageId.partitionId();
     }
 }
diff --git 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriterFactory.java
 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriterFactory.java
index 301313ccd7..c0c05e5ca1 100644
--- 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriterFactory.java
+++ 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriterFactory.java
@@ -24,7 +24,6 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.LongAdder;
 import java.util.function.BooleanSupplier;
-import org.apache.ignite.internal.pagememory.FullPageId;
 import org.apache.ignite.internal.pagememory.io.PageIoRegistry;
 import org.apache.ignite.internal.pagememory.persistence.GroupPartitionId;
 import org.apache.ignite.internal.pagememory.persistence.PartitionMetaManager;
@@ -82,7 +81,7 @@ public class CheckpointPagesWriterFactory {
      * Returns instance of page checkpoint writer.
      *
      * @param tracker Checkpoint metrics tracker.
-     * @param dirtyPageIdQueue Checkpoint dirty page ID queue to write.
+     * @param dirtyPartitionQueue Checkpoint dirty partition ID queue to write.
      * @param pageMemoryList List of {@link PersistentPageMemory} instances 
that have dirty partitions in current checkpoint.
      * @param updatedPartitions Updated partitions.
      * @param doneWriteFut Write done future.
@@ -92,7 +91,7 @@ public class CheckpointPagesWriterFactory {
      */
     CheckpointPagesWriter build(
             CheckpointMetricsTracker tracker,
-            IgniteConcurrentMultiPairQueue<PersistentPageMemory, FullPageId> 
dirtyPageIdQueue,
+            IgniteConcurrentMultiPairQueue<PersistentPageMemory, 
GroupPartitionId> dirtyPartitionQueue,
             List<PersistentPageMemory> pageMemoryList,
             ConcurrentMap<GroupPartitionId, LongAdder> updatedPartitions,
             CompletableFuture<?> doneWriteFut,
@@ -103,7 +102,7 @@ public class CheckpointPagesWriterFactory {
     ) {
         return new CheckpointPagesWriter(
                 tracker,
-                dirtyPageIdQueue,
+                dirtyPartitionQueue,
                 pageMemoryList,
                 updatedPartitions,
                 doneWriteFut,
diff --git 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflow.java
 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflow.java
index bc074fc851..ce5a4c9dbc 100644
--- 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflow.java
+++ 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflow.java
@@ -37,6 +37,7 @@ import static 
org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermin
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -57,6 +58,7 @@ import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.pagememory.DataRegion;
 import org.apache.ignite.internal.pagememory.FullPageId;
 import org.apache.ignite.internal.pagememory.PageMemory;
+import org.apache.ignite.internal.pagememory.persistence.GroupPartitionId;
 import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
 import org.apache.ignite.internal.thread.NamedThreadFactory;
 import org.apache.ignite.internal.util.CollectionUtils;
@@ -409,13 +411,15 @@ class CheckpointWorkflow {
     CheckpointDirtyPages createAndSortCheckpointDirtyPages(
             DataRegionsDirtyPages dataRegionsDirtyPages
     ) throws IgniteInternalCheckedException {
-        List<DataRegionDirtyPages<FullPageId[]>> checkpointDirtyPages = new 
ArrayList<>();
+        var checkpointDirtyPages = new ArrayList<DirtyPagesAndPartitions>();
 
         int realPagesArrSize = 0;
 
-        // Collect arrays of dirty pages for sorting.
+        // Collects dirty pages into an array (then we will sort them) and 
collects dirty partitions.
         for (DataRegionDirtyPages<Collection<FullPageId>> dataRegionDirtyPages 
: dataRegionsDirtyPages.dirtyPages) {
-            FullPageId[] pageIds = new 
FullPageId[dataRegionDirtyPages.dirtyPages.size()];
+            var pageIds = new 
FullPageId[dataRegionDirtyPages.dirtyPages.size()];
+
+            var partitionIds = new HashSet<GroupPartitionId>();
 
             int pagePos = 0;
 
@@ -424,6 +428,7 @@ class CheckpointWorkflow {
                         "Incorrect estimated dirty pages number: " + 
dataRegionsDirtyPages.dirtyPageCount;
 
                 pageIds[pagePos++] = dirtyPage;
+                partitionIds.add(GroupPartitionId.convert(dirtyPage));
             }
 
             // Some pages may have been already replaced.
@@ -433,20 +438,20 @@ class CheckpointWorkflow {
                 pageIds = Arrays.copyOf(pageIds, pagePos);
             }
 
-            checkpointDirtyPages.add(new 
DataRegionDirtyPages<>(dataRegionDirtyPages.pageMemory, pageIds));
+            checkpointDirtyPages.add(new 
DirtyPagesAndPartitions(dataRegionDirtyPages.pageMemory, pageIds, 
partitionIds));
         }
 
         // Add tasks to sort arrays of dirty page IDs in parallel if their 
number is greater than or equal to PARALLEL_SORT_THRESHOLD.
         List<ForkJoinTask<?>> parallelSortTasks = checkpointDirtyPages.stream()
-                .map(dataRegionDirtyPages -> dataRegionDirtyPages.dirtyPages)
+                .map(dirtyPagesAndPartitions -> 
dirtyPagesAndPartitions.dirtyPages)
                 .filter(pageIds -> pageIds.length >= PARALLEL_SORT_THRESHOLD)
                 .map(pageIds -> parallelSortThreadPool.submit(() -> 
Arrays.parallelSort(pageIds, DIRTY_PAGE_COMPARATOR)))
                 .collect(toList());
 
         // Sort arrays of dirty page IDs if their number is less than 
PARALLEL_SORT_THRESHOLD.
-        for (DataRegionDirtyPages<FullPageId[]> dataRegionDirtyPages : 
checkpointDirtyPages) {
-            if (dataRegionDirtyPages.dirtyPages.length < 
PARALLEL_SORT_THRESHOLD) {
-                Arrays.sort(dataRegionDirtyPages.dirtyPages, 
DIRTY_PAGE_COMPARATOR);
+        for (DirtyPagesAndPartitions dirtyPagesAndPartitions : 
checkpointDirtyPages) {
+            if (dirtyPagesAndPartitions.dirtyPages.length < 
PARALLEL_SORT_THRESHOLD) {
+                Arrays.sort(dirtyPagesAndPartitions.dirtyPages, 
DIRTY_PAGE_COMPARATOR);
             }
         }
 
diff --git 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
index 39a7bbfe69..4b8c5457d8 100644
--- 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
+++ 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
@@ -57,7 +57,6 @@ import org.apache.ignite.internal.lang.NodeStoppingException;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.pagememory.DataRegion;
-import org.apache.ignite.internal.pagememory.FullPageId;
 import 
org.apache.ignite.internal.pagememory.configuration.schema.PageMemoryCheckpointConfiguration;
 import 
org.apache.ignite.internal.pagememory.configuration.schema.PageMemoryCheckpointView;
 import org.apache.ignite.internal.pagememory.persistence.GroupPartitionId;
@@ -458,12 +457,13 @@ public class Checkpointer extends IgniteWorker {
 
         List<PersistentPageMemory> pageMemoryList = 
checkpointDirtyPages.dirtyPageMemoryInstances();
 
-        IgniteConcurrentMultiPairQueue<PersistentPageMemory, FullPageId> 
writePageIds = checkpointDirtyPages.toDirtyPageIdQueue();
+        IgniteConcurrentMultiPairQueue<PersistentPageMemory, GroupPartitionId> 
dirtyPartitionQueue
+                = checkpointDirtyPages.toDirtyPartitionQueue();
 
         for (int i = 0; i < checkpointWritePageThreads; i++) {
             CheckpointPagesWriter write = checkpointPagesWriterFactory.build(
                     tracker,
-                    writePageIds,
+                    dirtyPartitionQueue,
                     pageMemoryList,
                     updatedPartitions,
                     futures[i] = new CompletableFuture<>(),
diff --git 
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointTest.java
 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/DirtyPagesAndPartitions.java
similarity index 51%
copy from 
modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointTest.java
copy to 
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/DirtyPagesAndPartitions.java
index 6f5f6c3fc5..eadbbd2802 100644
--- 
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointTest.java
+++ 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/DirtyPagesAndPartitions.java
@@ -17,32 +17,22 @@
 
 package org.apache.ignite.internal.pagememory.persistence.checkpoint;
 
-import static 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointDirtyPages.EMPTY;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.Mockito.mock;
-
-import java.util.List;
+import java.util.Set;
 import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.pagememory.persistence.GroupPartitionId;
 import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
-import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
-import org.junit.jupiter.api.Test;
 
-/**
- * For {@link Checkpoint} testing.
- */
-public class CheckpointTest extends BaseIgniteAbstractTest {
-    @Test
-    void testHasDelta() {
-        CheckpointProgressImpl progress = mock(CheckpointProgressImpl.class);
+/** Container of dirty pages and partitions. */
+class DirtyPagesAndPartitions {
+    final PersistentPageMemory pageMemory;
 
-        assertFalse(new Checkpoint(EMPTY, progress).hasDelta());
+    final FullPageId[] dirtyPages;
 
-        DataRegionDirtyPages<FullPageId[]> biTuple = new 
DataRegionDirtyPages<>(
-                mock(PersistentPageMemory.class),
-                new FullPageId[]{new FullPageId(0, 1)}
-        );
+    final Set<GroupPartitionId> dirtyPartitions;
 
-        assertTrue(new Checkpoint(new CheckpointDirtyPages(List.of(biTuple)), 
progress).hasDelta());
+    DirtyPagesAndPartitions(PersistentPageMemory pageMemory, FullPageId[] 
dirtyPages, Set<GroupPartitionId> dirtyPartitions) {
+        this.pageMemory = pageMemory;
+        this.dirtyPages = dirtyPages;
+        this.dirtyPartitions = dirtyPartitions;
     }
 }
diff --git 
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointDirtyPagesTest.java
 
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointDirtyPagesTest.java
index 415d97fc71..05c1647897 100644
--- 
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointDirtyPagesTest.java
+++ 
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointDirtyPagesTest.java
@@ -30,13 +30,13 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.Mockito.mock;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
 import java.util.function.Predicate;
 import java.util.stream.IntStream;
 import java.util.stream.Stream;
 import org.apache.ignite.internal.lang.IgniteBiTuple;
 import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.pagememory.persistence.GroupPartitionId;
 import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
 import 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointDirtyPages.CheckpointDirtyPagesView;
 import org.apache.ignite.internal.pagememory.util.PageIdUtils;
@@ -51,8 +51,8 @@ import org.junit.jupiter.api.Test;
 public class CheckpointDirtyPagesTest extends BaseIgniteAbstractTest {
     @Test
     void testDirtyPagesCount() {
-        DataRegionDirtyPages<FullPageId[]> dirtyPages0 = 
createDirtyPages(of(0, 0, 0), of(0, 0, 1));
-        DataRegionDirtyPages<FullPageId[]> dirtyPages1 = 
createDirtyPages(of(1, 0, 0), of(1, 0, 1), of(1, 0, 2));
+        DirtyPagesAndPartitions dirtyPages0 = 
createDirtyPagesAndPartitions(of(0, 0, 0), of(0, 0, 1));
+        DirtyPagesAndPartitions dirtyPages1 = 
createDirtyPagesAndPartitions(of(1, 0, 0), of(1, 0, 1), of(1, 0, 2));
 
         assertEquals(0, EMPTY.dirtyPagesCount());
         assertEquals(2, new 
CheckpointDirtyPages(List.of(dirtyPages0)).dirtyPagesCount());
@@ -61,18 +61,18 @@ public class CheckpointDirtyPagesTest extends 
BaseIgniteAbstractTest {
     }
 
     @Test
-    void testToDirtyPageIdQueue() {
-        assertTrue(EMPTY.toDirtyPageIdQueue().isEmpty());
+    void testToDirtyPartitionQueue() {
+        assertTrue(EMPTY.toDirtyPartitionQueue().isEmpty());
 
-        DataRegionDirtyPages<FullPageId[]> dirtyPages0 = 
createDirtyPages(of(0, 0, 0));
-        DataRegionDirtyPages<FullPageId[]> dirtyPages1 = 
createDirtyPages(of(1, 0, 0), of(1, 0, 1));
-        DataRegionDirtyPages<FullPageId[]> dirtyPages2 = 
createDirtyPages(of(2, 0, 0), of(2, 1, 0), of(3, 2, 2));
+        DirtyPagesAndPartitions dirtyPages0 = 
createDirtyPagesAndPartitions(of(0, 0, 0));
+        DirtyPagesAndPartitions dirtyPages1 = 
createDirtyPagesAndPartitions(of(1, 0, 0), of(1, 0, 1));
+        DirtyPagesAndPartitions dirtyPages2 = 
createDirtyPagesAndPartitions(of(2, 0, 0), of(2, 1, 0), of(3, 2, 2));
 
-        CheckpointDirtyPages checkpointDirtyPages = new 
CheckpointDirtyPages(List.of(dirtyPages0, dirtyPages1, dirtyPages2));
+        var checkpointDirtyPages = new 
CheckpointDirtyPages(List.of(dirtyPages0, dirtyPages1, dirtyPages2));
 
         assertThat(
-                toListPair(checkpointDirtyPages.toDirtyPageIdQueue()),
-                equalTo(toListPair(dirtyPages0, dirtyPages1, dirtyPages2))
+                toListPair(checkpointDirtyPages.toDirtyPartitionQueue()),
+                equalTo(toListDirtyPartitionPair(dirtyPages0, dirtyPages1, 
dirtyPages2))
         );
     }
 
@@ -80,16 +80,16 @@ public class CheckpointDirtyPagesTest extends 
BaseIgniteAbstractTest {
     void testGetPartitionViewByPageMemory() {
         assertThrows(IllegalArgumentException.class, () -> 
EMPTY.getPartitionView(mock(PersistentPageMemory.class), 0, 0));
 
-        DataRegionDirtyPages<FullPageId[]> dirtyPages0 = 
createDirtyPages(of(0, 0, 0));
-        DataRegionDirtyPages<FullPageId[]> dirtyPages1 = 
createDirtyPages(of(5, 0, 0));
-        DataRegionDirtyPages<FullPageId[]> dirtyPages2 = 
createDirtyPages(of(1, 0, 0), of(1, 0, 1));
-        DataRegionDirtyPages<FullPageId[]> dirtyPages3 = createDirtyPages(
+        DirtyPagesAndPartitions dirtyPages0 = 
createDirtyPagesAndPartitions(of(0, 0, 0));
+        DirtyPagesAndPartitions dirtyPages1 = 
createDirtyPagesAndPartitions(of(5, 0, 0));
+        DirtyPagesAndPartitions dirtyPages2 = 
createDirtyPagesAndPartitions(of(1, 0, 0), of(1, 0, 1));
+        DirtyPagesAndPartitions dirtyPages3 = createDirtyPagesAndPartitions(
                 of(2, 0, 0), of(2, 0, 1),
                 of(2, 1, 1),
                 of(3, 2, 2), of(3, 2, 3)
         );
 
-        CheckpointDirtyPages checkpointDirtyPages = new 
CheckpointDirtyPages(List.of(dirtyPages0, dirtyPages1, dirtyPages2, 
dirtyPages3));
+        var checkpointDirtyPages = new 
CheckpointDirtyPages(List.of(dirtyPages0, dirtyPages1, dirtyPages2, 
dirtyPages3));
 
         
assertNull(checkpointDirtyPages.getPartitionView(dirtyPages0.pageMemory, 4, 0));
         
assertNull(checkpointDirtyPages.getPartitionView(dirtyPages1.pageMemory, 4, 0));
@@ -99,83 +99,36 @@ public class CheckpointDirtyPagesTest extends 
BaseIgniteAbstractTest {
         
assertNull(checkpointDirtyPages.getPartitionView(dirtyPages2.pageMemory, 5, 0));
 
         assertThat(
-                
toListPair(checkpointDirtyPages.getPartitionView(dirtyPages0.pageMemory, 0, 0)),
-                equalTo(toListPair(dirtyPages0))
+                
toListDirtyPagePair(checkpointDirtyPages.getPartitionView(dirtyPages0.pageMemory,
 0, 0)),
+                equalTo(toListDirtyPagePair(dirtyPages0))
         );
 
         assertThat(
-                
toListPair(checkpointDirtyPages.getPartitionView(dirtyPages1.pageMemory, 5, 0)),
-                equalTo(toListPair(dirtyPages1))
+                
toListDirtyPagePair(checkpointDirtyPages.getPartitionView(dirtyPages1.pageMemory,
 5, 0)),
+                equalTo(toListDirtyPagePair(dirtyPages1))
         );
 
         assertThat(
-                
toListPair(checkpointDirtyPages.getPartitionView(dirtyPages2.pageMemory, 1, 0)),
-                equalTo(toListPair(dirtyPages2))
+                
toListDirtyPagePair(checkpointDirtyPages.getPartitionView(dirtyPages2.pageMemory,
 1, 0)),
+                equalTo(toListDirtyPagePair(dirtyPages2))
         );
 
         assertThat(
-                
toListPair(checkpointDirtyPages.getPartitionView(dirtyPages3.pageMemory, 2, 0)),
-                equalTo(toListPair(equalsByGroupAndPartition(2, 0), 
dirtyPages3))
+                
toListDirtyPagePair(checkpointDirtyPages.getPartitionView(dirtyPages3.pageMemory,
 2, 0)),
+                equalTo(toListDirtyPagePair(equalsByGroupAndPartition(2, 0), 
dirtyPages3))
         );
 
         assertThat(
-                
toListPair(checkpointDirtyPages.getPartitionView(dirtyPages3.pageMemory, 2, 1)),
-                equalTo(toListPair(equalsByGroupAndPartition(2, 1), 
dirtyPages3))
+                
toListDirtyPagePair(checkpointDirtyPages.getPartitionView(dirtyPages3.pageMemory,
 2, 1)),
+                equalTo(toListDirtyPagePair(equalsByGroupAndPartition(2, 1), 
dirtyPages3))
         );
 
         assertThat(
-                
toListPair(checkpointDirtyPages.getPartitionView(dirtyPages3.pageMemory, 3, 2)),
-                equalTo(toListPair(equalsByGroupAndPartition(3, 2), 
dirtyPages3))
+                
toListDirtyPagePair(checkpointDirtyPages.getPartitionView(dirtyPages3.pageMemory,
 3, 2)),
+                equalTo(toListDirtyPagePair(equalsByGroupAndPartition(3, 2), 
dirtyPages3))
         );
     }
 
-    @Test
-    void testNextPartitionView() {
-        assertNull(EMPTY.nextPartitionView(null));
-
-        DataRegionDirtyPages<FullPageId[]> dirtyPages0 = 
createDirtyPages(of(0, 0, 0));
-        DataRegionDirtyPages<FullPageId[]> dirtyPages1 = 
createDirtyPages(of(5, 0, 0));
-        DataRegionDirtyPages<FullPageId[]> dirtyPages2 = 
createDirtyPages(of(1, 0, 0), of(1, 0, 1));
-        DataRegionDirtyPages<FullPageId[]> dirtyPages3 = createDirtyPages(
-                of(2, 0, 0), of(2, 0, 1),
-                of(2, 1, 1),
-                of(3, 2, 2), of(3, 2, 3)
-        );
-
-        CheckpointDirtyPages checkpointDirtyPages = new 
CheckpointDirtyPages(List.of(dirtyPages0, dirtyPages1, dirtyPages2, 
dirtyPages3));
-
-        CheckpointDirtyPagesView view = 
checkpointDirtyPages.nextPartitionView(null);
-
-        assertThat(toListPair(view), equalTo(toListPair(dirtyPages0)));
-
-        assertThat(
-                toListPair(view = 
checkpointDirtyPages.nextPartitionView(view)),
-                equalTo(toListPair(dirtyPages1))
-        );
-
-        assertThat(
-                toListPair(view = 
checkpointDirtyPages.nextPartitionView(view)),
-                equalTo(toListPair(dirtyPages2))
-        );
-
-        assertThat(
-                toListPair(view = 
checkpointDirtyPages.nextPartitionView(view)),
-                equalTo(toListPair(equalsByGroupAndPartition(2, 0), 
dirtyPages3))
-        );
-
-        assertThat(
-                toListPair(view = 
checkpointDirtyPages.nextPartitionView(view)),
-                equalTo(toListPair(equalsByGroupAndPartition(2, 1), 
dirtyPages3))
-        );
-
-        assertThat(
-                toListPair(view = 
checkpointDirtyPages.nextPartitionView(view)),
-                equalTo(toListPair(equalsByGroupAndPartition(3, 2), 
dirtyPages3))
-        );
-
-        assertNull(checkpointDirtyPages.nextPartitionView(view));
-    }
-
     @Test
     void testSortDirtyPageIds() {
         List<FullPageId> pageIds = new ArrayList<>(List.of(
@@ -196,10 +149,8 @@ public class CheckpointDirtyPagesTest extends 
BaseIgniteAbstractTest {
         );
     }
 
-    private static DataRegionDirtyPages<FullPageId[]> 
createDirtyPages(FullPageId... pageIds) {
-        Arrays.sort(pageIds, DIRTY_PAGE_COMPARATOR);
-
-        return new DataRegionDirtyPages<>(mock(PersistentPageMemory.class), 
pageIds);
+    private static DirtyPagesAndPartitions 
createDirtyPagesAndPartitions(FullPageId... pageIds) {
+        return 
TestCheckpointUtils.createDirtyPagesAndPartitions(mock(PersistentPageMemory.class),
 pageIds);
     }
 
     private static FullPageId of(int groupId, int partId, int pageIdx) {
@@ -220,13 +171,21 @@ public class CheckpointDirtyPagesTest extends 
BaseIgniteAbstractTest {
                 .collect(toList());
     }
 
-    private static List<IgniteBiTuple<PersistentPageMemory, FullPageId>> 
toListPair(DataRegionDirtyPages<FullPageId[]>... dirtyPages) {
-        return toListPair(dirtyPageId -> true, dirtyPages);
+    private static List<IgniteBiTuple<PersistentPageMemory, GroupPartitionId>> 
toListDirtyPartitionPair(
+            DirtyPagesAndPartitions... dirtyPagesAndPartitions
+    ) {
+        return Stream.of(dirtyPagesAndPartitions)
+                .flatMap(pages -> 
pages.dirtyPartitions.stream().map(partitionId -> new 
IgniteBiTuple<>(pages.pageMemory, partitionId)))
+                .collect(toList());
+    }
+
+    private static List<IgniteBiTuple<PersistentPageMemory, FullPageId>> 
toListDirtyPagePair(DirtyPagesAndPartitions... dirtyPages) {
+        return toListDirtyPagePair(dirtyPageId -> true, dirtyPages);
     }
 
-    private static List<IgniteBiTuple<PersistentPageMemory, FullPageId>> 
toListPair(
+    private static List<IgniteBiTuple<PersistentPageMemory, FullPageId>> 
toListDirtyPagePair(
             Predicate<FullPageId> predicate,
-            DataRegionDirtyPages<FullPageId[]>... dirtyPages
+            DirtyPagesAndPartitions... dirtyPages
     ) {
         return Stream.of(dirtyPages)
                 .flatMap(pages -> Stream.of(pages.dirtyPages)
@@ -236,7 +195,7 @@ public class CheckpointDirtyPagesTest extends 
BaseIgniteAbstractTest {
                 .collect(toList());
     }
 
-    private static List<IgniteBiTuple<PersistentPageMemory, FullPageId>> 
toListPair(CheckpointDirtyPagesView view) {
+    private static List<IgniteBiTuple<PersistentPageMemory, FullPageId>> 
toListDirtyPagePair(CheckpointDirtyPagesView view) {
         return IntStream.range(0, view.size()).mapToObj(i -> new 
IgniteBiTuple<>(view.pageMemory(), view.get(i))).collect(toList());
     }
 
diff --git 
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManagerTest.java
 
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManagerTest.java
index 76d6dd29b6..d21fcfc89a 100644
--- 
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManagerTest.java
+++ 
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManagerTest.java
@@ -23,6 +23,7 @@ import static 
org.apache.ignite.internal.pagememory.persistence.CheckpointUrgenc
 import static 
org.apache.ignite.internal.pagememory.persistence.CheckpointUrgency.SHOULD_TRIGGER;
 import static 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointManager.checkpointUrgency;
 import static 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointManager.pageIndexesForDeltaFilePageStore;
+import static 
org.apache.ignite.internal.pagememory.persistence.checkpoint.TestCheckpointUtils.createDirtyPagesAndPartitions;
 import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageId;
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
@@ -146,9 +147,9 @@ public class CheckpointManagerTest extends 
BaseIgniteAbstractTest {
         PersistentPageMemory pageMemory0 = mock(PersistentPageMemory.class);
         PersistentPageMemory pageMemory1 = mock(PersistentPageMemory.class);
 
-        CheckpointDirtyPages dirtyPages = new CheckpointDirtyPages(List.of(
-                new DataRegionDirtyPages<>(pageMemory0, dirtyPageArray(0, 0, 
1)),
-                new DataRegionDirtyPages<>(pageMemory1, dirtyPageArray(0, 1, 
2, 3, 4))
+        var dirtyPages = new CheckpointDirtyPages(List.of(
+                createDirtyPagesAndPartitions(pageMemory0, dirtyPageArray(0, 
0, 1)),
+                createDirtyPagesAndPartitions(pageMemory1, dirtyPageArray(0, 
1, 2, 3, 4))
         ));
 
         assertArrayEquals(new int[]{0, 1}, 
pageIndexesForDeltaFilePageStore(dirtyPages.getPartitionView(pageMemory0, 0, 
0)));
@@ -160,9 +161,9 @@ public class CheckpointManagerTest extends 
BaseIgniteAbstractTest {
         PersistentPageMemory pageMemory0 = mock(PersistentPageMemory.class);
         PersistentPageMemory pageMemory1 = mock(PersistentPageMemory.class);
 
-        CheckpointDirtyPages dirtyPages = new CheckpointDirtyPages(List.of(
-                new DataRegionDirtyPages<>(pageMemory0, dirtyPageArray(0, 0, 
0, 1)),
-                new DataRegionDirtyPages<>(pageMemory1, dirtyPageArray(0, 1, 
0, 2, 3, 4))
+        var dirtyPages = new CheckpointDirtyPages(List.of(
+                createDirtyPagesAndPartitions(pageMemory0, dirtyPageArray(0, 
0, 0, 1)),
+                createDirtyPagesAndPartitions(pageMemory1, dirtyPageArray(0, 
1, 0, 2, 3, 4))
         ));
 
         assertArrayEquals(new int[]{0, 1}, 
pageIndexesForDeltaFilePageStore(dirtyPages.getPartitionView(pageMemory0, 0, 
0)));
@@ -209,9 +210,7 @@ public class CheckpointManagerTest extends 
BaseIgniteAbstractTest {
 
         CheckpointProgress checkpointProgress = mock(CheckpointProgress.class);
 
-        CheckpointDirtyPages dirtyPages = new CheckpointDirtyPages(List.of(
-                new DataRegionDirtyPages<>(pageMemory, new 
FullPageId[]{dirtyPageId})
-        ));
+        var dirtyPages = new 
CheckpointDirtyPages(List.of(createDirtyPagesAndPartitions(pageMemory, 
dirtyPageId)));
 
         when(checkpointProgress.inProgress()).thenReturn(true);
 
diff --git 
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriterTest.java
 
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriterTest.java
index 97d5dfdb04..94c305aac6 100644
--- 
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriterTest.java
+++ 
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriterTest.java
@@ -22,6 +22,7 @@ import static 
org.apache.ignite.internal.pagememory.PageIdAllocator.FLAG_AUX;
 import static org.apache.ignite.internal.pagememory.PageIdAllocator.FLAG_DATA;
 import static 
org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory.TRY_AGAIN_TAG;
 import static 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointTestUtils.createPartitionMetaManager;
+import static 
org.apache.ignite.internal.pagememory.persistence.checkpoint.TestCheckpointUtils.createDirtyPagesAndPartitions;
 import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageId;
 import static org.apache.ignite.internal.util.GridUnsafe.allocateBuffer;
 import static org.apache.ignite.internal.util.GridUnsafe.bufferAddress;
@@ -101,9 +102,9 @@ public class CheckpointPagesWriterTest extends 
BaseIgniteAbstractTest {
         FullPageId fullPageId5 = new FullPageId(pageId(0, FLAG_DATA, 5), 0);
         FullPageId fullPageId6 = new FullPageId(pageId(1, FLAG_DATA, 6), 0);
 
-        IgniteConcurrentMultiPairQueue<PersistentPageMemory, FullPageId> 
writePageIds = new IgniteConcurrentMultiPairQueue<>(
-                Map.of(pageMemory, List.of(fullPageId1, fullPageId2, 
fullPageId3, fullPageId4, fullPageId5, fullPageId6))
-        );
+        CheckpointDirtyPages checkpointDirtyPages = new 
CheckpointDirtyPages(List.of(
+                createDirtyPagesAndPartitions(pageMemory, fullPageId1, 
fullPageId2, fullPageId3, fullPageId4, fullPageId5, fullPageId6)
+        ));
 
         GroupPartitionId groupPartId0 = groupPartId(0, 0);
         GroupPartitionId groupPartId1 = groupPartId(0, 1);
@@ -123,13 +124,17 @@ public class CheckpointPagesWriterTest extends 
BaseIgniteAbstractTest {
         CheckpointMetricsTracker tracker = new CheckpointMetricsTracker();
 
         CheckpointProgressImpl progressImpl = new CheckpointProgressImpl(0);
+        progressImpl.pagesToWrite(checkpointDirtyPages);
 
         PartitionMeta partitionMeta0 = mock(PartitionMeta.class);
         PartitionMeta partitionMeta1 = mock(PartitionMeta.class);
 
+        IgniteConcurrentMultiPairQueue<PersistentPageMemory, GroupPartitionId> 
dirtyPartitionQueue
+                = checkpointDirtyPages.toDirtyPartitionQueue();
+
         CheckpointPagesWriter pagesWriter = new CheckpointPagesWriter(
                 tracker,
-                writePageIds,
+                dirtyPartitionQueue,
                 singletonList(pageMemory),
                 updatedPartitions,
                 doneFuture,
@@ -148,7 +153,7 @@ public class CheckpointPagesWriterTest extends 
BaseIgniteAbstractTest {
 
         assertDoesNotThrow(() -> doneFuture.get(1, TimeUnit.SECONDS));
 
-        assertTrue(writePageIds.isEmpty());
+        assertTrue(dirtyPartitionQueue.isEmpty());
 
         assertThat(updatedPartitions.keySet(), 
containsInAnyOrder(groupPartId0, groupPartId1));
 
@@ -162,17 +167,20 @@ public class CheckpointPagesWriterTest extends 
BaseIgniteAbstractTest {
                 writtenFullPageIds.getAllValues(),
                 equalTo(List.of(
                         // At the beginning, we write the partition meta for 
each new partition.
-                        fullPageId(0, 0, 0),
+                        new FullPageId(pageId(0, FLAG_AUX, 0), 0),
                         // Order is different because the first 3 pages we 
have to try to write to the page store 2 times.
                         fullPageId4, fullPageId5,
-                        fullPageId(0, 1, 0),
-                        fullPageId6, fullPageId1, fullPageId2, fullPageId3
+                        // At the beginning, we write the partition meta for 
each new partition.
+                        new FullPageId(pageId(1, FLAG_AUX, 0), 0),
+                        fullPageId6,
+                        // Now the retry pages.
+                        fullPageId1, fullPageId2, fullPageId3
                 ))
         );
 
-        verify(beforePageWrite, times(11)).run();
+        verify(beforePageWrite, times(14)).run();
 
-        verify(threadBuf, times(2)).get();
+        verify(threadBuf, times(1)).get();
 
         verify(partitionMeta0, times(1)).metaSnapshot(any(UUID.class));
         verify(partitionMeta1, times(1)).metaSnapshot(any(UUID.class));
@@ -195,15 +203,22 @@ public class CheckpointPagesWriterTest extends 
BaseIgniteAbstractTest {
 
         GroupPartitionId groupPartId = groupPartId(0, 0);
 
+        CheckpointDirtyPages checkpointDirtyPages = new 
CheckpointDirtyPages(List.of(
+                createDirtyPagesAndPartitions(pageMemory, new 
FullPageId(pageId(0, FLAG_DATA, 1), 0))
+        ));
+
+        CheckpointProgressImpl checkpointProgress = new 
CheckpointProgressImpl(0);
+        checkpointProgress.pagesToWrite(checkpointDirtyPages);
+
         CheckpointPagesWriter pagesWriter = new CheckpointPagesWriter(
                 new CheckpointMetricsTracker(),
-                new IgniteConcurrentMultiPairQueue<>(Map.of(pageMemory, 
List.of(fullPageId(0, 0, 1)))),
+                new IgniteConcurrentMultiPairQueue<>(Map.of(pageMemory, 
List.of(new GroupPartitionId(0, 0)))),
                 singletonList(pageMemory),
                 new ConcurrentHashMap<>(),
                 doneFuture,
                 () -> {},
                 createThreadLocalBuffer(),
-                new CheckpointProgressImpl(0),
+                checkpointProgress,
                 createDirtyPageWriter(null),
                 ioRegistry,
                 createPartitionMetaManager(Map.of(groupPartId, 
mock(PartitionMeta.class))),
@@ -238,23 +253,29 @@ public class CheckpointPagesWriterTest extends 
BaseIgniteAbstractTest {
                         any(CheckpointMetricsTracker.class)
                 );
 
-        IgniteConcurrentMultiPairQueue<PersistentPageMemory, FullPageId> 
writePageIds = new IgniteConcurrentMultiPairQueue<>(
-                Map.of(pageMemory, List.of(fullPageId(0, 0, 1), fullPageId(0, 
0, 2)))
-        );
+        CheckpointDirtyPages checkpointDirtyPages = new 
CheckpointDirtyPages(List.of(
+                createDirtyPagesAndPartitions(pageMemory, fullPageId(0, 0, 1), 
fullPageId(0, 1, 2))
+        ));
 
         GroupPartitionId groupPartId = groupPartId(0, 0);
 
         ConcurrentMap<GroupPartitionId, LongAdder> updatedPartitions = new 
ConcurrentHashMap<>();
 
+        CheckpointProgressImpl checkpointProgress = new 
CheckpointProgressImpl(0);
+        checkpointProgress.pagesToWrite(checkpointDirtyPages);
+
+        IgniteConcurrentMultiPairQueue<PersistentPageMemory, GroupPartitionId> 
dirtyPartitionQueue
+                = checkpointDirtyPages.toDirtyPartitionQueue();
+
         CheckpointPagesWriter pagesWriter = new CheckpointPagesWriter(
                 new CheckpointMetricsTracker(),
-                writePageIds,
+                dirtyPartitionQueue,
                 singletonList(pageMemory),
                 updatedPartitions,
                 doneFuture,
                 () -> {},
                 createThreadLocalBuffer(),
-                new CheckpointProgressImpl(0),
+                checkpointProgress,
                 createDirtyPageWriter(null),
                 ioRegistry,
                 createPartitionMetaManager(Map.of(groupPartId, 
mock(PartitionMeta.class))),
@@ -265,7 +286,7 @@ public class CheckpointPagesWriterTest extends 
BaseIgniteAbstractTest {
 
         assertDoesNotThrow(() -> doneFuture.get(1, TimeUnit.SECONDS));
 
-        assertThat(writePageIds.size(), equalTo(1));
+        assertThat(dirtyPartitionQueue.size(), equalTo(1));
         assertThat(updatedPartitions.keySet(), contains(groupPartId));
     }
 
diff --git 
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointTest.java
 
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointTest.java
index 6f5f6c3fc5..9facbb11de 100644
--- 
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointTest.java
+++ 
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointTest.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.pagememory.persistence.checkpoint;
 
 import static 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointDirtyPages.EMPTY;
+import static 
org.apache.ignite.internal.pagememory.persistence.checkpoint.TestCheckpointUtils.createDirtyPagesAndPartitions;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.Mockito.mock;
@@ -38,11 +39,11 @@ public class CheckpointTest extends BaseIgniteAbstractTest {
 
         assertFalse(new Checkpoint(EMPTY, progress).hasDelta());
 
-        DataRegionDirtyPages<FullPageId[]> biTuple = new 
DataRegionDirtyPages<>(
+        DirtyPagesAndPartitions dirtyPagesAndPartitions = 
createDirtyPagesAndPartitions(
                 mock(PersistentPageMemory.class),
-                new FullPageId[]{new FullPageId(0, 1)}
+                new FullPageId(0, 1)
         );
 
-        assertTrue(new Checkpoint(new CheckpointDirtyPages(List.of(biTuple)), 
progress).hasDelta());
+        assertTrue(new Checkpoint(new 
CheckpointDirtyPages(List.of(dirtyPagesAndPartitions)), progress).hasDelta());
     }
 }
diff --git 
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflowTest.java
 
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflowTest.java
index 7f670a4efa..9e45b41e4b 100644
--- 
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflowTest.java
+++ 
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflowTest.java
@@ -33,6 +33,7 @@ import static 
org.apache.ignite.internal.pagememory.persistence.checkpoint.Check
 import static 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointWorkflowTest.TestCheckpointListener.BEFORE_CHECKPOINT_BEGIN;
 import static 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointWorkflowTest.TestCheckpointListener.ON_CHECKPOINT_BEGIN;
 import static 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointWorkflowTest.TestCheckpointListener.ON_MARK_CHECKPOINT_BEGIN;
+import static 
org.apache.ignite.internal.pagememory.persistence.checkpoint.TestCheckpointUtils.createDirtyPagesAndPartitions;
 import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageId;
 import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync;
@@ -314,7 +315,7 @@ public class CheckpointWorkflowTest extends 
BaseIgniteAbstractTest {
         verify(progressImpl, 
times(1)).pagesToWrite(any(CheckpointDirtyPages.class));
         verify(progressImpl, times(1)).initCounters(anyInt());
 
-        CheckpointDirtyPagesView dirtyPagesView = 
checkpoint.dirtyPages.nextPartitionView(null);
+        CheckpointDirtyPagesView dirtyPagesView = 
checkpoint.dirtyPages.getPartitionView(pageMemory, 0, 0);
 
         assertThat(toListDirtyPageIds(dirtyPagesView), equalTo(dirtyPages));
         assertThat(dirtyPagesView.pageMemory(), 
equalTo(dataRegion.pageMemory()));
@@ -383,7 +384,7 @@ public class CheckpointWorkflowTest extends 
BaseIgniteAbstractTest {
         workflow.addCheckpointListener(checkpointListener, dataRegion);
 
         workflow.markCheckpointEnd(new Checkpoint(
-                new 
CheckpointDirtyPages(List.of(createCheckpointDirtyPages(pageMemory, of(0, 0, 
0)))),
+                new 
CheckpointDirtyPages(List.of(createDirtyPagesAndPartitions(pageMemory, of(0, 0, 
0)))),
                 progressImpl
         ));
 
@@ -408,15 +409,18 @@ public class CheckpointWorkflowTest extends 
BaseIgniteAbstractTest {
 
     @Test
     void testCreateAndSortCheckpointDirtyPages() throws Exception {
+        PersistentPageMemory pageMemory0 = mock(PersistentPageMemory.class);
+        PersistentPageMemory pageMemory1 = mock(PersistentPageMemory.class);
+
         DataRegionDirtyPages<Collection<FullPageId>> dataRegionDirtyPages0 = 
createDataRegionDirtyPages(
-                mock(PersistentPageMemory.class),
+                pageMemory0,
                 of(10, 10, 2), of(10, 10, 1), of(10, 10, 0),
                 of(10, 5, 100), of(10, 5, 99),
                 of(10, 1, 50), of(10, 1, 51), of(10, 1, 99)
         );
 
         DataRegionDirtyPages<Collection<FullPageId>> dataRegionDirtyPages1 = 
createDataRegionDirtyPages(
-                mock(PersistentPageMemory.class),
+                pageMemory1,
                 of(77, 5, 100), of(77, 5, 99),
                 of(88, 1, 51), of(88, 1, 50), of(88, 1, 99),
                 of(66, 33, 0), of(66, 33, 1), of(66, 33, 2)
@@ -435,32 +439,32 @@ public class CheckpointWorkflowTest extends 
BaseIgniteAbstractTest {
                 new DataRegionsDirtyPages(List.of(dataRegionDirtyPages0, 
dataRegionDirtyPages1))
         );
 
-        CheckpointDirtyPagesView dirtyPagesView = 
sortCheckpointDirtyPages.nextPartitionView(null);
+        CheckpointDirtyPagesView dirtyPagesView = 
sortCheckpointDirtyPages.getPartitionView(pageMemory0, 10, 1);
 
         assertThat(toListDirtyPageIds(dirtyPagesView), equalTo(List.of(of(10, 
1, 50), of(10, 1, 51), of(10, 1, 99))));
         assertThat(dirtyPagesView.pageMemory(), 
equalTo(dataRegionDirtyPages0.pageMemory));
 
-        dirtyPagesView = 
sortCheckpointDirtyPages.nextPartitionView(dirtyPagesView);
+        dirtyPagesView = 
sortCheckpointDirtyPages.getPartitionView(pageMemory0, 10, 5);
 
         assertThat(toListDirtyPageIds(dirtyPagesView), equalTo(List.of(of(10, 
5, 99), of(10, 5, 100))));
         assertThat(dirtyPagesView.pageMemory(), 
equalTo(dataRegionDirtyPages0.pageMemory));
 
-        dirtyPagesView = 
sortCheckpointDirtyPages.nextPartitionView(dirtyPagesView);
+        dirtyPagesView = 
sortCheckpointDirtyPages.getPartitionView(pageMemory0, 10, 10);
 
         assertThat(toListDirtyPageIds(dirtyPagesView), equalTo(List.of(of(10, 
10, 0), of(10, 10, 1), of(10, 10, 2))));
         assertThat(dirtyPagesView.pageMemory(), 
equalTo(dataRegionDirtyPages0.pageMemory));
 
-        dirtyPagesView = 
sortCheckpointDirtyPages.nextPartitionView(dirtyPagesView);
+        dirtyPagesView = 
sortCheckpointDirtyPages.getPartitionView(pageMemory1, 66, 33);
 
         assertThat(toListDirtyPageIds(dirtyPagesView), equalTo(List.of(of(66, 
33, 0), of(66, 33, 1), of(66, 33, 2))));
         assertThat(dirtyPagesView.pageMemory(), 
equalTo(dataRegionDirtyPages1.pageMemory));
 
-        dirtyPagesView = 
sortCheckpointDirtyPages.nextPartitionView(dirtyPagesView);
+        dirtyPagesView = 
sortCheckpointDirtyPages.getPartitionView(pageMemory1, 77, 5);
 
         assertThat(toListDirtyPageIds(dirtyPagesView), equalTo(List.of(of(77, 
5, 99), of(77, 5, 100))));
         assertThat(dirtyPagesView.pageMemory(), 
equalTo(dataRegionDirtyPages1.pageMemory));
 
-        dirtyPagesView = 
sortCheckpointDirtyPages.nextPartitionView(dirtyPagesView);
+        dirtyPagesView = 
sortCheckpointDirtyPages.getPartitionView(pageMemory1, 88, 1);
 
         assertThat(toListDirtyPageIds(dirtyPagesView), equalTo(List.of(of(88, 
1, 50), of(88, 1, 51), of(88, 1, 99))));
         assertThat(dirtyPagesView.pageMemory(), 
equalTo(dataRegionDirtyPages1.pageMemory));
@@ -482,16 +486,19 @@ public class CheckpointWorkflowTest extends 
BaseIgniteAbstractTest {
 
         workflow.start();
 
+        PersistentPageMemory pageMemory0 = mock(PersistentPageMemory.class);
+        PersistentPageMemory pageMemory1 = mock(PersistentPageMemory.class);
+
         CheckpointDirtyPages sortCheckpointDirtyPages = 
workflow.createAndSortCheckpointDirtyPages(new DataRegionsDirtyPages(List.of(
-                createDataRegionDirtyPages(mock(PersistentPageMemory.class), 
dirtyPages1),
-                createDataRegionDirtyPages(mock(PersistentPageMemory.class), 
dirtyPages0)
+                createDataRegionDirtyPages(pageMemory1, dirtyPages1),
+                createDataRegionDirtyPages(pageMemory0, dirtyPages0)
         )));
 
-        CheckpointDirtyPagesView dirtyPagesView = 
sortCheckpointDirtyPages.nextPartitionView(null);
+        CheckpointDirtyPagesView dirtyPagesView = 
sortCheckpointDirtyPages.getPartitionView(pageMemory1, 1, 1);
 
         assertThat(toListDirtyPageIds(dirtyPagesView), 
equalTo(List.of(dirtyPages1)));
 
-        dirtyPagesView = 
sortCheckpointDirtyPages.nextPartitionView(dirtyPagesView);
+        dirtyPagesView = 
sortCheckpointDirtyPages.getPartitionView(pageMemory0, 0, 0);
 
         assertThat(
                 toListDirtyPageIds(dirtyPagesView),
@@ -533,7 +540,7 @@ public class CheckpointWorkflowTest extends 
BaseIgniteAbstractTest {
 
         assertEquals(1, checkpoint.dirtyPagesSize);
 
-        CheckpointDirtyPagesView dirtyPagesView = 
checkpoint.dirtyPages.nextPartitionView(null);
+        CheckpointDirtyPagesView dirtyPagesView = 
checkpoint.dirtyPages.getPartitionView(pageMemory, groupId, partitionId);
 
         assertNotNull(dirtyPagesView);
         assertThat(toListDirtyPageIds(dirtyPagesView), 
is(List.of(metaPageId)));
@@ -576,7 +583,7 @@ public class CheckpointWorkflowTest extends 
BaseIgniteAbstractTest {
 
         assertEquals(2, checkpoint.dirtyPagesSize);
 
-        CheckpointDirtyPagesView dirtyPagesView = 
checkpoint.dirtyPages.nextPartitionView(null);
+        CheckpointDirtyPagesView dirtyPagesView = 
checkpoint.dirtyPages.getPartitionView(pageMemory, groupId, partitionId);
 
         assertNotNull(dirtyPagesView);
         assertThat(toListDirtyPageIds(dirtyPagesView), is(List.of(metaPageId, 
dataPageId)));
@@ -663,13 +670,6 @@ public class CheckpointWorkflowTest extends 
BaseIgniteAbstractTest {
         return mock;
     }
 
-    private static DataRegionDirtyPages<FullPageId[]> 
createCheckpointDirtyPages(
-            PersistentPageMemory pageMemory,
-            FullPageId... pageIds
-    ) {
-        return new DataRegionDirtyPages<>(pageMemory, pageIds);
-    }
-
     private static DataRegionDirtyPages<Collection<FullPageId>> 
createDataRegionDirtyPages(
             PersistentPageMemory pageMemory,
             FullPageId... pageIds
diff --git 
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointerTest.java
 
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointerTest.java
index 38506b649d..976174068a 100644
--- 
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointerTest.java
+++ 
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointerTest.java
@@ -22,11 +22,11 @@ import static 
java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static 
org.apache.ignite.internal.pagememory.persistence.FakePartitionMeta.FACTORY;
-import static 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointDirtyPages.DIRTY_PAGE_COMPARATOR;
 import static 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointDirtyPages.EMPTY;
 import static 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.FINISHED;
 import static 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.LOCK_RELEASED;
 import static 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.LOCK_TAKEN;
+import static 
org.apache.ignite.internal.pagememory.persistence.checkpoint.TestCheckpointUtils.createDirtyPagesAndPartitions;
 import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageId;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
@@ -56,7 +56,6 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
-import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
@@ -391,7 +390,7 @@ public class CheckpointerTest extends 
BaseIgniteAbstractTest {
 
         assertDoesNotThrow(checkpointer::doCheckpoint);
 
-        verify(dirtyPages, times(1)).toDirtyPageIdQueue();
+        verify(dirtyPages, times(1)).toDirtyPartitionQueue();
         verify(checkpointer, times(1)).startCheckpointProgress();
         verify(compactor, times(1)).triggerCompaction();
         verify(mockLogSyncer, times(1)).sync();
@@ -423,7 +422,7 @@ public class CheckpointerTest extends 
BaseIgniteAbstractTest {
 
         assertDoesNotThrow(checkpointer::doCheckpoint);
 
-        verify(dirtyPages, never()).toDirtyPageIdQueue();
+        verify(dirtyPages, never()).toDirtyPartitionQueue();
         verify(checkpointer, times(1)).startCheckpointProgress();
         verify(compactor, never()).triggerCompaction();
 
@@ -518,10 +517,8 @@ public class CheckpointerTest extends 
BaseIgniteAbstractTest {
         onPartitionDestructionFuture.get(1, SECONDS);
     }
 
-    private CheckpointDirtyPages dirtyPages(PersistentPageMemory pageMemory, 
FullPageId... pageIds) {
-        Arrays.sort(pageIds, DIRTY_PAGE_COMPARATOR);
-
-        return new CheckpointDirtyPages(List.of(new 
DataRegionDirtyPages<>(pageMemory, pageIds)));
+    private static CheckpointDirtyPages dirtyPages(PersistentPageMemory 
pageMemory, FullPageId... pageIds) {
+        return new 
CheckpointDirtyPages(List.of(createDirtyPagesAndPartitions(pageMemory, 
pageIds)));
     }
 
     private CheckpointWorkflow createCheckpointWorkflow(CheckpointDirtyPages 
dirtyPages) throws Exception {
diff --git 
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointTest.java
 
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/TestCheckpointUtils.java
similarity index 54%
copy from 
modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointTest.java
copy to 
modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/TestCheckpointUtils.java
index 6f5f6c3fc5..217e19b01f 100644
--- 
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointTest.java
+++ 
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/TestCheckpointUtils.java
@@ -17,32 +17,24 @@
 
 package org.apache.ignite.internal.pagememory.persistence.checkpoint;
 
-import static 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointDirtyPages.EMPTY;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.Mockito.mock;
+import static java.util.stream.Collectors.toSet;
+import static 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointDirtyPages.DIRTY_PAGE_COMPARATOR;
 
-import java.util.List;
+import java.util.Arrays;
 import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.pagememory.persistence.GroupPartitionId;
 import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
-import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
-import org.junit.jupiter.api.Test;
 
-/**
- * For {@link Checkpoint} testing.
- */
-public class CheckpointTest extends BaseIgniteAbstractTest {
-    @Test
-    void testHasDelta() {
-        CheckpointProgressImpl progress = mock(CheckpointProgressImpl.class);
-
-        assertFalse(new Checkpoint(EMPTY, progress).hasDelta());
+/** Helper class for checkpoint testing that may contain useful methods and 
constants. */
+class TestCheckpointUtils {
+    /** Sorts dirty pages and creates a new instance {@link 
DirtyPagesAndPartitions}. */
+    static DirtyPagesAndPartitions 
createDirtyPagesAndPartitions(PersistentPageMemory pageMemory, FullPageId... 
dirtyPages) {
+        Arrays.sort(dirtyPages, DIRTY_PAGE_COMPARATOR);
 
-        DataRegionDirtyPages<FullPageId[]> biTuple = new 
DataRegionDirtyPages<>(
-                mock(PersistentPageMemory.class),
-                new FullPageId[]{new FullPageId(0, 1)}
+        return new DirtyPagesAndPartitions(
+                pageMemory,
+                dirtyPages,
+                
Arrays.stream(dirtyPages).map(GroupPartitionId::convert).collect(toSet())
         );
-
-        assertTrue(new Checkpoint(new CheckpointDirtyPages(List.of(biTuple)), 
progress).hasDelta());
     }
 }

Reply via email to