This is an automated email from the ASF dual-hosted git repository.

tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 16d400b0c5f IGNITE-27308 Improve the delta file merge pause strategy 
(#7360)
16d400b0c5f is described below

commit 16d400b0c5f7fc98dd305533ed8d36caaaeeb191
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Thu Jan 8 16:16:55 2026 +0300

    IGNITE-27308 Improve the delta file merge pause strategy (#7360)
---
 .../persistence/checkpoint/Checkpointer.java       |   4 +-
 .../persistence/compaction/CompactionRound.java    |  66 ++++++
 .../persistence/compaction/Compactor.java          | 232 +++++++++++----------
 .../compaction/DeltaFileForCompaction.java         |  37 ++++
 .../persistence/compaction/CompactorTest.java      |  58 ++++--
 5 files changed, 259 insertions(+), 138 deletions(-)

diff --git 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
index a7a786f1d05..3efee1a7c41 100644
--- 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
+++ 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
@@ -352,7 +352,7 @@ public class Checkpointer extends IgniteWorker {
         Checkpoint chp = null;
 
         try {
-            compactor.pause();
+            compactor.notifyCheckpointStart();
 
             var tracker = new CheckpointMetricsTracker();
 
@@ -470,7 +470,7 @@ public class Checkpointer extends IgniteWorker {
         } finally {
             currentCheckpointProgressForThrottling = null;
 
-            compactor.resume();
+            compactor.notifyCheckpointFinish();
         }
     }
 
diff --git 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/CompactionRound.java
 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/CompactionRound.java
new file mode 100644
index 00000000000..9408e3b0182
--- /dev/null
+++ 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/CompactionRound.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.compaction;
+
+import java.util.Queue;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import 
org.apache.ignite.internal.pagememory.persistence.store.DeltaFilePageStoreIo;
+import 
org.apache.ignite.internal.pagememory.persistence.store.FilePageStoreManager;
+
+/** Data class for compaction round. */
+class CompactionRound {
+    /** Compaction round ID. */
+    final UUID id = UUID.randomUUID();
+
+    /** Total number of partition files. */
+    final int partitionFileCount;
+
+    /** Total number of all partition delta files. */
+    final int totalDeltaFileCount;
+
+    /** Queue of delta files (one per partition) to be merged into the 
partition file. */
+    final Queue<DeltaFileForCompaction> queue;
+
+    private CompactionRound(int partitionFileCount, int totalDeltaFileCount, 
Queue<DeltaFileForCompaction> queue) {
+        this.partitionFileCount = partitionFileCount;
+        this.totalDeltaFileCount = totalDeltaFileCount;
+        this.queue = queue;
+    }
+
+    static CompactionRound create(FilePageStoreManager filePageStoreManager) {
+        var partitionFileCount = new int[]{0};
+        var totalDeltaFileCount = new int[]{0};
+
+        var queue = new ConcurrentLinkedQueue<DeltaFileForCompaction>();
+
+        filePageStoreManager.allPageStores().forEach(pageStore -> {
+            partitionFileCount[0]++;
+
+            totalDeltaFileCount[0] += pageStore.pageStore().deltaFileCount();
+
+            DeltaFilePageStoreIo deltaFileToCompaction = 
pageStore.pageStore().getDeltaFileToCompaction();
+
+            if (deltaFileToCompaction != null) {
+                queue.add(new DeltaFileForCompaction(pageStore, 
deltaFileToCompaction));
+            }
+        });
+
+        return new CompactionRound(partitionFileCount[0], 
totalDeltaFileCount[0], queue);
+    }
+}
diff --git 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java
 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java
index cae78c256cb..675193e27f5 100644
--- 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java
+++ 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java
@@ -20,16 +20,11 @@ package 
org.apache.ignite.internal.pagememory.persistence.compaction;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.concurrent.TimeUnit.NANOSECONDS;
 import static java.util.concurrent.TimeUnit.SECONDS;
-import static java.util.stream.Collectors.toCollection;
 import static 
org.apache.ignite.internal.failure.FailureType.SYSTEM_WORKER_TERMINATION;
 
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
-import java.util.Objects;
-import java.util.Queue;
-import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -46,7 +41,6 @@ import 
org.apache.ignite.internal.pagememory.persistence.WriteSpeedFormatter;
 import 
org.apache.ignite.internal.pagememory.persistence.store.DeltaFilePageStoreIo;
 import org.apache.ignite.internal.pagememory.persistence.store.FilePageStore;
 import 
org.apache.ignite.internal.pagememory.persistence.store.FilePageStoreManager;
-import 
org.apache.ignite.internal.pagememory.persistence.store.GroupPageStoresMap.GroupPartitionPageStore;
 import org.apache.ignite.internal.thread.IgniteThread;
 import org.apache.ignite.internal.thread.IgniteThreadFactory;
 import org.apache.ignite.internal.util.IgniteUtils;
@@ -67,6 +61,10 @@ import org.jetbrains.annotations.Nullable;
  *  <li>Fsync of the partition file.</li>
  *  <li>Remove delta file from {@link FilePageStore} and file system.</li>
  * </ul>
+ *
+ * <p>Optimization has been implemented to speed up checkpointing. When a 
checkpoint starts, compaction is stopped to allow for the IO
+ * operations for it. However, this is only true as long as the total number 
of delta files does not exceed 3 * partitions, to prevent
+ * errors due to a large number of open files.</p>
  */
 public class Compactor extends IgniteWorker {
     /** Logger. */
@@ -92,8 +90,11 @@ public class Compactor extends IgniteWorker {
 
     private final PartitionDestructionLockManager 
partitionDestructionLockManager;
 
-    /** Guarded by {@link #mux}. */
-    private boolean paused;
+    /** Flag indicating whether a checkpoint has started. Guarded by {@link 
#mux}. */
+    private boolean isCheckpointStarted;
+
+    /** Current compaction round, {@code null} means the round has either not 
started yet or has finished. */
+    private volatile @Nullable CompactionRound currentCompactionRound;
 
     /**
      * Creates new ignite worker with given parameters.
@@ -167,7 +168,7 @@ public class Compactor extends IgniteWorker {
     void waitDeltaFiles() {
         try {
             synchronized (mux) {
-                while ((!addedDeltaFiles || paused) && !isCancelled()) {
+                while (!addedDeltaFiles && !isCancelled()) {
                     blockingSectionBegin();
 
                     try {
@@ -206,106 +207,107 @@ public class Compactor extends IgniteWorker {
      * pages, we must look for it from the oldest delta file.
      */
     void doCompaction() {
-        while (!isPaused()) {
-            // Let's collect one delta file for each partition.
-            Queue<DeltaFileForCompaction> queue = 
filePageStoreManager.allPageStores()
-                    .map(groupPartitionFilePageStore -> {
-                        DeltaFilePageStoreIo deltaFileToCompaction = 
groupPartitionFilePageStore.pageStore().getDeltaFileToCompaction();
-
-                        if (deltaFileToCompaction == null) {
-                            return null;
-                        }
-
-                        return new 
DeltaFileForCompaction(groupPartitionFilePageStore, deltaFileToCompaction);
-                    })
-                    .filter(Objects::nonNull)
-                    .collect(toCollection(ConcurrentLinkedQueue::new));
+        while (true) {
+            CompactionRound compactionRound = 
CompactionRound.create(filePageStoreManager);
 
-            if (queue.isEmpty()) {
+            if (compactionRound.queue.isEmpty()) {
                 break;
             }
 
-            String compactionId = UUID.randomUUID().toString();
+            setCurrentCompactionRound(compactionRound);
 
-            if (LOG.isInfoEnabled()) {
-                LOG.info("Starting new compaction round [compactionId={}, 
files={}]", compactionId, queue.size());
-            }
+            try {
+                if (shouldStopCompaction()) {
+                    break;
+                }
 
-            CompactionMetricsTracker tracker = new CompactionMetricsTracker();
+                if (LOG.isInfoEnabled()) {
+                    LOG.info(
+                            "Starting new compaction round [compactionId={}, 
files={}]",
+                            compactionRound.id,
+                            compactionRound.queue.size()
+                    );
+                }
 
-            updateHeartbeat();
+                CompactionMetricsTracker tracker = new 
CompactionMetricsTracker();
 
-            int threads = threadPoolExecutor == null ? 1 : 
threadPoolExecutor.getMaximumPoolSize();
+                updateHeartbeat();
 
-            CompletableFuture<?>[] futures = new CompletableFuture[threads];
+                int threads = threadPoolExecutor == null ? 1 : 
threadPoolExecutor.getMaximumPoolSize();
 
-            for (int i = 0; i < threads; i++) {
-                CompletableFuture<?> future = futures[i] = new 
CompletableFuture<>();
+                CompletableFuture<?>[] futures = new 
CompletableFuture[threads];
 
-                Runnable merger = () -> {
-                    DeltaFileForCompaction toMerge;
+                for (int i = 0; i < threads; i++) {
+                    CompletableFuture<?> future = futures[i] = new 
CompletableFuture<>();
 
-                    try {
-                        while (true) {
-                            toMerge = queue.poll();
+                    Runnable merger = () -> {
+                        DeltaFileForCompaction toMerge;
 
-                            if (toMerge == null) {
-                                break;
-                            }
+                        try {
+                            while (true) {
+                                toMerge = compactionRound.queue.poll();
+
+                                if (toMerge == null) {
+                                    break;
+                                }
 
-                            GroupPartitionId groupPartitionId = 
toMerge.groupPartitionFilePageStore.groupPartitionId();
+                                GroupPartitionId groupPartitionId = 
toMerge.groupPartitionFilePageStore.groupPartitionId();
 
-                            Lock partitionDestructionLock = 
partitionDestructionLockManager.destructionLock(groupPartitionId).readLock();
+                                Lock partitionDestructionLock = 
partitionDestructionLockManager.destructionLock(groupPartitionId)
+                                        .readLock();
 
-                            partitionDestructionLock.lock();
+                                partitionDestructionLock.lock();
 
-                            try {
-                                mergeDeltaFileToMainFile(
-                                        
toMerge.groupPartitionFilePageStore.pageStore(),
-                                        toMerge.deltaFilePageStoreIo,
-                                        tracker
-                                );
-                            } finally {
-                                partitionDestructionLock.unlock();
+                                try {
+                                    mergeDeltaFileToMainFile(
+                                            
toMerge.groupPartitionFilePageStore.pageStore(),
+                                            toMerge.deltaFilePageStoreIo,
+                                            tracker
+                                    );
+                                } finally {
+                                    partitionDestructionLock.unlock();
+                                }
                             }
+                        } catch (Throwable ex) {
+                            future.completeExceptionally(ex);
                         }
-                    } catch (Throwable ex) {
-                        future.completeExceptionally(ex);
-                    }
 
-                    future.complete(null);
-                };
+                        future.complete(null);
+                    };
 
-                if (isCancelled()) {
-                    return;
-                }
+                    if (isCancelled()) {
+                        return;
+                    }
 
-                if (threadPoolExecutor == null) {
-                    merger.run();
-                } else {
-                    threadPoolExecutor.execute(merger);
+                    if (threadPoolExecutor == null) {
+                        merger.run();
+                    } else {
+                        threadPoolExecutor.execute(merger);
+                    }
                 }
-            }
 
-            updateHeartbeat();
+                updateHeartbeat();
 
-            // Wait and check for errors.
-            CompletableFuture.allOf(futures).join();
+                // Wait and check for errors.
+                CompletableFuture.allOf(futures).join();
 
-            tracker.onCompactionEnd();
+                tracker.onCompactionEnd();
 
-            if (LOG.isInfoEnabled()) {
-                long totalWriteBytes = (long) pageSize * 
tracker.dataPagesWritten();
-                long totalDurationInNanos = tracker.totalDuration(NANOSECONDS);
+                if (LOG.isInfoEnabled()) {
+                    long totalWriteBytes = (long) pageSize * 
tracker.dataPagesWritten();
+                    long totalDurationInNanos = 
tracker.totalDuration(NANOSECONDS);
 
-                LOG.info(
-                        "Compaction round finished [compactionId={}, pages={}, 
skipped={}, duration={}ms, avgWriteSpeed={}MB/s]",
-                        compactionId,
-                        tracker.dataPagesWritten(),
-                        tracker.dataPagesSkipped(),
-                        tracker.totalDuration(MILLISECONDS),
-                        WriteSpeedFormatter.formatWriteSpeed(totalWriteBytes, 
totalDurationInNanos)
-                );
+                    LOG.info(
+                            "Compaction round finished [compactionId={}, 
pages={}, skipped={}, duration={}ms, avgWriteSpeed={}MB/s]",
+                            compactionRound.id,
+                            tracker.dataPagesWritten(),
+                            tracker.dataPagesSkipped(),
+                            tracker.totalDuration(MILLISECONDS),
+                            
WriteSpeedFormatter.formatWriteSpeed(totalWriteBytes, totalDurationInNanos)
+                    );
+                }
+            } finally {
+                resetCurrentCompactionRound();
             }
         }
     }
@@ -357,8 +359,6 @@ public class Compactor extends IgniteWorker {
         }
 
         synchronized (mux) {
-            paused = false;
-
             // Do not interrupt runner thread.
             isCancelled.set(true);
 
@@ -487,38 +487,21 @@ public class Compactor extends IgniteWorker {
     }
 
     /**
-     * Delta file for compaction.
+     * Notifies about the start of a checkpoint. It is expected that this 
method will not be called multiple times in parallel and
+     * subsequent calls will strictly be calls after {@link 
#notifyCheckpointFinish}.
      */
-    private static class DeltaFileForCompaction {
-        private final GroupPartitionPageStore<FilePageStore> 
groupPartitionFilePageStore;
-
-        private final DeltaFilePageStoreIo deltaFilePageStoreIo;
-
-        private DeltaFileForCompaction(
-                GroupPartitionPageStore<FilePageStore> 
groupPartitionFilePageStore,
-                DeltaFilePageStoreIo deltaFilePageStoreIo
-        ) {
-            this.groupPartitionFilePageStore = groupPartitionFilePageStore;
-            this.deltaFilePageStoreIo = deltaFilePageStoreIo;
-        }
-    }
-
-    /**
-     * Pauses the compactor until it is resumed or compactor is stopped. It is 
expected that this method will not be called multiple times
-     * in parallel and subsequent calls will strictly be calls after {@link 
#resume}.
-     */
-    public void pause() {
+    public void notifyCheckpointStart() {
         synchronized (mux) {
-            assert !paused : "It is expected that a further pause will only 
occur after resume";
+            assert !isCheckpointStarted : "Expected to be called after a 
checkpoint is finished";
 
-            paused = true;
+            isCheckpointStarted = true;
         }
     }
 
-    /** Resumes the compactor if it was paused. It is expected that this 
method will not be called multiple times in parallel. */
-    public void resume() {
+    /** Notifies about the finish of a checkpoint. It is expected that this 
method will not be called multiple times in parallel. */
+    public void notifyCheckpointFinish() {
         synchronized (mux) {
-            paused = false;
+            isCheckpointStarted = false;
 
             // Force compaction as we could stop somewhere in the middle and 
we need to continue compaction.
             addedDeltaFiles = true;
@@ -527,14 +510,37 @@ public class Compactor extends IgniteWorker {
         }
     }
 
-    /** Must be called before each IO operation to pause the current 
compaction and to provide IO resources to other components. */
-    private boolean isPaused() {
+    private boolean shouldStopCompaction(FilePageStore filePageStore) {
+        return filePageStore.isMarkedToDestroy() || shouldStopCompaction();
+    }
+
+    private boolean shouldStopCompaction() {
+        if (isCancelled()) {
+            return true;
+        }
+
         synchronized (mux) {
-            return paused;
+            // We only need to stop compaction if a checkpoint has occurred in 
parallel and the total number of delta files is not yet high
+            // enough. The number 3 is based on observations of failed tests 
and intuition; it may change in the future.
+            return isCheckpointStarted && 
currentCompactionRound().totalDeltaFileCount < (3 * 
currentCompactionRound().partitionFileCount);
         }
     }
 
-    private boolean shouldStopCompaction(FilePageStore filePageStore) {
-        return isCancelled() || filePageStore.isMarkedToDestroy() || 
isPaused();
+    private void setCurrentCompactionRound(CompactionRound compactionRound) {
+        assert currentCompactionRound == null : "Previous round is expected to 
be completed";
+
+        currentCompactionRound = compactionRound;
+    }
+
+    private void resetCurrentCompactionRound() {
+        currentCompactionRound = null;
+    }
+
+    private CompactionRound currentCompactionRound() {
+        CompactionRound compactionRound = currentCompactionRound;
+
+        assert compactionRound != null : "Compaction round has not yet 
started";
+
+        return compactionRound;
     }
 }
diff --git 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/DeltaFileForCompaction.java
 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/DeltaFileForCompaction.java
new file mode 100644
index 00000000000..ca8cc2bf5c9
--- /dev/null
+++ 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/DeltaFileForCompaction.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.compaction;
+
+import 
org.apache.ignite.internal.pagememory.persistence.store.DeltaFilePageStoreIo;
+import org.apache.ignite.internal.pagememory.persistence.store.FilePageStore;
+import 
org.apache.ignite.internal.pagememory.persistence.store.GroupPageStoresMap.GroupPartitionPageStore;
+
+/** Delta file for compaction. */
+class DeltaFileForCompaction {
+    final GroupPartitionPageStore<FilePageStore> groupPartitionFilePageStore;
+
+    final DeltaFilePageStoreIo deltaFilePageStoreIo;
+
+    DeltaFileForCompaction(
+            GroupPartitionPageStore<FilePageStore> groupPartitionFilePageStore,
+            DeltaFilePageStoreIo deltaFilePageStoreIo
+    ) {
+        this.groupPartitionFilePageStore = groupPartitionFilePageStore;
+        this.deltaFilePageStoreIo = deltaFilePageStoreIo;
+    }
+}
diff --git 
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/compaction/CompactorTest.java
 
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/compaction/CompactorTest.java
index 0ab77fe9ae5..9d32a5a3622 100644
--- 
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/compaction/CompactorTest.java
+++ 
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/compaction/CompactorTest.java
@@ -47,6 +47,7 @@ import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.internal.failure.FailureManager;
 import org.apache.ignite.internal.pagememory.io.PageIo;
@@ -220,13 +221,11 @@ public class CompactorTest extends BaseIgniteAbstractTest 
{
 
         when(filePageStore.getDeltaFileToCompaction()).then(answer -> 
deltaFilePageStoreIoRef.get());
 
-        FilePageStoreManager filePageStoreManager = 
mock(FilePageStoreManager.class);
-
-        GroupPageStoresMap<FilePageStore> groupPageStoresMap = new 
GroupPageStoresMap<>(new LongOperationAsyncExecutor("test", log));
+        var groupPageStoresMap = new GroupPageStoresMap<FilePageStore>(new 
LongOperationAsyncExecutor("test", log));
 
         groupPageStoresMap.put(new GroupPartitionId(0, 0), filePageStore);
 
-        when(filePageStoreManager.allPageStores()).then(answer -> 
groupPageStoresMap.getAll());
+        FilePageStoreManager filePageStoreManager = 
newFilePageStoreManager(groupPageStoresMap);
 
         Compactor compactor = spy(newCompactor(filePageStoreManager));
 
@@ -303,18 +302,21 @@ public class CompactorTest extends BaseIgniteAbstractTest 
{
     }
 
     @Test
-    void testPauseResume() throws Exception {
-        Compactor compactor = spy(newCompactor());
+    void testNotifyCheckpointStartAndFinish() throws Exception {
+        var groupPageStoresMap = new GroupPageStoresMap<FilePageStore>(new 
LongOperationAsyncExecutor("test", log));
+
+        FilePageStoreManager filePageStoreManager = 
newFilePageStoreManager(groupPageStoresMap);
+
+        Compactor compactor = spy(newCompactor(filePageStoreManager));
 
-        compactor.pause();
+        compactor.notifyCheckpointStart();
 
         DeltaFilePageStoreIo deltaFilePageStoreIo = 
createDeltaFilePageStoreIo();
         FilePageStore filePageStore = 
createFilePageStore(deltaFilePageStoreIo);
 
-        assertThat(
-                runAsync(() -> 
compactor.mergeDeltaFileToMainFile(filePageStore, deltaFilePageStoreIo, new 
CompactionMetricsTracker())),
-                willCompleteSuccessfully()
-        );
+        groupPageStoresMap.put(new GroupPartitionId(0, 0), filePageStore);
+
+        assertThat(runAsync(compactor::doCompaction), 
willCompleteSuccessfully());
 
         verify(filePageStore, never()).write(anyLong(), any());
         verify(filePageStore, never()).sync();
@@ -324,12 +326,9 @@ public class CompactorTest extends BaseIgniteAbstractTest {
         verify(deltaFilePageStoreIo, never()).markMergedToFilePageStore();
         verify(deltaFilePageStoreIo, never()).stop(anyBoolean());
 
-        compactor.resume();
+        compactor.notifyCheckpointFinish();
 
-        assertThat(
-                runAsync(() -> 
compactor.mergeDeltaFileToMainFile(filePageStore, deltaFilePageStoreIo, new 
CompactionMetricsTracker())),
-                willCompleteSuccessfully()
-        );
+        assertThat(runAsync(compactor::doCompaction), 
willCompleteSuccessfully());
 
         verify(filePageStore).write(anyLong(), any());
         verify(filePageStore).sync();
@@ -341,18 +340,15 @@ public class CompactorTest extends BaseIgniteAbstractTest 
{
     }
 
     @Test
-    void testTriggerCompactionAfterPause() {
+    void testTriggerCompactionAfterNotifyCheckpointStartAndFinish() {
         Compactor compactor = spy(newCompactor());
 
-        compactor.pause();
+        compactor.notifyCheckpointStart();
 
         CompletableFuture<?> waitDeltaFilesFuture = 
runAsync(compactor::waitDeltaFiles);
         assertThat(waitDeltaFilesFuture, willTimeoutFast());
 
         compactor.triggerCompaction();
-        assertThat(waitDeltaFilesFuture, willTimeoutFast());
-
-        compactor.resume();
         assertThat(waitDeltaFilesFuture, willCompleteSuccessfully());
     }
 
@@ -363,7 +359,7 @@ public class CompactorTest extends BaseIgniteAbstractTest {
         CompletableFuture<?> waitDeltaFilesFuture = 
runAsync(compactor::waitDeltaFiles);
         assertThat(waitDeltaFilesFuture, willTimeoutFast());
 
-        compactor.resume();
+        compactor.notifyCheckpointFinish();
         assertThat(waitDeltaFilesFuture, willCompleteSuccessfully());
     }
 
@@ -407,8 +403,24 @@ public class CompactorTest extends BaseIgniteAbstractTest {
     private static FilePageStore createFilePageStore(DeltaFilePageStoreIo 
deltaFilePageStoreIo) {
         FilePageStore filePageStore = mock(FilePageStore.class);
 
-        
when(filePageStore.removeDeltaFile(eq(deltaFilePageStoreIo))).thenReturn(true);
+        var removedDeltaFile = new AtomicBoolean();
+
+        
when(filePageStore.removeDeltaFile(eq(deltaFilePageStoreIo))).then(invocation 
-> {
+            removedDeltaFile.set(true);
+
+            return true;
+        });
+        when(filePageStore.getDeltaFileToCompaction()).then(invocation -> 
removedDeltaFile.get() ? null : deltaFilePageStoreIo);
+        when(filePageStore.deltaFileCount()).thenReturn(1);
 
         return filePageStore;
     }
+
+    private static FilePageStoreManager 
newFilePageStoreManager(GroupPageStoresMap<FilePageStore> map) {
+        FilePageStoreManager manager = mock(FilePageStoreManager.class);
+
+        when(manager.allPageStores()).then(invocation -> map.getAll());
+
+        return manager;
+    }
 }

Reply via email to