This is an automated email from the ASF dual-hosted git repository.

apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 9108d151af5 IGNITE-26295 Integrate Raft Log GC into the new storage 
(#7880)
9108d151af5 is described below

commit 9108d151af544a5038c478143a83ee1b26975bdc
Author: Alexander Polovtcev <[email protected]>
AuthorDate: Thu Apr 2 14:27:30 2026 +0300

    IGNITE-26295 Integrate Raft Log GC into the new storage (#7880)
---
 ...le.java => BeforeIndexFileCreatedCallback.java} |  28 +--
 .../raft/storage/segstore/FileProperties.java      |  13 +-
 .../raft/storage/segstore/IndexFileManager.java    |  29 ++-
 .../MostGarbageFirstCompactionStrategy.java        |  72 +++++++
 .../segstore/OldestFirstCompactionStrategy.java    |  46 +++++
 .../raft/storage/segstore/RaftLogCheckpointer.java |  17 +-
 .../storage/segstore/RaftLogGarbageCollector.java  | 169 ++++++++++++++-
 .../storage/segstore/ReadModeIndexMemTable.java    |   8 +-
 ...ble.java => SegmentFileCompactionStrategy.java} |  31 ++-
 .../raft/storage/segstore/SegmentFileManager.java  |  19 +-
 .../segstore/AbstractCompactionStrategyTest.java   |  90 ++++++++
 .../storage/segstore/AbstractMemTableTest.java     |   6 +-
 .../MostGarbageFirstCompactionStrategyTest.java    |  81 ++++++++
 .../OldestFirstCompactionStrategyTest.java         |  49 +++++
 .../storage/segstore/RaftLogCheckpointerTest.java  |  45 ++--
 .../segstore/RaftLogGarbageCollectorTest.java      |  56 +++--
 .../storage/segstore/RaftLogGcSoftLimitTest.java   | 226 +++++++++++++++++++++
 17 files changed, 866 insertions(+), 119 deletions(-)

diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/ReadModeIndexMemTable.java
 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/BeforeIndexFileCreatedCallback.java
similarity index 54%
copy from 
modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/ReadModeIndexMemTable.java
copy to 
modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/BeforeIndexFileCreatedCallback.java
index ed5f5100d37..2786869b1ba 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/ReadModeIndexMemTable.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/BeforeIndexFileCreatedCallback.java
@@ -17,28 +17,8 @@
 
 package org.apache.ignite.internal.raft.storage.segstore;
 
-import java.util.Iterator;
-import java.util.Map.Entry;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Immutable version of an index memtable used by the {@link 
RaftLogCheckpointer}.
- *
- * @see WriteModeIndexMemTable
- */
-interface ReadModeIndexMemTable {
-    /**
-     * Returns information about a segment file for the given group ID or 
{@code null} if it is not present in this memtable.
-     */
-    @Nullable SegmentInfo segmentInfo(long groupId);
-
-    /**
-     * Returns an iterator over all {@code Group ID -> SegmentInfo} entries in 
this memtable.
-     */
-    Iterator<Entry<Long, SegmentInfo>> iterator();
-
-    /**
-     * Returns the number of Raft Group IDs stored in this memtable.
-     */
-    int numGroups();
+/** Callback executed by the {@link RaftLogCheckpointer} before an index file 
is created. */
+@FunctionalInterface
+interface BeforeIndexFileCreatedCallback {
+    void beforeIndexFileCreated(long indexFileSize);
 }
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/FileProperties.java
 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/FileProperties.java
index 0fc9647903b..1a1d1b65e2d 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/FileProperties.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/FileProperties.java
@@ -22,7 +22,7 @@ import org.apache.ignite.internal.tostring.S;
 /**
  * Represents properties common for some file types (namely Segment and Index 
files) used by the log storage.
  */
-class FileProperties {
+class FileProperties implements Comparable<FileProperties> {
     /** File ordinal. Incremented each time a new file is created. */
     private final int ordinal;
 
@@ -54,6 +54,17 @@ class FileProperties {
         return generation;
     }
 
+    @Override
+    public int compareTo(FileProperties o) {
+        int cmp = Integer.compare(ordinal, o.ordinal);
+
+        if (cmp != 0) {
+            return cmp;
+        }
+
+        return Integer.compare(generation, o.generation);
+    }
+
     @Override
     public boolean equals(Object o) {
         if (o == null || getClass() != o.getClass()) {
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManager.java
 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManager.java
index b893f7fe643..4b8453c69ad 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManager.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManager.java
@@ -51,6 +51,7 @@ import java.util.stream.Stream;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.VisibleForTesting;
 
 /**
  * File manager responsible for persisting {@link ReadModeIndexMemTable}s to 
index files.
@@ -215,10 +216,8 @@ class IndexFileManager {
         try (var os = new 
BufferedOutputStream(Files.newOutputStream(tmpFilePath, CREATE_NEW, WRITE))) {
             os.write(fileHeaderWithIndexMetas.header());
 
-            Iterator<Entry<Long, SegmentInfo>> it = indexMemTable.iterator();
-
-            while (it.hasNext()) {
-                SegmentInfo segmentInfo = it.next().getValue();
+            for (Entry<Long, SegmentInfo> longSegmentInfoEntry : 
indexMemTable) {
+                SegmentInfo segmentInfo = longSegmentInfoEntry.getValue();
 
                 // Segment Info may not contain payload in case of suffix 
truncation, see "IndexMemTable#truncateSuffix".
                 if (segmentInfo.size() > 0) {
@@ -385,11 +384,7 @@ class IndexFileManager {
 
         var metaSpecs = new ArrayList<IndexMetaSpec>(numGroups);
 
-        Iterator<Entry<Long, SegmentInfo>> it = indexMemTable.iterator();
-
-        while (it.hasNext()) {
-            Entry<Long, SegmentInfo> entry = it.next();
-
+        for (Entry<Long, SegmentInfo> entry : indexMemTable) {
             // Using the boxed value to avoid unnecessary autoboxing later.
             Long groupId = entry.getKey();
 
@@ -504,6 +499,19 @@ class IndexFileManager {
         return payloadBuffer.array();
     }
 
+    /**
+     * Computes the size in bytes that the index file for the given {@code 
indexMemTable} will occupy on disk.
+     */
+    static long computeIndexFileSize(ReadModeIndexMemTable indexMemTable) {
+        long total = headerSize(indexMemTable.numGroups());
+
+        for (Entry<Long, SegmentInfo> longSegmentInfoEntry : indexMemTable) {
+            total += payloadSize(longSegmentInfoEntry.getValue());
+        }
+
+        return total;
+    }
+
     private static int headerSize(int numGroups) {
         return COMMON_META_SIZE + numGroups * GROUP_META_SIZE;
     }
@@ -512,7 +520,8 @@ class IndexFileManager {
         return segmentInfo.size() * Integer.BYTES;
     }
 
-    private static String indexFileName(FileProperties fileProperties) {
+    @VisibleForTesting
+    static String indexFileName(FileProperties fileProperties) {
         return String.format(INDEX_FILE_NAME_FORMAT, fileProperties.ordinal(), 
fileProperties.generation());
     }
 
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/MostGarbageFirstCompactionStrategy.java
 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/MostGarbageFirstCompactionStrategy.java
new file mode 100644
index 00000000000..52d6ebefef8
--- /dev/null
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/MostGarbageFirstCompactionStrategy.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft.storage.segstore;
+
+import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
+import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Comparator;
+import java.util.stream.Stream;
+
+/**
+ * Compaction strategy that prioritizes files with the most dead entries, 
fully-deletable files first.
+ */
+class MostGarbageFirstCompactionStrategy implements 
SegmentFileCompactionStrategy {
+    private final Path segmentFilesDir;
+
+    private final IndexFileManager indexFileManager;
+
+    MostGarbageFirstCompactionStrategy(Path segmentFilesDir, IndexFileManager 
indexFileManager) {
+        this.segmentFilesDir = segmentFilesDir;
+        this.indexFileManager = indexFileManager;
+    }
+
+    @Override
+    public Stream<FileProperties> selectCandidates() throws IOException {
+        var scores = new Object2LongOpenHashMap<FileProperties>();
+
+        Comparator<FileProperties> comparator =
+                Comparator.<FileProperties>comparingLong(props -> 
scores.computeIfAbsent(props, this::score))
+                        .thenComparing(Comparator.naturalOrder());
+
+        //noinspection resource
+        return Files.list(segmentFilesDir)
+                .filter(p -> !p.getFileName().toString().endsWith(".tmp"))
+                .map(SegmentFile::fileProperties)
+                .filter(props -> 
Files.exists(indexFileManager.indexFilePath(props)))
+                .sorted(comparator);
+    }
+
+    private long score(FileProperties props) {
+        Long2ObjectMap<IndexFileMeta> description = 
indexFileManager.describeSegmentFile(props.ordinal());
+
+        if (description.isEmpty()) {
+            return -1; // Fully deletable — highest priority.
+        }
+
+        long liveCount = 0;
+
+        for (IndexFileMeta meta : description.values()) {
+            liveCount += meta.lastLogIndexExclusive() - 
meta.firstLogIndexInclusive();
+        }
+
+        return liveCount;
+    }
+}
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/OldestFirstCompactionStrategy.java
 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/OldestFirstCompactionStrategy.java
new file mode 100644
index 00000000000..1eeaeb7a4b1
--- /dev/null
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/OldestFirstCompactionStrategy.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft.storage.segstore;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.stream.Stream;
+
+/**
+ * Compaction strategy that yields fully-checkpointed segment files in write 
order (oldest first).
+ */
+class OldestFirstCompactionStrategy implements SegmentFileCompactionStrategy {
+    private final Path segmentFilesDir;
+
+    private final IndexFileManager indexFileManager;
+
+    OldestFirstCompactionStrategy(Path segmentFilesDir, IndexFileManager 
indexFileManager) {
+        this.segmentFilesDir = segmentFilesDir;
+        this.indexFileManager = indexFileManager;
+    }
+
+    @Override
+    public Stream<FileProperties> selectCandidates() throws IOException {
+        return Files.list(segmentFilesDir)
+                .filter(p -> !p.getFileName().toString().endsWith(".tmp"))
+                .map(SegmentFile::fileProperties)
+                .sorted()
+                .filter(props -> 
Files.exists(indexFileManager.indexFilePath(props)));
+    }
+}
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogCheckpointer.java
 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogCheckpointer.java
index 5913044052b..724e080e067 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogCheckpointer.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogCheckpointer.java
@@ -21,7 +21,7 @@ import static 
org.apache.ignite.internal.failure.FailureType.CRITICAL_ERROR;
 import static 
org.apache.ignite.internal.raft.storage.segstore.SegmentFileManager.SWITCH_SEGMENT_RECORD;
 import static 
org.apache.ignite.internal.raft.storage.segstore.SegmentInfo.MISSING_SEGMENT_FILE_OFFSET;
 import static org.apache.ignite.internal.util.IgniteUtils.closeAllManually;
-import static org.apache.ignite.lang.ErrorGroups.Marshalling.COMMON_ERR;
+import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -62,14 +62,18 @@ class RaftLogCheckpointer {
 
     private final FailureProcessor failureProcessor;
 
+    private final BeforeIndexFileCreatedCallback beforeIndexFileCreated;
+
     RaftLogCheckpointer(
             String nodeName,
             IndexFileManager indexFileManager,
             FailureProcessor failureProcessor,
-            int maxQueueSize
+            int maxQueueSize,
+            BeforeIndexFileCreatedCallback beforeIndexFileCreated
     ) {
         this.indexFileManager = indexFileManager;
         this.failureProcessor = failureProcessor;
+        this.beforeIndexFileCreated = beforeIndexFileCreated;
 
         queue = new CheckpointQueue(maxQueueSize);
         checkpointThread = new IgniteThread(nodeName, "segstore-checkpoint", 
new CheckpointTask());
@@ -91,7 +95,7 @@ class RaftLogCheckpointer {
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
 
-            throw new IgniteInternalException(COMMON_ERR, "Interrupted while 
waiting for the checkpoint thread to finish.", e);
+            throw new IgniteInternalException(INTERNAL_ERR, "Interrupted while 
waiting for the checkpoint thread to finish.", e);
         }
     }
 
@@ -101,7 +105,7 @@ class RaftLogCheckpointer {
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
 
-            throw new IgniteInternalException(COMMON_ERR, "Interrupted while 
adding an entry to the checkpoint queue.", e);
+            throw new IgniteInternalException(INTERNAL_ERR, "Interrupted while 
adding an entry to the checkpoint queue.", e);
         }
     }
 
@@ -155,6 +159,11 @@ class RaftLogCheckpointer {
 
                     segmentFile.sync();
 
+                    long indexFileSize = 
IndexFileManager.computeIndexFileSize(entry.memTable());
+
+                    // Notify about the upcoming log size increase.
+                    
beforeIndexFileCreated.beforeIndexFileCreated(indexFileSize);
+
                     indexFileManager.saveNewIndexMemtable(entry.memTable());
 
                     queue.removeHead();
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogGarbageCollector.java
 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogGarbageCollector.java
index a1b5e9cb5a1..3b09ad808e6 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogGarbageCollector.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogGarbageCollector.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.raft.storage.segstore;
 import static java.lang.Math.toIntExact;
 import static java.nio.file.StandardOpenOption.CREATE_NEW;
 import static java.nio.file.StandardOpenOption.WRITE;
+import static org.apache.ignite.internal.failure.FailureType.CRITICAL_ERROR;
 import static 
org.apache.ignite.internal.raft.storage.segstore.IndexFileManager.indexFileProperties;
 import static 
org.apache.ignite.internal.raft.storage.segstore.SegmentFileManager.HEADER_RECORD;
 import static 
org.apache.ignite.internal.raft.storage.segstore.SegmentPayload.CRC_SIZE_BYTES;
@@ -29,20 +30,28 @@ import static 
org.apache.ignite.internal.raft.storage.segstore.SegmentPayload.TR
 import static 
org.apache.ignite.internal.raft.storage.segstore.SegmentPayloadParser.endOfSegmentReached;
 import static 
org.apache.ignite.internal.raft.storage.segstore.SegmentPayloadParser.validateSegmentFileHeader;
 import static org.apache.ignite.internal.util.IgniteUtils.atomicMoveFile;
+import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
 
 import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
 import java.io.IOException;
+import java.io.UncheckedIOException;
 import java.nio.ByteBuffer;
+import java.nio.channels.ClosedByInterruptException;
 import java.nio.channels.FileChannel;
 import java.nio.channels.WritableByteChannel;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.Iterator;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.LockSupport;
 import java.util.stream.Stream;
+import org.apache.ignite.internal.failure.FailureContext;
+import org.apache.ignite.internal.failure.FailureProcessor;
+import org.apache.ignite.internal.lang.IgniteInternalException;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.raft.util.VarlenEncoder;
+import org.apache.ignite.internal.thread.IgniteThread;
 import org.apache.ignite.raft.jraft.storage.LogStorage;
 import org.jetbrains.annotations.VisibleForTesting;
 
@@ -53,6 +62,12 @@ import org.jetbrains.annotations.VisibleForTesting;
  * of segment files. This process reclaims disk space occupied by log entries 
that have been truncated via {@link LogStorage#truncatePrefix}
  * or {@link LogStorage#truncateSuffix} operations.
  *
+ * <h2>Size tracking</h2>
+ * The GC tracks the total size of all log storage files (segment files and 
index files) via an {@link AtomicLong}. The counter is
+ * incremented via {@link #onLogStorageSizeIncreased(long)} before a new 
segment file is allocated or before an index file is written to
+ * disk, and decremented inside {@link #runCompaction} after files are 
deleted. The GC thread wakes up on each of these events and compacts
+ * files until the total size drops below the configured {@link 
#softLimitBytes soft limit}.
+ *
  * <h2>Compaction Process</h2>
  * When a segment file is selected for compaction, the GC:
  * <ol>
@@ -71,11 +86,66 @@ class RaftLogGarbageCollector {
 
     private final IndexFileManager indexFileManager;
 
+    private final long softLimitBytes;
+
+    private final SegmentFileCompactionStrategy strategy;
+
+    private final FailureProcessor failureProcessor;
+
     private final AtomicLong logSizeBytes = new AtomicLong();
 
-    RaftLogGarbageCollector(Path segmentFilesDir, IndexFileManager 
indexFileManager) {
+    private final Thread gcThread;
+
+    RaftLogGarbageCollector(
+            String nodeName,
+            Path segmentFilesDir,
+            IndexFileManager indexFileManager,
+            long softLimitBytes,
+            SegmentFileCompactionStrategy strategy,
+            FailureProcessor failureProcessor
+    ) {
         this.segmentFilesDir = segmentFilesDir;
         this.indexFileManager = indexFileManager;
+        this.softLimitBytes = softLimitBytes;
+        this.strategy = strategy;
+        this.failureProcessor = failureProcessor;
+
+        gcThread = new IgniteThread(nodeName, "segstore-gc", new GcTask());
+    }
+
+    void start() throws IOException {
+        initLogSizeFromDisk();
+
+        gcThread.start();
+    }
+
+    void stop() {
+        gcThread.interrupt();
+
+        try {
+            gcThread.join();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+
+            throw new IgniteInternalException(INTERNAL_ERR, "Interrupted while 
waiting for the GC thread to finish.", e);
+        }
+    }
+
+    /**
+     * Notifies the GC that the log storage size is about to increase by 
{@code addedBytes} and wakes the GC thread if the soft limit is now
+     * exceeded. Must be called before the corresponding file is written to 
disk.
+     */
+    void onLogStorageSizeIncreased(long addedBytes) {
+        long logSize = logSizeBytes.addAndGet(addedBytes);
+
+        if (logSize >= softLimitBytes) {
+            LOG.info(
+                    "Log size is above the soft limit, waking up GC thread 
[current log size = {} bytes, soft limit = {} bytes].",
+                    logSize, softLimitBytes
+            );
+
+            LockSupport.unpark(gcThread);
+        }
     }
 
     void cleanupLeftoverFiles() throws IOException {
@@ -137,29 +207,43 @@ class RaftLogGarbageCollector {
     }
 
     @VisibleForTesting
-    void runCompaction(SegmentFile segmentFile) throws IOException {
-        LOG.info("Compacting segment file [path = {}].", segmentFile.path());
+    long logSizeBytes() {
+        return logSizeBytes.get();
+    }
+
+    @VisibleForTesting
+    void runCompaction(FileProperties segmentFileProperties) throws 
IOException {
+        Path segmentFilePath = 
segmentFilesDir.resolve(SegmentFile.fileName(segmentFileProperties));
+
+        LOG.info("Compacting segment file [path = {}].", segmentFilePath);
 
+        // TODO: Skip non-compactible files, see 
https://issues.apache.org/jira/browse/IGNITE-28417.
         Long2ObjectMap<IndexFileMeta> segmentFileDescription
-                = 
indexFileManager.describeSegmentFile(segmentFile.fileProperties().ordinal());
+                = 
indexFileManager.describeSegmentFile(segmentFileProperties.ordinal());
 
         boolean canRemoveSegmentFile = segmentFileDescription.isEmpty();
 
-        Path indexFilePath = 
indexFileManager.indexFilePath(segmentFile.fileProperties());
+        Path indexFilePath = 
indexFileManager.indexFilePath(segmentFileProperties);
 
         long logSizeDelta;
 
         if (canRemoveSegmentFile) {
-            indexFileManager.onIndexFileRemoved(segmentFile.fileProperties());
+            indexFileManager.onIndexFileRemoved(segmentFileProperties);
 
-            logSizeDelta = Files.size(segmentFile.path()) + 
Files.size(indexFilePath);
+            logSizeDelta = Files.size(segmentFilePath) + 
Files.size(indexFilePath);
         } else {
-            logSizeDelta = compactSegmentFile(segmentFile, indexFilePath, 
segmentFileDescription);
+            SegmentFile segmentFile = 
SegmentFile.openExisting(segmentFilePath, false);
+
+            try {
+                logSizeDelta = compactSegmentFile(segmentFile, indexFilePath, 
segmentFileDescription);
+            } finally {
+                segmentFile.close();
+            }
         }
 
         // Remove the previous generation of the segment file and its index. 
This is safe to do, because we rely on the file system
         // guarantees that other threads reading from the segment file will 
still be able to do that even if the file is deleted.
-        Files.delete(segmentFile.path());
+        Files.delete(segmentFilePath);
         Files.delete(indexFilePath);
 
         long newLogSize = logSizeBytes.addAndGet(-logSizeDelta);
@@ -168,12 +252,12 @@ class RaftLogGarbageCollector {
             if (canRemoveSegmentFile) {
                 LOG.info(
                         "Segment file removed (all entries are truncated) 
[path = {}, log size freed = {} bytes, new log size = {} bytes].",
-                        segmentFile.path(), logSizeDelta, newLogSize
+                        segmentFilePath, logSizeDelta, newLogSize
                 );
             } else {
                 LOG.info(
                         "Segment file compacted [path = {}, log size freed = 
{} bytes, new log size = {} bytes].",
-                        segmentFile.path(), logSizeDelta, newLogSize
+                        segmentFilePath, logSizeDelta, newLogSize
                 );
             }
         }
@@ -286,6 +370,69 @@ class RaftLogGarbageCollector {
         return index >= indexFileMeta.firstLogIndexInclusive() && index < 
indexFileMeta.lastLogIndexExclusive();
     }
 
+    private void initLogSizeFromDisk() throws IOException {
+        Path indexFilesDir = indexFileManager.indexFilesDir();
+
+        try (Stream<Path> files = Stream.concat(Files.list(segmentFilesDir), 
Files.list(indexFilesDir))) {
+            long logSizeOnDisk = files
+                    .mapToLong(path -> {
+                        try {
+                            return Files.size(path);
+                        } catch (IOException e) {
+                            throw new UncheckedIOException(e);
+                        }
+                    })
+                    .sum();
+
+            logSizeBytes.addAndGet(logSizeOnDisk);
+        }
+    }
+
+    private class GcTask implements Runnable {
+        @Override
+        public void run() {
+            while (!Thread.currentThread().isInterrupted()) {
+                try {
+                    runGcCycle();
+                } catch (ClosedByInterruptException e) {
+                    return;
+                } catch (IOException e) {
+                    failureProcessor.process(new 
FailureContext(CRITICAL_ERROR, e));
+                }
+
+                LockSupport.park();
+            }
+        }
+
+        private void runGcCycle() throws IOException {
+            if (logSizeBytes.get() < softLimitBytes) {
+                return;
+            }
+
+            LOG.info("Starting Log Storage GC cycle.");
+
+            try (Stream<FileProperties> candidates = 
strategy.selectCandidates()) {
+                Iterator<FileProperties> it = candidates.iterator();
+
+                do {
+                    if (!it.hasNext()) {
+                        LOG.warn(
+                                "Log size is above the soft limit but there 
are no files to compact "
+                                        + "[currentLogSize = {} bytes, 
softLimit = {} bytes].",
+                                logSizeBytes.get(), softLimitBytes
+                        );
+
+                        return;
+                    }
+
+                    runCompaction(it.next());
+                } while (logSizeBytes.get() >= softLimitBytes);
+            }
+
+            LOG.info("Log Storage GC cycle complete.");
+        }
+    }
+
     private class TmpSegmentFile implements AutoCloseable {
         private final String fileName;
 
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/ReadModeIndexMemTable.java
 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/ReadModeIndexMemTable.java
index ed5f5100d37..743f9921710 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/ReadModeIndexMemTable.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/ReadModeIndexMemTable.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.internal.raft.storage.segstore;
 
-import java.util.Iterator;
 import java.util.Map.Entry;
 import org.jetbrains.annotations.Nullable;
 
@@ -26,17 +25,12 @@ import org.jetbrains.annotations.Nullable;
  *
  * @see WriteModeIndexMemTable
  */
-interface ReadModeIndexMemTable {
+interface ReadModeIndexMemTable extends Iterable<Entry<Long, SegmentInfo>> {
     /**
      * Returns information about a segment file for the given group ID or 
{@code null} if it is not present in this memtable.
      */
     @Nullable SegmentInfo segmentInfo(long groupId);
 
-    /**
-     * Returns an iterator over all {@code Group ID -> SegmentInfo} entries in 
this memtable.
-     */
-    Iterator<Entry<Long, SegmentInfo>> iterator();
-
     /**
      * Returns the number of Raft Group IDs stored in this memtable.
      */
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/ReadModeIndexMemTable.java
 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileCompactionStrategy.java
similarity index 51%
copy from 
modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/ReadModeIndexMemTable.java
copy to 
modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileCompactionStrategy.java
index ed5f5100d37..815bba14f7f 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/ReadModeIndexMemTable.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileCompactionStrategy.java
@@ -17,28 +17,23 @@
 
 package org.apache.ignite.internal.raft.storage.segstore;
 
-import java.util.Iterator;
-import java.util.Map.Entry;
-import org.jetbrains.annotations.Nullable;
+import java.io.IOException;
+import java.util.stream.Stream;
 
 /**
- * Immutable version of an index memtable used by the {@link 
RaftLogCheckpointer}.
+ * Strategy for selecting which segment files to compact during garbage 
collection.
  *
- * @see WriteModeIndexMemTable
+ * <p>Each implementation is fully responsible for discovering and ordering 
compaction candidates. This includes
+ * determining which files are eligible (e.g. skipping files that have not yet 
been checkpointed) and ranking them
+ * by priority.
+ *
+ * <p>The GC consumes candidates from the stream in order, compacting each 
one, and stops as soon as the log size
+ * drops below the soft limit or the stream is exhausted.
  */
-interface ReadModeIndexMemTable {
-    /**
-     * Returns information about a segment file for the given group ID or 
{@code null} if it is not present in this memtable.
-     */
-    @Nullable SegmentInfo segmentInfo(long groupId);
-
-    /**
-     * Returns an iterator over all {@code Group ID -> SegmentInfo} entries in 
this memtable.
-     */
-    Iterator<Entry<Long, SegmentInfo>> iterator();
-
+@SuppressWarnings("InterfaceMayBeAnnotatedFunctional")
+interface SegmentFileCompactionStrategy {
     /**
-     * Returns the number of Raft Group IDs stored in this memtable.
+     * Returns an ordered stream of segment file candidates for compaction. 
The stream must be closed by the caller.
      */
-    int numGroups();
+    Stream<FileProperties> selectCandidates() throws IOException;
 }
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManager.java
 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManager.java
index a0b9d440d62..9c3a0c1a62b 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManager.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManager.java
@@ -176,14 +176,22 @@ class SegmentFileManager implements ManuallyCloseable {
 
         indexFileManager = new IndexFileManager(baseDir);
 
+        garbageCollector = new RaftLogGarbageCollector(
+                nodeName,
+                segmentFilesDir,
+                indexFileManager,
+                logStorageView.softLogSizeLimitBytes(),
+                new MostGarbageFirstCompactionStrategy(segmentFilesDir, 
indexFileManager),
+                failureProcessor
+        );
+
         checkpointer = new RaftLogCheckpointer(
                 nodeName,
                 indexFileManager,
                 failureProcessor,
-                logStorageView.maxCheckpointQueueSize()
+                logStorageView.maxCheckpointQueueSize(),
+                garbageCollector::onLogStorageSizeIncreased
         );
-
-        garbageCollector = new RaftLogGarbageCollector(segmentFilesDir, 
indexFileManager);
     }
 
     void start() throws IOException {
@@ -232,6 +240,8 @@ class SegmentFileManager implements ManuallyCloseable {
         // Index File Manager must be started strictly before the checkpointer.
         indexFileManager.start();
 
+        garbageCollector.start();
+
         checkpointer.start();
     }
 
@@ -512,6 +522,8 @@ class SegmentFileManager implements ManuallyCloseable {
                 throw new IgniteInternalException(NODE_STOPPING_ERR);
             }
 
+            garbageCollector.onLogStorageSizeIncreased(segmentFileSize);
+
             
currentSegmentFile.set(allocateNewSegmentFile(++curSegmentFileOrdinal));
 
             rolloverLock.notifyAll();
@@ -538,6 +550,7 @@ class SegmentFileManager implements ManuallyCloseable {
         }
 
         checkpointer.stop();
+        garbageCollector.stop();
     }
 
     private static void writeHeader(SegmentFile segmentFile) {
diff --git 
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/AbstractCompactionStrategyTest.java
 
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/AbstractCompactionStrategyTest.java
new file mode 100644
index 00000000000..1f7d60341e5
--- /dev/null
+++ 
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/AbstractCompactionStrategyTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft.storage.segstore;
+
+import static 
org.apache.ignite.internal.raft.storage.segstore.IndexFileManager.indexFileName;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+abstract class AbstractCompactionStrategyTest extends IgniteAbstractTest {
+    private Path segmentFilesDir;
+
+    private Path indexFilesDir;
+
+    @Mock
+    IndexFileManager indexFileManager;
+
+    private SegmentFileCompactionStrategy strategy;
+
+    @BeforeEach
+    void setUp() throws IOException {
+        segmentFilesDir = Files.createDirectories(workDir.resolve("segment"));
+        indexFilesDir = Files.createDirectories(workDir.resolve("index"));
+
+        when(indexFileManager.indexFilePath(any())).thenAnswer(inv -> 
indexFilesDir.resolve(indexFileName(inv.getArgument(0))));
+
+        strategy = createStrategy(segmentFilesDir, indexFileManager);
+    }
+
+    @Test
+    void selectsOnlySegmentFilesWithExistingIndexAndSkipsTmp() throws 
IOException {
+        FileProperties missingIndex = new FileProperties(0);
+        FileProperties presentIndex = new FileProperties(1);
+
+        createSegmentFile(missingIndex);
+        createSegmentFile(presentIndex);
+
+        Files.createFile(segmentFilesDir.resolve(SegmentFile.fileName(new 
FileProperties(2)) + ".tmp"));
+
+        createIndexFile(presentIndex);
+
+        assertThat(selectedCandidates(), contains(presentIndex));
+    }
+
+    abstract SegmentFileCompactionStrategy createStrategy(Path 
segmentFilesDir, IndexFileManager indexFileManager);
+
+    List<FileProperties> selectedCandidates() throws IOException {
+        try (Stream<FileProperties> candidates = strategy.selectCandidates()) {
+            return candidates.collect(Collectors.toList());
+        }
+    }
+
+    void createSegmentFile(FileProperties fileProperties) throws IOException {
+        
Files.createFile(segmentFilesDir.resolve(SegmentFile.fileName(fileProperties)));
+    }
+
+    void createIndexFile(FileProperties fileProperties) throws IOException {
+        Files.createFile(indexFilesDir.resolve(indexFileName(fileProperties)));
+    }
+}
diff --git 
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/AbstractMemTableTest.java
 
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/AbstractMemTableTest.java
index 0dffd7c8f29..c2445ce59b4 100644
--- 
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/AbstractMemTableTest.java
+++ 
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/AbstractMemTableTest.java
@@ -26,8 +26,6 @@ import static org.hamcrest.Matchers.nullValue;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
-import java.util.Iterator;
-import java.util.Map.Entry;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
 import org.junit.jupiter.api.Test;
 
@@ -80,9 +78,7 @@ abstract class AbstractMemTableTest<T extends 
WriteModeIndexMemTable & ReadModeI
         memTable.appendSegmentFileOffset(1, 0, 3);
         memTable.appendSegmentFileOffset(1, 1, 4);
 
-        Iterator<Entry<Long, SegmentInfo>> it = memTable.iterator();
-
-        it.forEachRemaining(entry -> {
+        memTable.forEach(entry -> {
             long groupId = entry.getKey();
             SegmentInfo segmentInfo = entry.getValue();
 
diff --git 
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/MostGarbageFirstCompactionStrategyTest.java
 
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/MostGarbageFirstCompactionStrategyTest.java
new file mode 100644
index 00000000000..d98b2962ca3
--- /dev/null
+++ 
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/MostGarbageFirstCompactionStrategyTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft.storage.segstore;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.Mockito.when;
+
+import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
+import it.unimi.dsi.fastutil.longs.Long2ObjectMaps;
+import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
+import java.io.IOException;
+import java.nio.file.Path;
+import org.junit.jupiter.api.Test;
+
+class MostGarbageFirstCompactionStrategyTest extends 
AbstractCompactionStrategyTest {
+    @Override
+    protected SegmentFileCompactionStrategy createStrategy(Path 
segmentFilesDir, IndexFileManager indexFileManager) {
+        return new MostGarbageFirstCompactionStrategy(segmentFilesDir, 
indexFileManager);
+    }
+
+    @Test
+    void prioritizesFullyDeletableThenLeastLiveEntries() throws IOException {
+        
when(indexFileManager.describeSegmentFile(anyInt())).thenReturn(Long2ObjectMaps.emptyMap());
+
+        FileProperties fullyDeletable = new FileProperties(3);
+        FileProperties smallLiveSet = new FileProperties(2);
+        FileProperties largerLiveSet = new FileProperties(1);
+        FileProperties tieLowGeneration = new FileProperties(5, 0);
+        FileProperties tieHighGeneration = new FileProperties(5, 1);
+
+        createSegmentFile(fullyDeletable);
+        createSegmentFile(smallLiveSet);
+        createSegmentFile(largerLiveSet);
+        createSegmentFile(tieLowGeneration);
+        createSegmentFile(tieHighGeneration);
+
+        createIndexFile(fullyDeletable);
+        createIndexFile(smallLiveSet);
+        createIndexFile(largerLiveSet);
+        createIndexFile(tieLowGeneration);
+        createIndexFile(tieHighGeneration);
+
+        
when(indexFileManager.describeSegmentFile(fullyDeletable.ordinal())).thenReturn(new
 Long2ObjectOpenHashMap<>());
+        
when(indexFileManager.describeSegmentFile(smallLiveSet.ordinal())).thenReturn(descriptor(meta(10,
 11)));
+        
when(indexFileManager.describeSegmentFile(largerLiveSet.ordinal())).thenReturn(descriptor(meta(20,
 23)));
+        
when(indexFileManager.describeSegmentFile(tieLowGeneration.ordinal())).thenReturn(descriptor(meta(30,
 33)));
+
+        assertThat(selectedCandidates(), contains(fullyDeletable, 
smallLiveSet, largerLiveSet, tieLowGeneration, tieHighGeneration));
+    }
+
+    private static Long2ObjectMap<IndexFileMeta> descriptor(IndexFileMeta... 
metas) {
+        var map = new Long2ObjectOpenHashMap<IndexFileMeta>();
+
+        for (int i = 0; i < metas.length; i++) {
+            map.put(i, metas[i]);
+        }
+
+        return map;
+    }
+
+    private static IndexFileMeta meta(long firstLogIndexInclusive, long 
lastLogIndexExclusive) {
+        return new IndexFileMeta(firstLogIndexInclusive, 
lastLogIndexExclusive, 0, new FileProperties(0));
+    }
+}
diff --git 
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/OldestFirstCompactionStrategyTest.java
 
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/OldestFirstCompactionStrategyTest.java
new file mode 100644
index 00000000000..dc5abb4e7b8
--- /dev/null
+++ 
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/OldestFirstCompactionStrategyTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft.storage.segstore;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import org.junit.jupiter.api.Test;
+
+class OldestFirstCompactionStrategyTest extends AbstractCompactionStrategyTest 
{
+    @Override
+    protected SegmentFileCompactionStrategy createStrategy(Path 
segmentFilesDir, IndexFileManager indexFileManager) {
+        return new OldestFirstCompactionStrategy(segmentFilesDir, 
indexFileManager);
+    }
+
+    @Test
+    void ordersCandidatesByNaturalOrder() throws IOException {
+        FileProperties oldest = new FileProperties(1, 0);
+        FileProperties newestOrdinal = new FileProperties(3, 0);
+        FileProperties sameOrdinalNewerGeneration = new FileProperties(1, 1);
+
+        createSegmentFile(newestOrdinal);
+        createSegmentFile(sameOrdinalNewerGeneration);
+        createSegmentFile(oldest);
+
+        createIndexFile(newestOrdinal);
+        createIndexFile(sameOrdinalNewerGeneration);
+        createIndexFile(oldest);
+
+        assertThat(selectedCandidates(), contains(oldest, 
sameOrdinalNewerGeneration, newestOrdinal));
+    }
+}
diff --git 
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogCheckpointerTest.java
 
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogCheckpointerTest.java
index f172899123e..24ae9a1ec56 100644
--- 
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogCheckpointerTest.java
+++ 
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogCheckpointerTest.java
@@ -25,7 +25,6 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.anyOf;
 import static org.hamcrest.Matchers.is;
 import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.lenient;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.timeout;
 import static org.mockito.Mockito.verify;
@@ -61,7 +60,7 @@ class RaftLogCheckpointerTest extends BaseIgniteAbstractTest {
 
     @BeforeEach
     void setUp() {
-        checkpointer = new RaftLogCheckpointer(NODE_NAME, indexFileManager, 
new NoOpFailureManager(), MAX_QUEUE_SIZE);
+        checkpointer = new RaftLogCheckpointer(NODE_NAME, indexFileManager, 
new NoOpFailureManager(), MAX_QUEUE_SIZE, size -> {});
 
         checkpointer.start();
     }
@@ -74,7 +73,9 @@ class RaftLogCheckpointerTest extends BaseIgniteAbstractTest {
     }
 
     @Test
-    void testOnRollover(@Mock SegmentFile segmentFile, @Mock StripedMemTable 
memTable) throws IOException {
+    void testOnRollover(@Mock SegmentFile segmentFile) throws IOException {
+        var memTable = new SingleThreadMemTable();
+
         checkpointer.onRollover(segmentFile, memTable);
 
         verify(segmentFile, timeout(500)).sync();
@@ -84,7 +85,6 @@ class RaftLogCheckpointerTest extends BaseIgniteAbstractTest {
     @Test
     void testBlockOnRollover(
             @Mock SegmentFile segmentFile,
-            @Mock StripedMemTable memTable,
             @InjectExecutorService(threadCount = 1) ExecutorService executor
     ) {
         var blockFuture = new CompletableFuture<Void>();
@@ -92,6 +92,8 @@ class RaftLogCheckpointerTest extends BaseIgniteAbstractTest {
         try {
             doAnswer(invocation -> 
blockFuture.join()).when(segmentFile).sync();
 
+            var memTable = new SingleThreadMemTable();
+
             for (int i = 0; i < MAX_QUEUE_SIZE; i++) {
                 checkpointer.onRollover(segmentFile, memTable);
             }
@@ -125,15 +127,11 @@ class RaftLogCheckpointerTest extends 
BaseIgniteAbstractTest {
 
                 when(mockFile.buffer()).thenReturn(buffer);
 
-                StripedMemTable mockMemTable = mock(StripedMemTable.class);
-
-                var segmentInfo = new SegmentInfo(i);
-
-                segmentInfo.addOffset(i, 1);
+                var memTable = new SingleThreadMemTable();
 
-                
lenient().when(mockMemTable.segmentInfo(i)).thenReturn(segmentInfo);
+                memTable.appendSegmentFileOffset(i, i, 1);
 
-                checkpointer.onRollover(mockFile, mockMemTable);
+                checkpointer.onRollover(mockFile, memTable);
             }
 
             for (int groupId = 0; groupId < MAX_QUEUE_SIZE; groupId++) {
@@ -161,7 +159,7 @@ class RaftLogCheckpointerTest extends 
BaseIgniteAbstractTest {
     }
 
     @Test
-    void testFindSegmentPayloadReturnsBufferWhenOffsetPresent(@Mock 
SegmentFile mockFile, @Mock StripedMemTable mockMemTable) {
+    void testFindSegmentPayloadReturnsBufferWhenOffsetPresent(@Mock 
SegmentFile mockFile) {
         var blockFuture = new CompletableFuture<Void>();
 
         try {
@@ -174,15 +172,13 @@ class RaftLogCheckpointerTest extends 
BaseIgniteAbstractTest {
             long groupId = 2;
             long logIndex = 5;
 
-            var segmentInfo = new SegmentInfo(1);
+            var memTable = new SingleThreadMemTable();
 
             for (int i = 1; i <= 10; i++) {
-                segmentInfo.addOffset(i, i);
+                memTable.appendSegmentFileOffset(groupId, i, i);
             }
 
-            when(mockMemTable.segmentInfo(groupId)).thenReturn(segmentInfo);
-
-            checkpointer.onRollover(mockFile, mockMemTable);
+            checkpointer.onRollover(mockFile, memTable);
 
             EntrySearchResult res = 
checkpointer.findSegmentPayloadInQueue(groupId, logIndex);
 
@@ -194,7 +190,7 @@ class RaftLogCheckpointerTest extends 
BaseIgniteAbstractTest {
     }
 
     @Test
-    void testFindSegmentPayloadReturnsEmptyWhenPrefixTombstoneCutsOff(@Mock 
SegmentFile mockFile, @Mock StripedMemTable mockMemTable) {
+    void testFindSegmentPayloadReturnsEmptyWhenPrefixTombstoneCutsOff(@Mock 
SegmentFile mockFile) {
         var blockFuture = new CompletableFuture<Void>();
 
         try {
@@ -202,14 +198,15 @@ class RaftLogCheckpointerTest extends 
BaseIgniteAbstractTest {
 
             long groupId = 2;
 
-            SegmentInfo mockSegmentInfo = mock(SegmentInfo.class);
+            var memTable = new SingleThreadMemTable();
+
+            for (long i = 1; i <= 20; i++) {
+                memTable.appendSegmentFileOffset(groupId, i, 1);
+            }
 
-            
when(mockMemTable.segmentInfo(groupId)).thenReturn(mockSegmentInfo);
-            when(mockSegmentInfo.lastLogIndexExclusive()).thenReturn(20L);
-            // Emulate prefix truncation from index 10.
-            when(mockSegmentInfo.firstIndexKept()).thenReturn(10L);
+            memTable.truncatePrefix(groupId, 10);
 
-            checkpointer.onRollover(mockFile, mockMemTable);
+            checkpointer.onRollover(mockFile, memTable);
 
             EntrySearchResult res = 
checkpointer.findSegmentPayloadInQueue(groupId, 5);
 
diff --git 
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogGarbageCollectorTest.java
 
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogGarbageCollectorTest.java
index 733ff153ba2..6cf9db8d3ef 100644
--- 
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogGarbageCollectorTest.java
+++ 
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogGarbageCollectorTest.java
@@ -31,6 +31,7 @@ import static org.hamcrest.Matchers.lessThan;
 import static org.hamcrest.Matchers.not;
 
 import java.io.IOException;
+import java.io.UncheckedIOException;
 import java.nio.ByteBuffer;
 import java.nio.file.Files;
 import java.nio.file.Path;
@@ -129,7 +130,7 @@ class RaftLogGarbageCollectorTest extends 
IgniteAbstractTest {
 
         FileProperties fileProperties = 
SegmentFile.fileProperties(segmentFilePath);
 
-        runCompaction(segmentFilePath);
+        garbageCollector.runCompaction(fileProperties);
 
         assertThat(segmentFilePath, not(exists()));
         assertThat(indexFileManager.indexFilePath(fileProperties), 
not(exists()));
@@ -165,7 +166,7 @@ class RaftLogGarbageCollectorTest extends 
IgniteAbstractTest {
 
         FileProperties originalFileProperties = 
SegmentFile.fileProperties(firstSegmentFile);
 
-        runCompaction(firstSegmentFile);
+        garbageCollector.runCompaction(originalFileProperties);
 
         // Segment file should be replaced by a new one with increased 
generation.
         assertThat(firstSegmentFile, not(exists()));
@@ -287,7 +288,7 @@ class RaftLogGarbageCollectorTest extends 
IgniteAbstractTest {
 
                     long sizeBeforeCompaction = Files.size(curSegmentFilePath);
 
-                    runCompaction(curSegmentFilePath);
+                    garbageCollector.runCompaction(fileProperties);
 
                     FileProperties newFileProperties = new 
FileProperties(fileProperties.ordinal(), fileProperties.generation() + 1);
 
@@ -590,8 +591,8 @@ class RaftLogGarbageCollectorTest extends 
IgniteAbstractTest {
     }
 
     /**
-     * Reproducer for the stale-deque-entry corruption bug: after fully 
deleting a middle segment file, its {@link IndexFileMeta} remains
-     * in the same deque block as the preceding file's meta. When the 
preceding file is subsequently partially compacted (its live log range
+     * Reproducer for the stale-deque-entry corruption bug: after fully 
deleting a middle segment file, its {@link IndexFileMeta} remains in
+     * the same deque block as the preceding file's meta. When the preceding 
file is subsequently partially compacted (its live log range
      * shrinks), {@link IndexFileMetaArray#onIndexCompacted} asserts that the 
new meta's {@code lastLogIndexExclusive} equals the next
      * entry's {@code firstLogIndexInclusive}.
      */
@@ -643,6 +644,30 @@ class RaftLogGarbageCollectorTest extends 
IgniteAbstractTest {
         });
     }
 
+    @Test
+    void testLogSizeBytesInitializedCorrectlyOnStartup() throws Exception {
+        // Fill multiple segment files to create both segment and index files.
+        List<byte[]> batches = createRandomData(FILE_SIZE / 4, 10);
+        for (int i = 0; i < batches.size(); i++) {
+            appendBytes(GROUP_ID_1, batches.get(i), i);
+        }
+
+        // Wait for at least one checkpoint so index files exist.
+        await().until(this::indexFiles, hasSize(greaterThanOrEqualTo(1)));
+
+        long expectedSize = totalLogSizeFromDisk(fileManager);
+
+        assertThat(garbageCollector.logSizeBytes(), is(expectedSize));
+
+        restartSegmentFileManager();
+
+        assertThat(garbageCollector.logSizeBytes(), is(expectedSize));
+    }
+
+    private void runCompaction(Path segmentFilePath) throws IOException {
+        
garbageCollector.runCompaction(SegmentFile.fileProperties(segmentFilePath));
+    }
+
     private List<Path> segmentFiles() throws IOException {
         try (Stream<Path> files = Files.list(fileManager.segmentFilesDir())) {
             return files
@@ -720,15 +745,22 @@ class RaftLogGarbageCollectorTest extends 
IgniteAbstractTest {
         );
 
         fileManager.start();
-    }
 
-    private void runCompaction(Path segmentFilePath) throws IOException {
-        SegmentFile segmentFile = SegmentFile.openExisting(segmentFilePath, 
false);
+        indexFileManager = fileManager.indexFileManager();
+        garbageCollector = fileManager.garbageCollector();
+    }
 
-        try {
-            garbageCollector.runCompaction(segmentFile);
-        } finally {
-            segmentFile.close();
+    private static long totalLogSizeFromDisk(SegmentFileManager manager) 
throws IOException {
+        try (Stream<Path> files = 
Stream.concat(Files.list(manager.segmentFilesDir()), 
Files.list(manager.indexFilesDir()))) {
+            return files
+                    .mapToLong(p -> {
+                        try {
+                            return Files.size(p);
+                        } catch (IOException e) {
+                            throw new UncheckedIOException(e);
+                        }
+                    })
+                    .sum();
         }
     }
 }
diff --git 
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogGcSoftLimitTest.java
 
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogGcSoftLimitTest.java
new file mode 100644
index 00000000000..c6552a0386a
--- /dev/null
+++ 
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogGcSoftLimitTest.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft.storage.segstore;
+
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.randomBytes;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.runRace;
+import static org.awaitility.Awaitility.await;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.failure.NoOpFailureManager;
+import org.apache.ignite.internal.lang.RunnableX;
+import org.apache.ignite.internal.raft.configuration.LogStorageConfiguration;
+import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
+import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.apache.ignite.raft.jraft.entity.LogEntry;
+import org.apache.ignite.raft.jraft.entity.LogId;
+import org.apache.ignite.raft.jraft.entity.codec.LogEntryEncoder;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.RepeatedTest;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/**
+ * Tests for {@link RaftLogGarbageCollector} soft-limit behavior.
+ */
+@ExtendWith(ConfigurationExtension.class)
+class RaftLogGcSoftLimitTest extends IgniteAbstractTest {
+    private static final int FILE_SIZE = 200;
+
+    private static final long SMALL_SOFT_LIMIT = 2 * FILE_SIZE;
+
+    private static final long GROUP_ID_1 = 1000;
+
+    private static final long GROUP_ID_2 = 2000;
+
+    private static final int STRIPES = 10;
+
+    private static final String NODE_NAME = "test";
+
+    @InjectConfiguration
+    private RaftConfiguration raftConfiguration;
+
+    @InjectConfiguration(
+            value = "mock.segmentFileSizeBytes=" + FILE_SIZE + ", 
mock.softLogSizeLimitBytes=" + SMALL_SOFT_LIMIT,
+            validate = false
+    )
+    private LogStorageConfiguration storageConfiguration;
+
+    private SegmentFileManager fileManager;
+
+    private RaftLogGarbageCollector garbageCollector;
+
+    @BeforeEach
+    void setUp() throws IOException {
+        fileManager = new SegmentFileManager(
+                NODE_NAME,
+                workDir,
+                STRIPES,
+                new NoOpFailureManager(),
+                raftConfiguration,
+                storageConfiguration
+        );
+
+        fileManager.start();
+
+        garbageCollector = fileManager.garbageCollector();
+    }
+
+    @AfterEach
+    void tearDown() throws Exception {
+        if (fileManager != null) {
+            fileManager.close();
+        }
+    }
+
+    @Test
+    void testGcTriggeredWhenSoftLimitExceeded() throws Exception {
+        List<byte[]> batches = createRandomData(FILE_SIZE / 4, 10);
+        for (int i = 0; i < batches.size(); i++) {
+            appendBytes(GROUP_ID_1, i, batches.get(i));
+        }
+
+        await().until(() -> indexFiles(fileManager), hasSize(4));
+
+        assertThat(totalLogSizeFromDisk(fileManager), 
is(greaterThanOrEqualTo(SMALL_SOFT_LIMIT)));
+
+        // Truncate all GROUP_ID_1 entries so every old file becomes fully 
deletable.
+        fileManager.truncatePrefix(GROUP_ID_1, batches.size() - 1);
+
+        // Write one GROUP_ID_2 batch to force a rollover so the truncation 
record ends up in a
+        // completed (checkpointable) file.
+        appendBytes(GROUP_ID_2, batches.size() + 1, createRandomData(FILE_SIZE 
/ 4, 1).get(0));
+
+        await().until(() -> garbageCollector.logSizeBytes(), 
is(lessThanOrEqualTo(SMALL_SOFT_LIMIT)));
+
+        assertThat(garbageCollector.logSizeBytes(), 
is(totalLogSizeFromDisk(fileManager)));
+    }
+
+    @RepeatedTest(10)
+    void testConcurrentWritesAndAutoGc() throws Exception {
+        int numBatches = 20;
+        List<byte[]> batches = createRandomData(FILE_SIZE / 8, numBatches);
+
+        // Write initial data from both groups.
+        for (int i = 0; i < numBatches; i++) {
+            appendBytes(GROUP_ID_1, i, batches.get(i));
+            appendBytes(GROUP_ID_2, i, batches.get(i));
+        }
+
+        // Wait for checkpoints so the GC has candidates to compact.
+        await().until(() -> indexFiles(fileManager), 
hasSize(greaterThanOrEqualTo(1)));
+
+        var writerDone = new AtomicBoolean(false);
+
+        RunnableX writerTask = () -> {
+            try {
+                for (int i = numBatches; i < numBatches * 3; i++) {
+                    appendBytes(GROUP_ID_1, i, batches.get(i % numBatches));
+                    appendBytes(GROUP_ID_2, i, batches.get(i % numBatches));
+                    fileManager.truncatePrefix(GROUP_ID_1, i - 1);
+                }
+            } finally {
+                writerDone.set(true);
+            }
+        };
+
+        RunnableX readerTask = () -> {
+            while (!writerDone.get()) {
+                for (int i = 0; i < numBatches; i++) {
+                    int index = i;
+                    fileManager.getEntry(GROUP_ID_2, i, bs -> {
+                        assertThat(bs, is(batches.get(index)));
+
+                        return null;
+                    });
+                }
+            }
+        };
+
+        runRace(writerTask, readerTask, readerTask);
+    }
+
+    private static List<Path> indexFiles(SegmentFileManager manager) throws 
IOException {
+        try (Stream<Path> files = Files.list(manager.indexFilesDir())) {
+            return files
+                    .filter(p -> !p.getFileName().toString().endsWith(".tmp"))
+                    .sorted()
+                    .collect(Collectors.toList());
+        }
+    }
+
+    private static long totalLogSizeFromDisk(SegmentFileManager manager) 
throws IOException {
+        try (Stream<Path> files = 
Stream.concat(Files.list(manager.segmentFilesDir()), 
Files.list(manager.indexFilesDir()))) {
+            return files
+                    .mapToLong(p -> {
+                        try {
+                            return Files.size(p);
+                        } catch (IOException e) {
+                            throw new UncheckedIOException(e);
+                        }
+                    })
+                    .sum();
+        }
+    }
+
+    private static List<byte[]> createRandomData(int batchLength, int 
numBatches) {
+        return IntStream.range(0, numBatches)
+                .mapToObj(i -> randomBytes(ThreadLocalRandom.current(), 
batchLength))
+                .collect(Collectors.toList());
+    }
+
+    private void appendBytes(long groupId, long index, byte[] serializedEntry) 
throws IOException {
+        var entry = new LogEntry();
+        entry.setId(new LogId(index, 0));
+
+        fileManager.appendEntry(groupId, entry, new LogEntryEncoder() {
+            @Override
+            public byte[] encode(LogEntry log) {
+                return serializedEntry;
+            }
+
+            @Override
+            public void encode(ByteBuffer buffer, LogEntry log) {
+                buffer.put(serializedEntry);
+            }
+
+            @Override
+            public int size(LogEntry logEntry) {
+                return serializedEntry.length;
+            }
+        });
+    }
+}

Reply via email to