This is an automated email from the ASF dual-hosted git repository. tkalkirill pushed a commit to branch ignite-26315 in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit 4cd2b0895e195bb7a519a4d0d04c35258cb47fbd Author: Kirill Tkalenko <tkalkir...@yandex.ru> AuthorDate: Wed Aug 27 17:45:58 2025 +0300 IGNITE-26315 wip --- .../persistence/PartitionMetaManager.java | 2 +- .../persistence/checkpoint/CheckpointManager.java | 6 ++-- .../persistence/checkpoint/CheckpointPages.java | 7 ++++ .../checkpoint/CheckpointPagesWriter.java | 27 ++++++++++----- .../checkpoint/CheckpointProgressImpl.java | 14 ++++++++ .../persistence/checkpoint/Checkpointer.java | 21 +++++++----- .../persistence/compaction/Compactor.java | 1 + .../replacement/DelayedDirtyPageWrite.java | 7 ++-- .../pagememory/AbstractPageMemoryTableStorage.java | 40 ++++++++++++++++++---- .../PersistentPageMemoryTableStorage.java | 32 ++++++++++++++--- 10 files changed, 121 insertions(+), 36 deletions(-) diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PartitionMetaManager.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PartitionMetaManager.java index 3db2224454a..5cdb733af75 100644 --- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PartitionMetaManager.java +++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PartitionMetaManager.java @@ -65,7 +65,7 @@ public class PartitionMetaManager { } /** - * Returns the partition's meta information. + * Returns the partition's meta information, {@code null} if not yet added or was removed when the partition was destroyed. * * @param groupPartitionId Partition of the group. */ diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java index fe973622617..86e092442b6 100644 --- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java +++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java @@ -403,10 +403,8 @@ public class CheckpointManager { * @param groupPartitionId Pair of group ID with partition ID. * @return Future that will complete when the callback completes. */ + // TODO: IGNITE-26315 Вот тут скорее всего сделать по другому или вроде того public CompletableFuture<Void> onPartitionDestruction(GroupPartitionId groupPartitionId) { - return CompletableFuture.allOf( - checkpointer.prepareToDestroyPartition(groupPartitionId), - compactor.prepareToDestroyPartition(groupPartitionId) - ); + return compactor.prepareToDestroyPartition(groupPartitionId); } } diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPages.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPages.java index 01c2e69ad9e..2c84d7f3a35 100644 --- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPages.java +++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPages.java @@ -23,6 +23,7 @@ import static org.apache.ignite.internal.util.IgniteUtils.getUninterruptibly; import java.util.Set; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; +import java.util.concurrent.locks.Lock; import org.apache.ignite.internal.lang.IgniteInternalCheckedException; import org.apache.ignite.internal.pagememory.persistence.DirtyFullPageId; import org.apache.ignite.internal.pagememory.persistence.GroupPartitionId; @@ -195,4 +196,10 @@ public class CheckpointPages { public void unblockPartitionDestruction(GroupPartitionId groupPartitionId) { checkpointProgress.unblockPartitionDestruction(groupPartitionId); } + + /** No doc yet. */ + // TODO: IGNITE-26315 Исправить документацию и удалить не используемые методы + public Lock partitionDesctructionLock(GroupPartitionId groupPartitionId) { + return checkpointProgress.partitionDesctructionLock(groupPartitionId); + } } diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriter.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriter.java index e0746d46c18..490ebe5b9cd 100644 --- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriter.java +++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriter.java @@ -35,6 +35,7 @@ import java.util.Map.Entry; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.LongAdder; +import java.util.concurrent.locks.Lock; import java.util.function.BooleanSupplier; import org.apache.ignite.internal.lang.IgniteInternalCheckedException; import org.apache.ignite.internal.logger.IgniteLogger; @@ -203,7 +204,9 @@ public class CheckpointPagesWriter implements Runnable { ) throws IgniteInternalCheckedException { CheckpointDirtyPagesView checkpointDirtyPagesView = checkpointDirtyPagesView(pageMemory, partitionId); - checkpointProgress.blockPartitionDestruction(partitionId); + Lock partitionDesctructionLock = checkpointProgress.partitionDesctructionLock(partitionId); + + partitionDesctructionLock.lock(); try { addUpdatePartitionCounterIfAbsent(partitionId); @@ -224,7 +227,7 @@ public class CheckpointPagesWriter implements Runnable { writeDirtyPage(pageMemory, pageId, tmpWriteBuf, pageStoreWriter, true); } } finally { - checkpointProgress.unblockPartitionDestruction(partitionId); + partitionDesctructionLock.unlock(); } } @@ -267,6 +270,8 @@ public class CheckpointPagesWriter implements Runnable { GroupPartitionId partitionId = null; + Lock partitionDesctructionLock = null; + try { for (DirtyFullPageId pageId : entry.getValue()) { if (shutdownNow.getAsBoolean()) { @@ -276,20 +281,22 @@ public class CheckpointPagesWriter implements Runnable { updateHeartbeat.run(); if (partitionIdChanged(partitionId, pageId)) { - if (partitionId != null) { - checkpointProgress.unblockPartitionDestruction(partitionId); + if (partitionDesctructionLock != null) { + partitionDesctructionLock.unlock(); } partitionId = GroupPartitionId.convert(pageId); - checkpointProgress.blockPartitionDestruction(partitionId); + partitionDesctructionLock = checkpointProgress.partitionDesctructionLock(partitionId); + + partitionDesctructionLock.lock(); } writeDirtyPage(pageMemory, pageId, tmpWriteBuf, pageStoreWriter, useTryWriteLockOnPage); } } finally { - if (partitionId != null) { - checkpointProgress.unblockPartitionDestruction(partitionId); + if (partitionDesctructionLock != null) { + partitionDesctructionLock.unlock(); } } } @@ -333,14 +340,16 @@ public class CheckpointPagesWriter implements Runnable { GroupPartitionId partitionId = GroupPartitionId.convert(cpPageId); - checkpointProgress.blockPartitionDestruction(partitionId); + Lock partitionDesctructionLock = checkpointProgress.partitionDesctructionLock(partitionId); + + partitionDesctructionLock.lock(); try { addUpdatePartitionCounterIfAbsent(partitionId); pageMemory.checkpointWritePage(cpPageId, tmpWriteBuf.rewind(), pageStoreWriter, tracker, true); } finally { - checkpointProgress.unblockPartitionDestruction(partitionId); + partitionDesctructionLock.unlock(); } } } diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointProgressImpl.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointProgressImpl.java index ea77ba217fe..cd04b634bf9 100644 --- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointProgressImpl.java +++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointProgressImpl.java @@ -28,6 +28,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import org.apache.ignite.internal.pagememory.persistence.DirtyFullPageId; import org.apache.ignite.internal.pagememory.persistence.GroupPartitionId; import org.apache.ignite.internal.pagememory.persistence.PartitionProcessingCounterMap; @@ -78,6 +80,8 @@ public class CheckpointProgressImpl implements CheckpointProgress { /** Partitions currently being processed, for example, writing dirty pages or doing fsync. */ private final PartitionProcessingCounterMap processedPartitionMap = new PartitionProcessingCounterMap(); + private final Map<GroupPartitionId, Lock> destructionLockByPartitionId = new ConcurrentHashMap<>(); + /** Assistant for synchronizing page replacement and fsync phase. */ private final CheckpointPageReplacement checkpointPageReplacement = new CheckpointPageReplacement(); @@ -237,6 +241,10 @@ public class CheckpointProgressImpl implements CheckpointProgress { doFinishFuturesWhichLessOrEqualTo(newState); } + + if (newState == FINISHED) { + destructionLockByPartitionId.clear(); + } } /** @@ -421,4 +429,10 @@ public class CheckpointProgressImpl implements CheckpointProgress { CompletableFuture<Void> getUnblockFsyncOnPageReplacementFuture() { return checkpointPageReplacement.stopBlocking(); } + + /** No doc yet. */ + // TODO: IGNITE-26315 Добавить документацию и удалить не нужные методы + public Lock partitionDesctructionLock(GroupPartitionId groupPartitionId) { + return destructionLockByPartitionId.computeIfAbsent(groupPartitionId, unused -> new ReentrantLock()); + } } diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java index 463866fd26d..4134faa98f4 100644 --- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java +++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java @@ -46,6 +46,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.LongAdder; +import java.util.concurrent.locks.Lock; import java.util.function.BooleanSupplier; import org.apache.ignite.internal.components.LogSyncer; import org.apache.ignite.internal.components.LongJvmPauseDetector; @@ -612,27 +613,29 @@ public class Checkpointer extends IgniteWorker { return; } - currentCheckpointProgress.blockPartitionDestruction(partitionId); + Lock partitionDesctructionLock = currentCheckpointProgress.partitionDesctructionLock(partitionId); - try { - fsyncDeltaFilePageStoreOnCheckpointThread(filePageStore); - - fsyncFilePageStoreOnCheckpointThread(filePageStore); + partitionDesctructionLock.lock(); - renameDeltaFileOnCheckpointThread(filePageStore, partitionId); - - // TODO: IGNITE-26315 Deal with partition deletion blocking on checkpoint + try { PartitionMeta meta = partitionMetaManager.getMeta(partitionId); + // If this happens, then the partition is destroyed. if (meta == null) { return; } + fsyncDeltaFilePageStoreOnCheckpointThread(filePageStore); + + fsyncFilePageStoreOnCheckpointThread(filePageStore); + + renameDeltaFileOnCheckpointThread(filePageStore, partitionId); + filePageStore.checkpointedPageCount(meta.metaSnapshot(currentCheckpointProgress.id()).pageCount()); currentCheckpointProgress.syncedPagesCounter().addAndGet(pagesWritten.intValue()); } finally { - currentCheckpointProgress.unblockPartitionDestruction(partitionId); + partitionDesctructionLock.unlock(); } } diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java index e708f55bad6..6c5f3080888 100644 --- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java +++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java @@ -460,6 +460,7 @@ public class Compactor extends IgniteWorker { * @param groupPartitionId Pair of group ID with partition ID. * @return Future at the complete of which we can delete the partition file and its delta files. */ + // TODO: IGNITE-26315 От этого тоже нужно будет скорее всего избавиться public CompletableFuture<Void> prepareToDestroyPartition(GroupPartitionId groupPartitionId) { CompletableFuture<Void> partitionProcessingFuture = partitionCompactionInProgressMap.getProcessedPartitionFuture(groupPartitionId); diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/DelayedDirtyPageWrite.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/DelayedDirtyPageWrite.java index 243ace83dbf..6357c2d590d 100644 --- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/DelayedDirtyPageWrite.java +++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/DelayedDirtyPageWrite.java @@ -21,6 +21,7 @@ import static org.apache.ignite.internal.util.GridUnsafe.bufferAddress; import static org.apache.ignite.internal.util.GridUnsafe.copyMemory; import java.nio.ByteBuffer; +import java.util.concurrent.locks.Lock; import org.apache.ignite.internal.lang.IgniteInternalCheckedException; import org.apache.ignite.internal.pagememory.persistence.DirtyFullPageId; import org.apache.ignite.internal.pagememory.persistence.GroupPartitionId; @@ -134,7 +135,9 @@ public class DelayedDirtyPageWrite { Throwable errorOnWrite = null; - checkpointPages.blockPartitionDestruction(GroupPartitionId.convert(fullPageId)); + Lock partitionDesctructionLock = checkpointPages.partitionDesctructionLock(GroupPartitionId.convert(fullPageId)); + + partitionDesctructionLock.lock(); try { flushDirtyPage.write(pageMemory, fullPageId, byteBufThreadLoc.get()); @@ -143,7 +146,7 @@ public class DelayedDirtyPageWrite { throw t; } finally { - checkpointPages.unblockPartitionDestruction(GroupPartitionId.convert(fullPageId)); + partitionDesctructionLock.unlock(); checkpointPages.unblockFsyncOnPageReplacement(fullPageId, errorOnWrite); diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java index 1da4f4af9dd..25219a008dd 100644 --- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java +++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java @@ -26,12 +26,15 @@ import static org.apache.ignite.internal.storage.util.StorageUtils.transitionToC import static org.apache.ignite.internal.storage.util.StorageUtils.transitionToDestroyedState; import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; +import java.time.Duration; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; import org.apache.ignite.internal.lang.IgniteStringFormatter; +import org.apache.ignite.internal.logger.IgniteLogger; +import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.internal.pagememory.DataRegion; import org.apache.ignite.internal.pagememory.PageMemory; import org.apache.ignite.internal.pagememory.freelist.FreeList; @@ -289,20 +292,43 @@ public abstract class AbstractPageMemoryTableStorage<T extends AbstractPageMemor } } + private static final IgniteLogger LOG = Loggers.forClass(AbstractPageMemoryTableStorage.class); + + // TODO: IGNITE-26315 Вернуть как было @Override public CompletableFuture<Void> startRebalancePartition(int partitionId) { - return busy(() -> mvPartitionStorages.startRebalance(partitionId, mvPartitionStorage -> { - mvPartitionStorage.startRebalance(); + long startNanos = System.nanoTime(); + + return busy(() -> { + LOG.info(">>>>> in busy: {}", Duration.ofNanos(System.nanoTime() - startNanos)); + + return mvPartitionStorages.startRebalance(partitionId, mvPartitionStorage -> { + LOG.info(">>>>> in startRebalance: {}", Duration.ofNanos(System.nanoTime() - startNanos)); + + mvPartitionStorage.startRebalance(); + + LOG.info(">>>>> after mvPartitionStorage.startRebalance: {}", Duration.ofNanos(System.nanoTime() - startNanos)); + + return clearStorageAndUpdateDataStructures(mvPartitionStorage) + .thenAccept(unused -> { + LOG.info( + ">>>>> after clearStorageAndUpdateDataStructures: {}", + Duration.ofNanos(System.nanoTime() - startNanos) + ); - return clearStorageAndUpdateDataStructures(mvPartitionStorage) - .thenAccept(unused -> mvPartitionStorage.runConsistently(locker -> { mvPartitionStorage.lastAppliedOnRebalance(REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS); return null; - }) - ); - })); + }); + + LOG.info( + ">>>>> after mvPartitionStorage.runConsistently: {}", + Duration.ofNanos(System.nanoTime() - startNanos) + ); + }); + }); + }); } @Override diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java index 12d08279c3e..e2a462b3c98 100644 --- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java +++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java @@ -27,6 +27,7 @@ import java.nio.ByteBuffer; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; +import java.util.concurrent.locks.Lock; import java.util.function.Supplier; import org.apache.ignite.internal.failure.FailureProcessor; import org.apache.ignite.internal.lang.IgniteInternalCheckedException; @@ -37,6 +38,7 @@ import org.apache.ignite.internal.pagememory.freelist.FreeListImpl; import org.apache.ignite.internal.pagememory.persistence.GroupPartitionId; import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory; import org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointProgress; +import org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointProgressImpl; import org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointTimeoutLock; import org.apache.ignite.internal.pagememory.persistence.store.FilePageStore; import org.apache.ignite.internal.pagememory.reuse.ReuseList; @@ -364,11 +366,33 @@ public class PersistentPageMemoryTableStorage extends AbstractPageMemoryTableSto private CompletableFuture<Void> destroyPartitionPhysically(GroupPartitionId groupPartitionId) { dataRegion.filePageStoreManager().getStore(groupPartitionId).markToDestroy(); - dataRegion.pageMemory().invalidate(groupPartitionId.getGroupId(), groupPartitionId.getPartitionId()); + CheckpointProgress currentCheckpointProgress = dataRegion.checkpointManager().currentCheckpointProgress(); - return dataRegion.checkpointManager().onPartitionDestruction(groupPartitionId) - .thenAccept(unused -> dataRegion.partitionMetaManager().removeMeta(groupPartitionId)) - .thenCompose(unused -> dataRegion.filePageStoreManager().destroyPartition(groupPartitionId)); + assert currentCheckpointProgress == null || currentCheckpointProgress instanceof CheckpointProgressImpl : + "currentCheckpointProgress=" + currentCheckpointProgress + ", groupPartitionId=" + groupPartitionId; + + Lock partitionDesctructionLock = null; + + if (currentCheckpointProgress != null) { + partitionDesctructionLock = ((CheckpointProgressImpl) currentCheckpointProgress).partitionDesctructionLock(groupPartitionId); + } + + if (partitionDesctructionLock != null) { + partitionDesctructionLock.lock(); + } + + try { + dataRegion.pageMemory().invalidate(groupPartitionId.getGroupId(), groupPartitionId.getPartitionId()); + + dataRegion.partitionMetaManager().removeMeta(groupPartitionId); + + return dataRegion.checkpointManager().onPartitionDestruction(groupPartitionId) + .thenCompose(unused -> dataRegion.filePageStoreManager().destroyPartition(groupPartitionId)); + } finally { + if (partitionDesctructionLock != null) { + partitionDesctructionLock.unlock(); + } + } } private GroupPartitionId createGroupPartitionId(int partitionId) {