This is an automated email from the ASF dual-hosted git repository.

tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 3f68eea4a99 IGNITE-27028 Fix consistency of last applied index on 
start rebalance for aipersist (#6947)
3f68eea4a99 is described below

commit 3f68eea4a9991372e7722c10dcbf40efac9b5e31
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Wed Nov 12 12:27:05 2025 +0300

    IGNITE-27028 Fix consistency of last applied index on start rebalance for 
aipersist (#6947)
---
 .../pagememory/AbstractPageMemoryTableStorage.java | 33 +++++++++++++---------
 .../PersistentPageMemoryTableStorage.java          |  7 ++++-
 .../pagememory/VolatilePageMemoryTableStorage.java |  7 ++++-
 3 files changed, 32 insertions(+), 15 deletions(-)

diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java
index a071959b5b4..8f3a1011219 100644
--- 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java
@@ -294,14 +294,14 @@ public abstract class AbstractPageMemoryTableStorage<T 
extends AbstractPageMemor
         return busy(() -> mvPartitionStorages.startRebalance(partitionId, 
mvPartitionStorage -> {
             mvPartitionStorage.startRebalance();
 
-            return clearStorageAndUpdateDataStructures(mvPartitionStorage)
-                    .thenAccept(unused ->
-                            mvPartitionStorage.runConsistently(locker -> {
-                                
mvPartitionStorage.lastAppliedOnRebalance(REBALANCE_IN_PROGRESS, 
REBALANCE_IN_PROGRESS);
-
-                                return null;
-                            })
-                    );
+            return clearStorageAndUpdateDataStructures(
+                    mvPartitionStorage,
+                    () -> mvPartitionStorage.runConsistently(locker -> {
+                        
mvPartitionStorage.lastAppliedOnRebalance(REBALANCE_IN_PROGRESS, 
REBALANCE_IN_PROGRESS);
+
+                        return null;
+                    })
+            );
         }));
     }
 
@@ -310,8 +310,9 @@ public abstract class AbstractPageMemoryTableStorage<T 
extends AbstractPageMemor
         return busy(() -> mvPartitionStorages.abortRebalance(partitionId, 
mvPartitionStorage -> {
             mvPartitionStorage.startAbortRebalance();
 
-            return clearStorageAndUpdateDataStructures(mvPartitionStorage)
-                    .thenAccept(unused -> {
+            return clearStorageAndUpdateDataStructures(
+                    mvPartitionStorage,
+                    () -> {
                         mvPartitionStorage.runConsistently(locker -> {
                             mvPartitionStorage.lastAppliedOnRebalance(0, 0);
 
@@ -319,7 +320,8 @@ public abstract class AbstractPageMemoryTableStorage<T 
extends AbstractPageMemor
                         });
 
                         mvPartitionStorage.completeRebalance();
-                    });
+                    }
+            );
         }));
     }
 
@@ -351,7 +353,7 @@ public abstract class AbstractPageMemoryTableStorage<T 
extends AbstractPageMemor
             try {
                 mvPartitionStorage.startCleanup();
 
-                return clearStorageAndUpdateDataStructures(mvPartitionStorage)
+                return clearStorageAndUpdateDataStructures(mvPartitionStorage, 
() -> {})
                         .whenComplete((unused, throwable) -> 
mvPartitionStorage.finishCleanup());
             } catch (StorageException e) {
                 mvPartitionStorage.finishCleanup();
@@ -369,9 +371,14 @@ public abstract class AbstractPageMemoryTableStorage<T 
extends AbstractPageMemor
      * Clears the partition multi-version storage and all its indexes, updates 
their internal data structures such as {@link BplusTree},
      * {@link FreeList} and {@link ReuseList}.
      *
+     * @param mvPartitionStorage Storage to be cleared.
+     * @param afterUpdateStructuresCallback Callback to be invoked after 
updating internal structures.
      * @return Future of the operation.
      */
-    abstract CompletableFuture<Void> 
clearStorageAndUpdateDataStructures(AbstractPageMemoryMvPartitionStorage 
mvPartitionStorage);
+    abstract CompletableFuture<Void> clearStorageAndUpdateDataStructures(
+            AbstractPageMemoryMvPartitionStorage mvPartitionStorage,
+            Runnable afterUpdateStructuresCallback
+    );
 
     /**
      * Returns the table ID.
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java
index e9c50688e0b..7385839d8c7 100644
--- 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java
@@ -332,7 +332,10 @@ public class PersistentPageMemoryTableStorage extends 
AbstractPageMemoryTableSto
     }
 
     @Override
-    CompletableFuture<Void> 
clearStorageAndUpdateDataStructures(AbstractPageMemoryMvPartitionStorage 
mvPartitionStorage) {
+    CompletableFuture<Void> clearStorageAndUpdateDataStructures(
+            AbstractPageMemoryMvPartitionStorage mvPartitionStorage,
+            Runnable afterUpdateStructuresCallback
+    ) {
         GroupPartitionId groupPartitionId = 
createGroupPartitionId(mvPartitionStorage.partitionId());
 
         return destroyPartitionPhysically(groupPartitionId).thenAccept(unused 
-> {
@@ -359,6 +362,8 @@ public class PersistentPageMemoryTableStorage extends 
AbstractPageMemoryTableSto
                         gcQueue
                 );
 
+                afterUpdateStructuresCallback.run();
+
                 return null;
             });
         });
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryTableStorage.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryTableStorage.java
index 5dec337b499..1cf367f0f5c 100644
--- 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryTableStorage.java
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryTableStorage.java
@@ -175,7 +175,10 @@ public class VolatilePageMemoryTableStorage extends 
AbstractPageMemoryTableStora
     }
 
     @Override
-    CompletableFuture<Void> 
clearStorageAndUpdateDataStructures(AbstractPageMemoryMvPartitionStorage 
mvPartitionStorage) {
+    CompletableFuture<Void> clearStorageAndUpdateDataStructures(
+            AbstractPageMemoryMvPartitionStorage mvPartitionStorage,
+            Runnable afterUpdateStructuresCallback
+    ) {
         VolatilePageMemoryMvPartitionStorage volatilePartitionStorage = 
(VolatilePageMemoryMvPartitionStorage) mvPartitionStorage;
 
         volatilePartitionStorage.destroyStructures().whenComplete((res, ex) -> 
{
@@ -196,6 +199,8 @@ public class VolatilePageMemoryTableStorage extends 
AbstractPageMemoryTableStora
                 createGarbageCollectionTree(partitionId)
         );
 
+        afterUpdateStructuresCallback.run();
+
         return nullCompletedFuture();
     }
 

Reply via email to