This is an automated email from the ASF dual-hosted git repository.
tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 16d400b0c5f IGNITE-27308 Improve the delta file merge pause strategy
(#7360)
16d400b0c5f is described below
commit 16d400b0c5f7fc98dd305533ed8d36caaaeeb191
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Thu Jan 8 16:16:55 2026 +0300
IGNITE-27308 Improve the delta file merge pause strategy (#7360)
---
.../persistence/checkpoint/Checkpointer.java | 4 +-
.../persistence/compaction/CompactionRound.java | 66 ++++++
.../persistence/compaction/Compactor.java | 232 +++++++++++----------
.../compaction/DeltaFileForCompaction.java | 37 ++++
.../persistence/compaction/CompactorTest.java | 58 ++++--
5 files changed, 259 insertions(+), 138 deletions(-)
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
index a7a786f1d05..3efee1a7c41 100644
---
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
@@ -352,7 +352,7 @@ public class Checkpointer extends IgniteWorker {
Checkpoint chp = null;
try {
- compactor.pause();
+ compactor.notifyCheckpointStart();
var tracker = new CheckpointMetricsTracker();
@@ -470,7 +470,7 @@ public class Checkpointer extends IgniteWorker {
} finally {
currentCheckpointProgressForThrottling = null;
- compactor.resume();
+ compactor.notifyCheckpointFinish();
}
}
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/CompactionRound.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/CompactionRound.java
new file mode 100644
index 00000000000..9408e3b0182
--- /dev/null
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/CompactionRound.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.compaction;
+
+import java.util.Queue;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import
org.apache.ignite.internal.pagememory.persistence.store.DeltaFilePageStoreIo;
+import
org.apache.ignite.internal.pagememory.persistence.store.FilePageStoreManager;
+
+/** Data class for compaction round. */
+class CompactionRound {
+ /** Compaction round ID. */
+ final UUID id = UUID.randomUUID();
+
+ /** Total number of partition files. */
+ final int partitionFileCount;
+
+ /** Total number of all partition delta files. */
+ final int totalDeltaFileCount;
+
+ /** Queue of delta files (one per partition) to be merged into the
partition file. */
+ final Queue<DeltaFileForCompaction> queue;
+
+ private CompactionRound(int partitionFileCount, int totalDeltaFileCount,
Queue<DeltaFileForCompaction> queue) {
+ this.partitionFileCount = partitionFileCount;
+ this.totalDeltaFileCount = totalDeltaFileCount;
+ this.queue = queue;
+ }
+
+ static CompactionRound create(FilePageStoreManager filePageStoreManager) {
+ var partitionFileCount = new int[]{0};
+ var totalDeltaFileCount = new int[]{0};
+
+ var queue = new ConcurrentLinkedQueue<DeltaFileForCompaction>();
+
+ filePageStoreManager.allPageStores().forEach(pageStore -> {
+ partitionFileCount[0]++;
+
+ totalDeltaFileCount[0] += pageStore.pageStore().deltaFileCount();
+
+ DeltaFilePageStoreIo deltaFileToCompaction =
pageStore.pageStore().getDeltaFileToCompaction();
+
+ if (deltaFileToCompaction != null) {
+ queue.add(new DeltaFileForCompaction(pageStore,
deltaFileToCompaction));
+ }
+ });
+
+ return new CompactionRound(partitionFileCount[0],
totalDeltaFileCount[0], queue);
+ }
+}
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java
index cae78c256cb..675193e27f5 100644
---
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java
@@ -20,16 +20,11 @@ package
org.apache.ignite.internal.pagememory.persistence.compaction;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
-import static java.util.stream.Collectors.toCollection;
import static
org.apache.ignite.internal.failure.FailureType.SYSTEM_WORKER_TERMINATION;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
-import java.util.Objects;
-import java.util.Queue;
-import java.util.UUID;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -46,7 +41,6 @@ import
org.apache.ignite.internal.pagememory.persistence.WriteSpeedFormatter;
import
org.apache.ignite.internal.pagememory.persistence.store.DeltaFilePageStoreIo;
import org.apache.ignite.internal.pagememory.persistence.store.FilePageStore;
import
org.apache.ignite.internal.pagememory.persistence.store.FilePageStoreManager;
-import
org.apache.ignite.internal.pagememory.persistence.store.GroupPageStoresMap.GroupPartitionPageStore;
import org.apache.ignite.internal.thread.IgniteThread;
import org.apache.ignite.internal.thread.IgniteThreadFactory;
import org.apache.ignite.internal.util.IgniteUtils;
@@ -67,6 +61,10 @@ import org.jetbrains.annotations.Nullable;
* <li>Fsync of the partition file.</li>
* <li>Remove delta file from {@link FilePageStore} and file system.</li>
* </ul>
+ *
+ * <p>Optimization has been implemented to speed up checkpointing. When a
checkpoint starts, compaction is stopped to allow for the IO
+ * operations for it. However, this is only true as long as the total number
of delta files does not exceed 3 * partitions, to prevent
+ * errors due to a large number of open files.</p>
*/
public class Compactor extends IgniteWorker {
/** Logger. */
@@ -92,8 +90,11 @@ public class Compactor extends IgniteWorker {
private final PartitionDestructionLockManager
partitionDestructionLockManager;
- /** Guarded by {@link #mux}. */
- private boolean paused;
+ /** Flag indicating whether a checkpoint has started. Guarded by {@link
#mux}. */
+ private boolean isCheckpointStarted;
+
+ /** Current compaction round, {@code null} means the round has either not
started yet or has finished. */
+ private volatile @Nullable CompactionRound currentCompactionRound;
/**
* Creates new ignite worker with given parameters.
@@ -167,7 +168,7 @@ public class Compactor extends IgniteWorker {
void waitDeltaFiles() {
try {
synchronized (mux) {
- while ((!addedDeltaFiles || paused) && !isCancelled()) {
+ while (!addedDeltaFiles && !isCancelled()) {
blockingSectionBegin();
try {
@@ -206,106 +207,107 @@ public class Compactor extends IgniteWorker {
* pages, we must look for it from the oldest delta file.
*/
void doCompaction() {
- while (!isPaused()) {
- // Let's collect one delta file for each partition.
- Queue<DeltaFileForCompaction> queue =
filePageStoreManager.allPageStores()
- .map(groupPartitionFilePageStore -> {
- DeltaFilePageStoreIo deltaFileToCompaction =
groupPartitionFilePageStore.pageStore().getDeltaFileToCompaction();
-
- if (deltaFileToCompaction == null) {
- return null;
- }
-
- return new
DeltaFileForCompaction(groupPartitionFilePageStore, deltaFileToCompaction);
- })
- .filter(Objects::nonNull)
- .collect(toCollection(ConcurrentLinkedQueue::new));
+ while (true) {
+ CompactionRound compactionRound =
CompactionRound.create(filePageStoreManager);
- if (queue.isEmpty()) {
+ if (compactionRound.queue.isEmpty()) {
break;
}
- String compactionId = UUID.randomUUID().toString();
+ setCurrentCompactionRound(compactionRound);
- if (LOG.isInfoEnabled()) {
- LOG.info("Starting new compaction round [compactionId={},
files={}]", compactionId, queue.size());
- }
+ try {
+ if (shouldStopCompaction()) {
+ break;
+ }
- CompactionMetricsTracker tracker = new CompactionMetricsTracker();
+ if (LOG.isInfoEnabled()) {
+ LOG.info(
+ "Starting new compaction round [compactionId={},
files={}]",
+ compactionRound.id,
+ compactionRound.queue.size()
+ );
+ }
- updateHeartbeat();
+ CompactionMetricsTracker tracker = new
CompactionMetricsTracker();
- int threads = threadPoolExecutor == null ? 1 :
threadPoolExecutor.getMaximumPoolSize();
+ updateHeartbeat();
- CompletableFuture<?>[] futures = new CompletableFuture[threads];
+ int threads = threadPoolExecutor == null ? 1 :
threadPoolExecutor.getMaximumPoolSize();
- for (int i = 0; i < threads; i++) {
- CompletableFuture<?> future = futures[i] = new
CompletableFuture<>();
+ CompletableFuture<?>[] futures = new
CompletableFuture[threads];
- Runnable merger = () -> {
- DeltaFileForCompaction toMerge;
+ for (int i = 0; i < threads; i++) {
+ CompletableFuture<?> future = futures[i] = new
CompletableFuture<>();
- try {
- while (true) {
- toMerge = queue.poll();
+ Runnable merger = () -> {
+ DeltaFileForCompaction toMerge;
- if (toMerge == null) {
- break;
- }
+ try {
+ while (true) {
+ toMerge = compactionRound.queue.poll();
+
+ if (toMerge == null) {
+ break;
+ }
- GroupPartitionId groupPartitionId =
toMerge.groupPartitionFilePageStore.groupPartitionId();
+ GroupPartitionId groupPartitionId =
toMerge.groupPartitionFilePageStore.groupPartitionId();
- Lock partitionDestructionLock =
partitionDestructionLockManager.destructionLock(groupPartitionId).readLock();
+ Lock partitionDestructionLock =
partitionDestructionLockManager.destructionLock(groupPartitionId)
+ .readLock();
- partitionDestructionLock.lock();
+ partitionDestructionLock.lock();
- try {
- mergeDeltaFileToMainFile(
-
toMerge.groupPartitionFilePageStore.pageStore(),
- toMerge.deltaFilePageStoreIo,
- tracker
- );
- } finally {
- partitionDestructionLock.unlock();
+ try {
+ mergeDeltaFileToMainFile(
+
toMerge.groupPartitionFilePageStore.pageStore(),
+ toMerge.deltaFilePageStoreIo,
+ tracker
+ );
+ } finally {
+ partitionDestructionLock.unlock();
+ }
}
+ } catch (Throwable ex) {
+ future.completeExceptionally(ex);
}
- } catch (Throwable ex) {
- future.completeExceptionally(ex);
- }
- future.complete(null);
- };
+ future.complete(null);
+ };
- if (isCancelled()) {
- return;
- }
+ if (isCancelled()) {
+ return;
+ }
- if (threadPoolExecutor == null) {
- merger.run();
- } else {
- threadPoolExecutor.execute(merger);
+ if (threadPoolExecutor == null) {
+ merger.run();
+ } else {
+ threadPoolExecutor.execute(merger);
+ }
}
- }
- updateHeartbeat();
+ updateHeartbeat();
- // Wait and check for errors.
- CompletableFuture.allOf(futures).join();
+ // Wait and check for errors.
+ CompletableFuture.allOf(futures).join();
- tracker.onCompactionEnd();
+ tracker.onCompactionEnd();
- if (LOG.isInfoEnabled()) {
- long totalWriteBytes = (long) pageSize *
tracker.dataPagesWritten();
- long totalDurationInNanos = tracker.totalDuration(NANOSECONDS);
+ if (LOG.isInfoEnabled()) {
+ long totalWriteBytes = (long) pageSize *
tracker.dataPagesWritten();
+ long totalDurationInNanos =
tracker.totalDuration(NANOSECONDS);
- LOG.info(
- "Compaction round finished [compactionId={}, pages={},
skipped={}, duration={}ms, avgWriteSpeed={}MB/s]",
- compactionId,
- tracker.dataPagesWritten(),
- tracker.dataPagesSkipped(),
- tracker.totalDuration(MILLISECONDS),
- WriteSpeedFormatter.formatWriteSpeed(totalWriteBytes,
totalDurationInNanos)
- );
+ LOG.info(
+ "Compaction round finished [compactionId={},
pages={}, skipped={}, duration={}ms, avgWriteSpeed={}MB/s]",
+ compactionRound.id,
+ tracker.dataPagesWritten(),
+ tracker.dataPagesSkipped(),
+ tracker.totalDuration(MILLISECONDS),
+
WriteSpeedFormatter.formatWriteSpeed(totalWriteBytes, totalDurationInNanos)
+ );
+ }
+ } finally {
+ resetCurrentCompactionRound();
}
}
}
@@ -357,8 +359,6 @@ public class Compactor extends IgniteWorker {
}
synchronized (mux) {
- paused = false;
-
// Do not interrupt runner thread.
isCancelled.set(true);
@@ -487,38 +487,21 @@ public class Compactor extends IgniteWorker {
}
/**
- * Delta file for compaction.
+ * Notifies about the start of a checkpoint. It is expected that this
method will not be called multiple times in parallel and
+ * subsequent calls will strictly be calls after {@link
#notifyCheckpointFinish}.
*/
- private static class DeltaFileForCompaction {
- private final GroupPartitionPageStore<FilePageStore>
groupPartitionFilePageStore;
-
- private final DeltaFilePageStoreIo deltaFilePageStoreIo;
-
- private DeltaFileForCompaction(
- GroupPartitionPageStore<FilePageStore>
groupPartitionFilePageStore,
- DeltaFilePageStoreIo deltaFilePageStoreIo
- ) {
- this.groupPartitionFilePageStore = groupPartitionFilePageStore;
- this.deltaFilePageStoreIo = deltaFilePageStoreIo;
- }
- }
-
- /**
- * Pauses the compactor until it is resumed or compactor is stopped. It is
expected that this method will not be called multiple times
- * in parallel and subsequent calls will strictly be calls after {@link
#resume}.
- */
- public void pause() {
+ public void notifyCheckpointStart() {
synchronized (mux) {
- assert !paused : "It is expected that a further pause will only
occur after resume";
+ assert !isCheckpointStarted : "Expected to be called after a
checkpoint is finished";
- paused = true;
+ isCheckpointStarted = true;
}
}
- /** Resumes the compactor if it was paused. It is expected that this
method will not be called multiple times in parallel. */
- public void resume() {
+ /** Notifies about the finish of a checkpoint. It is expected that this
method will not be called multiple times in parallel. */
+ public void notifyCheckpointFinish() {
synchronized (mux) {
- paused = false;
+ isCheckpointStarted = false;
// Force compaction as we could stop somewhere in the middle and
we need to continue compaction.
addedDeltaFiles = true;
@@ -527,14 +510,37 @@ public class Compactor extends IgniteWorker {
}
}
- /** Must be called before each IO operation to pause the current
compaction and to provide IO resources to other components. */
- private boolean isPaused() {
+ private boolean shouldStopCompaction(FilePageStore filePageStore) {
+ return filePageStore.isMarkedToDestroy() || shouldStopCompaction();
+ }
+
+ private boolean shouldStopCompaction() {
+ if (isCancelled()) {
+ return true;
+ }
+
synchronized (mux) {
- return paused;
+ // We only need to stop compaction if a checkpoint has occurred in
parallel and the total number of delta files is not yet high
+ // enough. The number 3 is based on observations of failed tests
and intuition; it may change in the future.
+ return isCheckpointStarted &&
currentCompactionRound().totalDeltaFileCount < (3 *
currentCompactionRound().partitionFileCount);
}
}
- private boolean shouldStopCompaction(FilePageStore filePageStore) {
- return isCancelled() || filePageStore.isMarkedToDestroy() ||
isPaused();
+ private void setCurrentCompactionRound(CompactionRound compactionRound) {
+ assert currentCompactionRound == null : "Previous round is expected to
be completed";
+
+ currentCompactionRound = compactionRound;
+ }
+
+ private void resetCurrentCompactionRound() {
+ currentCompactionRound = null;
+ }
+
+ private CompactionRound currentCompactionRound() {
+ CompactionRound compactionRound = currentCompactionRound;
+
+ assert compactionRound != null : "Compaction round has not yet
started";
+
+ return compactionRound;
}
}
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/DeltaFileForCompaction.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/DeltaFileForCompaction.java
new file mode 100644
index 00000000000..ca8cc2bf5c9
--- /dev/null
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/DeltaFileForCompaction.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.compaction;
+
+import
org.apache.ignite.internal.pagememory.persistence.store.DeltaFilePageStoreIo;
+import org.apache.ignite.internal.pagememory.persistence.store.FilePageStore;
+import
org.apache.ignite.internal.pagememory.persistence.store.GroupPageStoresMap.GroupPartitionPageStore;
+
+/** Delta file for compaction. */
+class DeltaFileForCompaction {
+ final GroupPartitionPageStore<FilePageStore> groupPartitionFilePageStore;
+
+ final DeltaFilePageStoreIo deltaFilePageStoreIo;
+
+ DeltaFileForCompaction(
+ GroupPartitionPageStore<FilePageStore> groupPartitionFilePageStore,
+ DeltaFilePageStoreIo deltaFilePageStoreIo
+ ) {
+ this.groupPartitionFilePageStore = groupPartitionFilePageStore;
+ this.deltaFilePageStoreIo = deltaFilePageStoreIo;
+ }
+}
diff --git
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/compaction/CompactorTest.java
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/compaction/CompactorTest.java
index 0ab77fe9ae5..9d32a5a3622 100644
---
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/compaction/CompactorTest.java
+++
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/compaction/CompactorTest.java
@@ -47,6 +47,7 @@ import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.internal.failure.FailureManager;
import org.apache.ignite.internal.pagememory.io.PageIo;
@@ -220,13 +221,11 @@ public class CompactorTest extends BaseIgniteAbstractTest
{
when(filePageStore.getDeltaFileToCompaction()).then(answer ->
deltaFilePageStoreIoRef.get());
- FilePageStoreManager filePageStoreManager =
mock(FilePageStoreManager.class);
-
- GroupPageStoresMap<FilePageStore> groupPageStoresMap = new
GroupPageStoresMap<>(new LongOperationAsyncExecutor("test", log));
+ var groupPageStoresMap = new GroupPageStoresMap<FilePageStore>(new
LongOperationAsyncExecutor("test", log));
groupPageStoresMap.put(new GroupPartitionId(0, 0), filePageStore);
- when(filePageStoreManager.allPageStores()).then(answer ->
groupPageStoresMap.getAll());
+ FilePageStoreManager filePageStoreManager =
newFilePageStoreManager(groupPageStoresMap);
Compactor compactor = spy(newCompactor(filePageStoreManager));
@@ -303,18 +302,21 @@ public class CompactorTest extends BaseIgniteAbstractTest
{
}
@Test
- void testPauseResume() throws Exception {
- Compactor compactor = spy(newCompactor());
+ void testNotifyCheckpointStartAndFinish() throws Exception {
+ var groupPageStoresMap = new GroupPageStoresMap<FilePageStore>(new
LongOperationAsyncExecutor("test", log));
+
+ FilePageStoreManager filePageStoreManager =
newFilePageStoreManager(groupPageStoresMap);
+
+ Compactor compactor = spy(newCompactor(filePageStoreManager));
- compactor.pause();
+ compactor.notifyCheckpointStart();
DeltaFilePageStoreIo deltaFilePageStoreIo =
createDeltaFilePageStoreIo();
FilePageStore filePageStore =
createFilePageStore(deltaFilePageStoreIo);
- assertThat(
- runAsync(() ->
compactor.mergeDeltaFileToMainFile(filePageStore, deltaFilePageStoreIo, new
CompactionMetricsTracker())),
- willCompleteSuccessfully()
- );
+ groupPageStoresMap.put(new GroupPartitionId(0, 0), filePageStore);
+
+ assertThat(runAsync(compactor::doCompaction),
willCompleteSuccessfully());
verify(filePageStore, never()).write(anyLong(), any());
verify(filePageStore, never()).sync();
@@ -324,12 +326,9 @@ public class CompactorTest extends BaseIgniteAbstractTest {
verify(deltaFilePageStoreIo, never()).markMergedToFilePageStore();
verify(deltaFilePageStoreIo, never()).stop(anyBoolean());
- compactor.resume();
+ compactor.notifyCheckpointFinish();
- assertThat(
- runAsync(() ->
compactor.mergeDeltaFileToMainFile(filePageStore, deltaFilePageStoreIo, new
CompactionMetricsTracker())),
- willCompleteSuccessfully()
- );
+ assertThat(runAsync(compactor::doCompaction),
willCompleteSuccessfully());
verify(filePageStore).write(anyLong(), any());
verify(filePageStore).sync();
@@ -341,18 +340,15 @@ public class CompactorTest extends BaseIgniteAbstractTest
{
}
@Test
- void testTriggerCompactionAfterPause() {
+ void testTriggerCompactionAfterNotifyCheckpointStartAndFinish() {
Compactor compactor = spy(newCompactor());
- compactor.pause();
+ compactor.notifyCheckpointStart();
CompletableFuture<?> waitDeltaFilesFuture =
runAsync(compactor::waitDeltaFiles);
assertThat(waitDeltaFilesFuture, willTimeoutFast());
compactor.triggerCompaction();
- assertThat(waitDeltaFilesFuture, willTimeoutFast());
-
- compactor.resume();
assertThat(waitDeltaFilesFuture, willCompleteSuccessfully());
}
@@ -363,7 +359,7 @@ public class CompactorTest extends BaseIgniteAbstractTest {
CompletableFuture<?> waitDeltaFilesFuture =
runAsync(compactor::waitDeltaFiles);
assertThat(waitDeltaFilesFuture, willTimeoutFast());
- compactor.resume();
+ compactor.notifyCheckpointFinish();
assertThat(waitDeltaFilesFuture, willCompleteSuccessfully());
}
@@ -407,8 +403,24 @@ public class CompactorTest extends BaseIgniteAbstractTest {
private static FilePageStore createFilePageStore(DeltaFilePageStoreIo
deltaFilePageStoreIo) {
FilePageStore filePageStore = mock(FilePageStore.class);
-
when(filePageStore.removeDeltaFile(eq(deltaFilePageStoreIo))).thenReturn(true);
+ var removedDeltaFile = new AtomicBoolean();
+
+
when(filePageStore.removeDeltaFile(eq(deltaFilePageStoreIo))).then(invocation
-> {
+ removedDeltaFile.set(true);
+
+ return true;
+ });
+ when(filePageStore.getDeltaFileToCompaction()).then(invocation ->
removedDeltaFile.get() ? null : deltaFilePageStoreIo);
+ when(filePageStore.deltaFileCount()).thenReturn(1);
return filePageStore;
}
+
+ private static FilePageStoreManager
newFilePageStoreManager(GroupPageStoresMap<FilePageStore> map) {
+ FilePageStoreManager manager = mock(FilePageStoreManager.class);
+
+ when(manager.allPageStores()).then(invocation -> map.getAll());
+
+ return manager;
+ }
}