This is an automated email from the ASF dual-hosted git repository.
apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 9108d151af5 IGNITE-26295 Integrate Raft Log GC into the new storage
(#7880)
9108d151af5 is described below
commit 9108d151af544a5038c478143a83ee1b26975bdc
Author: Alexander Polovtcev <[email protected]>
AuthorDate: Thu Apr 2 14:27:30 2026 +0300
IGNITE-26295 Integrate Raft Log GC into the new storage (#7880)
---
...le.java => BeforeIndexFileCreatedCallback.java} | 28 +--
.../raft/storage/segstore/FileProperties.java | 13 +-
.../raft/storage/segstore/IndexFileManager.java | 29 ++-
.../MostGarbageFirstCompactionStrategy.java | 72 +++++++
.../segstore/OldestFirstCompactionStrategy.java | 46 +++++
.../raft/storage/segstore/RaftLogCheckpointer.java | 17 +-
.../storage/segstore/RaftLogGarbageCollector.java | 169 ++++++++++++++-
.../storage/segstore/ReadModeIndexMemTable.java | 8 +-
...ble.java => SegmentFileCompactionStrategy.java} | 31 ++-
.../raft/storage/segstore/SegmentFileManager.java | 19 +-
.../segstore/AbstractCompactionStrategyTest.java | 90 ++++++++
.../storage/segstore/AbstractMemTableTest.java | 6 +-
.../MostGarbageFirstCompactionStrategyTest.java | 81 ++++++++
.../OldestFirstCompactionStrategyTest.java | 49 +++++
.../storage/segstore/RaftLogCheckpointerTest.java | 45 ++--
.../segstore/RaftLogGarbageCollectorTest.java | 56 +++--
.../storage/segstore/RaftLogGcSoftLimitTest.java | 226 +++++++++++++++++++++
17 files changed, 866 insertions(+), 119 deletions(-)
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/ReadModeIndexMemTable.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/BeforeIndexFileCreatedCallback.java
similarity index 54%
copy from
modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/ReadModeIndexMemTable.java
copy to
modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/BeforeIndexFileCreatedCallback.java
index ed5f5100d37..2786869b1ba 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/ReadModeIndexMemTable.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/BeforeIndexFileCreatedCallback.java
@@ -17,28 +17,8 @@
package org.apache.ignite.internal.raft.storage.segstore;
-import java.util.Iterator;
-import java.util.Map.Entry;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Immutable version of an index memtable used by the {@link
RaftLogCheckpointer}.
- *
- * @see WriteModeIndexMemTable
- */
-interface ReadModeIndexMemTable {
- /**
- * Returns information about a segment file for the given group ID or
{@code null} if it is not present in this memtable.
- */
- @Nullable SegmentInfo segmentInfo(long groupId);
-
- /**
- * Returns an iterator over all {@code Group ID -> SegmentInfo} entries in
this memtable.
- */
- Iterator<Entry<Long, SegmentInfo>> iterator();
-
- /**
- * Returns the number of Raft Group IDs stored in this memtable.
- */
- int numGroups();
+/** Callback executed by the {@link RaftLogCheckpointer} before an index file
is created. */
+@FunctionalInterface
+interface BeforeIndexFileCreatedCallback {
+ void beforeIndexFileCreated(long indexFileSize);
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/FileProperties.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/FileProperties.java
index 0fc9647903b..1a1d1b65e2d 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/FileProperties.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/FileProperties.java
@@ -22,7 +22,7 @@ import org.apache.ignite.internal.tostring.S;
/**
* Represents properties common for some file types (namely Segment and Index
files) used by the log storage.
*/
-class FileProperties {
+class FileProperties implements Comparable<FileProperties> {
/** File ordinal. Incremented each time a new file is created. */
private final int ordinal;
@@ -54,6 +54,17 @@ class FileProperties {
return generation;
}
+ @Override
+ public int compareTo(FileProperties o) {
+ int cmp = Integer.compare(ordinal, o.ordinal);
+
+ if (cmp != 0) {
+ return cmp;
+ }
+
+ return Integer.compare(generation, o.generation);
+ }
+
@Override
public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) {
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManager.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManager.java
index b893f7fe643..4b8453c69ad 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManager.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManager.java
@@ -51,6 +51,7 @@ import java.util.stream.Stream;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.VisibleForTesting;
/**
* File manager responsible for persisting {@link ReadModeIndexMemTable}s to
index files.
@@ -215,10 +216,8 @@ class IndexFileManager {
try (var os = new
BufferedOutputStream(Files.newOutputStream(tmpFilePath, CREATE_NEW, WRITE))) {
os.write(fileHeaderWithIndexMetas.header());
- Iterator<Entry<Long, SegmentInfo>> it = indexMemTable.iterator();
-
- while (it.hasNext()) {
- SegmentInfo segmentInfo = it.next().getValue();
+ for (Entry<Long, SegmentInfo> longSegmentInfoEntry :
indexMemTable) {
+ SegmentInfo segmentInfo = longSegmentInfoEntry.getValue();
// Segment Info may not contain payload in case of suffix
truncation, see "IndexMemTable#truncateSuffix".
if (segmentInfo.size() > 0) {
@@ -385,11 +384,7 @@ class IndexFileManager {
var metaSpecs = new ArrayList<IndexMetaSpec>(numGroups);
- Iterator<Entry<Long, SegmentInfo>> it = indexMemTable.iterator();
-
- while (it.hasNext()) {
- Entry<Long, SegmentInfo> entry = it.next();
-
+ for (Entry<Long, SegmentInfo> entry : indexMemTable) {
// Using the boxed value to avoid unnecessary autoboxing later.
Long groupId = entry.getKey();
@@ -504,6 +499,19 @@ class IndexFileManager {
return payloadBuffer.array();
}
+ /**
+ * Computes the size in bytes that the index file for the given {@code
indexMemTable} will occupy on disk.
+ */
+ static long computeIndexFileSize(ReadModeIndexMemTable indexMemTable) {
+ long total = headerSize(indexMemTable.numGroups());
+
+ for (Entry<Long, SegmentInfo> longSegmentInfoEntry : indexMemTable) {
+ total += payloadSize(longSegmentInfoEntry.getValue());
+ }
+
+ return total;
+ }
+
private static int headerSize(int numGroups) {
return COMMON_META_SIZE + numGroups * GROUP_META_SIZE;
}
@@ -512,7 +520,8 @@ class IndexFileManager {
return segmentInfo.size() * Integer.BYTES;
}
- private static String indexFileName(FileProperties fileProperties) {
+ @VisibleForTesting
+ static String indexFileName(FileProperties fileProperties) {
return String.format(INDEX_FILE_NAME_FORMAT, fileProperties.ordinal(),
fileProperties.generation());
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/MostGarbageFirstCompactionStrategy.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/MostGarbageFirstCompactionStrategy.java
new file mode 100644
index 00000000000..52d6ebefef8
--- /dev/null
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/MostGarbageFirstCompactionStrategy.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft.storage.segstore;
+
+import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
+import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Comparator;
+import java.util.stream.Stream;
+
+/**
+ * Compaction strategy that prioritizes files with the most dead entries,
fully-deletable files first.
+ */
+class MostGarbageFirstCompactionStrategy implements
SegmentFileCompactionStrategy {
+ private final Path segmentFilesDir;
+
+ private final IndexFileManager indexFileManager;
+
+ MostGarbageFirstCompactionStrategy(Path segmentFilesDir, IndexFileManager
indexFileManager) {
+ this.segmentFilesDir = segmentFilesDir;
+ this.indexFileManager = indexFileManager;
+ }
+
+ @Override
+ public Stream<FileProperties> selectCandidates() throws IOException {
+ var scores = new Object2LongOpenHashMap<FileProperties>();
+
+ Comparator<FileProperties> comparator =
+ Comparator.<FileProperties>comparingLong(props ->
scores.computeIfAbsent(props, this::score))
+ .thenComparing(Comparator.naturalOrder());
+
+ //noinspection resource
+ return Files.list(segmentFilesDir)
+ .filter(p -> !p.getFileName().toString().endsWith(".tmp"))
+ .map(SegmentFile::fileProperties)
+ .filter(props ->
Files.exists(indexFileManager.indexFilePath(props)))
+ .sorted(comparator);
+ }
+
+ private long score(FileProperties props) {
+ Long2ObjectMap<IndexFileMeta> description =
indexFileManager.describeSegmentFile(props.ordinal());
+
+ if (description.isEmpty()) {
+ return -1; // Fully deletable — highest priority.
+ }
+
+ long liveCount = 0;
+
+ for (IndexFileMeta meta : description.values()) {
+ liveCount += meta.lastLogIndexExclusive() -
meta.firstLogIndexInclusive();
+ }
+
+ return liveCount;
+ }
+}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/OldestFirstCompactionStrategy.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/OldestFirstCompactionStrategy.java
new file mode 100644
index 00000000000..1eeaeb7a4b1
--- /dev/null
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/OldestFirstCompactionStrategy.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft.storage.segstore;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.stream.Stream;
+
+/**
+ * Compaction strategy that yields fully-checkpointed segment files in write
order (oldest first).
+ */
+class OldestFirstCompactionStrategy implements SegmentFileCompactionStrategy {
+ private final Path segmentFilesDir;
+
+ private final IndexFileManager indexFileManager;
+
+ OldestFirstCompactionStrategy(Path segmentFilesDir, IndexFileManager
indexFileManager) {
+ this.segmentFilesDir = segmentFilesDir;
+ this.indexFileManager = indexFileManager;
+ }
+
+ @Override
+ public Stream<FileProperties> selectCandidates() throws IOException {
+ return Files.list(segmentFilesDir)
+ .filter(p -> !p.getFileName().toString().endsWith(".tmp"))
+ .map(SegmentFile::fileProperties)
+ .sorted()
+ .filter(props ->
Files.exists(indexFileManager.indexFilePath(props)));
+ }
+}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogCheckpointer.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogCheckpointer.java
index 5913044052b..724e080e067 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogCheckpointer.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogCheckpointer.java
@@ -21,7 +21,7 @@ import static
org.apache.ignite.internal.failure.FailureType.CRITICAL_ERROR;
import static
org.apache.ignite.internal.raft.storage.segstore.SegmentFileManager.SWITCH_SEGMENT_RECORD;
import static
org.apache.ignite.internal.raft.storage.segstore.SegmentInfo.MISSING_SEGMENT_FILE_OFFSET;
import static org.apache.ignite.internal.util.IgniteUtils.closeAllManually;
-import static org.apache.ignite.lang.ErrorGroups.Marshalling.COMMON_ERR;
+import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -62,14 +62,18 @@ class RaftLogCheckpointer {
private final FailureProcessor failureProcessor;
+ private final BeforeIndexFileCreatedCallback beforeIndexFileCreated;
+
RaftLogCheckpointer(
String nodeName,
IndexFileManager indexFileManager,
FailureProcessor failureProcessor,
- int maxQueueSize
+ int maxQueueSize,
+ BeforeIndexFileCreatedCallback beforeIndexFileCreated
) {
this.indexFileManager = indexFileManager;
this.failureProcessor = failureProcessor;
+ this.beforeIndexFileCreated = beforeIndexFileCreated;
queue = new CheckpointQueue(maxQueueSize);
checkpointThread = new IgniteThread(nodeName, "segstore-checkpoint",
new CheckpointTask());
@@ -91,7 +95,7 @@ class RaftLogCheckpointer {
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
- throw new IgniteInternalException(COMMON_ERR, "Interrupted while
waiting for the checkpoint thread to finish.", e);
+ throw new IgniteInternalException(INTERNAL_ERR, "Interrupted while
waiting for the checkpoint thread to finish.", e);
}
}
@@ -101,7 +105,7 @@ class RaftLogCheckpointer {
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
- throw new IgniteInternalException(COMMON_ERR, "Interrupted while
adding an entry to the checkpoint queue.", e);
+ throw new IgniteInternalException(INTERNAL_ERR, "Interrupted while
adding an entry to the checkpoint queue.", e);
}
}
@@ -155,6 +159,11 @@ class RaftLogCheckpointer {
segmentFile.sync();
+ long indexFileSize =
IndexFileManager.computeIndexFileSize(entry.memTable());
+
+ // Notify about the upcoming log size increase.
+
beforeIndexFileCreated.beforeIndexFileCreated(indexFileSize);
+
indexFileManager.saveNewIndexMemtable(entry.memTable());
queue.removeHead();
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogGarbageCollector.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogGarbageCollector.java
index a1b5e9cb5a1..3b09ad808e6 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogGarbageCollector.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogGarbageCollector.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.raft.storage.segstore;
import static java.lang.Math.toIntExact;
import static java.nio.file.StandardOpenOption.CREATE_NEW;
import static java.nio.file.StandardOpenOption.WRITE;
+import static org.apache.ignite.internal.failure.FailureType.CRITICAL_ERROR;
import static
org.apache.ignite.internal.raft.storage.segstore.IndexFileManager.indexFileProperties;
import static
org.apache.ignite.internal.raft.storage.segstore.SegmentFileManager.HEADER_RECORD;
import static
org.apache.ignite.internal.raft.storage.segstore.SegmentPayload.CRC_SIZE_BYTES;
@@ -29,20 +30,28 @@ import static
org.apache.ignite.internal.raft.storage.segstore.SegmentPayload.TR
import static
org.apache.ignite.internal.raft.storage.segstore.SegmentPayloadParser.endOfSegmentReached;
import static
org.apache.ignite.internal.raft.storage.segstore.SegmentPayloadParser.validateSegmentFileHeader;
import static org.apache.ignite.internal.util.IgniteUtils.atomicMoveFile;
+import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
import java.io.IOException;
+import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
+import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.FileChannel;
import java.nio.channels.WritableByteChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.LockSupport;
import java.util.stream.Stream;
+import org.apache.ignite.internal.failure.FailureContext;
+import org.apache.ignite.internal.failure.FailureProcessor;
+import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.raft.util.VarlenEncoder;
+import org.apache.ignite.internal.thread.IgniteThread;
import org.apache.ignite.raft.jraft.storage.LogStorage;
import org.jetbrains.annotations.VisibleForTesting;
@@ -53,6 +62,12 @@ import org.jetbrains.annotations.VisibleForTesting;
* of segment files. This process reclaims disk space occupied by log entries
that have been truncated via {@link LogStorage#truncatePrefix}
* or {@link LogStorage#truncateSuffix} operations.
*
+ * <h2>Size tracking</h2>
+ * The GC tracks the total size of all log storage files (segment files and
index files) via an {@link AtomicLong}. The counter is
+ * incremented via {@link #onLogStorageSizeIncreased(long)} before a new
segment file is allocated or before an index file is written to
+ * disk, and decremented inside {@link #runCompaction} after files are
deleted. The GC thread wakes up on each of these events and compacts
+ * files until the total size drops below the configured {@link
#softLimitBytes soft limit}.
+ *
* <h2>Compaction Process</h2>
* When a segment file is selected for compaction, the GC:
* <ol>
@@ -71,11 +86,66 @@ class RaftLogGarbageCollector {
private final IndexFileManager indexFileManager;
+ private final long softLimitBytes;
+
+ private final SegmentFileCompactionStrategy strategy;
+
+ private final FailureProcessor failureProcessor;
+
private final AtomicLong logSizeBytes = new AtomicLong();
- RaftLogGarbageCollector(Path segmentFilesDir, IndexFileManager
indexFileManager) {
+ private final Thread gcThread;
+
+ RaftLogGarbageCollector(
+ String nodeName,
+ Path segmentFilesDir,
+ IndexFileManager indexFileManager,
+ long softLimitBytes,
+ SegmentFileCompactionStrategy strategy,
+ FailureProcessor failureProcessor
+ ) {
this.segmentFilesDir = segmentFilesDir;
this.indexFileManager = indexFileManager;
+ this.softLimitBytes = softLimitBytes;
+ this.strategy = strategy;
+ this.failureProcessor = failureProcessor;
+
+ gcThread = new IgniteThread(nodeName, "segstore-gc", new GcTask());
+ }
+
+ void start() throws IOException {
+ initLogSizeFromDisk();
+
+ gcThread.start();
+ }
+
+ void stop() {
+ gcThread.interrupt();
+
+ try {
+ gcThread.join();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+
+ throw new IgniteInternalException(INTERNAL_ERR, "Interrupted while
waiting for the GC thread to finish.", e);
+ }
+ }
+
+ /**
+ * Notifies the GC that the log storage size is about to increase by
{@code addedBytes} and wakes the GC thread if the soft limit is now
+ * exceeded. Must be called before the corresponding file is written to
disk.
+ */
+ void onLogStorageSizeIncreased(long addedBytes) {
+ long logSize = logSizeBytes.addAndGet(addedBytes);
+
+ if (logSize >= softLimitBytes) {
+ LOG.info(
+ "Log size is above the soft limit, waking up GC thread
[current log size = {} bytes, soft limit = {} bytes].",
+ logSize, softLimitBytes
+ );
+
+ LockSupport.unpark(gcThread);
+ }
}
void cleanupLeftoverFiles() throws IOException {
@@ -137,29 +207,43 @@ class RaftLogGarbageCollector {
}
@VisibleForTesting
- void runCompaction(SegmentFile segmentFile) throws IOException {
- LOG.info("Compacting segment file [path = {}].", segmentFile.path());
+ long logSizeBytes() {
+ return logSizeBytes.get();
+ }
+
+ @VisibleForTesting
+ void runCompaction(FileProperties segmentFileProperties) throws
IOException {
+ Path segmentFilePath =
segmentFilesDir.resolve(SegmentFile.fileName(segmentFileProperties));
+
+ LOG.info("Compacting segment file [path = {}].", segmentFilePath);
+ // TODO: Skip non-compactible files, see
https://issues.apache.org/jira/browse/IGNITE-28417.
Long2ObjectMap<IndexFileMeta> segmentFileDescription
- =
indexFileManager.describeSegmentFile(segmentFile.fileProperties().ordinal());
+ =
indexFileManager.describeSegmentFile(segmentFileProperties.ordinal());
boolean canRemoveSegmentFile = segmentFileDescription.isEmpty();
- Path indexFilePath =
indexFileManager.indexFilePath(segmentFile.fileProperties());
+ Path indexFilePath =
indexFileManager.indexFilePath(segmentFileProperties);
long logSizeDelta;
if (canRemoveSegmentFile) {
- indexFileManager.onIndexFileRemoved(segmentFile.fileProperties());
+ indexFileManager.onIndexFileRemoved(segmentFileProperties);
- logSizeDelta = Files.size(segmentFile.path()) +
Files.size(indexFilePath);
+ logSizeDelta = Files.size(segmentFilePath) +
Files.size(indexFilePath);
} else {
- logSizeDelta = compactSegmentFile(segmentFile, indexFilePath,
segmentFileDescription);
+ SegmentFile segmentFile =
SegmentFile.openExisting(segmentFilePath, false);
+
+ try {
+ logSizeDelta = compactSegmentFile(segmentFile, indexFilePath,
segmentFileDescription);
+ } finally {
+ segmentFile.close();
+ }
}
// Remove the previous generation of the segment file and its index.
This is safe to do, because we rely on the file system
// guarantees that other threads reading from the segment file will
still be able to do that even if the file is deleted.
- Files.delete(segmentFile.path());
+ Files.delete(segmentFilePath);
Files.delete(indexFilePath);
long newLogSize = logSizeBytes.addAndGet(-logSizeDelta);
@@ -168,12 +252,12 @@ class RaftLogGarbageCollector {
if (canRemoveSegmentFile) {
LOG.info(
"Segment file removed (all entries are truncated)
[path = {}, log size freed = {} bytes, new log size = {} bytes].",
- segmentFile.path(), logSizeDelta, newLogSize
+ segmentFilePath, logSizeDelta, newLogSize
);
} else {
LOG.info(
"Segment file compacted [path = {}, log size freed =
{} bytes, new log size = {} bytes].",
- segmentFile.path(), logSizeDelta, newLogSize
+ segmentFilePath, logSizeDelta, newLogSize
);
}
}
@@ -286,6 +370,69 @@ class RaftLogGarbageCollector {
return index >= indexFileMeta.firstLogIndexInclusive() && index <
indexFileMeta.lastLogIndexExclusive();
}
+ private void initLogSizeFromDisk() throws IOException {
+ Path indexFilesDir = indexFileManager.indexFilesDir();
+
+ try (Stream<Path> files = Stream.concat(Files.list(segmentFilesDir),
Files.list(indexFilesDir))) {
+ long logSizeOnDisk = files
+ .mapToLong(path -> {
+ try {
+ return Files.size(path);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ })
+ .sum();
+
+ logSizeBytes.addAndGet(logSizeOnDisk);
+ }
+ }
+
+ private class GcTask implements Runnable {
+ @Override
+ public void run() {
+ while (!Thread.currentThread().isInterrupted()) {
+ try {
+ runGcCycle();
+ } catch (ClosedByInterruptException e) {
+ return;
+ } catch (IOException e) {
+ failureProcessor.process(new
FailureContext(CRITICAL_ERROR, e));
+ }
+
+ LockSupport.park();
+ }
+ }
+
+ private void runGcCycle() throws IOException {
+ if (logSizeBytes.get() < softLimitBytes) {
+ return;
+ }
+
+ LOG.info("Starting Log Storage GC cycle.");
+
+ try (Stream<FileProperties> candidates =
strategy.selectCandidates()) {
+ Iterator<FileProperties> it = candidates.iterator();
+
+ do {
+ if (!it.hasNext()) {
+ LOG.warn(
+ "Log size is above the soft limit but there
are no files to compact "
+ + "[currentLogSize = {} bytes,
softLimit = {} bytes].",
+ logSizeBytes.get(), softLimitBytes
+ );
+
+ return;
+ }
+
+ runCompaction(it.next());
+ } while (logSizeBytes.get() >= softLimitBytes);
+ }
+
+ LOG.info("Log Storage GC cycle complete.");
+ }
+ }
+
private class TmpSegmentFile implements AutoCloseable {
private final String fileName;
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/ReadModeIndexMemTable.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/ReadModeIndexMemTable.java
index ed5f5100d37..743f9921710 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/ReadModeIndexMemTable.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/ReadModeIndexMemTable.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.raft.storage.segstore;
-import java.util.Iterator;
import java.util.Map.Entry;
import org.jetbrains.annotations.Nullable;
@@ -26,17 +25,12 @@ import org.jetbrains.annotations.Nullable;
*
* @see WriteModeIndexMemTable
*/
-interface ReadModeIndexMemTable {
+interface ReadModeIndexMemTable extends Iterable<Entry<Long, SegmentInfo>> {
/**
* Returns information about a segment file for the given group ID or
{@code null} if it is not present in this memtable.
*/
@Nullable SegmentInfo segmentInfo(long groupId);
- /**
- * Returns an iterator over all {@code Group ID -> SegmentInfo} entries in
this memtable.
- */
- Iterator<Entry<Long, SegmentInfo>> iterator();
-
/**
* Returns the number of Raft Group IDs stored in this memtable.
*/
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/ReadModeIndexMemTable.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileCompactionStrategy.java
similarity index 51%
copy from
modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/ReadModeIndexMemTable.java
copy to
modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileCompactionStrategy.java
index ed5f5100d37..815bba14f7f 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/ReadModeIndexMemTable.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileCompactionStrategy.java
@@ -17,28 +17,23 @@
package org.apache.ignite.internal.raft.storage.segstore;
-import java.util.Iterator;
-import java.util.Map.Entry;
-import org.jetbrains.annotations.Nullable;
+import java.io.IOException;
+import java.util.stream.Stream;
/**
- * Immutable version of an index memtable used by the {@link
RaftLogCheckpointer}.
+ * Strategy for selecting which segment files to compact during garbage
collection.
*
- * @see WriteModeIndexMemTable
+ * <p>Each implementation is fully responsible for discovering and ordering
compaction candidates. This includes
+ * determining which files are eligible (e.g. skipping files that have not yet
been checkpointed) and ranking them
+ * by priority.
+ *
+ * <p>The GC consumes candidates from the stream in order, compacting each
one, and stops as soon as the log size
+ * drops below the soft limit or the stream is exhausted.
*/
-interface ReadModeIndexMemTable {
- /**
- * Returns information about a segment file for the given group ID or
{@code null} if it is not present in this memtable.
- */
- @Nullable SegmentInfo segmentInfo(long groupId);
-
- /**
- * Returns an iterator over all {@code Group ID -> SegmentInfo} entries in
this memtable.
- */
- Iterator<Entry<Long, SegmentInfo>> iterator();
-
+@SuppressWarnings("InterfaceMayBeAnnotatedFunctional")
+interface SegmentFileCompactionStrategy {
/**
- * Returns the number of Raft Group IDs stored in this memtable.
+ * Returns an ordered stream of segment file candidates for compaction.
The stream must be closed by the caller.
*/
- int numGroups();
+ Stream<FileProperties> selectCandidates() throws IOException;
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManager.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManager.java
index a0b9d440d62..9c3a0c1a62b 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManager.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManager.java
@@ -176,14 +176,22 @@ class SegmentFileManager implements ManuallyCloseable {
indexFileManager = new IndexFileManager(baseDir);
+ garbageCollector = new RaftLogGarbageCollector(
+ nodeName,
+ segmentFilesDir,
+ indexFileManager,
+ logStorageView.softLogSizeLimitBytes(),
+ new MostGarbageFirstCompactionStrategy(segmentFilesDir,
indexFileManager),
+ failureProcessor
+ );
+
checkpointer = new RaftLogCheckpointer(
nodeName,
indexFileManager,
failureProcessor,
- logStorageView.maxCheckpointQueueSize()
+ logStorageView.maxCheckpointQueueSize(),
+ garbageCollector::onLogStorageSizeIncreased
);
-
- garbageCollector = new RaftLogGarbageCollector(segmentFilesDir,
indexFileManager);
}
void start() throws IOException {
@@ -232,6 +240,8 @@ class SegmentFileManager implements ManuallyCloseable {
// Index File Manager must be started strictly before the checkpointer.
indexFileManager.start();
+ garbageCollector.start();
+
checkpointer.start();
}
@@ -512,6 +522,8 @@ class SegmentFileManager implements ManuallyCloseable {
throw new IgniteInternalException(NODE_STOPPING_ERR);
}
+ garbageCollector.onLogStorageSizeIncreased(segmentFileSize);
+
currentSegmentFile.set(allocateNewSegmentFile(++curSegmentFileOrdinal));
rolloverLock.notifyAll();
@@ -538,6 +550,7 @@ class SegmentFileManager implements ManuallyCloseable {
}
checkpointer.stop();
+ garbageCollector.stop();
}
private static void writeHeader(SegmentFile segmentFile) {
diff --git
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/AbstractCompactionStrategyTest.java
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/AbstractCompactionStrategyTest.java
new file mode 100644
index 00000000000..1f7d60341e5
--- /dev/null
+++
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/AbstractCompactionStrategyTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft.storage.segstore;
+
+import static
org.apache.ignite.internal.raft.storage.segstore.IndexFileManager.indexFileName;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+abstract class AbstractCompactionStrategyTest extends IgniteAbstractTest {
+ private Path segmentFilesDir;
+
+ private Path indexFilesDir;
+
+ @Mock
+ IndexFileManager indexFileManager;
+
+ private SegmentFileCompactionStrategy strategy;
+
+ @BeforeEach
+ void setUp() throws IOException {
+ segmentFilesDir = Files.createDirectories(workDir.resolve("segment"));
+ indexFilesDir = Files.createDirectories(workDir.resolve("index"));
+
+ when(indexFileManager.indexFilePath(any())).thenAnswer(inv ->
indexFilesDir.resolve(indexFileName(inv.getArgument(0))));
+
+ strategy = createStrategy(segmentFilesDir, indexFileManager);
+ }
+
+ @Test
+ void selectsOnlySegmentFilesWithExistingIndexAndSkipsTmp() throws
IOException {
+ FileProperties missingIndex = new FileProperties(0);
+ FileProperties presentIndex = new FileProperties(1);
+
+ createSegmentFile(missingIndex);
+ createSegmentFile(presentIndex);
+
+ Files.createFile(segmentFilesDir.resolve(SegmentFile.fileName(new
FileProperties(2)) + ".tmp"));
+
+ createIndexFile(presentIndex);
+
+ assertThat(selectedCandidates(), contains(presentIndex));
+ }
+
+ abstract SegmentFileCompactionStrategy createStrategy(Path
segmentFilesDir, IndexFileManager indexFileManager);
+
+ List<FileProperties> selectedCandidates() throws IOException {
+ try (Stream<FileProperties> candidates = strategy.selectCandidates()) {
+ return candidates.collect(Collectors.toList());
+ }
+ }
+
+ void createSegmentFile(FileProperties fileProperties) throws IOException {
+
Files.createFile(segmentFilesDir.resolve(SegmentFile.fileName(fileProperties)));
+ }
+
+ void createIndexFile(FileProperties fileProperties) throws IOException {
+ Files.createFile(indexFilesDir.resolve(indexFileName(fileProperties)));
+ }
+}
diff --git
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/AbstractMemTableTest.java
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/AbstractMemTableTest.java
index 0dffd7c8f29..c2445ce59b4 100644
---
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/AbstractMemTableTest.java
+++
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/AbstractMemTableTest.java
@@ -26,8 +26,6 @@ import static org.hamcrest.Matchers.nullValue;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertThrows;
-import java.util.Iterator;
-import java.util.Map.Entry;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.junit.jupiter.api.Test;
@@ -80,9 +78,7 @@ abstract class AbstractMemTableTest<T extends
WriteModeIndexMemTable & ReadModeI
memTable.appendSegmentFileOffset(1, 0, 3);
memTable.appendSegmentFileOffset(1, 1, 4);
- Iterator<Entry<Long, SegmentInfo>> it = memTable.iterator();
-
- it.forEachRemaining(entry -> {
+ memTable.forEach(entry -> {
long groupId = entry.getKey();
SegmentInfo segmentInfo = entry.getValue();
diff --git
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/MostGarbageFirstCompactionStrategyTest.java
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/MostGarbageFirstCompactionStrategyTest.java
new file mode 100644
index 00000000000..d98b2962ca3
--- /dev/null
+++
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/MostGarbageFirstCompactionStrategyTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft.storage.segstore;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.Mockito.when;
+
+import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
+import it.unimi.dsi.fastutil.longs.Long2ObjectMaps;
+import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
+import java.io.IOException;
+import java.nio.file.Path;
+import org.junit.jupiter.api.Test;
+
+class MostGarbageFirstCompactionStrategyTest extends
AbstractCompactionStrategyTest {
+ @Override
+ protected SegmentFileCompactionStrategy createStrategy(Path
segmentFilesDir, IndexFileManager indexFileManager) {
+ return new MostGarbageFirstCompactionStrategy(segmentFilesDir,
indexFileManager);
+ }
+
+ @Test
+ void prioritizesFullyDeletableThenLeastLiveEntries() throws IOException {
+
when(indexFileManager.describeSegmentFile(anyInt())).thenReturn(Long2ObjectMaps.emptyMap());
+
+ FileProperties fullyDeletable = new FileProperties(3);
+ FileProperties smallLiveSet = new FileProperties(2);
+ FileProperties largerLiveSet = new FileProperties(1);
+ FileProperties tieLowGeneration = new FileProperties(5, 0);
+ FileProperties tieHighGeneration = new FileProperties(5, 1);
+
+ createSegmentFile(fullyDeletable);
+ createSegmentFile(smallLiveSet);
+ createSegmentFile(largerLiveSet);
+ createSegmentFile(tieLowGeneration);
+ createSegmentFile(tieHighGeneration);
+
+ createIndexFile(fullyDeletable);
+ createIndexFile(smallLiveSet);
+ createIndexFile(largerLiveSet);
+ createIndexFile(tieLowGeneration);
+ createIndexFile(tieHighGeneration);
+
+
when(indexFileManager.describeSegmentFile(fullyDeletable.ordinal())).thenReturn(new
Long2ObjectOpenHashMap<>());
+
when(indexFileManager.describeSegmentFile(smallLiveSet.ordinal())).thenReturn(descriptor(meta(10,
11)));
+
when(indexFileManager.describeSegmentFile(largerLiveSet.ordinal())).thenReturn(descriptor(meta(20,
23)));
+
when(indexFileManager.describeSegmentFile(tieLowGeneration.ordinal())).thenReturn(descriptor(meta(30,
33)));
+
+ assertThat(selectedCandidates(), contains(fullyDeletable,
smallLiveSet, largerLiveSet, tieLowGeneration, tieHighGeneration));
+ }
+
+ private static Long2ObjectMap<IndexFileMeta> descriptor(IndexFileMeta...
metas) {
+ var map = new Long2ObjectOpenHashMap<IndexFileMeta>();
+
+ for (int i = 0; i < metas.length; i++) {
+ map.put(i, metas[i]);
+ }
+
+ return map;
+ }
+
+ private static IndexFileMeta meta(long firstLogIndexInclusive, long
lastLogIndexExclusive) {
+ return new IndexFileMeta(firstLogIndexInclusive,
lastLogIndexExclusive, 0, new FileProperties(0));
+ }
+}
diff --git
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/OldestFirstCompactionStrategyTest.java
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/OldestFirstCompactionStrategyTest.java
new file mode 100644
index 00000000000..dc5abb4e7b8
--- /dev/null
+++
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/OldestFirstCompactionStrategyTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft.storage.segstore;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import org.junit.jupiter.api.Test;
+
+class OldestFirstCompactionStrategyTest extends AbstractCompactionStrategyTest
{
+ @Override
+ protected SegmentFileCompactionStrategy createStrategy(Path
segmentFilesDir, IndexFileManager indexFileManager) {
+ return new OldestFirstCompactionStrategy(segmentFilesDir,
indexFileManager);
+ }
+
+ @Test
+ void ordersCandidatesByNaturalOrder() throws IOException {
+ FileProperties oldest = new FileProperties(1, 0);
+ FileProperties newestOrdinal = new FileProperties(3, 0);
+ FileProperties sameOrdinalNewerGeneration = new FileProperties(1, 1);
+
+ createSegmentFile(newestOrdinal);
+ createSegmentFile(sameOrdinalNewerGeneration);
+ createSegmentFile(oldest);
+
+ createIndexFile(newestOrdinal);
+ createIndexFile(sameOrdinalNewerGeneration);
+ createIndexFile(oldest);
+
+ assertThat(selectedCandidates(), contains(oldest,
sameOrdinalNewerGeneration, newestOrdinal));
+ }
+}
diff --git
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogCheckpointerTest.java
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogCheckpointerTest.java
index f172899123e..24ae9a1ec56 100644
---
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogCheckpointerTest.java
+++
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogCheckpointerTest.java
@@ -25,7 +25,6 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.is;
import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
@@ -61,7 +60,7 @@ class RaftLogCheckpointerTest extends BaseIgniteAbstractTest {
@BeforeEach
void setUp() {
- checkpointer = new RaftLogCheckpointer(NODE_NAME, indexFileManager,
new NoOpFailureManager(), MAX_QUEUE_SIZE);
+ checkpointer = new RaftLogCheckpointer(NODE_NAME, indexFileManager,
new NoOpFailureManager(), MAX_QUEUE_SIZE, size -> {});
checkpointer.start();
}
@@ -74,7 +73,9 @@ class RaftLogCheckpointerTest extends BaseIgniteAbstractTest {
}
@Test
- void testOnRollover(@Mock SegmentFile segmentFile, @Mock StripedMemTable
memTable) throws IOException {
+ void testOnRollover(@Mock SegmentFile segmentFile) throws IOException {
+ var memTable = new SingleThreadMemTable();
+
checkpointer.onRollover(segmentFile, memTable);
verify(segmentFile, timeout(500)).sync();
@@ -84,7 +85,6 @@ class RaftLogCheckpointerTest extends BaseIgniteAbstractTest {
@Test
void testBlockOnRollover(
@Mock SegmentFile segmentFile,
- @Mock StripedMemTable memTable,
@InjectExecutorService(threadCount = 1) ExecutorService executor
) {
var blockFuture = new CompletableFuture<Void>();
@@ -92,6 +92,8 @@ class RaftLogCheckpointerTest extends BaseIgniteAbstractTest {
try {
doAnswer(invocation ->
blockFuture.join()).when(segmentFile).sync();
+ var memTable = new SingleThreadMemTable();
+
for (int i = 0; i < MAX_QUEUE_SIZE; i++) {
checkpointer.onRollover(segmentFile, memTable);
}
@@ -125,15 +127,11 @@ class RaftLogCheckpointerTest extends
BaseIgniteAbstractTest {
when(mockFile.buffer()).thenReturn(buffer);
- StripedMemTable mockMemTable = mock(StripedMemTable.class);
-
- var segmentInfo = new SegmentInfo(i);
-
- segmentInfo.addOffset(i, 1);
+ var memTable = new SingleThreadMemTable();
-
lenient().when(mockMemTable.segmentInfo(i)).thenReturn(segmentInfo);
+ memTable.appendSegmentFileOffset(i, i, 1);
- checkpointer.onRollover(mockFile, mockMemTable);
+ checkpointer.onRollover(mockFile, memTable);
}
for (int groupId = 0; groupId < MAX_QUEUE_SIZE; groupId++) {
@@ -161,7 +159,7 @@ class RaftLogCheckpointerTest extends
BaseIgniteAbstractTest {
}
@Test
- void testFindSegmentPayloadReturnsBufferWhenOffsetPresent(@Mock
SegmentFile mockFile, @Mock StripedMemTable mockMemTable) {
+ void testFindSegmentPayloadReturnsBufferWhenOffsetPresent(@Mock
SegmentFile mockFile) {
var blockFuture = new CompletableFuture<Void>();
try {
@@ -174,15 +172,13 @@ class RaftLogCheckpointerTest extends
BaseIgniteAbstractTest {
long groupId = 2;
long logIndex = 5;
- var segmentInfo = new SegmentInfo(1);
+ var memTable = new SingleThreadMemTable();
for (int i = 1; i <= 10; i++) {
- segmentInfo.addOffset(i, i);
+ memTable.appendSegmentFileOffset(groupId, i, i);
}
- when(mockMemTable.segmentInfo(groupId)).thenReturn(segmentInfo);
-
- checkpointer.onRollover(mockFile, mockMemTable);
+ checkpointer.onRollover(mockFile, memTable);
EntrySearchResult res =
checkpointer.findSegmentPayloadInQueue(groupId, logIndex);
@@ -194,7 +190,7 @@ class RaftLogCheckpointerTest extends
BaseIgniteAbstractTest {
}
@Test
- void testFindSegmentPayloadReturnsEmptyWhenPrefixTombstoneCutsOff(@Mock
SegmentFile mockFile, @Mock StripedMemTable mockMemTable) {
+ void testFindSegmentPayloadReturnsEmptyWhenPrefixTombstoneCutsOff(@Mock
SegmentFile mockFile) {
var blockFuture = new CompletableFuture<Void>();
try {
@@ -202,14 +198,15 @@ class RaftLogCheckpointerTest extends
BaseIgniteAbstractTest {
long groupId = 2;
- SegmentInfo mockSegmentInfo = mock(SegmentInfo.class);
+ var memTable = new SingleThreadMemTable();
+
+ for (long i = 1; i <= 20; i++) {
+ memTable.appendSegmentFileOffset(groupId, i, 1);
+ }
-
when(mockMemTable.segmentInfo(groupId)).thenReturn(mockSegmentInfo);
- when(mockSegmentInfo.lastLogIndexExclusive()).thenReturn(20L);
- // Emulate prefix truncation from index 10.
- when(mockSegmentInfo.firstIndexKept()).thenReturn(10L);
+ memTable.truncatePrefix(groupId, 10);
- checkpointer.onRollover(mockFile, mockMemTable);
+ checkpointer.onRollover(mockFile, memTable);
EntrySearchResult res =
checkpointer.findSegmentPayloadInQueue(groupId, 5);
diff --git
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogGarbageCollectorTest.java
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogGarbageCollectorTest.java
index 733ff153ba2..6cf9db8d3ef 100644
---
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogGarbageCollectorTest.java
+++
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogGarbageCollectorTest.java
@@ -31,6 +31,7 @@ import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.not;
import java.io.IOException;
+import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Path;
@@ -129,7 +130,7 @@ class RaftLogGarbageCollectorTest extends
IgniteAbstractTest {
FileProperties fileProperties =
SegmentFile.fileProperties(segmentFilePath);
- runCompaction(segmentFilePath);
+ garbageCollector.runCompaction(fileProperties);
assertThat(segmentFilePath, not(exists()));
assertThat(indexFileManager.indexFilePath(fileProperties),
not(exists()));
@@ -165,7 +166,7 @@ class RaftLogGarbageCollectorTest extends
IgniteAbstractTest {
FileProperties originalFileProperties =
SegmentFile.fileProperties(firstSegmentFile);
- runCompaction(firstSegmentFile);
+ garbageCollector.runCompaction(originalFileProperties);
// Segment file should be replaced by a new one with increased
generation.
assertThat(firstSegmentFile, not(exists()));
@@ -287,7 +288,7 @@ class RaftLogGarbageCollectorTest extends
IgniteAbstractTest {
long sizeBeforeCompaction = Files.size(curSegmentFilePath);
- runCompaction(curSegmentFilePath);
+ garbageCollector.runCompaction(fileProperties);
FileProperties newFileProperties = new
FileProperties(fileProperties.ordinal(), fileProperties.generation() + 1);
@@ -590,8 +591,8 @@ class RaftLogGarbageCollectorTest extends
IgniteAbstractTest {
}
/**
- * Reproducer for the stale-deque-entry corruption bug: after fully
deleting a middle segment file, its {@link IndexFileMeta} remains
- * in the same deque block as the preceding file's meta. When the
preceding file is subsequently partially compacted (its live log range
+ * Reproducer for the stale-deque-entry corruption bug: after fully
deleting a middle segment file, its {@link IndexFileMeta} remains in
+ * the same deque block as the preceding file's meta. When the preceding
file is subsequently partially compacted (its live log range
* shrinks), {@link IndexFileMetaArray#onIndexCompacted} asserts that the
new meta's {@code lastLogIndexExclusive} equals the next
* entry's {@code firstLogIndexInclusive}.
*/
@@ -643,6 +644,30 @@ class RaftLogGarbageCollectorTest extends
IgniteAbstractTest {
});
}
+ @Test
+ void testLogSizeBytesInitializedCorrectlyOnStartup() throws Exception {
+ // Fill multiple segment files to create both segment and index files.
+ List<byte[]> batches = createRandomData(FILE_SIZE / 4, 10);
+ for (int i = 0; i < batches.size(); i++) {
+ appendBytes(GROUP_ID_1, batches.get(i), i);
+ }
+
+ // Wait for at least one checkpoint so index files exist.
+ await().until(this::indexFiles, hasSize(greaterThanOrEqualTo(1)));
+
+ long expectedSize = totalLogSizeFromDisk(fileManager);
+
+ assertThat(garbageCollector.logSizeBytes(), is(expectedSize));
+
+ restartSegmentFileManager();
+
+ assertThat(garbageCollector.logSizeBytes(), is(expectedSize));
+ }
+
+ private void runCompaction(Path segmentFilePath) throws IOException {
+
garbageCollector.runCompaction(SegmentFile.fileProperties(segmentFilePath));
+ }
+
private List<Path> segmentFiles() throws IOException {
try (Stream<Path> files = Files.list(fileManager.segmentFilesDir())) {
return files
@@ -720,15 +745,22 @@ class RaftLogGarbageCollectorTest extends
IgniteAbstractTest {
);
fileManager.start();
- }
- private void runCompaction(Path segmentFilePath) throws IOException {
- SegmentFile segmentFile = SegmentFile.openExisting(segmentFilePath,
false);
+ indexFileManager = fileManager.indexFileManager();
+ garbageCollector = fileManager.garbageCollector();
+ }
- try {
- garbageCollector.runCompaction(segmentFile);
- } finally {
- segmentFile.close();
+ private static long totalLogSizeFromDisk(SegmentFileManager manager)
throws IOException {
+ try (Stream<Path> files =
Stream.concat(Files.list(manager.segmentFilesDir()),
Files.list(manager.indexFilesDir()))) {
+ return files
+ .mapToLong(p -> {
+ try {
+ return Files.size(p);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ })
+ .sum();
}
}
}
diff --git
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogGcSoftLimitTest.java
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogGcSoftLimitTest.java
new file mode 100644
index 00000000000..c6552a0386a
--- /dev/null
+++
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogGcSoftLimitTest.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft.storage.segstore;
+
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.randomBytes;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.runRace;
+import static org.awaitility.Awaitility.await;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.failure.NoOpFailureManager;
+import org.apache.ignite.internal.lang.RunnableX;
+import org.apache.ignite.internal.raft.configuration.LogStorageConfiguration;
+import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
+import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.apache.ignite.raft.jraft.entity.LogEntry;
+import org.apache.ignite.raft.jraft.entity.LogId;
+import org.apache.ignite.raft.jraft.entity.codec.LogEntryEncoder;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.RepeatedTest;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/**
+ * Tests for {@link RaftLogGarbageCollector} soft-limit behavior.
+ */
+@ExtendWith(ConfigurationExtension.class)
+class RaftLogGcSoftLimitTest extends IgniteAbstractTest {
+ private static final int FILE_SIZE = 200;
+
+ private static final long SMALL_SOFT_LIMIT = 2 * FILE_SIZE;
+
+ private static final long GROUP_ID_1 = 1000;
+
+ private static final long GROUP_ID_2 = 2000;
+
+ private static final int STRIPES = 10;
+
+ private static final String NODE_NAME = "test";
+
+ @InjectConfiguration
+ private RaftConfiguration raftConfiguration;
+
+ @InjectConfiguration(
+ value = "mock.segmentFileSizeBytes=" + FILE_SIZE + ",
mock.softLogSizeLimitBytes=" + SMALL_SOFT_LIMIT,
+ validate = false
+ )
+ private LogStorageConfiguration storageConfiguration;
+
+ private SegmentFileManager fileManager;
+
+ private RaftLogGarbageCollector garbageCollector;
+
+ @BeforeEach
+ void setUp() throws IOException {
+ fileManager = new SegmentFileManager(
+ NODE_NAME,
+ workDir,
+ STRIPES,
+ new NoOpFailureManager(),
+ raftConfiguration,
+ storageConfiguration
+ );
+
+ fileManager.start();
+
+ garbageCollector = fileManager.garbageCollector();
+ }
+
+ @AfterEach
+ void tearDown() throws Exception {
+ if (fileManager != null) {
+ fileManager.close();
+ }
+ }
+
+ @Test
+ void testGcTriggeredWhenSoftLimitExceeded() throws Exception {
+ List<byte[]> batches = createRandomData(FILE_SIZE / 4, 10);
+ for (int i = 0; i < batches.size(); i++) {
+ appendBytes(GROUP_ID_1, i, batches.get(i));
+ }
+
+ await().until(() -> indexFiles(fileManager), hasSize(4));
+
+ assertThat(totalLogSizeFromDisk(fileManager),
is(greaterThanOrEqualTo(SMALL_SOFT_LIMIT)));
+
+ // Truncate all GROUP_ID_1 entries so every old file becomes fully
deletable.
+ fileManager.truncatePrefix(GROUP_ID_1, batches.size() - 1);
+
+ // Write one GROUP_ID_2 batch to force a rollover so the truncation
record ends up in a
+ // completed (checkpointable) file.
+ appendBytes(GROUP_ID_2, batches.size() + 1, createRandomData(FILE_SIZE
/ 4, 1).get(0));
+
+ await().until(() -> garbageCollector.logSizeBytes(),
is(lessThanOrEqualTo(SMALL_SOFT_LIMIT)));
+
+ assertThat(garbageCollector.logSizeBytes(),
is(totalLogSizeFromDisk(fileManager)));
+ }
+
+ @RepeatedTest(10)
+ void testConcurrentWritesAndAutoGc() throws Exception {
+ int numBatches = 20;
+ List<byte[]> batches = createRandomData(FILE_SIZE / 8, numBatches);
+
+ // Write initial data from both groups.
+ for (int i = 0; i < numBatches; i++) {
+ appendBytes(GROUP_ID_1, i, batches.get(i));
+ appendBytes(GROUP_ID_2, i, batches.get(i));
+ }
+
+ // Wait for checkpoints so the GC has candidates to compact.
+ await().until(() -> indexFiles(fileManager),
hasSize(greaterThanOrEqualTo(1)));
+
+ var writerDone = new AtomicBoolean(false);
+
+ RunnableX writerTask = () -> {
+ try {
+ for (int i = numBatches; i < numBatches * 3; i++) {
+ appendBytes(GROUP_ID_1, i, batches.get(i % numBatches));
+ appendBytes(GROUP_ID_2, i, batches.get(i % numBatches));
+ fileManager.truncatePrefix(GROUP_ID_1, i - 1);
+ }
+ } finally {
+ writerDone.set(true);
+ }
+ };
+
+ RunnableX readerTask = () -> {
+ while (!writerDone.get()) {
+ for (int i = 0; i < numBatches; i++) {
+ int index = i;
+ fileManager.getEntry(GROUP_ID_2, i, bs -> {
+ assertThat(bs, is(batches.get(index)));
+
+ return null;
+ });
+ }
+ }
+ };
+
+ runRace(writerTask, readerTask, readerTask);
+ }
+
+ private static List<Path> indexFiles(SegmentFileManager manager) throws
IOException {
+ try (Stream<Path> files = Files.list(manager.indexFilesDir())) {
+ return files
+ .filter(p -> !p.getFileName().toString().endsWith(".tmp"))
+ .sorted()
+ .collect(Collectors.toList());
+ }
+ }
+
+ private static long totalLogSizeFromDisk(SegmentFileManager manager)
throws IOException {
+ try (Stream<Path> files =
Stream.concat(Files.list(manager.segmentFilesDir()),
Files.list(manager.indexFilesDir()))) {
+ return files
+ .mapToLong(p -> {
+ try {
+ return Files.size(p);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ })
+ .sum();
+ }
+ }
+
+ private static List<byte[]> createRandomData(int batchLength, int
numBatches) {
+ return IntStream.range(0, numBatches)
+ .mapToObj(i -> randomBytes(ThreadLocalRandom.current(),
batchLength))
+ .collect(Collectors.toList());
+ }
+
+ private void appendBytes(long groupId, long index, byte[] serializedEntry)
throws IOException {
+ var entry = new LogEntry();
+ entry.setId(new LogId(index, 0));
+
+ fileManager.appendEntry(groupId, entry, new LogEntryEncoder() {
+ @Override
+ public byte[] encode(LogEntry log) {
+ return serializedEntry;
+ }
+
+ @Override
+ public void encode(ByteBuffer buffer, LogEntry log) {
+ buffer.put(serializedEntry);
+ }
+
+ @Override
+ public int size(LogEntry logEntry) {
+ return serializedEntry.length;
+ }
+ });
+ }
+}