This is an automated email from the ASF dual-hosted git repository.

tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new aed3e9efc8e IGNITE-26722 Slowdown compactor when running checkpoint 
for aipersist (#6799)
aed3e9efc8e is described below

commit aed3e9efc8e8ff8e25089925ee2ab9df67bfb8f7
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Fri Nov 14 12:08:05 2025 +0300

    IGNITE-26722 Slowdown compactor when running checkpoint for aipersist 
(#6799)
---
 .../persistence/checkpoint/Checkpointer.java       |   4 +
 .../persistence/compaction/Compactor.java          |  66 ++++++++----
 .../persistence/compaction/CompactorTest.java      | 111 ++++++++++++++++++---
 3 files changed, 145 insertions(+), 36 deletions(-)

diff --git 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
index 65265f4222f..6f48c084e58 100644
--- 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
+++ 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
@@ -347,6 +347,8 @@ public class Checkpointer extends IgniteWorker {
         Checkpoint chp = null;
 
         try {
+            compactor.pause();
+
             var tracker = new CheckpointMetricsTracker();
 
             tracker.onCheckpointStart();
@@ -455,6 +457,8 @@ public class Checkpointer extends IgniteWorker {
             throw e;
         } finally {
             currentCheckpointProgressForThrottling = null;
+
+            compactor.resume();
         }
     }
 
diff --git 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java
 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java
index 01f5a4cace5..a060f17e44b 100644
--- 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java
+++ 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java
@@ -92,6 +92,9 @@ public class Compactor extends IgniteWorker {
 
     private final PartitionDestructionLockManager 
partitionDestructionLockManager;
 
+    /** Guarded by {@link #mux}. */
+    private boolean paused;
+
     /**
      * Creates new ignite worker with given parameters.
      *
@@ -164,7 +167,7 @@ public class Compactor extends IgniteWorker {
     void waitDeltaFiles() {
         try {
             synchronized (mux) {
-                while (!addedDeltaFiles && !isCancelled()) {
+                while ((!addedDeltaFiles || paused) && !isCancelled()) {
                     blockingSectionBegin();
 
                     try {
@@ -353,6 +356,8 @@ public class Compactor extends IgniteWorker {
         }
 
         synchronized (mux) {
+            paused = false;
+
             // Do not interrupt runner thread.
             isCancelled.set(true);
 
@@ -386,11 +391,7 @@ public class Compactor extends IgniteWorker {
         for (long pageIndex : deltaFilePageStore.pageIndexes()) {
             updateHeartbeat();
 
-            if (isCancelled()) {
-                return;
-            }
-
-            if (filePageStore.isMarkedToDestroy()) {
+            if (shouldStopCompaction(filePageStore)) {
                 return;
             }
 
@@ -409,11 +410,7 @@ public class Compactor extends IgniteWorker {
 
             updateHeartbeat();
 
-            if (isCancelled()) {
-                return;
-            }
-
-            if (filePageStore.isMarkedToDestroy()) {
+            if (shouldStopCompaction(filePageStore)) {
                 return;
             }
 
@@ -425,11 +422,7 @@ public class Compactor extends IgniteWorker {
         // Fsync the file page store.
         updateHeartbeat();
 
-        if (isCancelled()) {
-            return;
-        }
-
-        if (filePageStore.isMarkedToDestroy()) {
+        if (shouldStopCompaction(filePageStore)) {
             return;
         }
 
@@ -438,11 +431,7 @@ public class Compactor extends IgniteWorker {
         // Removing the delta file page store from a file page store.
         updateHeartbeat();
 
-        if (isCancelled()) {
-            return;
-        }
-
-        if (filePageStore.isMarkedToDestroy()) {
+        if (shouldStopCompaction(filePageStore)) {
             return;
         }
 
@@ -485,4 +474,39 @@ public class Compactor extends IgniteWorker {
             this.deltaFilePageStoreIo = deltaFilePageStoreIo;
         }
     }
+
+    /**
+     * Pauses the compactor until it is resumed or compactor is stopped. It is 
expected that this method will not be called multiple times
+     * in parallel and subsequent calls will strictly be calls after {@link 
#resume}.
+     */
+    public void pause() {
+        synchronized (mux) {
+            assert !paused : "It is expected that a further pause will only 
occur after resume";
+
+            paused = true;
+        }
+    }
+
+    /** Resumes the compactor if it was paused. It is expected that this 
method will not be called multiple times in parallel. */
+    public void resume() {
+        synchronized (mux) {
+            paused = false;
+
+            // Force compaction as we could stop somewhere in the middle and 
we need to continue compaction.
+            addedDeltaFiles = true;
+
+            mux.notifyAll();
+        }
+    }
+
+    /** Must be called before each IO operation to pause the current 
compaction and to provide IO resources to other components. */
+    private boolean isPaused() {
+        synchronized (mux) {
+            return paused;
+        }
+    }
+
+    private boolean shouldStopCompaction(FilePageStore filePageStore) {
+        return isCancelled() || filePageStore.isMarkedToDestroy() || 
isPaused();
+    }
 }
diff --git 
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/compaction/CompactorTest.java
 
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/compaction/CompactorTest.java
index 1af89b6e7c8..a234f47d9b8 100644
--- 
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/compaction/CompactorTest.java
+++ 
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/compaction/CompactorTest.java
@@ -20,7 +20,10 @@ package 
org.apache.ignite.internal.pagememory.persistence.compaction;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willTimeoutFast;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static org.apache.ignite.internal.util.GridUnsafe.bufferAddress;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertSame;
@@ -33,6 +36,7 @@ import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -91,21 +95,8 @@ public class CompactorTest extends BaseIgniteAbstractTest {
     void testMergeDeltaFileToMainFile() throws Throwable {
         Compactor compactor = newCompactor();
 
-        FilePageStore filePageStore = mock(FilePageStore.class);
-        DeltaFilePageStoreIo deltaFilePageStoreIo = 
mock(DeltaFilePageStoreIo.class);
-
-        
when(filePageStore.removeDeltaFile(eq(deltaFilePageStoreIo))).thenReturn(true);
-
-        when(deltaFilePageStoreIo.pageIndexes()).thenReturn(new int[]{0});
-
-        
when(deltaFilePageStoreIo.readWithMergedToFilePageStoreCheck(anyLong(), 
anyLong(), any(ByteBuffer.class), anyBoolean()))
-                .then(answer -> {
-                    ByteBuffer buffer = answer.getArgument(2);
-
-                    PageIo.setPageId(bufferAddress(buffer), 1);
-
-                    return true;
-                });
+        DeltaFilePageStoreIo deltaFilePageStoreIo = 
createDeltaFilePageStoreIo();
+        FilePageStore filePageStore = 
createFilePageStore(deltaFilePageStoreIo);
 
         compactor.mergeDeltaFileToMainFile(filePageStore, 
deltaFilePageStoreIo, new CompactionMetricsTracker());
 
@@ -209,6 +200,71 @@ public class CompactorTest extends BaseIgniteAbstractTest {
         waitDeltaFilesFuture.get(100, MILLISECONDS);
     }
 
+    @Test
+    void testPauseResume() throws Exception {
+        Compactor compactor = spy(newCompactor());
+
+        compactor.pause();
+
+        DeltaFilePageStoreIo deltaFilePageStoreIo = 
createDeltaFilePageStoreIo();
+        FilePageStore filePageStore = 
createFilePageStore(deltaFilePageStoreIo);
+
+        assertThat(
+                runAsync(() -> 
compactor.mergeDeltaFileToMainFile(filePageStore, deltaFilePageStoreIo, new 
CompactionMetricsTracker())),
+                willCompleteSuccessfully()
+        );
+
+        verify(filePageStore, never()).write(anyLong(), any());
+        verify(filePageStore, never()).sync();
+        verify(filePageStore, never()).removeDeltaFile(any());
+
+        verify(deltaFilePageStoreIo, 
never()).readWithMergedToFilePageStoreCheck(anyLong(), anyLong(), any(), 
anyBoolean());
+        verify(deltaFilePageStoreIo, never()).markMergedToFilePageStore();
+        verify(deltaFilePageStoreIo, never()).stop(anyBoolean());
+
+        compactor.resume();
+
+        assertThat(
+                runAsync(() -> 
compactor.mergeDeltaFileToMainFile(filePageStore, deltaFilePageStoreIo, new 
CompactionMetricsTracker())),
+                willCompleteSuccessfully()
+        );
+
+        verify(filePageStore).write(anyLong(), any());
+        verify(filePageStore).sync();
+        verify(filePageStore).removeDeltaFile(any());
+
+        
verify(deltaFilePageStoreIo).readWithMergedToFilePageStoreCheck(anyLong(), 
anyLong(), any(), anyBoolean());
+        verify(deltaFilePageStoreIo).markMergedToFilePageStore();
+        verify(deltaFilePageStoreIo).stop(anyBoolean());
+    }
+
+    @Test
+    void testTriggerCompactionAfterPause() {
+        Compactor compactor = spy(newCompactor());
+
+        compactor.pause();
+
+        CompletableFuture<?> waitDeltaFilesFuture = 
runAsync(compactor::waitDeltaFiles);
+        assertThat(waitDeltaFilesFuture, willTimeoutFast());
+
+        compactor.triggerCompaction();
+        assertThat(waitDeltaFilesFuture, willTimeoutFast());
+
+        compactor.resume();
+        assertThat(waitDeltaFilesFuture, willCompleteSuccessfully());
+    }
+
+    @Test
+    void testWaitDeltaFilesAfterResume() {
+        Compactor compactor = spy(newCompactor());
+
+        CompletableFuture<?> waitDeltaFilesFuture = 
runAsync(compactor::waitDeltaFiles);
+        assertThat(waitDeltaFilesFuture, willTimeoutFast());
+
+        compactor.resume();
+        assertThat(waitDeltaFilesFuture, willCompleteSuccessfully());
+    }
+
     private Compactor newCompactor() {
         return newCompactor(mock(FilePageStoreManager.class));
     }
@@ -224,4 +280,29 @@ public class CompactorTest extends BaseIgniteAbstractTest {
                 new PartitionDestructionLockManager()
         );
     }
+
+    private static DeltaFilePageStoreIo createDeltaFilePageStoreIo() throws 
Exception {
+        DeltaFilePageStoreIo deltaFilePageStoreIo = 
mock(DeltaFilePageStoreIo.class);
+
+        when(deltaFilePageStoreIo.pageIndexes()).thenReturn(new int[]{0});
+
+        
when(deltaFilePageStoreIo.readWithMergedToFilePageStoreCheck(anyLong(), 
anyLong(), any(ByteBuffer.class), anyBoolean()))
+                .then(answer -> {
+                    ByteBuffer buffer = answer.getArgument(2);
+
+                    PageIo.setPageId(bufferAddress(buffer), 1);
+
+                    return true;
+                });
+
+        return deltaFilePageStoreIo;
+    }
+
+    private static FilePageStore createFilePageStore(DeltaFilePageStoreIo 
deltaFilePageStoreIo) {
+        FilePageStore filePageStore = mock(FilePageStore.class);
+
+        
when(filePageStore.removeDeltaFile(eq(deltaFilePageStoreIo))).thenReturn(true);
+
+        return filePageStore;
+    }
 }

Reply via email to