This is an automated email from the ASF dual-hosted git repository.

tkalkirill pushed a commit to branch ignite-26988
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit 49c837c8c32d95f7e1a36df51a6d72b9306432d6
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Sun Nov 9 17:39:17 2025 +0300

    IGNITE-26988 wip
---
 .../pagememory/AbstractPageMemoryTableStorage.java | 33 +++++----
 .../PersistentPageMemoryTableStorage.java          |  7 +-
 .../pagememory/VolatilePageMemoryTableStorage.java |  7 +-
 .../PersistentPageMemoryMvTableStorageTest.java    | 83 ++++++++++++++++++----
 4 files changed, 103 insertions(+), 27 deletions(-)

diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java
index 1da4f4af9dd..3f3a2a8de6f 100644
--- 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java
@@ -294,22 +294,23 @@ public abstract class AbstractPageMemoryTableStorage<T 
extends AbstractPageMemor
         return busy(() -> mvPartitionStorages.startRebalance(partitionId, 
mvPartitionStorage -> {
             mvPartitionStorage.startRebalance();
 
-            return clearStorageAndUpdateDataStructures(mvPartitionStorage)
-                    .thenAccept(unused ->
-                            mvPartitionStorage.runConsistently(locker -> {
-                                
mvPartitionStorage.lastAppliedOnRebalance(REBALANCE_IN_PROGRESS, 
REBALANCE_IN_PROGRESS);
-
-                                return null;
-                            })
-                    );
+            return clearStorageAndUpdateDataStructures(
+                    mvPartitionStorage,
+                    () -> mvPartitionStorage.runConsistently(locker -> {
+                        
mvPartitionStorage.lastAppliedOnRebalance(REBALANCE_IN_PROGRESS, 
REBALANCE_IN_PROGRESS);
+
+                        return null;
+                    })
+            );
         }));
     }
 
     @Override
     public CompletableFuture<Void> abortRebalancePartition(int partitionId) {
         return busy(() -> mvPartitionStorages.abortRebalance(partitionId, 
mvPartitionStorage ->
-                clearStorageAndUpdateDataStructures(mvPartitionStorage)
-                        .thenAccept(unused -> {
+                clearStorageAndUpdateDataStructures(
+                        mvPartitionStorage,
+                        () -> {
                             mvPartitionStorage.runConsistently(locker -> {
                                 mvPartitionStorage.lastAppliedOnRebalance(0, 
0);
 
@@ -317,7 +318,8 @@ public abstract class AbstractPageMemoryTableStorage<T 
extends AbstractPageMemor
                             });
 
                             mvPartitionStorage.completeRebalance();
-                        })
+                        }
+                )
         ));
     }
 
@@ -349,7 +351,7 @@ public abstract class AbstractPageMemoryTableStorage<T 
extends AbstractPageMemor
             try {
                 mvPartitionStorage.startCleanup();
 
-                return clearStorageAndUpdateDataStructures(mvPartitionStorage)
+                return clearStorageAndUpdateDataStructures(mvPartitionStorage, 
() -> {})
                         .whenComplete((unused, throwable) -> 
mvPartitionStorage.finishCleanup());
             } catch (StorageException e) {
                 mvPartitionStorage.finishCleanup();
@@ -367,9 +369,14 @@ public abstract class AbstractPageMemoryTableStorage<T 
extends AbstractPageMemor
      * Clears the partition multi-version storage and all its indexes, updates 
their internal data structures such as {@link BplusTree},
      * {@link FreeList} and {@link ReuseList}.
      *
+     * @param mvPartitionStorage Storage to be cleared.
+     * @param afterUpdateStructuresCallback Callback after updating internal 
structures.
      * @return Future of the operation.
      */
-    abstract CompletableFuture<Void> 
clearStorageAndUpdateDataStructures(AbstractPageMemoryMvPartitionStorage 
mvPartitionStorage);
+    abstract CompletableFuture<Void> clearStorageAndUpdateDataStructures(
+            AbstractPageMemoryMvPartitionStorage mvPartitionStorage,
+            Runnable afterUpdateStructuresCallback
+    );
 
     /**
      * Returns the table ID.
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java
index e9c50688e0b..7385839d8c7 100644
--- 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java
@@ -332,7 +332,10 @@ public class PersistentPageMemoryTableStorage extends 
AbstractPageMemoryTableSto
     }
 
     @Override
-    CompletableFuture<Void> 
clearStorageAndUpdateDataStructures(AbstractPageMemoryMvPartitionStorage 
mvPartitionStorage) {
+    CompletableFuture<Void> clearStorageAndUpdateDataStructures(
+            AbstractPageMemoryMvPartitionStorage mvPartitionStorage,
+            Runnable afterUpdateStructuresCallback
+    ) {
         GroupPartitionId groupPartitionId = 
createGroupPartitionId(mvPartitionStorage.partitionId());
 
         return destroyPartitionPhysically(groupPartitionId).thenAccept(unused 
-> {
@@ -359,6 +362,8 @@ public class PersistentPageMemoryTableStorage extends 
AbstractPageMemoryTableSto
                         gcQueue
                 );
 
+                afterUpdateStructuresCallback.run();
+
                 return null;
             });
         });
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryTableStorage.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryTableStorage.java
index 5dec337b499..1cf367f0f5c 100644
--- 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryTableStorage.java
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryTableStorage.java
@@ -175,7 +175,10 @@ public class VolatilePageMemoryTableStorage extends 
AbstractPageMemoryTableStora
     }
 
     @Override
-    CompletableFuture<Void> 
clearStorageAndUpdateDataStructures(AbstractPageMemoryMvPartitionStorage 
mvPartitionStorage) {
+    CompletableFuture<Void> clearStorageAndUpdateDataStructures(
+            AbstractPageMemoryMvPartitionStorage mvPartitionStorage,
+            Runnable afterUpdateStructuresCallback
+    ) {
         VolatilePageMemoryMvPartitionStorage volatilePartitionStorage = 
(VolatilePageMemoryMvPartitionStorage) mvPartitionStorage;
 
         volatilePartitionStorage.destroyStructures().whenComplete((res, ex) -> 
{
@@ -196,6 +199,8 @@ public class VolatilePageMemoryTableStorage extends 
AbstractPageMemoryTableStora
                 createGarbageCollectionTree(partitionId)
         );
 
+        afterUpdateStructuresCallback.run();
+
         return nullCompletedFuture();
     }
 
diff --git 
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryMvTableStorageTest.java
 
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryMvTableStorageTest.java
index 9369c26a845..c69997c5d1a 100644
--- 
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryMvTableStorageTest.java
+++ 
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryMvTableStorageTest.java
@@ -29,8 +29,10 @@ import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutur
 import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.allOf;
+import static org.hamcrest.Matchers.emptyArray;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.not;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
@@ -77,6 +79,7 @@ import 
org.apache.ignite.internal.testframework.ExecutorServiceExtension;
 import org.apache.ignite.internal.testframework.InjectExecutorService;
 import org.apache.ignite.internal.testframework.WorkDirectory;
 import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.internal.util.CompletableFutures;
 import org.apache.ignite.internal.util.Constants;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.junit.jupiter.api.AfterEach;
@@ -218,7 +221,7 @@ public class PersistentPageMemoryMvTableStorageTest extends 
AbstractMvTableStora
         assertNotNull(metric);
         assertEquals(0L, metric.value());
 
-        MvPartitionStorage mvPartitionStorage = 
getOrCreateMvPartition(PARTITION_ID);
+        PersistentPageMemoryMvPartitionStorage mvPartitionStorage = 
getOrCreateMvPartition(PARTITION_ID);
         assertThat(metric.value(), allOf(greaterThan(0L), 
equalTo(totalAllocatedSizeInBytes(PARTITION_ID))));
 
         addWriteCommitted(mvPartitionStorage);
@@ -232,7 +235,7 @@ public class PersistentPageMemoryMvTableStorageTest extends 
AbstractMvTableStora
         assertNotNull(metric);
         assertEquals(0L, metric.value());
 
-        MvPartitionStorage mvPartitionStorage = 
getOrCreateMvPartition(PARTITION_ID);
+        PersistentPageMemoryMvPartitionStorage mvPartitionStorage = 
getOrCreateMvPartition(PARTITION_ID);
         assertThat(metric.value(), allOf(greaterThan(0L), 
equalTo(totalUsedSizeInBytes(PARTITION_ID))));
 
         addWriteCommitted(mvPartitionStorage);
@@ -280,18 +283,22 @@ public class PersistentPageMemoryMvTableStorageTest 
extends AbstractMvTableStora
         return pageSize() * (filePageStorePageCount(partitionId) - 
freeListEmptyPageCount(partitionId));
     }
 
-    private void addWriteCommitted(MvPartitionStorage storage) {
-        var rowId = new RowId(PARTITION_ID);
+    private void addWriteCommitted(PersistentPageMemoryMvPartitionStorage... 
storages) {
+        assertThat(storages, not(emptyArray()));
 
-        BinaryRow binaryRow = binaryRow(new TestKey(0, "0"), new TestValue(1, 
"1"));
+        for (PersistentPageMemoryMvPartitionStorage storage : storages) {
+            var rowId = new RowId(storage.partitionId());
 
-        storage.runConsistently(locker -> {
-            locker.lock(rowId);
+            BinaryRow binaryRow = binaryRow(new TestKey(0, "0"), new 
TestValue(1, "1"));
 
-            storage.addWriteCommitted(rowId, binaryRow, clock.now());
+            storage.runConsistently(locker -> {
+                locker.lock(rowId);
 
-            return null;
-        });
+                storage.addWriteCommitted(rowId, binaryRow, clock.now());
+
+                return null;
+            });
+        }
     }
 
     private void addWriteCommitted(MvPartitionStorage storage, List<RowId> 
rowIds, List<BinaryRow> binaryRows) {
@@ -397,7 +404,7 @@ public class PersistentPageMemoryMvTableStorageTest extends 
AbstractMvTableStora
 
     @Test
     void testSyncFreeListOnCheckpointAfterStartRebalance() {
-        MvPartitionStorage storage = getOrCreateMvPartition(PARTITION_ID);
+        PersistentPageMemoryMvPartitionStorage storage = 
getOrCreateMvPartition(PARTITION_ID);
 
         var meta = new MvPartitionMeta(1, 1, BYTE_EMPTY_ARRAY, null, 
BYTE_EMPTY_ARRAY);
 
@@ -473,7 +480,7 @@ public class PersistentPageMemoryMvTableStorageTest extends 
AbstractMvTableStora
     @Test
     void testSuccessfulPartitionRestartAfterParallelUpdateLeaseAndCheckpoint() 
throws Exception {
         for (int i = 0; i < 100; i++) {
-            MvPartitionStorage mvPartition = 
getOrCreateMvPartition(PARTITION_ID);
+            PersistentPageMemoryMvPartitionStorage mvPartition = 
getOrCreateMvPartition(PARTITION_ID);
 
             addWriteCommitted(mvPartition);
 
@@ -512,6 +519,31 @@ public class PersistentPageMemoryMvTableStorageTest 
extends AbstractMvTableStora
         }
     }
 
+    /**
+     * Checks that the partition meta is updated consistently at the start of 
rebalancing. In other words, it checks that updating the meta
+     * and recreating the structures are under the same checkpoint read lock. 
If this doesn't happen, then when writing the meta at the
+     * checkpoint, it may not be included in the dirty page list and may not 
be included in the delta file page index list.
+     */
+    @Test
+    void testUpdatePartitionMetaAfterStartRebalance() {
+        int[] partitionIds = IntStream.range(0, 5)
+                .map(i -> PARTITION_ID + i)
+                .toArray();
+
+        PersistentPageMemoryMvPartitionStorage[] partitions = 
getOrCreateMvPartitions(partitionIds);
+
+        for (int i = 0; i < 10_000; i++) {
+            addWriteCommitted(partitions);
+
+            runRace(
+                    () -> startRebalance(partitionIds),
+                    () -> assertThat(forceCheckpointAsync(), 
willCompleteSuccessfully())
+            );
+
+            abortRebalance(partitionIds);
+        }
+    }
+
     private CompletableFuture<Void> forceCheckpointAsync() {
         return 
engine.checkpointManager().forceCheckpoint("test").futureFor(FINISHED);
     }
@@ -561,4 +593,31 @@ public class PersistentPageMemoryMvTableStorageTest 
extends AbstractMvTableStora
     private int partitionGeneration(int partId) {
         return ((PersistentPageMemoryTableStorage) 
tableStorage).dataRegion().pageMemory().partGeneration(TABLE_ID, partId);
     }
+
+    @Override
+    protected PersistentPageMemoryMvPartitionStorage 
getOrCreateMvPartition(int partitionId) {
+        return (PersistentPageMemoryMvPartitionStorage) 
super.getOrCreateMvPartition(partitionId);
+    }
+
+    private PersistentPageMemoryMvPartitionStorage[] 
getOrCreateMvPartitions(int... partitionIds) {
+        return IntStream.of(partitionIds)
+                .mapToObj(this::getOrCreateMvPartition)
+                .toArray(PersistentPageMemoryMvPartitionStorage[]::new);
+    }
+
+    private void startRebalance(int... partitionIds) {
+        List<CompletableFuture<Void>> startRebalanceFutures = 
IntStream.of(partitionIds)
+                .mapToObj(tableStorage::startRebalancePartition)
+                .collect(toList());
+
+        assertThat(CompletableFutures.allOf(startRebalanceFutures), 
willCompleteSuccessfully());
+    }
+
+    private void abortRebalance(int... partitionIds) {
+        List<CompletableFuture<Void>> abortRebalanceFutures = 
IntStream.of(partitionIds)
+                .mapToObj(tableStorage::abortRebalancePartition)
+                .collect(toList());
+
+        assertThat(CompletableFutures.allOf(abortRebalanceFutures), 
willCompleteSuccessfully());
+    }
 }

Reply via email to