This is an automated email from the ASF dual-hosted git repository.

tkalkirill pushed a commit to branch ignite-26722
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit 9c1ea01e096c2e9b2cfa5f8509ae03791ed4a346
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Thu Oct 16 14:37:28 2025 +0300

    IGNITE-26722 wip
---
 .../ignite/internal/util/IgniteUtilsTest.java      |  8 +++
 .../persistence/checkpoint/Checkpointer.java       |  6 ++
 .../persistence/compaction/Compactor.java          | 56 +++++++++++++++++++
 .../persistence/compaction/CompactorTest.java      | 65 +++++++++++++++++-----
 4 files changed, 120 insertions(+), 15 deletions(-)

diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsTest.java
index 0aa80378713..e409dc05803 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsTest.java
@@ -50,6 +50,7 @@ import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
 import org.apache.ignite.internal.util.worker.IgniteWorker;
 import org.junit.jupiter.api.Test;
@@ -217,4 +218,11 @@ class IgniteUtilsTest extends BaseIgniteAbstractTest {
         ByteBuffer smallDirectBuffer = 
bigDirectBuffer.position(1).limit(4).slice();
         assertArrayEquals(new byte[] {1, 2, 3}, 
byteBufferToByteArray(smallDirectBuffer));
     }
+
+    @Test
+    void test() {
+        AtomicReference<Integer> reference = new AtomicReference<>();
+
+        assertTrue(reference.compareAndSet(null, null));
+    }
 }
diff --git 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
index cd7dfe3afc5..ce78cc6fb48 100644
--- 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
+++ 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
@@ -346,6 +346,8 @@ public class Checkpointer extends IgniteWorker {
         Checkpoint chp = null;
 
         try {
+            compactor.pause();
+
             var tracker = new CheckpointMetricsTracker();
 
             tracker.onCheckpointStart();
@@ -410,6 +412,8 @@ public class Checkpointer extends IgniteWorker {
                             chp.progress.reason()
                     );
                 }
+
+                compactor.resume();
             }
 
             currentCheckpointProgress.setPagesWriteTimeMillis(
@@ -546,6 +550,8 @@ public class Checkpointer extends IgniteWorker {
 
         tracker.onFsyncEnd();
 
+        compactor.resume();
+
         compactor.triggerCompaction();
 
         if (shutdownNow.getAsBoolean()) {
diff --git 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java
 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java
index 773d7901d08..362af3971bc 100644
--- 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java
+++ 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java
@@ -30,9 +30,11 @@ import java.util.Queue;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Lock;
 import org.apache.ignite.internal.failure.FailureContext;
 import org.apache.ignite.internal.failure.FailureManager;
@@ -92,6 +94,9 @@ public class Compactor extends IgniteWorker {
 
     private final PartitionDestructionLockManager 
partitionDestructionLockManager;
 
+    /** Latch for pausing and resuming the compaction, {@code null} if there 
was no pause. */
+    private final AtomicReference<CountDownLatch> pauseLatchRef = new 
AtomicReference<>();
+
     /**
      * Creates new ignite worker with given parameters.
      *
@@ -355,6 +360,8 @@ public class Compactor extends IgniteWorker {
         // Do not interrupt runner thread.
         isCancelled.set(true);
 
+        resume();
+
         synchronized (mux) {
             mux.notifyAll();
         }
@@ -396,6 +403,8 @@ public class Compactor extends IgniteWorker {
 
             long pageOffset = deltaFilePageStore.pageOffset(pageIndex);
 
+            pauseCompactionIfNeeded();
+
             // pageIndex instead of pageId, only for debugging in case of 
errors
             // since we do not know the pageId until we read it from the 
pageOffset.
             boolean read = 
deltaFilePageStore.readWithMergedToFilePageStoreCheck(pageIndex, pageOffset, 
buffer.rewind(), false);
@@ -417,6 +426,8 @@ public class Compactor extends IgniteWorker {
                 return;
             }
 
+            pauseCompactionIfNeeded();
+
             filePageStore.write(pageId, buffer.rewind());
 
             tracker.onDataPageWritten();
@@ -433,6 +444,8 @@ public class Compactor extends IgniteWorker {
             return;
         }
 
+        pauseCompactionIfNeeded();
+
         filePageStore.sync();
 
         // Removing the delta file page store from a file page store.
@@ -448,6 +461,8 @@ public class Compactor extends IgniteWorker {
 
         deltaFilePageStore.markMergedToFilePageStore();
 
+        pauseCompactionIfNeeded();
+
         deltaFilePageStore.stop(true);
 
         boolean removed = filePageStore.removeDeltaFile(deltaFilePageStore);
@@ -485,4 +500,45 @@ public class Compactor extends IgniteWorker {
             this.deltaFilePageStoreIo = deltaFilePageStoreIo;
         }
     }
+
+    /**
+     * Pauses the compactor until it is resumed or compactor or stopped. It is 
expected that this method will not be called multiple times
+     * in parallel and subsequent calls will strictly be calls after {@link 
#resume}.
+     */
+    public void pause() {
+        boolean casResult = pauseLatchRef.compareAndSet(null, new 
CountDownLatch(1));
+
+        assert casResult : "It is expected that there will be no parallel 
pause, resume or previous one has ended: " + pauseLatchRef.get();
+    }
+
+    /** Resumes the compactor if it was paused. It is expected that this 
method will not be called multiple times in parallel. */
+    public void resume() {
+        CountDownLatch latch = pauseLatchRef.get();
+
+        boolean casResult = pauseLatchRef.compareAndSet(latch, null);
+
+        assert casResult : "It is expected that there will be no parallel 
pause or resume";
+
+        if (latch != null) {
+            latch.countDown();
+        }
+    }
+
+    private void pauseCompactionIfNeeded() {
+        CountDownLatch latch = pauseLatchRef.get();
+
+        if (latch != null) {
+            blockingSectionBegin();
+
+            try {
+                latch.await();
+            } catch (InterruptedException e) {
+                LOG.debug("Compactor pause was interrupted", e);
+
+                Thread.currentThread().interrupt();
+            } finally {
+                blockingSectionEnd();
+            }
+        }
+    }
 }
diff --git 
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/compaction/CompactorTest.java
 
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/compaction/CompactorTest.java
index 1af89b6e7c8..dba5b13fef1 100644
--- 
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/compaction/CompactorTest.java
+++ 
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/compaction/CompactorTest.java
@@ -20,7 +20,10 @@ package 
org.apache.ignite.internal.pagememory.persistence.compaction;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willTimeoutFast;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static org.apache.ignite.internal.util.GridUnsafe.bufferAddress;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertSame;
@@ -91,21 +94,8 @@ public class CompactorTest extends BaseIgniteAbstractTest {
     void testMergeDeltaFileToMainFile() throws Throwable {
         Compactor compactor = newCompactor();
 
-        FilePageStore filePageStore = mock(FilePageStore.class);
-        DeltaFilePageStoreIo deltaFilePageStoreIo = 
mock(DeltaFilePageStoreIo.class);
-
-        
when(filePageStore.removeDeltaFile(eq(deltaFilePageStoreIo))).thenReturn(true);
-
-        when(deltaFilePageStoreIo.pageIndexes()).thenReturn(new int[]{0});
-
-        
when(deltaFilePageStoreIo.readWithMergedToFilePageStoreCheck(anyLong(), 
anyLong(), any(ByteBuffer.class), anyBoolean()))
-                .then(answer -> {
-                    ByteBuffer buffer = answer.getArgument(2);
-
-                    PageIo.setPageId(bufferAddress(buffer), 1);
-
-                    return true;
-                });
+        DeltaFilePageStoreIo deltaFilePageStoreIo = 
createDeltaFilePageStoreIo();
+        FilePageStore filePageStore = 
createFilePageStore(deltaFilePageStoreIo);
 
         compactor.mergeDeltaFileToMainFile(filePageStore, 
deltaFilePageStoreIo, new CompactionMetricsTracker());
 
@@ -209,6 +199,26 @@ public class CompactorTest extends BaseIgniteAbstractTest {
         waitDeltaFilesFuture.get(100, MILLISECONDS);
     }
 
+    @Test
+    void testPauseResume() throws Exception {
+        Compactor compactor = spy(newCompactor());
+
+        compactor.pause();
+
+        DeltaFilePageStoreIo deltaFilePageStoreIo = 
createDeltaFilePageStoreIo();
+        FilePageStore filePageStore = 
createFilePageStore(deltaFilePageStoreIo);
+
+        CompletableFuture<?> mergeDeltaFileToMainFileFuture = runAsync(
+                () -> compactor.mergeDeltaFileToMainFile(filePageStore, 
deltaFilePageStoreIo, new CompactionMetricsTracker())
+        );
+
+        assertThat(mergeDeltaFileToMainFileFuture, willTimeoutFast());
+
+        compactor.resume();
+
+        assertThat(mergeDeltaFileToMainFileFuture, willCompleteSuccessfully());
+    }
+
     private Compactor newCompactor() {
         return newCompactor(mock(FilePageStoreManager.class));
     }
@@ -224,4 +234,29 @@ public class CompactorTest extends BaseIgniteAbstractTest {
                 new PartitionDestructionLockManager()
         );
     }
+
+    private static DeltaFilePageStoreIo createDeltaFilePageStoreIo() throws 
Exception {
+        DeltaFilePageStoreIo deltaFilePageStoreIo = 
mock(DeltaFilePageStoreIo.class);
+
+        when(deltaFilePageStoreIo.pageIndexes()).thenReturn(new int[]{0});
+
+        
when(deltaFilePageStoreIo.readWithMergedToFilePageStoreCheck(anyLong(), 
anyLong(), any(ByteBuffer.class), anyBoolean()))
+                .then(answer -> {
+                    ByteBuffer buffer = answer.getArgument(2);
+
+                    PageIo.setPageId(bufferAddress(buffer), 1);
+
+                    return true;
+                });
+
+        return deltaFilePageStoreIo;
+    }
+
+    private static FilePageStore createFilePageStore(DeltaFilePageStoreIo 
deltaFilePageStoreIo) {
+        FilePageStore filePageStore = mock(FilePageStore.class);
+
+        
when(filePageStore.removeDeltaFile(eq(deltaFilePageStoreIo))).thenReturn(true);
+
+        return filePageStore;
+    }
 }

Reply via email to