This is an automated email from the ASF dual-hosted git repository. tkalkirill pushed a commit to branch ignite-26988 in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit 49c837c8c32d95f7e1a36df51a6d72b9306432d6 Author: Kirill Tkalenko <[email protected]> AuthorDate: Sun Nov 9 17:39:17 2025 +0300 IGNITE-26988 wip --- .../pagememory/AbstractPageMemoryTableStorage.java | 33 +++++---- .../PersistentPageMemoryTableStorage.java | 7 +- .../pagememory/VolatilePageMemoryTableStorage.java | 7 +- .../PersistentPageMemoryMvTableStorageTest.java | 83 ++++++++++++++++++---- 4 files changed, 103 insertions(+), 27 deletions(-) diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java index 1da4f4af9dd..3f3a2a8de6f 100644 --- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java +++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java @@ -294,22 +294,23 @@ public abstract class AbstractPageMemoryTableStorage<T extends AbstractPageMemor return busy(() -> mvPartitionStorages.startRebalance(partitionId, mvPartitionStorage -> { mvPartitionStorage.startRebalance(); - return clearStorageAndUpdateDataStructures(mvPartitionStorage) - .thenAccept(unused -> - mvPartitionStorage.runConsistently(locker -> { - mvPartitionStorage.lastAppliedOnRebalance(REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS); - - return null; - }) - ); + return clearStorageAndUpdateDataStructures( + mvPartitionStorage, + () -> mvPartitionStorage.runConsistently(locker -> { + mvPartitionStorage.lastAppliedOnRebalance(REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS); + + return null; + }) + ); })); } @Override public CompletableFuture<Void> abortRebalancePartition(int partitionId) { return busy(() -> mvPartitionStorages.abortRebalance(partitionId, mvPartitionStorage -> - clearStorageAndUpdateDataStructures(mvPartitionStorage) - .thenAccept(unused -> { + clearStorageAndUpdateDataStructures( + mvPartitionStorage, + () -> { mvPartitionStorage.runConsistently(locker -> { mvPartitionStorage.lastAppliedOnRebalance(0, 0); @@ -317,7 +318,8 @@ public abstract class AbstractPageMemoryTableStorage<T extends AbstractPageMemor }); mvPartitionStorage.completeRebalance(); - }) + } + ) )); } @@ -349,7 +351,7 @@ public abstract class AbstractPageMemoryTableStorage<T extends AbstractPageMemor try { mvPartitionStorage.startCleanup(); - return clearStorageAndUpdateDataStructures(mvPartitionStorage) + return clearStorageAndUpdateDataStructures(mvPartitionStorage, () -> {}) .whenComplete((unused, throwable) -> mvPartitionStorage.finishCleanup()); } catch (StorageException e) { mvPartitionStorage.finishCleanup(); @@ -367,9 +369,14 @@ public abstract class AbstractPageMemoryTableStorage<T extends AbstractPageMemor * Clears the partition multi-version storage and all its indexes, updates their internal data structures such as {@link BplusTree}, * {@link FreeList} and {@link ReuseList}. * + * @param mvPartitionStorage Storage to be cleared. + * @param afterUpdateStructuresCallback Callback after updating internal structures. * @return Future of the operation. */ - abstract CompletableFuture<Void> clearStorageAndUpdateDataStructures(AbstractPageMemoryMvPartitionStorage mvPartitionStorage); + abstract CompletableFuture<Void> clearStorageAndUpdateDataStructures( + AbstractPageMemoryMvPartitionStorage mvPartitionStorage, + Runnable afterUpdateStructuresCallback + ); /** * Returns the table ID. diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java index e9c50688e0b..7385839d8c7 100644 --- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java +++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java @@ -332,7 +332,10 @@ public class PersistentPageMemoryTableStorage extends AbstractPageMemoryTableSto } @Override - CompletableFuture<Void> clearStorageAndUpdateDataStructures(AbstractPageMemoryMvPartitionStorage mvPartitionStorage) { + CompletableFuture<Void> clearStorageAndUpdateDataStructures( + AbstractPageMemoryMvPartitionStorage mvPartitionStorage, + Runnable afterUpdateStructuresCallback + ) { GroupPartitionId groupPartitionId = createGroupPartitionId(mvPartitionStorage.partitionId()); return destroyPartitionPhysically(groupPartitionId).thenAccept(unused -> { @@ -359,6 +362,8 @@ public class PersistentPageMemoryTableStorage extends AbstractPageMemoryTableSto gcQueue ); + afterUpdateStructuresCallback.run(); + return null; }); }); diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryTableStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryTableStorage.java index 5dec337b499..1cf367f0f5c 100644 --- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryTableStorage.java +++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryTableStorage.java @@ -175,7 +175,10 @@ public class VolatilePageMemoryTableStorage extends AbstractPageMemoryTableStora } @Override - CompletableFuture<Void> clearStorageAndUpdateDataStructures(AbstractPageMemoryMvPartitionStorage mvPartitionStorage) { + CompletableFuture<Void> clearStorageAndUpdateDataStructures( + AbstractPageMemoryMvPartitionStorage mvPartitionStorage, + Runnable afterUpdateStructuresCallback + ) { VolatilePageMemoryMvPartitionStorage volatilePartitionStorage = (VolatilePageMemoryMvPartitionStorage) mvPartitionStorage; volatilePartitionStorage.destroyStructures().whenComplete((res, ex) -> { @@ -196,6 +199,8 @@ public class VolatilePageMemoryTableStorage extends AbstractPageMemoryTableStora createGarbageCollectionTree(partitionId) ); + afterUpdateStructuresCallback.run(); + return nullCompletedFuture(); } diff --git a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryMvTableStorageTest.java b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryMvTableStorageTest.java index 9369c26a845..c69997c5d1a 100644 --- a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryMvTableStorageTest.java +++ b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryMvTableStorageTest.java @@ -29,8 +29,10 @@ import static org.apache.ignite.internal.testframework.matchers.CompletableFutur import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.emptyArray; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.not; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertInstanceOf; @@ -77,6 +79,7 @@ import org.apache.ignite.internal.testframework.ExecutorServiceExtension; import org.apache.ignite.internal.testframework.InjectExecutorService; import org.apache.ignite.internal.testframework.WorkDirectory; import org.apache.ignite.internal.testframework.WorkDirectoryExtension; +import org.apache.ignite.internal.util.CompletableFutures; import org.apache.ignite.internal.util.Constants; import org.apache.ignite.internal.util.IgniteUtils; import org.junit.jupiter.api.AfterEach; @@ -218,7 +221,7 @@ public class PersistentPageMemoryMvTableStorageTest extends AbstractMvTableStora assertNotNull(metric); assertEquals(0L, metric.value()); - MvPartitionStorage mvPartitionStorage = getOrCreateMvPartition(PARTITION_ID); + PersistentPageMemoryMvPartitionStorage mvPartitionStorage = getOrCreateMvPartition(PARTITION_ID); assertThat(metric.value(), allOf(greaterThan(0L), equalTo(totalAllocatedSizeInBytes(PARTITION_ID)))); addWriteCommitted(mvPartitionStorage); @@ -232,7 +235,7 @@ public class PersistentPageMemoryMvTableStorageTest extends AbstractMvTableStora assertNotNull(metric); assertEquals(0L, metric.value()); - MvPartitionStorage mvPartitionStorage = getOrCreateMvPartition(PARTITION_ID); + PersistentPageMemoryMvPartitionStorage mvPartitionStorage = getOrCreateMvPartition(PARTITION_ID); assertThat(metric.value(), allOf(greaterThan(0L), equalTo(totalUsedSizeInBytes(PARTITION_ID)))); addWriteCommitted(mvPartitionStorage); @@ -280,18 +283,22 @@ public class PersistentPageMemoryMvTableStorageTest extends AbstractMvTableStora return pageSize() * (filePageStorePageCount(partitionId) - freeListEmptyPageCount(partitionId)); } - private void addWriteCommitted(MvPartitionStorage storage) { - var rowId = new RowId(PARTITION_ID); + private void addWriteCommitted(PersistentPageMemoryMvPartitionStorage... storages) { + assertThat(storages, not(emptyArray())); - BinaryRow binaryRow = binaryRow(new TestKey(0, "0"), new TestValue(1, "1")); + for (PersistentPageMemoryMvPartitionStorage storage : storages) { + var rowId = new RowId(storage.partitionId()); - storage.runConsistently(locker -> { - locker.lock(rowId); + BinaryRow binaryRow = binaryRow(new TestKey(0, "0"), new TestValue(1, "1")); - storage.addWriteCommitted(rowId, binaryRow, clock.now()); + storage.runConsistently(locker -> { + locker.lock(rowId); - return null; - }); + storage.addWriteCommitted(rowId, binaryRow, clock.now()); + + return null; + }); + } } private void addWriteCommitted(MvPartitionStorage storage, List<RowId> rowIds, List<BinaryRow> binaryRows) { @@ -397,7 +404,7 @@ public class PersistentPageMemoryMvTableStorageTest extends AbstractMvTableStora @Test void testSyncFreeListOnCheckpointAfterStartRebalance() { - MvPartitionStorage storage = getOrCreateMvPartition(PARTITION_ID); + PersistentPageMemoryMvPartitionStorage storage = getOrCreateMvPartition(PARTITION_ID); var meta = new MvPartitionMeta(1, 1, BYTE_EMPTY_ARRAY, null, BYTE_EMPTY_ARRAY); @@ -473,7 +480,7 @@ public class PersistentPageMemoryMvTableStorageTest extends AbstractMvTableStora @Test void testSuccessfulPartitionRestartAfterParallelUpdateLeaseAndCheckpoint() throws Exception { for (int i = 0; i < 100; i++) { - MvPartitionStorage mvPartition = getOrCreateMvPartition(PARTITION_ID); + PersistentPageMemoryMvPartitionStorage mvPartition = getOrCreateMvPartition(PARTITION_ID); addWriteCommitted(mvPartition); @@ -512,6 +519,31 @@ public class PersistentPageMemoryMvTableStorageTest extends AbstractMvTableStora } } + /** + * Checks that the partition meta is updated consistently at the start of rebalancing. In other words, it checks that updating the meta + * and recreating the structures are under the same checkpoint read lock. If this doesn't happen, then when writing the meta at the + * checkpoint, it may not be included in the dirty page list and may not be included in the delta file page index list. + */ + @Test + void testUpdatePartitionMetaAfterStartRebalance() { + int[] partitionIds = IntStream.range(0, 5) + .map(i -> PARTITION_ID + i) + .toArray(); + + PersistentPageMemoryMvPartitionStorage[] partitions = getOrCreateMvPartitions(partitionIds); + + for (int i = 0; i < 10_000; i++) { + addWriteCommitted(partitions); + + runRace( + () -> startRebalance(partitionIds), + () -> assertThat(forceCheckpointAsync(), willCompleteSuccessfully()) + ); + + abortRebalance(partitionIds); + } + } + private CompletableFuture<Void> forceCheckpointAsync() { return engine.checkpointManager().forceCheckpoint("test").futureFor(FINISHED); } @@ -561,4 +593,31 @@ public class PersistentPageMemoryMvTableStorageTest extends AbstractMvTableStora private int partitionGeneration(int partId) { return ((PersistentPageMemoryTableStorage) tableStorage).dataRegion().pageMemory().partGeneration(TABLE_ID, partId); } + + @Override + protected PersistentPageMemoryMvPartitionStorage getOrCreateMvPartition(int partitionId) { + return (PersistentPageMemoryMvPartitionStorage) super.getOrCreateMvPartition(partitionId); + } + + private PersistentPageMemoryMvPartitionStorage[] getOrCreateMvPartitions(int... partitionIds) { + return IntStream.of(partitionIds) + .mapToObj(this::getOrCreateMvPartition) + .toArray(PersistentPageMemoryMvPartitionStorage[]::new); + } + + private void startRebalance(int... partitionIds) { + List<CompletableFuture<Void>> startRebalanceFutures = IntStream.of(partitionIds) + .mapToObj(tableStorage::startRebalancePartition) + .collect(toList()); + + assertThat(CompletableFutures.allOf(startRebalanceFutures), willCompleteSuccessfully()); + } + + private void abortRebalance(int... partitionIds) { + List<CompletableFuture<Void>> abortRebalanceFutures = IntStream.of(partitionIds) + .mapToObj(tableStorage::abortRebalancePartition) + .collect(toList()); + + assertThat(CompletableFutures.allOf(abortRebalanceFutures), willCompleteSuccessfully()); + } }
