This is an automated email from the ASF dual-hosted git repository.
tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 686b7e4b741 IGNITE-25671 move physical partition destruction to the
end of checkpoint (#6033)
686b7e4b741 is described below
commit 686b7e4b7419b86c5324539db05716922a3283a3
Author: Phillippko <[email protected]>
AuthorDate: Tue Jun 24 14:03:34 2025 +0400
IGNITE-25671 move physical partition destruction to the end of checkpoint
(#6033)
---
.../ignite/internal/rebalance/ItRebalanceTest.java | 8 +-
.../persistence/PartitionProcessingCounter.java | 85 ----------------------
.../persistence/PartitionProcessingCounterMap.java | 84 ---------------------
.../persistence/checkpoint/CheckpointManager.java | 15 ----
.../persistence/checkpoint/CheckpointPages.java | 50 -------------
.../checkpoint/CheckpointPagesWriter.java | 72 ++++++------------
.../checkpoint/CheckpointProgressImpl.java | 71 ------------------
.../persistence/checkpoint/Checkpointer.java | 47 ++----------
.../persistence/compaction/Compactor.java | 39 ++--------
.../replacement/DelayedDirtyPageWrite.java | 5 --
.../PartitionProcessingCounterMapTest.java | 79 --------------------
.../PartitionProcessingCounterTest.java | 54 --------------
.../checkpoint/CheckpointProgressImplTest.java | 48 ------------
.../persistence/checkpoint/CheckpointerTest.java | 44 -----------
.../storage/AbstractMvTableStorageTest.java | 2 +-
.../pagememory/AbstractPageMemoryTableStorage.java | 1 +
.../pagememory/PersistentPageMemoryDataRegion.java | 2 +-
.../PersistentPageMemoryTableStorage.java | 49 +++++++++++--
...PersistentPageMemoryMvPartitionStorageTest.java | 61 ++++++++++++++++
19 files changed, 148 insertions(+), 668 deletions(-)
diff --git
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceTest.java
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceTest.java
index 4f45d0ace58..cb740c51906 100644
---
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceTest.java
+++
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceTest.java
@@ -61,6 +61,8 @@ import org.junit.jupiter.api.Test;
* Test suite for the rebalance.
*/
public class ItRebalanceTest extends ClusterPerTestIntegrationTest {
+ public static final String ZONE_NAME = "TEST_ZONE";
+ public static final String TABLE_NAME = "TEST_TABLE";
private final HybridClock clock = new HybridClockImpl();
@Override
@@ -86,11 +88,11 @@ public class ItRebalanceTest extends
ClusterPerTestIntegrationTest {
*/
@Test
void assignmentsChangingOnNodeLeaveNodeJoin() throws Exception {
- createZone("TEST_ZONE", 1, 3);
+ createZone(ZONE_NAME, 1, 3);
// Creates table with 1 partition and 3 replicas.
- createTestTable("TEST_TABLE", "TEST_ZONE");
+ createTestTable(TABLE_NAME, ZONE_NAME);
- TableViewInternal table =
unwrapTableViewInternal(cluster.node(0).tables().table("TEST_TABLE"));
+ TableViewInternal table =
unwrapTableViewInternal(cluster.node(0).tables().table(TABLE_NAME));
waitForStableAssignmentsInMetastore(Set.of(
nodeName(0),
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PartitionProcessingCounter.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PartitionProcessingCounter.java
deleted file mode 100644
index 00dac86fbe2..00000000000
---
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PartitionProcessingCounter.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.pagememory.persistence;
-
-import java.lang.invoke.MethodHandles;
-import java.lang.invoke.VarHandle;
-import java.util.concurrent.CompletableFuture;
-
-/**
- * Helper class for tracking the completion of partition processing.
- *
- * <p>At the start of partition processing, you need to call {@link
#incrementPartitionProcessingCounter()}, at the end
- * {@link #decrementPartitionProcessingCounter()}. When all partition
processing is completed, the {@link #future()} will be completed.
- *
- * <p>If the {@link #future()} is completed, then subsequent calls to {@link
#incrementPartitionProcessingCounter()} and
- * {@link #decrementPartitionProcessingCounter()} on this instance do not make
sense, since we will not be able to wait for the completion
- * of the partition processing (we will not be able to reset the current
future), in order not to get into such a situation, we need to
- * delete the current instance at the completion of the current future, and
this may require external synchronization.
- */
-class PartitionProcessingCounter {
- private static final VarHandle COUNTER;
-
- static {
- try {
- COUNTER =
MethodHandles.lookup().findVarHandle(PartitionProcessingCounter.class,
"counter", int.class);
- } catch (ReflectiveOperationException e) {
- throw new ExceptionInInitializerError(e);
- }
- }
-
- /** Partition processing counter must be greater than or equal to zero. */
- @SuppressWarnings("unused")
- private volatile int counter;
-
- /** Future that will be completed when the {@link #counter} is zero. */
- private final CompletableFuture<Void> future = new CompletableFuture<>();
-
- /**
- * Atomically increments the partition processing counter.
- */
- void incrementPartitionProcessingCounter() {
- assert !future.isDone();
-
- int updatedValue = (int) COUNTER.getAndAdd(this, 1) + 1;
-
- assert updatedValue > 0 : updatedValue;
- }
-
- /**
- * Atomically decrements the partition processing counter.
- */
- void decrementPartitionProcessingCounter() {
- assert !future.isDone();
-
- int updatedValue = (int) COUNTER.getAndAdd(this, -1) - 1;
-
- assert updatedValue >= 0 : updatedValue;
-
- if (updatedValue == 0) {
- future.complete(null);
- }
- }
-
- /**
- * Returns a future that will be completed when all partition processing
has finished.
- */
- CompletableFuture<Void> future() {
- return future;
- }
-}
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PartitionProcessingCounterMap.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PartitionProcessingCounterMap.java
deleted file mode 100644
index d0369054794..00000000000
---
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PartitionProcessingCounterMap.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.pagememory.persistence;
-
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Helper class for thread-safe work with {@link PartitionProcessingCounter}
for any partition of any group.
- */
-public class PartitionProcessingCounterMap {
- private final ConcurrentMap<GroupPartitionId, PartitionProcessingCounter>
processedPartitions = new ConcurrentHashMap<>();
-
- /**
- * Atomically increments the partition processing counter.
- *
- * @param groupPartitionId Pair of group ID with partition ID.
- */
- public void incrementPartitionProcessingCounter(GroupPartitionId
groupPartitionId) {
- processedPartitions.compute(groupPartitionId, (id,
partitionProcessingCounter) -> {
- if (partitionProcessingCounter == null) {
- PartitionProcessingCounter counter = new
PartitionProcessingCounter();
-
- counter.incrementPartitionProcessingCounter();
-
- return counter;
- }
-
- partitionProcessingCounter.incrementPartitionProcessingCounter();
-
- return partitionProcessingCounter;
- });
- }
-
- /**
- * Atomically decrements the partition processing counter.
- *
- * @param groupPartitionId Pair of group ID with partition ID.
- */
- public void decrementPartitionProcessingCounter(GroupPartitionId
groupPartitionId) {
- processedPartitions.compute(groupPartitionId, (id,
partitionProcessingCounter) -> {
- assert partitionProcessingCounter != null : id;
- assert !partitionProcessingCounter.future().isDone() : id;
-
- partitionProcessingCounter.decrementPartitionProcessingCounter();
-
- return partitionProcessingCounter.future().isDone() ? null :
partitionProcessingCounter;
- });
- }
-
- /**
- * Returns the future if the partition according to the given parameters
is currently being processed, for example, dirty pages are
- * being written or fsync is being done, {@code null} if the partition is
not currently being processed.
- *
- * <p>Future will be added on {@link
#incrementPartitionProcessingCounter(GroupPartitionId)} call and completed on
- * {@link #incrementPartitionProcessingCounter(GroupPartitionId)} call
(equal to the number of
- * {@link #decrementPartitionProcessingCounter(GroupPartitionId)} calls).
- *
- * @param groupPartitionId Pair of group ID with partition ID.
- */
- @Nullable
- public CompletableFuture<Void>
getProcessedPartitionFuture(GroupPartitionId groupPartitionId) {
- PartitionProcessingCounter partitionProcessingCounter =
processedPartitions.get(groupPartitionId);
-
- return partitionProcessingCounter == null ? null :
partitionProcessingCounter.future();
- }
-}
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java
index 102cde1dccf..824f6fc303c 100644
---
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java
@@ -365,19 +365,4 @@ public class CheckpointManager {
public void triggerCompaction() {
compactor.triggerCompaction();
}
-
- /**
- * Callback on destruction of the partition of the corresponding group.
- *
- * <p>Prepares the checkpointer and compactor for partition destruction.
- *
- * @param groupPartitionId Pair of group ID with partition ID.
- * @return Future that will complete when the callback completes.
- */
- public CompletableFuture<Void> onPartitionDestruction(GroupPartitionId
groupPartitionId) {
- return CompletableFuture.allOf(
- checkpointer.prepareToDestroyPartition(groupPartitionId),
- compactor.prepareToDestroyPartition(groupPartitionId)
- );
- }
}
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPages.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPages.java
index f6897a03b82..7e6c3f0ff69 100644
---
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPages.java
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPages.java
@@ -26,11 +26,8 @@ import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
import org.apache.ignite.internal.pagememory.FullPageId;
-import org.apache.ignite.internal.pagememory.persistence.GroupPartitionId;
import org.apache.ignite.internal.pagememory.persistence.PageStoreWriter;
import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
-import org.apache.ignite.internal.pagememory.persistence.store.FilePageStore;
-import
org.apache.ignite.internal.pagememory.persistence.store.FilePageStoreManager;
import org.jetbrains.annotations.Nullable;
/**
@@ -150,51 +147,4 @@ public class CheckpointPages {
public void unblockFsyncOnPageReplacement(FullPageId pageId, @Nullable
Throwable error) {
checkpointProgress.unblockFsyncOnPageReplacement(pageId, error);
}
-
- /**
- * Blocks physical destruction of partition.
- *
- * <p>When the intention to destroy partition appears, {@link
FilePageStore#isMarkedToDestroy()} is set to {@code == true} and
- * {@link PersistentPageMemory#invalidate(int, int)} invoked at the
beginning. And if there is a block, it waits for unblocking.
- * Then it destroys the partition, {@link
FilePageStoreManager#getStore(GroupPartitionId)} will return {@code null}.</p>
- *
- * <p>It is recommended to use where physical destruction of the partition
may have an impact, for example when writing dirty pages and
- * executing a fsync.</p>
- *
- * <p>To make sure that we can physically do something with the partition
during a block, we will need to use approximately the
- * following code:</p>
- * <pre><code>
- * checkpointProgress.blockPartitionDestruction(partitionId);
- *
- * try {
- * FilePageStore pageStore =
FilePageStoreManager#getStore(partitionId);
- *
- * if (pageStore == null || pageStore.isMarkedToDestroy()) {
- * return;
- * }
- *
- * someAction(pageStore);
- * } finally {
- * checkpointProgress.unblockPartitionDestruction(partitionId);
- * }
- * </code></pre>
- *
- * @param groupPartitionId Pair of group ID with partition ID.
- * @see #unblockPartitionDestruction(GroupPartitionId)
- */
- public void blockPartitionDestruction(GroupPartitionId groupPartitionId) {
- checkpointProgress.blockPartitionDestruction(groupPartitionId);
- }
-
- /**
- * Unblocks physical destruction of partition.
- *
- * <p>As soon as the last thread makes an unlock, the physical destruction
of the partition can immediately begin.</p>
- *
- * @param groupPartitionId Pair of group ID with partition ID.
- * @see #blockPartitionDestruction(GroupPartitionId)
- */
- public void unblockPartitionDestruction(GroupPartitionId groupPartitionId)
{
- checkpointProgress.unblockPartitionDestruction(groupPartitionId);
- }
}
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriter.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriter.java
index c010d1f1580..b2965b99e37 100644
---
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriter.java
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriter.java
@@ -192,27 +192,21 @@ public class CheckpointPagesWriter implements Runnable {
) throws IgniteInternalCheckedException {
CheckpointDirtyPagesView checkpointDirtyPagesView =
checkpointDirtyPagesView(pageMemory, partitionId);
- checkpointProgress.blockPartitionDestruction(partitionId);
-
- try {
- if (shouldWriteMetaPage(partitionId)) {
- writePartitionMeta(pageMemory, partitionId,
tmpWriteBuf.rewind());
- }
-
- for (int i = 0; i < checkpointDirtyPagesView.size() &&
!shutdownNow.getAsBoolean(); i++) {
- updateHeartbeat.run();
+ if (shouldWriteMetaPage(partitionId)) {
+ writePartitionMeta(pageMemory, partitionId, tmpWriteBuf.rewind());
+ }
- FullPageId pageId = checkpointDirtyPagesView.get(i);
+ for (int i = 0; i < checkpointDirtyPagesView.size() &&
!shutdownNow.getAsBoolean(); i++) {
+ updateHeartbeat.run();
- if (pageId.pageIdx() == 0) {
- // Skip meta-pages, they are written by
"writePartitionMeta".
- continue;
- }
+ FullPageId pageId = checkpointDirtyPagesView.get(i);
- writeDirtyPage(pageMemory, pageId, tmpWriteBuf,
pageStoreWriter);
+ if (pageId.pageIdx() == 0) {
+ // Skip meta-pages, they are written by "writePartitionMeta".
+ continue;
}
- } finally {
- checkpointProgress.unblockPartitionDestruction(partitionId);
+
+ writeDirtyPage(pageMemory, pageId, tmpWriteBuf, pageStoreWriter);
}
}
@@ -248,30 +242,18 @@ public class CheckpointPagesWriter implements Runnable {
GroupPartitionId partitionId = null;
- try {
- for (FullPageId pageId : entry.getValue()) {
- if (shutdownNow.getAsBoolean()) {
- return Map.of();
- }
-
- updateHeartbeat.run();
-
- if (partitionIdChanged(partitionId, pageId)) {
- if (partitionId != null) {
-
checkpointProgress.unblockPartitionDestruction(partitionId);
- }
-
- partitionId = GroupPartitionId.convert(pageId);
+ for (FullPageId pageId : entry.getValue()) {
+ if (shutdownNow.getAsBoolean()) {
+ return Map.of();
+ }
-
checkpointProgress.blockPartitionDestruction(partitionId);
- }
+ updateHeartbeat.run();
- writeDirtyPage(pageMemory, pageId, tmpWriteBuf,
pageStoreWriter);
- }
- } finally {
- if (partitionId != null) {
-
checkpointProgress.unblockPartitionDestruction(partitionId);
+ if (partitionIdChanged(partitionId, pageId)) {
+ partitionId = GroupPartitionId.convert(pageId);
}
+
+ writeDirtyPage(pageMemory, pageId, tmpWriteBuf,
pageStoreWriter);
}
}
@@ -314,17 +296,11 @@ public class CheckpointPagesWriter implements Runnable {
GroupPartitionId partitionId =
GroupPartitionId.convert(cpPageId);
- checkpointProgress.blockPartitionDestruction(partitionId);
-
- try {
- if (shouldWriteMetaPage(partitionId)) {
- writePartitionMeta(pageMemory, partitionId,
tmpWriteBuf.rewind());
- }
-
- pageMemory.checkpointWritePage(cpPageId,
tmpWriteBuf.rewind(), pageStoreWriter, tracker);
- } finally {
-
checkpointProgress.unblockPartitionDestruction(partitionId);
+ if (shouldWriteMetaPage(partitionId)) {
+ writePartitionMeta(pageMemory, partitionId,
tmpWriteBuf.rewind());
}
+
+ pageMemory.checkpointWritePage(cpPageId,
tmpWriteBuf.rewind(), pageStoreWriter, tracker);
}
}
}
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointProgressImpl.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointProgressImpl.java
index 9d1ece6e634..5188faf14e9 100644
---
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointProgressImpl.java
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointProgressImpl.java
@@ -29,11 +29,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.internal.pagememory.FullPageId;
-import org.apache.ignite.internal.pagememory.persistence.GroupPartitionId;
-import
org.apache.ignite.internal.pagememory.persistence.PartitionProcessingCounterMap;
-import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
-import org.apache.ignite.internal.pagememory.persistence.store.FilePageStore;
-import
org.apache.ignite.internal.pagememory.persistence.store.FilePageStoreManager;
import org.jetbrains.annotations.Nullable;
/**
@@ -75,9 +70,6 @@ class CheckpointProgressImpl implements CheckpointProgress {
/** Sorted dirty pages to be written on the checkpoint. */
private volatile @Nullable CheckpointDirtyPages pageToWrite;
- /** Partitions currently being processed, for example, writing dirty pages
or doing fsync. */
- private final PartitionProcessingCounterMap processedPartitionMap = new
PartitionProcessingCounterMap();
-
/** Assistant for synchronizing page replacement and fsync phase. */
private final CheckpointPageReplacement checkpointPageReplacement = new
CheckpointPageReplacement();
@@ -305,69 +297,6 @@ class CheckpointProgressImpl implements CheckpointProgress
{
this.pageToWrite = pageToWrite;
}
- /**
- * Blocks physical destruction of partition.
- *
- * <p>When the intention to destroy partition appears, {@link
FilePageStore#isMarkedToDestroy()} is set to {@code == true} and
- * {@link PersistentPageMemory#invalidate(int, int)} invoked at the
beginning. And if there is a block, it waits for unblocking.
- * Then it destroys the partition, {@link
FilePageStoreManager#getStore(GroupPartitionId)} will return {@code null}.</p>
- *
- * <p>It is recommended to use where physical destruction of the partition
may have an impact, for example when writing dirty pages and
- * executing a fsync.</p>
- *
- * <p>To make sure that we can physically do something with the partition
during a block, we will need to use approximately the
- * following code:</p>
- * <pre><code>
- * checkpointProgress.blockPartitionDestruction(partitionId);
- *
- * try {
- * FilePageStore pageStore =
FilePageStoreManager#getStore(partitionId);
- *
- * if (pageStore == null || pageStore.isMarkedToDestroy()) {
- * return;
- * }
- *
- * someAction(pageStore);
- * } finally {
- * checkpointProgress.unblockPartitionDestruction(partitionId);
- * }
- * </code></pre>
- *
- * @param groupPartitionId Pair of group ID with partition ID.
- * @see #unblockPartitionDestruction(GroupPartitionId)
- * @see #getUnblockPartitionDestructionFuture(GroupPartitionId)
- */
- public void blockPartitionDestruction(GroupPartitionId groupPartitionId) {
-
processedPartitionMap.incrementPartitionProcessingCounter(groupPartitionId);
- }
-
- /**
- * Unblocks physical destruction of partition.
- *
- * <p>As soon as the last thread makes an unlock, the physical destruction
of the partition can immediately begin.</p>
- *
- * @param groupPartitionId Pair of group ID with partition ID.
- * @see #blockPartitionDestruction(GroupPartitionId)
- * @see #getUnblockPartitionDestructionFuture(GroupPartitionId)
- */
- public void unblockPartitionDestruction(GroupPartitionId groupPartitionId)
{
-
processedPartitionMap.decrementPartitionProcessingCounter(groupPartitionId);
- }
-
- /**
- * Returns the future if the partition according to the given parameters
is currently being blocked, for example, dirty pages are
- * being written or fsync is being done, {@code null} if the partition is
not currently being blocked.
- *
- * <p>Future will be added on {@link
#blockPartitionDestruction(GroupPartitionId)} call and completed on
- * {@link #unblockPartitionDestruction(GroupPartitionId)} call (equal to
the number of
- * {@link #unblockPartitionDestruction(GroupPartitionId)} calls).
- *
- * @param groupPartitionId Pair of group ID with partition ID.
- */
- public @Nullable CompletableFuture<Void>
getUnblockPartitionDestructionFuture(GroupPartitionId groupPartitionId) {
- return
processedPartitionMap.getProcessedPartitionFuture(groupPartitionId);
- }
-
/**
* Block the start of the fsync phase at a checkpoint before replacing the
page.
*
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
index 4eb09698e4c..e4097c84fa1 100644
---
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
@@ -28,7 +28,6 @@ import static
org.apache.ignite.internal.failure.FailureType.SYSTEM_WORKER_TERMI
import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointReadWriteLock.CHECKPOINT_RUNNER_THREAD_PREFIX;
import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.LOCK_TAKEN;
import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.PAGES_SNAPSHOT_TAKEN;
-import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static
org.apache.ignite.internal.util.FastTimestamps.coarseCurrentTimeMillis;
import static org.apache.ignite.internal.util.IgniteUtils.safeAbs;
import static
org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
@@ -523,7 +522,7 @@ public class Checkpointer extends IgniteWorker {
tracker.onFsyncStart();
- syncUpdatedPageStores(updatedPartitions, currentCheckpointProgress);
+ syncUpdatedPageStores(updatedPartitions);
tracker.onFsyncEnd();
@@ -538,10 +537,7 @@ public class Checkpointer extends IgniteWorker {
return true;
}
- private void syncUpdatedPageStores(
- ConcurrentMap<GroupPartitionId, LongAdder> updatedPartitions,
- CheckpointProgressImpl currentCheckpointProgress
- ) throws IgniteInternalCheckedException {
+ private void syncUpdatedPageStores(ConcurrentMap<GroupPartitionId,
LongAdder> updatedPartitions) throws IgniteInternalCheckedException {
ThreadPoolExecutor pageWritePool = checkpointWritePagesPool;
if (pageWritePool == null) {
@@ -550,7 +546,7 @@ public class Checkpointer extends IgniteWorker {
return;
}
- fsyncDeltaFile(currentCheckpointProgress, entry.getKey(),
entry.getValue());
+ fsyncDeltaFile(entry.getKey(), entry.getValue());
}
} else {
int checkpointThreads = pageWritePool.getMaximumPoolSize();
@@ -575,7 +571,7 @@ public class Checkpointer extends IgniteWorker {
break;
}
- fsyncDeltaFile(currentCheckpointProgress,
entry.getKey(), entry.getValue());
+ fsyncDeltaFile(entry.getKey(), entry.getValue());
entry = queue.poll();
}
@@ -598,7 +594,6 @@ public class Checkpointer extends IgniteWorker {
}
private void fsyncDeltaFile(
- CheckpointProgressImpl currentCheckpointProgress,
GroupPartitionId partitionId,
LongAdder pagesWritten
) throws IgniteInternalCheckedException {
@@ -608,15 +603,9 @@ public class Checkpointer extends IgniteWorker {
return;
}
- currentCheckpointProgress.blockPartitionDestruction(partitionId);
-
- try {
- fsyncDeltaFilePageStoreOnCheckpointThread(filePageStore,
pagesWritten);
+ fsyncDeltaFilePageStoreOnCheckpointThread(filePageStore, pagesWritten);
- renameDeltaFileOnCheckpointThread(filePageStore, partitionId);
- } finally {
- currentCheckpointProgress.unblockPartitionDestruction(partitionId);
- }
+ renameDeltaFileOnCheckpointThread(filePageStore, partitionId);
}
/**
@@ -905,30 +894,6 @@ public class Checkpointer extends IgniteWorker {
afterReleaseWriteLockCheckpointProgress = currentCheckpointProgress;
}
- /**
- * Prepares the checkpointer to destroy a partition.
- *
- * <p>If the checkpoint is in progress, then wait until it finishes
processing the partition that we are going to destroy, in order to
- * prevent the situation when we want to destroy the partition file along
with its delta files, and at this time the checkpoint performs
- * I/O operations on them.
- *
- * @param groupPartitionId Pair of group ID with partition ID.
- * @return Future that will end when the checkpoint is ready to destroy
the partition.
- */
- CompletableFuture<Void> prepareToDestroyPartition(GroupPartitionId
groupPartitionId) {
- CheckpointProgressImpl currentCheckpointProgress =
this.currentCheckpointProgress;
-
- // If the checkpoint starts after this line, then the data region will
already know that we want to destroy the partition, and when
- // reading the page for writing to the delta file, we will receive an
"outdated" page that we will not write to disk.
- if (currentCheckpointProgress == null ||
!currentCheckpointProgress.inProgress()) {
- return nullCompletedFuture();
- }
-
- CompletableFuture<Void> processedPartitionFuture =
currentCheckpointProgress.getUnblockPartitionDestructionFuture(groupPartitionId);
-
- return processedPartitionFuture == null ? nullCompletedFuture() :
processedPartitionFuture;
- }
-
private void replicatorLogSync(CheckpointMetricsTracker tracker) throws
IgniteInternalCheckedException {
try {
tracker.onReplicatorLogSyncStart();
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java
index 007ddd17e72..33cf21e92ae 100644
---
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java
@@ -21,7 +21,6 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static java.util.stream.Collectors.toCollection;
import static
org.apache.ignite.internal.failure.FailureType.SYSTEM_WORKER_TERMINATION;
-import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
@@ -40,8 +39,6 @@ import
org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.pagememory.io.PageIo;
-import org.apache.ignite.internal.pagememory.persistence.GroupPartitionId;
-import
org.apache.ignite.internal.pagememory.persistence.PartitionProcessingCounterMap;
import org.apache.ignite.internal.pagememory.persistence.WriteSpeedFormatter;
import
org.apache.ignite.internal.pagememory.persistence.store.DeltaFilePageStoreIo;
import org.apache.ignite.internal.pagememory.persistence.store.FilePageStore;
@@ -84,9 +81,6 @@ public class Compactor extends IgniteWorker {
/** Thread local with buffers for the compaction threads. */
private static final ThreadLocal<ByteBuffer> THREAD_BUF = new
ThreadLocal<>();
- /** Partitions for which delta files are currently compacted. */
- private final PartitionProcessingCounterMap
partitionCompactionInProgressMap = new PartitionProcessingCounterMap();
-
/** Page size in bytes. */
private final int pageSize;
@@ -250,19 +244,11 @@ public class Compactor extends IgniteWorker {
break;
}
- GroupPartitionId groupPartitionId =
toMerge.groupPartitionFilePageStore.groupPartitionId();
-
-
partitionCompactionInProgressMap.incrementPartitionProcessingCounter(groupPartitionId);
-
- try {
- mergeDeltaFileToMainFile(
-
toMerge.groupPartitionFilePageStore.pageStore(),
- toMerge.deltaFilePageStoreIo,
- tracker
- );
- } finally {
-
partitionCompactionInProgressMap.decrementPartitionProcessingCounter(groupPartitionId);
- }
+ mergeDeltaFileToMainFile(
+
toMerge.groupPartitionFilePageStore.pageStore(),
+ toMerge.deltaFilePageStoreIo,
+ tracker
+ );
}
} catch (Throwable ex) {
future.completeExceptionally(ex);
@@ -453,21 +439,6 @@ public class Compactor extends IgniteWorker {
assert removed : filePageStore.filePath();
}
- /**
- * Prepares the compactor to destroy a partition.
- *
- * <p>If the partition compaction is in progress, then we will wait until
it is completed so that there are no errors when we want to
- * destroy the partition file and its delta file, and at this time its
compaction occurs.
- *
- * @param groupPartitionId Pair of group ID with partition ID.
- * @return Future at the complete of which we can delete the partition
file and its delta files.
- */
- public CompletableFuture<Void> prepareToDestroyPartition(GroupPartitionId
groupPartitionId) {
- CompletableFuture<Void> partitionProcessingFuture =
partitionCompactionInProgressMap.getProcessedPartitionFuture(groupPartitionId);
-
- return partitionProcessingFuture == null ? nullCompletedFuture() :
partitionProcessingFuture;
- }
-
private static ByteBuffer getThreadLocalBuffer(int pageSize) {
ByteBuffer buffer = THREAD_BUF.get();
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/DelayedDirtyPageWrite.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/DelayedDirtyPageWrite.java
index 84536da0106..01c04205b50 100644
---
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/DelayedDirtyPageWrite.java
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/DelayedDirtyPageWrite.java
@@ -23,7 +23,6 @@ import static
org.apache.ignite.internal.util.GridUnsafe.copyMemory;
import java.nio.ByteBuffer;
import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
import org.apache.ignite.internal.pagememory.FullPageId;
-import org.apache.ignite.internal.pagememory.persistence.GroupPartitionId;
import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
import org.apache.ignite.internal.pagememory.persistence.WriteDirtyPage;
import
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointPages;
@@ -134,8 +133,6 @@ public class DelayedDirtyPageWrite {
Throwable errorOnWrite = null;
-
checkpointPages.blockPartitionDestruction(GroupPartitionId.convert(fullPageId));
-
try {
flushDirtyPage.write(pageMemory, fullPageId,
byteBufThreadLoc.get());
} catch (Throwable t) {
@@ -143,8 +140,6 @@ public class DelayedDirtyPageWrite {
throw t;
} finally {
-
checkpointPages.unblockPartitionDestruction(GroupPartitionId.convert(fullPageId));
-
checkpointPages.unblockFsyncOnPageReplacement(fullPageId,
errorOnWrite);
tracker.unlock(fullPageId);
diff --git
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/PartitionProcessingCounterMapTest.java
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/PartitionProcessingCounterMapTest.java
deleted file mode 100644
index af5bf41158a..00000000000
---
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/PartitionProcessingCounterMapTest.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.pagememory.persistence;
-
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertNotSame;
-import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertSame;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-import java.util.concurrent.CompletableFuture;
-import org.junit.jupiter.api.Test;
-
-/**
- * For {@link PartitionProcessingCounterMap} testing.
- */
-public class PartitionProcessingCounterMapTest {
- @Test
- void test() {
- PartitionProcessingCounterMap processingCounterMap = new
PartitionProcessingCounterMap();
-
- GroupPartitionId groupPartitionId = new GroupPartitionId(0, 0);
-
-
assertNull(processingCounterMap.getProcessedPartitionFuture(groupPartitionId));
-
-
processingCounterMap.incrementPartitionProcessingCounter(groupPartitionId);
-
- CompletableFuture<Void> processedPartitionFuture0 =
processingCounterMap.getProcessedPartitionFuture(groupPartitionId);
-
- assertNotNull(processedPartitionFuture0);
- assertFalse(processedPartitionFuture0.isDone());
-
-
processingCounterMap.incrementPartitionProcessingCounter(groupPartitionId);
-
- assertSame(processedPartitionFuture0,
processingCounterMap.getProcessedPartitionFuture(groupPartitionId));
- assertFalse(processedPartitionFuture0.isDone());
-
-
processingCounterMap.decrementPartitionProcessingCounter(groupPartitionId);
-
- assertSame(processedPartitionFuture0,
processingCounterMap.getProcessedPartitionFuture(groupPartitionId));
- assertFalse(processedPartitionFuture0.isDone());
-
-
processingCounterMap.decrementPartitionProcessingCounter(groupPartitionId);
-
-
assertNull(processingCounterMap.getProcessedPartitionFuture(groupPartitionId));
- assertTrue(processedPartitionFuture0.isDone());
-
- // Let's check the reprocessing of the partition.
-
-
processingCounterMap.incrementPartitionProcessingCounter(groupPartitionId);
-
- CompletableFuture<Void> processedPartitionFuture1 =
processingCounterMap.getProcessedPartitionFuture(groupPartitionId);
-
- assertNotNull(processedPartitionFuture1);
- assertFalse(processedPartitionFuture1.isDone());
- assertNotSame(processedPartitionFuture0, processedPartitionFuture1);
-
-
processingCounterMap.decrementPartitionProcessingCounter(groupPartitionId);
-
-
assertNull(processingCounterMap.getProcessedPartitionFuture(groupPartitionId));
- assertTrue(processedPartitionFuture1.isDone());
- }
-}
diff --git
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/PartitionProcessingCounterTest.java
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/PartitionProcessingCounterTest.java
deleted file mode 100644
index 08d921b103c..00000000000
---
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/PartitionProcessingCounterTest.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.pagememory.persistence;
-
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-import java.util.concurrent.CompletableFuture;
-import org.junit.jupiter.api.Test;
-
-/**
- * For {@link PartitionProcessingCounter} testing.
- */
-public class PartitionProcessingCounterTest {
- @Test
- void test() {
- PartitionProcessingCounter counter = new PartitionProcessingCounter();
-
- CompletableFuture<Void> future = counter.future();
-
- assertFalse(future.isDone());
-
- counter.incrementPartitionProcessingCounter();
-
- assertFalse(future.isDone());
-
- counter.incrementPartitionProcessingCounter();
-
- assertFalse(future.isDone());
-
- counter.decrementPartitionProcessingCounter();
-
- assertFalse(future.isDone());
-
- counter.decrementPartitionProcessingCounter();
-
- assertTrue(future.isDone());
- }
-}
diff --git
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointProgressImplTest.java
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointProgressImplTest.java
index c1c96cd8d3c..8454c71e511 100644
---
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointProgressImplTest.java
+++
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointProgressImplTest.java
@@ -30,7 +30,6 @@ import static
org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertNotSame;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -39,7 +38,6 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
-import org.apache.ignite.internal.pagememory.persistence.GroupPartitionId;
import org.junit.jupiter.api.Test;
/**
@@ -367,50 +365,4 @@ public class CheckpointProgressImplTest {
assertNull(progressImpl.pagesToWrite());
}
-
- @Test
- void testProcessedPartition() {
- CheckpointProgressImpl progressImpl = new CheckpointProgressImpl(0);
-
- GroupPartitionId groupPartitionId = new GroupPartitionId(0, 0);
-
-
assertNull(progressImpl.getUnblockPartitionDestructionFuture(groupPartitionId));
-
- progressImpl.blockPartitionDestruction(groupPartitionId);
-
- CompletableFuture<Void> processedPartitionFuture0 =
progressImpl.getUnblockPartitionDestructionFuture(groupPartitionId);
-
- assertNotNull(processedPartitionFuture0);
- assertFalse(processedPartitionFuture0.isDone());
-
- progressImpl.blockPartitionDestruction(groupPartitionId);
-
- assertSame(processedPartitionFuture0,
progressImpl.getUnblockPartitionDestructionFuture(groupPartitionId));
- assertFalse(processedPartitionFuture0.isDone());
-
- progressImpl.unblockPartitionDestruction(groupPartitionId);
-
- assertSame(processedPartitionFuture0,
progressImpl.getUnblockPartitionDestructionFuture(groupPartitionId));
- assertFalse(processedPartitionFuture0.isDone());
-
- progressImpl.unblockPartitionDestruction(groupPartitionId);
-
-
assertNull(progressImpl.getUnblockPartitionDestructionFuture(groupPartitionId));
- assertTrue(processedPartitionFuture0.isDone());
-
- // Let's check the reprocessing of the partition.
-
- progressImpl.blockPartitionDestruction(groupPartitionId);
-
- CompletableFuture<Void> processedPartitionFuture1 =
progressImpl.getUnblockPartitionDestructionFuture(groupPartitionId);
-
- assertNotNull(processedPartitionFuture1);
- assertFalse(processedPartitionFuture1.isDone());
- assertNotSame(processedPartitionFuture0, processedPartitionFuture1);
-
- progressImpl.unblockPartitionDestruction(groupPartitionId);
-
-
assertNull(progressImpl.getUnblockPartitionDestructionFuture(groupPartitionId));
- assertTrue(processedPartitionFuture1.isDone());
- }
}
diff --git
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointerTest.java
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointerTest.java
index 6a1cbca84ce..010b43b6ac4 100644
---
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointerTest.java
+++
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointerTest.java
@@ -20,11 +20,9 @@ package
org.apache.ignite.internal.pagememory.persistence.checkpoint;
import static java.lang.System.nanoTime;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static java.util.concurrent.TimeUnit.SECONDS;
import static
org.apache.ignite.internal.pagememory.persistence.FakePartitionMeta.FACTORY;
import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointDirtyPages.EMPTY;
import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.FINISHED;
-import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.LOCK_RELEASED;
import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.LOCK_TAKEN;
import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.TestCheckpointUtils.createDirtyPagesAndPartitions;
import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageId;
@@ -467,48 +465,6 @@ public class CheckpointerTest extends
BaseIgniteAbstractTest {
);
}
- @Test
- void testPrepareToDestroyPartition() throws Exception {
- Checkpointer checkpointer = new Checkpointer(
- "test",
- null,
- mock(FailureManager.class),
- mock(CheckpointWorkflow.class),
- mock(CheckpointPagesWriterFactory.class),
- mock(FilePageStoreManager.class),
- mock(Compactor.class),
- PAGE_SIZE,
- checkpointConfig,
- mock(LogSyncer.class)
- );
-
- GroupPartitionId groupPartitionId = new GroupPartitionId(0, 0);
-
- // Everything should be fine as there is no current running checkpoint.
- checkpointer.prepareToDestroyPartition(groupPartitionId).get(1,
SECONDS);
-
- CheckpointProgressImpl checkpointProgress = (CheckpointProgressImpl)
checkpointer.scheduledProgress();
-
- checkpointer.startCheckpointProgress();
-
- checkpointer.prepareToDestroyPartition(groupPartitionId).get(1,
SECONDS);
-
- checkpointProgress.transitTo(LOCK_RELEASED);
- assertTrue(checkpointProgress.inProgress());
-
- // Everything should be fine so on a "working" checkpoint we don't
process the partition anyhow.
- checkpointer.prepareToDestroyPartition(groupPartitionId).get(1,
SECONDS);
-
- // Let's emulate that we are processing a partition and check that
everything will be fine after processing is completed.
- checkpointProgress.blockPartitionDestruction(groupPartitionId);
-
- CompletableFuture<?> onPartitionDestructionFuture =
checkpointer.prepareToDestroyPartition(groupPartitionId);
-
- checkpointProgress.unblockPartitionDestruction(groupPartitionId);
-
- onPartitionDestructionFuture.get(1, SECONDS);
- }
-
private static CheckpointDirtyPages dirtyPages(PersistentPageMemory
pageMemory, FullPageId... pageIds) {
return new
CheckpointDirtyPages(List.of(createDirtyPagesAndPartitions(pageMemory,
pageIds)));
}
diff --git
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
index 960a81b9680..35673f92e0a 100644
---
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
+++
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
@@ -626,7 +626,7 @@ public abstract class AbstractMvTableStorageTest extends
BaseMvTableStorageTest
return null;
});
- tableStorage.destroyPartition(PARTITION_ID).get(1, SECONDS);
+ assertThat(tableStorage.destroyPartition(PARTITION_ID),
willCompleteSuccessfully());
MvPartitionStorage newMvPartitionStorage =
getOrCreateMvPartition(PARTITION_ID);
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java
index 1da4f4af9dd..697b312c6dc 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java
@@ -289,6 +289,7 @@ public abstract class AbstractPageMemoryTableStorage<T
extends AbstractPageMemor
}
}
+ // TODO IGNITE-25739 Optimise rebalance for persistent page storages to
avoid waiting for checkpoint after storage destruction.
@Override
public CompletableFuture<Void> startRebalancePartition(int partitionId) {
return busy(() -> mvPartitionStorages.startRebalance(partitionId,
mvPartitionStorage -> {
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryDataRegion.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryDataRegion.java
index e2c581d8d1f..b64289bdc74 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryDataRegion.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryDataRegion.java
@@ -64,7 +64,7 @@ import org.jetbrains.annotations.Nullable;
/**
* Implementation of {@link DataRegion} for persistent case.
*/
-class PersistentPageMemoryDataRegion implements
DataRegion<PersistentPageMemory> {
+public class PersistentPageMemoryDataRegion implements
DataRegion<PersistentPageMemory> {
/** Logger. */
private static final IgniteLogger LOG =
Loggers.forClass(PersistentPageMemoryDataRegion.class);
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java
index dd052d1bffc..907f5d2ee07 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java
@@ -33,7 +33,10 @@ import org.apache.ignite.internal.pagememory.PageMemory;
import org.apache.ignite.internal.pagememory.freelist.FreeListImpl;
import org.apache.ignite.internal.pagememory.persistence.GroupPartitionId;
import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
+import
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointListener;
+import
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointManager;
import
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointProgress;
+import
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState;
import
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointTimeoutLock;
import org.apache.ignite.internal.pagememory.persistence.store.FilePageStore;
import org.apache.ignite.internal.pagememory.reuse.ReuseList;
@@ -51,6 +54,10 @@ import org.jetbrains.annotations.Nullable;
* Implementation of {@link AbstractPageMemoryTableStorage} for persistent
case.
*/
public class PersistentPageMemoryTableStorage extends
AbstractPageMemoryTableStorage<PersistentPageMemoryMvPartitionStorage> {
+ // TODO IGNITE-25738 Check if 1 second is a good value.
+ /** After partition invalidation checkpoint will be scheduled using this
delay to allow batching. */
+ public static final int CHECKPOINT_ON_DESTRUCTION_DELAY_MILLIS = 1000;
+
/** Storage engine instance. */
private final PersistentPageMemoryStorageEngine engine;
@@ -353,13 +360,45 @@ public class PersistentPageMemoryTableStorage extends
AbstractPageMemoryTableSto
}
private CompletableFuture<Void>
destroyPartitionPhysically(GroupPartitionId groupPartitionId) {
-
dataRegion.filePageStoreManager().getStore(groupPartitionId).markToDestroy();
+ FilePageStore store =
dataRegion.filePageStoreManager().getStore(groupPartitionId);
+
+ assert store != null : groupPartitionId;
+
+ store.markToDestroy();
+
+ var prepareDestroyFuture = new CompletableFuture<>();
+
+ CheckpointManager checkpointManager = dataRegion.checkpointManager();
+
+ var listener = new CheckpointListener() {
+ @Override
+ public void afterCheckpointEnd(CheckpointProgress progress) {
+ checkpointManager.removeCheckpointListener(this);
+
+ try {
+
dataRegion.pageMemory().invalidate(groupPartitionId.getGroupId(),
groupPartitionId.getPartitionId());
+
+
dataRegion.partitionMetaManager().removeMeta(groupPartitionId);
+
+ prepareDestroyFuture.complete(null);
+ } catch (Exception e) {
+ prepareDestroyFuture.completeExceptionally(
+ new StorageException("Couldn't invalidate
partition for destruction: " + groupPartitionId, e)
+ );
+ }
+ }
+ };
+
+ checkpointManager.addCheckpointListener(listener, dataRegion);
- dataRegion.pageMemory().invalidate(groupPartitionId.getGroupId(),
groupPartitionId.getPartitionId());
+ CheckpointProgress checkpoint =
dataRegion.checkpointManager().scheduleCheckpoint(
+ CHECKPOINT_ON_DESTRUCTION_DELAY_MILLIS,
+ "Partition destruction"
+ );
- return
dataRegion.checkpointManager().onPartitionDestruction(groupPartitionId)
- .thenAccept(unused ->
dataRegion.partitionMetaManager().removeMeta(groupPartitionId))
- .thenCompose(unused ->
dataRegion.filePageStoreManager().destroyPartition(groupPartitionId));
+ return checkpoint.futureFor(CheckpointState.FINISHED)
+ .thenCompose(v -> prepareDestroyFuture)
+ .thenCompose(v ->
dataRegion.filePageStoreManager().destroyPartition(groupPartitionId));
}
private GroupPartitionId createGroupPartitionId(int partitionId) {
diff --git
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorageTest.java
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorageTest.java
index 5632d90f6c6..6fa1a3dc6e4 100644
---
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorageTest.java
+++
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorageTest.java
@@ -21,14 +21,19 @@ import static
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_
import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_PARTITION_COUNT;
import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.FINISHED;
import static org.apache.ignite.internal.schema.BinaryRowMatcher.isRow;
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import java.nio.file.Path;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import org.apache.ignite.internal.components.LogSyncer;
import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
@@ -36,12 +41,19 @@ import org.apache.ignite.internal.failure.FailureManager;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.metrics.MetricManager;
import org.apache.ignite.internal.pagememory.io.PageIoRegistry;
+import org.apache.ignite.internal.pagememory.persistence.GroupPartitionId;
+import
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointProgress;
+import org.apache.ignite.internal.pagememory.persistence.store.FilePageStore;
import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.storage.configurations.StorageConfiguration;
+import org.apache.ignite.internal.storage.engine.MvPartitionMeta;
import org.apache.ignite.internal.storage.engine.MvTableStorage;
import org.apache.ignite.internal.storage.engine.StorageTableDescriptor;
import org.apache.ignite.internal.storage.index.StorageIndexDescriptorSupplier;
+import org.apache.ignite.internal.storage.lease.LeaseInfo;
+import
org.apache.ignite.internal.storage.pagememory.PersistentPageMemoryDataRegion;
import
org.apache.ignite.internal.storage.pagememory.PersistentPageMemoryStorageEngine;
+import
org.apache.ignite.internal.storage.pagememory.PersistentPageMemoryTableStorage;
import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
import org.apache.ignite.internal.testframework.InjectExecutorService;
import org.apache.ignite.internal.testframework.WorkDirectory;
@@ -203,4 +215,53 @@ class PersistentPageMemoryMvPartitionStorageTest extends
AbstractPageMemoryMvPar
assertThat(readConfig, is(equalTo(configWhichFitsInOnePage)));
}
+
+ @Test
+ void testDeltaFileCompactionAfterClearPartition() throws
InterruptedException {
+ addWriteCommitted(new RowId(PARTITION_ID), binaryRow, clock.now());
+
+ assertThat(table.clearPartition(PARTITION_ID),
willCompleteSuccessfully());
+
+ waitForDeltaFileCompaction((PersistentPageMemoryTableStorage) table);
+ }
+
+ @Test
+ void testDeltaFileCompactionAfterPartitionRebalanced() throws
InterruptedException {
+ addWriteCommitted(new RowId(PARTITION_ID), binaryRow, clock.now());
+
+ var leaseInfo = new LeaseInfo(333, new UUID(1, 2), "primary");
+
+ var partitionMeta = new MvPartitionMeta(1, 2, BYTE_EMPTY_ARRAY,
leaseInfo, BYTE_EMPTY_ARRAY);
+
+ CompletableFuture<Void> rebalance =
table.startRebalancePartition(PARTITION_ID)
+ .thenCompose(v -> table.finishRebalancePartition(PARTITION_ID,
partitionMeta));
+
+ assertThat(rebalance, willCompleteSuccessfully());
+
+ waitForDeltaFileCompaction((PersistentPageMemoryTableStorage) table);
+ }
+
+ @Test
+ void testDeltaFileCompactionAfterPartitionRebalanceAborted() throws
InterruptedException {
+ addWriteCommitted(new RowId(PARTITION_ID), binaryRow, clock.now());
+
+ CompletableFuture<Void> abortRebalance =
table.startRebalancePartition(PARTITION_ID)
+ .thenCompose(v -> table.abortRebalancePartition(PARTITION_ID));
+
+ assertThat(abortRebalance, willCompleteSuccessfully());
+
+ waitForDeltaFileCompaction((PersistentPageMemoryTableStorage) table);
+ }
+
+ private void waitForDeltaFileCompaction(PersistentPageMemoryTableStorage
tableStorage) throws InterruptedException {
+ PersistentPageMemoryDataRegion dataRegion = tableStorage.dataRegion();
+
+ CheckpointProgress checkpointProgress =
engine.checkpointManager().forceCheckpoint("Test compaction");
+ assertThat(checkpointProgress.futureFor(FINISHED),
willCompleteSuccessfully());
+
+ FilePageStore fileStore =
dataRegion.filePageStoreManager().getStore(new
GroupPartitionId(tableStorage.getTableId(), PARTITION_ID));
+
+ assertTrue(waitForCondition(() -> fileStore.deltaFileCount() == 0,
1000));
+ }
+
}