This is an automated email from the ASF dual-hosted git repository. tkalkirill pushed a commit to branch ignite-26722 in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit 9c1ea01e096c2e9b2cfa5f8509ae03791ed4a346 Author: Kirill Tkalenko <[email protected]> AuthorDate: Thu Oct 16 14:37:28 2025 +0300 IGNITE-26722 wip --- .../ignite/internal/util/IgniteUtilsTest.java | 8 +++ .../persistence/checkpoint/Checkpointer.java | 6 ++ .../persistence/compaction/Compactor.java | 56 +++++++++++++++++++ .../persistence/compaction/CompactorTest.java | 65 +++++++++++++++++----- 4 files changed, 120 insertions(+), 15 deletions(-) diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsTest.java index 0aa80378713..e409dc05803 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsTest.java @@ -50,6 +50,7 @@ import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest; import org.apache.ignite.internal.util.worker.IgniteWorker; import org.junit.jupiter.api.Test; @@ -217,4 +218,11 @@ class IgniteUtilsTest extends BaseIgniteAbstractTest { ByteBuffer smallDirectBuffer = bigDirectBuffer.position(1).limit(4).slice(); assertArrayEquals(new byte[] {1, 2, 3}, byteBufferToByteArray(smallDirectBuffer)); } + + @Test + void test() { + AtomicReference<Integer> reference = new AtomicReference<>(); + + assertTrue(reference.compareAndSet(null, null)); + } } diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java index cd7dfe3afc5..ce78cc6fb48 100644 --- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java +++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java @@ -346,6 +346,8 @@ public class Checkpointer extends IgniteWorker { Checkpoint chp = null; try { + compactor.pause(); + var tracker = new CheckpointMetricsTracker(); tracker.onCheckpointStart(); @@ -410,6 +412,8 @@ public class Checkpointer extends IgniteWorker { chp.progress.reason() ); } + + compactor.resume(); } currentCheckpointProgress.setPagesWriteTimeMillis( @@ -546,6 +550,8 @@ public class Checkpointer extends IgniteWorker { tracker.onFsyncEnd(); + compactor.resume(); + compactor.triggerCompaction(); if (shutdownNow.getAsBoolean()) { diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java index 773d7901d08..362af3971bc 100644 --- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java +++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java @@ -30,9 +30,11 @@ import java.util.Queue; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; import org.apache.ignite.internal.failure.FailureContext; import org.apache.ignite.internal.failure.FailureManager; @@ -92,6 +94,9 @@ public class Compactor extends IgniteWorker { private final PartitionDestructionLockManager partitionDestructionLockManager; + /** Latch for pausing and resuming the compaction, {@code null} if there was no pause. */ + private final AtomicReference<CountDownLatch> pauseLatchRef = new AtomicReference<>(); + /** * Creates new ignite worker with given parameters. * @@ -355,6 +360,8 @@ public class Compactor extends IgniteWorker { // Do not interrupt runner thread. isCancelled.set(true); + resume(); + synchronized (mux) { mux.notifyAll(); } @@ -396,6 +403,8 @@ public class Compactor extends IgniteWorker { long pageOffset = deltaFilePageStore.pageOffset(pageIndex); + pauseCompactionIfNeeded(); + // pageIndex instead of pageId, only for debugging in case of errors // since we do not know the pageId until we read it from the pageOffset. boolean read = deltaFilePageStore.readWithMergedToFilePageStoreCheck(pageIndex, pageOffset, buffer.rewind(), false); @@ -417,6 +426,8 @@ public class Compactor extends IgniteWorker { return; } + pauseCompactionIfNeeded(); + filePageStore.write(pageId, buffer.rewind()); tracker.onDataPageWritten(); @@ -433,6 +444,8 @@ public class Compactor extends IgniteWorker { return; } + pauseCompactionIfNeeded(); + filePageStore.sync(); // Removing the delta file page store from a file page store. @@ -448,6 +461,8 @@ public class Compactor extends IgniteWorker { deltaFilePageStore.markMergedToFilePageStore(); + pauseCompactionIfNeeded(); + deltaFilePageStore.stop(true); boolean removed = filePageStore.removeDeltaFile(deltaFilePageStore); @@ -485,4 +500,45 @@ public class Compactor extends IgniteWorker { this.deltaFilePageStoreIo = deltaFilePageStoreIo; } } + + /** + * Pauses the compactor until it is resumed or compactor or stopped. It is expected that this method will not be called multiple times + * in parallel and subsequent calls will strictly be calls after {@link #resume}. + */ + public void pause() { + boolean casResult = pauseLatchRef.compareAndSet(null, new CountDownLatch(1)); + + assert casResult : "It is expected that there will be no parallel pause, resume or previous one has ended: " + pauseLatchRef.get(); + } + + /** Resumes the compactor if it was paused. It is expected that this method will not be called multiple times in parallel. */ + public void resume() { + CountDownLatch latch = pauseLatchRef.get(); + + boolean casResult = pauseLatchRef.compareAndSet(latch, null); + + assert casResult : "It is expected that there will be no parallel pause or resume"; + + if (latch != null) { + latch.countDown(); + } + } + + private void pauseCompactionIfNeeded() { + CountDownLatch latch = pauseLatchRef.get(); + + if (latch != null) { + blockingSectionBegin(); + + try { + latch.await(); + } catch (InterruptedException e) { + LOG.debug("Compactor pause was interrupted", e); + + Thread.currentThread().interrupt(); + } finally { + blockingSectionEnd(); + } + } + } } diff --git a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/compaction/CompactorTest.java b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/compaction/CompactorTest.java index 1af89b6e7c8..dba5b13fef1 100644 --- a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/compaction/CompactorTest.java +++ b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/compaction/CompactorTest.java @@ -20,7 +20,10 @@ package org.apache.ignite.internal.pagememory.persistence.compaction; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync; import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition; +import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willTimeoutFast; +import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; import static org.apache.ignite.internal.util.GridUnsafe.bufferAddress; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertSame; @@ -91,21 +94,8 @@ public class CompactorTest extends BaseIgniteAbstractTest { void testMergeDeltaFileToMainFile() throws Throwable { Compactor compactor = newCompactor(); - FilePageStore filePageStore = mock(FilePageStore.class); - DeltaFilePageStoreIo deltaFilePageStoreIo = mock(DeltaFilePageStoreIo.class); - - when(filePageStore.removeDeltaFile(eq(deltaFilePageStoreIo))).thenReturn(true); - - when(deltaFilePageStoreIo.pageIndexes()).thenReturn(new int[]{0}); - - when(deltaFilePageStoreIo.readWithMergedToFilePageStoreCheck(anyLong(), anyLong(), any(ByteBuffer.class), anyBoolean())) - .then(answer -> { - ByteBuffer buffer = answer.getArgument(2); - - PageIo.setPageId(bufferAddress(buffer), 1); - - return true; - }); + DeltaFilePageStoreIo deltaFilePageStoreIo = createDeltaFilePageStoreIo(); + FilePageStore filePageStore = createFilePageStore(deltaFilePageStoreIo); compactor.mergeDeltaFileToMainFile(filePageStore, deltaFilePageStoreIo, new CompactionMetricsTracker()); @@ -209,6 +199,26 @@ public class CompactorTest extends BaseIgniteAbstractTest { waitDeltaFilesFuture.get(100, MILLISECONDS); } + @Test + void testPauseResume() throws Exception { + Compactor compactor = spy(newCompactor()); + + compactor.pause(); + + DeltaFilePageStoreIo deltaFilePageStoreIo = createDeltaFilePageStoreIo(); + FilePageStore filePageStore = createFilePageStore(deltaFilePageStoreIo); + + CompletableFuture<?> mergeDeltaFileToMainFileFuture = runAsync( + () -> compactor.mergeDeltaFileToMainFile(filePageStore, deltaFilePageStoreIo, new CompactionMetricsTracker()) + ); + + assertThat(mergeDeltaFileToMainFileFuture, willTimeoutFast()); + + compactor.resume(); + + assertThat(mergeDeltaFileToMainFileFuture, willCompleteSuccessfully()); + } + private Compactor newCompactor() { return newCompactor(mock(FilePageStoreManager.class)); } @@ -224,4 +234,29 @@ public class CompactorTest extends BaseIgniteAbstractTest { new PartitionDestructionLockManager() ); } + + private static DeltaFilePageStoreIo createDeltaFilePageStoreIo() throws Exception { + DeltaFilePageStoreIo deltaFilePageStoreIo = mock(DeltaFilePageStoreIo.class); + + when(deltaFilePageStoreIo.pageIndexes()).thenReturn(new int[]{0}); + + when(deltaFilePageStoreIo.readWithMergedToFilePageStoreCheck(anyLong(), anyLong(), any(ByteBuffer.class), anyBoolean())) + .then(answer -> { + ByteBuffer buffer = answer.getArgument(2); + + PageIo.setPageId(bufferAddress(buffer), 1); + + return true; + }); + + return deltaFilePageStoreIo; + } + + private static FilePageStore createFilePageStore(DeltaFilePageStoreIo deltaFilePageStoreIo) { + FilePageStore filePageStore = mock(FilePageStore.class); + + when(filePageStore.removeDeltaFile(eq(deltaFilePageStoreIo))).thenReturn(true); + + return filePageStore; + } }
