This is an automated email from the ASF dual-hosted git repository.
tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new aed3e9efc8e IGNITE-26722 Slowdown compactor when running checkpoint
for aipersist (#6799)
aed3e9efc8e is described below
commit aed3e9efc8e8ff8e25089925ee2ab9df67bfb8f7
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Fri Nov 14 12:08:05 2025 +0300
IGNITE-26722 Slowdown compactor when running checkpoint for aipersist
(#6799)
---
.../persistence/checkpoint/Checkpointer.java | 4 +
.../persistence/compaction/Compactor.java | 66 ++++++++----
.../persistence/compaction/CompactorTest.java | 111 ++++++++++++++++++---
3 files changed, 145 insertions(+), 36 deletions(-)
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
index 65265f4222f..6f48c084e58 100644
---
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
@@ -347,6 +347,8 @@ public class Checkpointer extends IgniteWorker {
Checkpoint chp = null;
try {
+ compactor.pause();
+
var tracker = new CheckpointMetricsTracker();
tracker.onCheckpointStart();
@@ -455,6 +457,8 @@ public class Checkpointer extends IgniteWorker {
throw e;
} finally {
currentCheckpointProgressForThrottling = null;
+
+ compactor.resume();
}
}
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java
index 01f5a4cace5..a060f17e44b 100644
---
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java
@@ -92,6 +92,9 @@ public class Compactor extends IgniteWorker {
private final PartitionDestructionLockManager
partitionDestructionLockManager;
+ /** Guarded by {@link #mux}. */
+ private boolean paused;
+
/**
* Creates new ignite worker with given parameters.
*
@@ -164,7 +167,7 @@ public class Compactor extends IgniteWorker {
void waitDeltaFiles() {
try {
synchronized (mux) {
- while (!addedDeltaFiles && !isCancelled()) {
+ while ((!addedDeltaFiles || paused) && !isCancelled()) {
blockingSectionBegin();
try {
@@ -353,6 +356,8 @@ public class Compactor extends IgniteWorker {
}
synchronized (mux) {
+ paused = false;
+
// Do not interrupt runner thread.
isCancelled.set(true);
@@ -386,11 +391,7 @@ public class Compactor extends IgniteWorker {
for (long pageIndex : deltaFilePageStore.pageIndexes()) {
updateHeartbeat();
- if (isCancelled()) {
- return;
- }
-
- if (filePageStore.isMarkedToDestroy()) {
+ if (shouldStopCompaction(filePageStore)) {
return;
}
@@ -409,11 +410,7 @@ public class Compactor extends IgniteWorker {
updateHeartbeat();
- if (isCancelled()) {
- return;
- }
-
- if (filePageStore.isMarkedToDestroy()) {
+ if (shouldStopCompaction(filePageStore)) {
return;
}
@@ -425,11 +422,7 @@ public class Compactor extends IgniteWorker {
// Fsync the file page store.
updateHeartbeat();
- if (isCancelled()) {
- return;
- }
-
- if (filePageStore.isMarkedToDestroy()) {
+ if (shouldStopCompaction(filePageStore)) {
return;
}
@@ -438,11 +431,7 @@ public class Compactor extends IgniteWorker {
// Removing the delta file page store from a file page store.
updateHeartbeat();
- if (isCancelled()) {
- return;
- }
-
- if (filePageStore.isMarkedToDestroy()) {
+ if (shouldStopCompaction(filePageStore)) {
return;
}
@@ -485,4 +474,39 @@ public class Compactor extends IgniteWorker {
this.deltaFilePageStoreIo = deltaFilePageStoreIo;
}
}
+
+ /**
+ * Pauses the compactor until it is resumed or compactor is stopped. It is
expected that this method will not be called multiple times
+ * in parallel and subsequent calls will strictly be calls after {@link
#resume}.
+ */
+ public void pause() {
+ synchronized (mux) {
+ assert !paused : "It is expected that a further pause will only
occur after resume";
+
+ paused = true;
+ }
+ }
+
+ /** Resumes the compactor if it was paused. It is expected that this
method will not be called multiple times in parallel. */
+ public void resume() {
+ synchronized (mux) {
+ paused = false;
+
+ // Force compaction as we could stop somewhere in the middle and
we need to continue compaction.
+ addedDeltaFiles = true;
+
+ mux.notifyAll();
+ }
+ }
+
+ /** Must be called before each IO operation to pause the current
compaction and to provide IO resources to other components. */
+ private boolean isPaused() {
+ synchronized (mux) {
+ return paused;
+ }
+ }
+
+ private boolean shouldStopCompaction(FilePageStore filePageStore) {
+ return isCancelled() || filePageStore.isMarkedToDestroy() ||
isPaused();
+ }
}
diff --git
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/compaction/CompactorTest.java
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/compaction/CompactorTest.java
index 1af89b6e7c8..a234f47d9b8 100644
---
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/compaction/CompactorTest.java
+++
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/compaction/CompactorTest.java
@@ -20,7 +20,10 @@ package
org.apache.ignite.internal.pagememory.persistence.compaction;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willTimeoutFast;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.apache.ignite.internal.util.GridUnsafe.bufferAddress;
+import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertSame;
@@ -33,6 +36,7 @@ import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -91,21 +95,8 @@ public class CompactorTest extends BaseIgniteAbstractTest {
void testMergeDeltaFileToMainFile() throws Throwable {
Compactor compactor = newCompactor();
- FilePageStore filePageStore = mock(FilePageStore.class);
- DeltaFilePageStoreIo deltaFilePageStoreIo =
mock(DeltaFilePageStoreIo.class);
-
-
when(filePageStore.removeDeltaFile(eq(deltaFilePageStoreIo))).thenReturn(true);
-
- when(deltaFilePageStoreIo.pageIndexes()).thenReturn(new int[]{0});
-
-
when(deltaFilePageStoreIo.readWithMergedToFilePageStoreCheck(anyLong(),
anyLong(), any(ByteBuffer.class), anyBoolean()))
- .then(answer -> {
- ByteBuffer buffer = answer.getArgument(2);
-
- PageIo.setPageId(bufferAddress(buffer), 1);
-
- return true;
- });
+ DeltaFilePageStoreIo deltaFilePageStoreIo =
createDeltaFilePageStoreIo();
+ FilePageStore filePageStore =
createFilePageStore(deltaFilePageStoreIo);
compactor.mergeDeltaFileToMainFile(filePageStore,
deltaFilePageStoreIo, new CompactionMetricsTracker());
@@ -209,6 +200,71 @@ public class CompactorTest extends BaseIgniteAbstractTest {
waitDeltaFilesFuture.get(100, MILLISECONDS);
}
+ @Test
+ void testPauseResume() throws Exception {
+ Compactor compactor = spy(newCompactor());
+
+ compactor.pause();
+
+ DeltaFilePageStoreIo deltaFilePageStoreIo =
createDeltaFilePageStoreIo();
+ FilePageStore filePageStore =
createFilePageStore(deltaFilePageStoreIo);
+
+ assertThat(
+ runAsync(() ->
compactor.mergeDeltaFileToMainFile(filePageStore, deltaFilePageStoreIo, new
CompactionMetricsTracker())),
+ willCompleteSuccessfully()
+ );
+
+ verify(filePageStore, never()).write(anyLong(), any());
+ verify(filePageStore, never()).sync();
+ verify(filePageStore, never()).removeDeltaFile(any());
+
+ verify(deltaFilePageStoreIo,
never()).readWithMergedToFilePageStoreCheck(anyLong(), anyLong(), any(),
anyBoolean());
+ verify(deltaFilePageStoreIo, never()).markMergedToFilePageStore();
+ verify(deltaFilePageStoreIo, never()).stop(anyBoolean());
+
+ compactor.resume();
+
+ assertThat(
+ runAsync(() ->
compactor.mergeDeltaFileToMainFile(filePageStore, deltaFilePageStoreIo, new
CompactionMetricsTracker())),
+ willCompleteSuccessfully()
+ );
+
+ verify(filePageStore).write(anyLong(), any());
+ verify(filePageStore).sync();
+ verify(filePageStore).removeDeltaFile(any());
+
+
verify(deltaFilePageStoreIo).readWithMergedToFilePageStoreCheck(anyLong(),
anyLong(), any(), anyBoolean());
+ verify(deltaFilePageStoreIo).markMergedToFilePageStore();
+ verify(deltaFilePageStoreIo).stop(anyBoolean());
+ }
+
+ @Test
+ void testTriggerCompactionAfterPause() {
+ Compactor compactor = spy(newCompactor());
+
+ compactor.pause();
+
+ CompletableFuture<?> waitDeltaFilesFuture =
runAsync(compactor::waitDeltaFiles);
+ assertThat(waitDeltaFilesFuture, willTimeoutFast());
+
+ compactor.triggerCompaction();
+ assertThat(waitDeltaFilesFuture, willTimeoutFast());
+
+ compactor.resume();
+ assertThat(waitDeltaFilesFuture, willCompleteSuccessfully());
+ }
+
+ @Test
+ void testWaitDeltaFilesAfterResume() {
+ Compactor compactor = spy(newCompactor());
+
+ CompletableFuture<?> waitDeltaFilesFuture =
runAsync(compactor::waitDeltaFiles);
+ assertThat(waitDeltaFilesFuture, willTimeoutFast());
+
+ compactor.resume();
+ assertThat(waitDeltaFilesFuture, willCompleteSuccessfully());
+ }
+
private Compactor newCompactor() {
return newCompactor(mock(FilePageStoreManager.class));
}
@@ -224,4 +280,29 @@ public class CompactorTest extends BaseIgniteAbstractTest {
new PartitionDestructionLockManager()
);
}
+
+ private static DeltaFilePageStoreIo createDeltaFilePageStoreIo() throws
Exception {
+ DeltaFilePageStoreIo deltaFilePageStoreIo =
mock(DeltaFilePageStoreIo.class);
+
+ when(deltaFilePageStoreIo.pageIndexes()).thenReturn(new int[]{0});
+
+
when(deltaFilePageStoreIo.readWithMergedToFilePageStoreCheck(anyLong(),
anyLong(), any(ByteBuffer.class), anyBoolean()))
+ .then(answer -> {
+ ByteBuffer buffer = answer.getArgument(2);
+
+ PageIo.setPageId(bufferAddress(buffer), 1);
+
+ return true;
+ });
+
+ return deltaFilePageStoreIo;
+ }
+
+ private static FilePageStore createFilePageStore(DeltaFilePageStoreIo
deltaFilePageStoreIo) {
+ FilePageStore filePageStore = mock(FilePageStore.class);
+
+
when(filePageStore.removeDeltaFile(eq(deltaFilePageStoreIo))).thenReturn(true);
+
+ return filePageStore;
+ }
}