This is an automated email from the ASF dual-hosted git repository.

tkalkirill pushed a commit to branch ignite-26233
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit ba86ff21a7415ac4100607d644d1bed6a39c7411
Author: Kirill Tkalenko <tkalkir...@yandex.ru>
AuthorDate: Tue Aug 19 10:40:37 2025 +0300

    IGNITE-26233 wip
---
 .../pagememory/persistence/PageHeader.java         |   2 +-
 .../persistence/PersistentPageMemory.java          |  75 +++++++++-----
 .../persistence/checkpoint/CheckpointManager.java  |   9 ++
 .../persistence/checkpoint/CheckpointPages.java    |  21 ++--
 .../persistence/checkpoint/Checkpointer.java       |  10 +-
 .../RandomLruPageReplacementPolicy.java            |   9 +-
 .../checkpoint/CheckpointPagesTest.java            |  51 ++++++----
 .../PersistentPageMemoryMvTableStorageTest.java    | 113 +++++++++++++++++++++
 8 files changed, 232 insertions(+), 58 deletions(-)

diff --git 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageHeader.java
 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageHeader.java
index a58d5104c4e..72546f21513 100644
--- 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageHeader.java
+++ 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageHeader.java
@@ -73,7 +73,7 @@ public class PageHeader {
     private static final int FLAGS_OFFSET = 12;
 
     /** Unknown partition generation. */
-    static final int UNKNOWN_PARTITION_GENERATION = -1;
+    public static final int UNKNOWN_PARTITION_GENERATION = -1;
 
     /**
      * Initializes the header of the page.
diff --git 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java
 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java
index 3ba17e416af..330485b53b3 100644
--- 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java
+++ 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java
@@ -27,10 +27,12 @@ import static 
org.apache.ignite.internal.pagememory.io.PageIo.setPageId;
 import static 
org.apache.ignite.internal.pagememory.persistence.CheckpointUrgency.MUST_TRIGGER;
 import static 
org.apache.ignite.internal.pagememory.persistence.CheckpointUrgency.NOT_REQUIRED;
 import static 
org.apache.ignite.internal.pagememory.persistence.CheckpointUrgency.SHOULD_TRIGGER;
+import static 
org.apache.ignite.internal.pagememory.persistence.PageHeader.UNKNOWN_PARTITION_GENERATION;
 import static 
org.apache.ignite.internal.pagememory.persistence.PageHeader.dirty;
 import static 
org.apache.ignite.internal.pagememory.persistence.PageHeader.fullPageId;
 import static 
org.apache.ignite.internal.pagememory.persistence.PageHeader.isAcquired;
 import static 
org.apache.ignite.internal.pagememory.persistence.PageHeader.readPageId;
+import static 
org.apache.ignite.internal.pagememory.persistence.PageHeader.readPartitionGeneration;
 import static 
org.apache.ignite.internal.pagememory.persistence.PageHeader.tempBufferPointer;
 import static 
org.apache.ignite.internal.pagememory.persistence.PageHeader.writePartitionGeneration;
 import static 
org.apache.ignite.internal.pagememory.persistence.PageHeader.writeTimestamp;
@@ -1119,7 +1121,7 @@ public class PersistentPageMemory implements PageMemory {
         writeTimestamp(absPtr, coarseCurrentTimeMillis());
 
         // Create a buffer copy if the page is scheduled for a checkpoint.
-        if (isInCheckpoint(fullId) && tempBufferPointer(absPtr) == 
INVALID_REL_PTR) {
+        if (isInCheckpoint(fullId, absPtr) && tempBufferPointer(absPtr) == 
INVALID_REL_PTR) {
             long tmpRelPtr;
 
             PagePool checkpointPool = this.checkpointPool;
@@ -1197,7 +1199,8 @@ public class PersistentPageMemory implements PageMemory {
 
                 PagesWriteThrottlePolicy writeThrottle = this.writeThrottle;
                 if (writeThrottle != null && !restore && !wasDirty && 
markDirty) {
-                    writeThrottle.onMarkDirty(isInCheckpoint(fullId));
+                    // TODO: IGNITE-26233 Починить нормально!
+                    writeThrottle.onMarkDirty(isInCheckpoint(fullId, page));
                 }
             } catch (AssertionError ex) {
                 LOG.debug("Failed to unlock page [fullPageId={}, binPage={}]", 
fullId, toHexString(page, systemPageSize()));
@@ -1271,13 +1274,17 @@ public class PersistentPageMemory implements PageMemory 
{
         boolean wasDirty = dirty(absPtr, dirty);
 
         if (dirty) {
-            assert checkpointTimeoutLock.checkpointLockIsHeldByThread();
-            assert pageIndex(pageId.pageId()) != 0 : "Partition meta should 
only be updated via the instance of PartitionMeta.";
+            assert checkpointTimeoutLock.checkpointLockIsHeldByThread() : 
pageId;
+            assert pageIndex(pageId.pageId()) != 0 : "Partition meta should 
only be updated via the instance of PartitionMeta: " + pageId;
 
             if (!wasDirty || forceAdd) {
                 Segment seg = segment(pageId.groupId(), pageId.pageId());
 
-                if (seg.dirtyPages.add(pageId)) {
+                int partitionGeneration = readPartitionGeneration(absPtr);
+
+                assert partitionGeneration != UNKNOWN_PARTITION_GENERATION : 
pageId;
+
+                if (seg.dirtyPages.putIfAbsent(pageId, partitionGeneration) == 
null) {
                     long dirtyPagesCnt = seg.dirtyPagesCntr.incrementAndGet();
 
                     if (dirtyPagesCnt >= seg.dirtyPagesSoftThreshold) {
@@ -1296,7 +1303,7 @@ public class PersistentPageMemory implements PageMemory {
         } else {
             Segment seg = segment(pageId.groupId(), pageId.pageId());
 
-            if (seg.dirtyPages.remove(pageId)) {
+            if (seg.dirtyPages.remove(pageId) != null) {
                 seg.dirtyPagesCntr.decrementAndGet();
             }
         }
@@ -1337,7 +1344,7 @@ public class PersistentPageMemory implements PageMemory {
         Set<FullPageId> res = new HashSet<>((int) loadedPages());
 
         for (Segment seg : segments) {
-            res.addAll(seg.dirtyPages);
+            res.addAll(seg.dirtyPages.keySet());
         }
 
         return res;
@@ -1393,7 +1400,8 @@ public class PersistentPageMemory implements PageMemory {
         private long memPerRepl;
 
         /** Pages marked as dirty since the last checkpoint. */
-        private volatile Set<FullPageId> dirtyPages = 
ConcurrentHashMap.newKeySet();
+        // TODO: IGNITE-26233 Улучшить документацию
+        private volatile Map<FullPageId, Integer> dirtyPages = new 
ConcurrentHashMap<>();
 
         /** Atomic size counter for {@link #dirtyPages}. */
         private final AtomicLong dirtyPagesCntr = new AtomicLong();
@@ -1530,7 +1538,7 @@ public class PersistentPageMemory implements PageMemory {
          * Clear dirty pages collection and reset counter.
          */
         private void resetDirtyPages() {
-            dirtyPages = ConcurrentHashMap.newKeySet();
+            dirtyPages = new ConcurrentHashMap<>();
 
             dirtyPagesCntr.set(0);
         }
@@ -1577,7 +1585,12 @@ public class PersistentPageMemory implements PageMemory {
                 CheckpointPages checkpointPages = this.checkpointPages;
                 // Can replace a dirty page only if it should be written by a 
checkpoint.
                 // Safe to invoke because we keep segment write lock and the 
checkpoint writer must remove pages on the segment read lock.
-                if (checkpointPages != null && 
checkpointPages.removeOnPageReplacement(fullPageId)) {
+                // TODO: IGNITE-26233 Исправить документацию или еще что нужно
+                Integer partitionGeneration = readPartitionGeneration(absPtr);
+
+                assert partitionGeneration != UNKNOWN_PARTITION_GENERATION : 
fullPageId;
+
+                if (checkpointPages != null && 
partitionGeneration.equals(checkpointPages.removeOnPageReplacement(fullPageId)))
 {
                     checkpointPages.blockFsyncOnPageReplacement(fullPageId);
 
                     DelayedDirtyPageWrite delayedDirtyPageWrite = 
delayedPageReplacementTracker.delayedPageWrite();
@@ -1884,12 +1897,21 @@ public class PersistentPageMemory implements PageMemory 
{
      *
      * @param pageId Page ID to check if it was added to the checkpoint list.
      */
-    boolean isInCheckpoint(FullPageId pageId) {
+    // TODO: IGNITE-26233 Исправить документацию или еще что нужно
+    private boolean isInCheckpoint(FullPageId pageId, long absPtr) {
         Segment seg = segment(pageId.groupId(), pageId.pageId());
 
         CheckpointPages pages0 = seg.checkpointPages;
 
-        return pages0 != null && pages0.contains(pageId);
+        if (pages0 == null) {
+            return false;
+        }
+
+        Integer partitionGeneration = readPartitionGeneration(absPtr);
+
+        assert partitionGeneration != UNKNOWN_PARTITION_GENERATION : pageId;
+
+        return partitionGeneration.equals(pages0.contains(pageId));
     }
 
     /**
@@ -1897,14 +1919,19 @@ public class PersistentPageMemory implements PageMemory 
{
      *
      * @param fullPageId Page ID to remove.
      */
-    private boolean removeOnCheckpoint(FullPageId fullPageId) {
+    // TODO: IGNITE-26233 Исправить документацию или еще что нужно
+    private boolean removeOnCheckpoint(FullPageId fullPageId, long absPtr) {
         Segment seg = segment(fullPageId.groupId(), fullPageId.pageId());
 
         CheckpointPages pages0 = seg.checkpointPages;
 
         assert pages0 != null : fullPageId;
 
-        return pages0.removeOnCheckpoint(fullPageId);
+        Integer partitionGeneration = readPartitionGeneration(absPtr);
+
+        assert partitionGeneration != UNKNOWN_PARTITION_GENERATION : 
fullPageId;
+
+        return 
partitionGeneration.equals(pages0.removeOnCheckpoint(fullPageId));
     }
 
     /**
@@ -1932,7 +1959,7 @@ public class PersistentPageMemory implements PageMemory {
             boolean useTryWriteLockOnPage
     ) throws IgniteInternalCheckedException {
         assert absPtr != 0 : hexLong(fullId.pageId());
-        assert isAcquired(absPtr) || !isInCheckpoint(fullId) : 
hexLong(fullId.pageId());
+        assert isAcquired(absPtr) || !isInCheckpoint(fullId, absPtr) : 
hexLong(fullId.pageId());
 
         if (useTryWriteLockOnPage) {
             if (!rwLock.tryWriteLock(absPtr + PAGE_LOCK_OFFSET, 
TAG_LOCK_ALWAYS)) {
@@ -1944,7 +1971,7 @@ public class PersistentPageMemory implements PageMemory {
 
                 buf.clear();
 
-                if (isInCheckpoint(fullId)) {
+                if (isInCheckpoint(fullId, absPtr)) {
                     pageStoreWriter.writePage(fullId, buf, TRY_AGAIN_TAG);
                 }
 
@@ -1956,7 +1983,7 @@ public class PersistentPageMemory implements PageMemory {
             assert locked : hexLong(fullId.pageId());
         }
 
-        if (!removeOnCheckpoint(fullId)) {
+        if (!removeOnCheckpoint(fullId, absPtr)) {
             rwLock.writeUnlock(absPtr + PAGE_LOCK_OFFSET, TAG_LOCK_ALWAYS);
 
             if (!pageSingleAcquire) {
@@ -2055,9 +2082,10 @@ public class PersistentPageMemory implements PageMemory {
         seg.readLock().lock();
 
         try {
-            if (!isInCheckpoint(fullId)) {
-                return;
-            }
+            // TODO: IGNITE-26233 Может тут по другому нужно будет
+            // if (!isInCheckpoint(fullId, absPtr)) {
+            //     return;
+            // }
 
             relPtr = resolveRelativePointer(seg, fullId, partitionGeneration = 
generationTag(seg, fullId));
 
@@ -2142,7 +2170,7 @@ public class PersistentPageMemory implements PageMemory {
                 continue;
             }
 
-            if (!isInCheckpoint(fullPageId)) {
+            if (!isInCheckpoint(fullPageId, freePageAbsPtr)) {
                 continue;
             }
 
@@ -2176,8 +2204,9 @@ public class PersistentPageMemory implements PageMemory {
                     dataRegionConfiguration.name(), i
             );
 
-            Set<FullPageId> segmentDirtyPages = segment.dirtyPages;
-            dirtyPageIds[i] = segmentDirtyPages;
+            // TODO: IGNITE-26233 Вот тут еще поправить нужно будет
+            Map<FullPageId, Integer> segmentDirtyPages = segment.dirtyPages;
+            dirtyPageIds[i] = segmentDirtyPages.keySet();
 
             segment.checkpointPages = new CheckpointPages(segmentDirtyPages, 
checkpointProgress);
 
diff --git 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java
 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java
index 52ce770185b..ece5a2ef413 100644
--- 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java
+++ 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java
@@ -45,6 +45,7 @@ import 
org.apache.ignite.internal.pagememory.persistence.store.DeltaFilePageStor
 import org.apache.ignite.internal.pagememory.persistence.store.FilePageStore;
 import 
org.apache.ignite.internal.pagememory.persistence.store.FilePageStoreManager;
 import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.TestOnly;
 
 /**
  * Main class to abstract checkpoint-related processes and actions and hide 
them from higher-level components.
@@ -377,4 +378,12 @@ public class CheckpointManager {
                 compactor.prepareToDestroyPartition(groupPartitionId)
         );
     }
+
+
+    /** Returns compactor. */
+    @TestOnly
+    // TODO: IGNITE-25861 Maybe get rid of it
+    public Compactor compactor() {
+        return compactor;
+    }
 }
diff --git 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPages.java
 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPages.java
index f6897a03b82..1a628ca870f 100644
--- 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPages.java
+++ 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPages.java
@@ -20,14 +20,12 @@ package 
org.apache.ignite.internal.pagememory.persistence.checkpoint;
 import static 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.PAGES_SORTED;
 import static org.apache.ignite.internal.util.IgniteUtils.getUninterruptibly;
 
-import java.nio.ByteBuffer;
-import java.util.Set;
+import java.util.Map;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.ExecutionException;
 import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
 import org.apache.ignite.internal.pagememory.FullPageId;
 import org.apache.ignite.internal.pagememory.persistence.GroupPartitionId;
-import org.apache.ignite.internal.pagememory.persistence.PageStoreWriter;
 import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
 import org.apache.ignite.internal.pagememory.persistence.store.FilePageStore;
 import 
org.apache.ignite.internal.pagememory.persistence.store.FilePageStoreManager;
@@ -39,11 +37,12 @@ import org.jetbrains.annotations.Nullable;
  *
  * <p>For correct parallel operation of the checkpoint writer and page 
replacement, external synchronization must be used.</p>
  *
- * @see PersistentPageMemory#checkpointWritePage(FullPageId, ByteBuffer, 
PageStoreWriter, CheckpointMetricsTracker)
- * @see PersistentPageMemory.Segment#tryToRemovePage(FullPageId, long)
+ * @see PersistentPageMemory#checkpointWritePage
+ * @see PersistentPageMemory.Segment#tryToRemovePage
  */
+// TODO: IGNITE-26233 Думаю что тут надо будет разное поменять и доки тоже и 
тесты
 public class CheckpointPages {
-    private final Set<FullPageId> pageIds;
+    private final Map<FullPageId, Integer> pageIds;
 
     private final CheckpointProgressImpl checkpointProgress;
 
@@ -53,7 +52,7 @@ public class CheckpointPages {
      * @param pageIds Dirty page IDs in the segment that should be written at 
a checkpoint or page replacement.
      * @param checkpointProgress Progress of the current checkpoint at which 
the object was created.
      */
-    public CheckpointPages(Set<FullPageId> pageIds, CheckpointProgress 
checkpointProgress) {
+    public CheckpointPages(Map<FullPageId, Integer> pageIds, 
CheckpointProgress checkpointProgress) {
         this.pageIds = pageIds;
         this.checkpointProgress = (CheckpointProgressImpl) checkpointProgress;
     }
@@ -76,7 +75,7 @@ public class CheckpointPages {
      * @see #blockFsyncOnPageReplacement(FullPageId)
      * @see #unblockFsyncOnPageReplacement(FullPageId, Throwable)
      */
-    public boolean removeOnPageReplacement(FullPageId pageId) throws 
IgniteInternalCheckedException {
+    public @Nullable Integer removeOnPageReplacement(FullPageId pageId) throws 
IgniteInternalCheckedException {
         try {
             // Uninterruptibly is important because otherwise in case of 
interrupt of client thread node would be stopped.
             getUninterruptibly(checkpointProgress.futureFor(PAGES_SORTED));
@@ -100,7 +99,7 @@ public class CheckpointPages {
      *      removes or did not exist.
      * @see #removeOnPageReplacement(FullPageId)
      */
-    public boolean removeOnCheckpoint(FullPageId pageId) {
+    public @Nullable Integer removeOnCheckpoint(FullPageId pageId) {
         return pageIds.remove(pageId);
     }
 
@@ -109,8 +108,8 @@ public class CheckpointPages {
      *
      * @param pageId Page ID for checking.
      */
-    public boolean contains(FullPageId pageId) {
-        return pageIds.contains(pageId);
+    public @Nullable Integer contains(FullPageId pageId) {
+        return pageIds.get(pageId);
     }
 
     /** Returns the current size of all pages that will be written at a 
checkpoint or page replacement. */
diff --git 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
index c9d17551819..0f66f76c1de 100644
--- 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
+++ 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
@@ -854,7 +854,10 @@ public class Checkpointer extends IgniteWorker {
         try {
             CompletableFuture<DeltaFilePageStoreIo> deltaFilePageStoreFuture = 
filePageStore.getNewDeltaFile();
 
-            assert deltaFilePageStoreFuture != null;
+            // TODO: IGNITE-26233 Может тут по другому нужно будет
+            if (deltaFilePageStoreFuture == null) {
+                return;
+            }
 
             deltaFilePageStoreFuture.join().sync();
         } finally {
@@ -873,7 +876,10 @@ public class Checkpointer extends IgniteWorker {
         try {
             CompletableFuture<DeltaFilePageStoreIo> deltaFilePageStoreFuture = 
filePageStore.getNewDeltaFile();
 
-            assert deltaFilePageStoreFuture != null;
+            // TODO: IGNITE-26233 Может тут по другому нужно будет
+            if (deltaFilePageStoreFuture == null) {
+                return;
+            }
 
             DeltaFilePageStoreIo deltaFilePageStoreIo = 
deltaFilePageStoreFuture.join();
 
diff --git 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/RandomLruPageReplacementPolicy.java
 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/RandomLruPageReplacementPolicy.java
index ae019a32ba0..e6fc8a9a8e7 100644
--- 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/RandomLruPageReplacementPolicy.java
+++ 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/RandomLruPageReplacementPolicy.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.pagememory.persistence.replacement;
 
 import static 
org.apache.ignite.internal.pagememory.persistence.PageHeader.fullPageId;
+import static 
org.apache.ignite.internal.pagememory.persistence.PageHeader.readPartitionGeneration;
 import static 
org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory.INVALID_REL_PTR;
 import static 
org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory.PAGE_OVERHEAD;
 import static 
org.apache.ignite.internal.pagememory.util.PageIdUtils.partitionId;
@@ -119,7 +120,13 @@ public class RandomLruPageReplacementPolicy extends 
PageReplacementPolicy {
 
                 CheckpointPages checkpointPages = seg.checkpointPages();
 
-                if (relRmvAddr == rndAddr || pinned || skip || (dirty && 
(checkpointPages == null || !checkpointPages.contains(fullId)))) {
+                // TODO: IGNITE-26233 Исправить документацию или еще что нужно
+                Integer partitionGeneration = 
readPartitionGeneration(absPageAddr);
+
+                assert partitionGeneration != 
PageHeader.UNKNOWN_PARTITION_GENERATION : partitionGeneration;
+
+                if (relRmvAddr == rndAddr || pinned || skip || (dirty && 
(checkpointPages == null || !partitionGeneration.equals(
+                        checkpointPages.contains(fullId))))) {
                     i--;
 
                     continue;
diff --git 
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesTest.java
 
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesTest.java
index 38b742d58f5..61d5875da8f 100644
--- 
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesTest.java
+++ 
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesTest.java
@@ -27,25 +27,33 @@ import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutur
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 import org.apache.ignite.internal.pagememory.FullPageId;
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 
 /** For {@link CheckpointPages} testing. */
 public class CheckpointPagesTest {
     @Test
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-26233";)
     void testContains() {
+        // TODO: IGNITE-26233 Починить!
         CheckpointPages checkpointPages = createCheckpointPages(new 
FullPageId(0, 0), new FullPageId(1, 0));
 
-        assertTrue(checkpointPages.contains(new FullPageId(0, 0)));
-        assertTrue(checkpointPages.contains(new FullPageId(1, 0)));
+        assertNull(checkpointPages.contains(new FullPageId(0, 0)));
+        assertNull(checkpointPages.contains(new FullPageId(1, 0)));
 
-        assertFalse(checkpointPages.contains(new FullPageId(2, 0)));
-        assertFalse(checkpointPages.contains(new FullPageId(3, 0)));
+        assertNull(checkpointPages.contains(new FullPageId(2, 0)));
+        assertNull(checkpointPages.contains(new FullPageId(3, 0)));
     }
 
     @Test
@@ -56,24 +64,28 @@ public class CheckpointPagesTest {
     }
 
     @Test
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-26233";)
     void testRemoveOnCheckpoint() {
+        // TODO: IGNITE-26233 Починить!
         CheckpointPages checkpointPages = createCheckpointPages(fullPageId(0, 
0), fullPageId(1, 0), fullPageId(2, 0));
 
-        assertTrue(checkpointPages.removeOnCheckpoint(fullPageId(0, 0)));
-        assertFalse(checkpointPages.contains(new FullPageId(0, 0)));
+        assertNull(checkpointPages.removeOnCheckpoint(fullPageId(0, 0)));
+        assertNull(checkpointPages.contains(new FullPageId(0, 0)));
         assertEquals(2, checkpointPages.size());
 
-        assertFalse(checkpointPages.removeOnCheckpoint(fullPageId(0, 0)));
-        assertFalse(checkpointPages.contains(new FullPageId(0, 0)));
+        assertNull(checkpointPages.removeOnCheckpoint(fullPageId(0, 0)));
+        assertNull(checkpointPages.contains(new FullPageId(0, 0)));
         assertEquals(2, checkpointPages.size());
 
-        assertTrue(checkpointPages.removeOnCheckpoint(fullPageId(1, 0)));
-        assertFalse(checkpointPages.contains(new FullPageId(0, 0)));
+        assertNull(checkpointPages.removeOnCheckpoint(fullPageId(1, 0)));
+        assertNull(checkpointPages.contains(new FullPageId(0, 0)));
         assertEquals(1, checkpointPages.size());
     }
 
     @Test
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-26233";)
     void testRemoveOnPageReplacement() throws Exception {
+        // TODO: IGNITE-26233 Починить!
         var checkpointProgress = new CheckpointProgressImpl(10);
 
         CheckpointPages checkpointPages = 
createCheckpointPages(checkpointProgress, fullPageId(0, 0), fullPageId(1, 0));
@@ -81,22 +93,22 @@ public class CheckpointPagesTest {
         // Let's make sure that the check will not complete until the dirty 
page sorting phase completes.
         checkpointProgress.transitTo(LOCK_RELEASED);
 
-        CompletableFuture<Boolean> removeOnPageReplacementFuture = runAsync(
+        CompletableFuture<?> removeOnPageReplacementFuture = runAsync(
                 () -> checkpointPages.removeOnPageReplacement(fullPageId(0, 0))
         );
         assertThat(removeOnPageReplacementFuture, willTimeoutFast());
 
         checkpointProgress.transitTo(PAGES_SORTED);
         assertThat(removeOnPageReplacementFuture, willBe(true));
-        assertFalse(checkpointPages.contains(fullPageId(0, 0)));
+        assertNull(checkpointPages.contains(fullPageId(0, 0)));
         assertEquals(1, checkpointPages.size());
 
-        assertFalse(checkpointPages.removeOnPageReplacement(fullPageId(0, 0)));
-        assertFalse(checkpointPages.contains(fullPageId(0, 0)));
+        assertNull(checkpointPages.removeOnPageReplacement(fullPageId(0, 0)));
+        assertNull(checkpointPages.contains(fullPageId(0, 0)));
         assertEquals(1, checkpointPages.size());
 
-        assertTrue(checkpointPages.removeOnPageReplacement(fullPageId(1, 0)));
-        assertFalse(checkpointPages.contains(fullPageId(1, 0)));
+        assertNull(checkpointPages.removeOnPageReplacement(fullPageId(1, 0)));
+        assertNull(checkpointPages.contains(fullPageId(1, 0)));
         assertEquals(0, checkpointPages.size());
     }
 
@@ -124,10 +136,9 @@ public class CheckpointPagesTest {
     }
 
     private static CheckpointPages 
createCheckpointPages(CheckpointProgressImpl checkpointProgress, FullPageId... 
pageIds) {
-        var set = new HashSet<FullPageId>(pageIds.length);
+        Map<FullPageId, Integer> collect = 
Arrays.stream(pageIds).collect(Collectors.toMap(Function.identity(), 
FullPageId::groupId));
 
-        Collections.addAll(set, pageIds);
-
-        return new CheckpointPages(set, checkpointProgress);
+        // TODO: IGNITE-26233 вот тут надо будет починить
+        return new CheckpointPages(collect, checkpointProgress);
     }
 }
diff --git 
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryMvTableStorageTest.java
 
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryMvTableStorageTest.java
index a1a6032572e..aa322ff2d39 100644
--- 
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryMvTableStorageTest.java
+++ 
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryMvTableStorageTest.java
@@ -23,6 +23,7 @@ import static 
org.apache.ignite.internal.pagememory.persistence.checkpoint.Check
 import static 
org.apache.ignite.internal.storage.pagememory.PersistentPageMemoryStorageEngine.ENGINE_NAME;
 import static org.apache.ignite.internal.testframework.IgniteTestUtils.runRace;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.allOf;
 import static org.hamcrest.Matchers.equalTo;
@@ -33,6 +34,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.mockito.Mockito.mock;
 
 import java.nio.file.Path;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import org.apache.ignite.internal.components.LogSyncer;
 import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
@@ -49,6 +51,7 @@ import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.storage.configurations.StorageConfiguration;
 import 
org.apache.ignite.internal.storage.configurations.StorageProfileConfiguration;
+import org.apache.ignite.internal.storage.engine.MvPartitionMeta;
 import org.apache.ignite.internal.storage.engine.MvTableStorage;
 import org.apache.ignite.internal.storage.engine.StorageTableDescriptor;
 import 
org.apache.ignite.internal.storage.pagememory.configuration.schema.PersistentPageMemoryProfileConfiguration;
@@ -60,6 +63,7 @@ import 
org.apache.ignite.internal.testframework.WorkDirectoryExtension;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.junit.jupiter.params.ParameterizedTest;
@@ -267,4 +271,113 @@ public class PersistentPageMemoryMvTableStorageTest 
extends AbstractMvTableStora
             return null;
         });
     }
+
+    @Test
+    void createMvPartitionStorageAndDoCheckpointInParallel() throws Exception {
+        // TODO: IGNITE-26233 на момент запуск обнаружил что мы можем встрять 
навсега, может быть надо join() заменить или интераптить
+        //  нитку чекпоинтера, в общем решить надо будет
+
+        // TODO: IGNITE-26233 Добавить тест на удаление партиции сразу после 
сортировки страниц
+
+        stopCompactor();
+
+        for (int i = 0; i < 10; i++) {
+            runRace(
+                    () -> getOrCreateMvPartition(PARTITION_ID),
+                    () -> assertThat(forceCheckpointAsync(), 
willCompleteSuccessfully())
+            );
+
+            assertThat(tableStorage.destroyPartition(PARTITION_ID), 
willCompleteSuccessfully());
+        }
+    }
+
+    @Test
+    void clearMvPartitionStorageAndDoCheckpointInParallel() throws Exception {
+        stopCompactor();
+
+        for (int i = 0; i < 10; i++) {
+            getOrCreateMvPartition(PARTITION_ID);
+
+            runRace(
+                    () -> 
assertThat(tableStorage.clearPartition(PARTITION_ID), 
willCompleteSuccessfully()),
+                    () -> assertThat(forceCheckpointAsync(), 
willCompleteSuccessfully())
+            );
+
+            assertThat(tableStorage.destroyPartition(PARTITION_ID), 
willCompleteSuccessfully());
+        }
+    }
+
+    @Test
+    void destroyMvPartitionStorageAndDoCheckpointInParallel() throws Exception 
{
+        stopCompactor();
+
+        for (int i = 0; i < 10; i++) {
+            getOrCreateMvPartition(PARTITION_ID);
+
+            runRace(
+                    () -> 
assertThat(tableStorage.destroyPartition(PARTITION_ID), 
willCompleteSuccessfully()),
+                    () -> assertThat(forceCheckpointAsync(), 
willCompleteSuccessfully())
+            );
+        }
+    }
+
+    @Test
+    void startRebalancePartitionAndDoCheckpointInParallel() throws Exception {
+        stopCompactor();
+
+        getOrCreateMvPartition(PARTITION_ID);
+
+        for (int i = 0; i < 10; i++) {
+            runRace(
+                    () -> 
assertThat(tableStorage.startRebalancePartition(PARTITION_ID), 
willCompleteSuccessfully()),
+                    () -> assertThat(forceCheckpointAsync(), 
willCompleteSuccessfully())
+            );
+
+            assertThat(tableStorage.abortRebalancePartition(PARTITION_ID), 
willCompleteSuccessfully());
+        }
+    }
+
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-26233";)
+    @Test
+    void abortRebalancePartitionAndDoCheckpointInParallel() throws Exception {
+        stopCompactor();
+
+        getOrCreateMvPartition(PARTITION_ID);
+
+        for (int i = 0; i < 10; i++) {
+            assertThat(tableStorage.startRebalancePartition(PARTITION_ID), 
willCompleteSuccessfully());
+
+            runRace(
+                    () -> 
assertThat(tableStorage.abortRebalancePartition(PARTITION_ID), 
willCompleteSuccessfully()),
+                    () -> assertThat(forceCheckpointAsync(), 
willCompleteSuccessfully())
+            );
+        }
+    }
+
+    @Test
+    void finishRebalancePartitionAndDoCheckpointInParallel() throws Exception {
+        stopCompactor();
+
+        getOrCreateMvPartition(PARTITION_ID);
+
+        for (int i = 0; i < 10; i++) {
+            assertThat(tableStorage.startRebalancePartition(PARTITION_ID), 
willCompleteSuccessfully());
+
+            var meta = new MvPartitionMeta(1, 1, BYTE_EMPTY_ARRAY, null, 
BYTE_EMPTY_ARRAY);
+
+            runRace(
+                    () -> 
assertThat(tableStorage.finishRebalancePartition(PARTITION_ID, meta), 
willCompleteSuccessfully()),
+                    () -> assertThat(forceCheckpointAsync(), 
willCompleteSuccessfully())
+            );
+        }
+    }
+
+    private CompletableFuture<Void> forceCheckpointAsync() {
+        return 
engine.checkpointManager().forceCheckpoint("test").futureFor(FINISHED);
+    }
+
+    // TODO: IGNITE-25861 Get rid of it
+    private void stopCompactor() throws Exception {
+        engine.checkpointManager().compactor().stop();
+    }
 }


Reply via email to