This is an automated email from the ASF dual-hosted git repository.

apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 4d2eda3bed7 IGNITE-26283 Recover Log Storage on startup (#6839)
4d2eda3bed7 is described below

commit 4d2eda3bed7ffc41e547dbaab81f472204123ceb
Author: Alexander Polovtcev <[email protected]>
AuthorDate: Mon Nov 3 09:37:54 2025 +0200

    IGNITE-26283 Recover Log Storage on startup (#6839)
---
 .../raft/storage/segstore/IndexFileManager.java    | 146 ++++++++++++--
 .../raft/storage/segstore/IndexFileMeta.java       |   8 +
 .../raft/storage/segstore/IndexMemTable.java       |   6 +-
 .../raft/storage/segstore/SegmentFileManager.java  | 224 ++++++++++++++++++---
 .../raft/storage/segstore/SegmentPayload.java      |  27 ++-
 .../storage/segstore/WriteBufferWithMemtable.java  |  45 +++++
 .../segstore/DeserializedSegmentPayload.java       |   8 +-
 .../storage/segstore/IndexFileManagerTest.java     | 102 ++++++++++
 .../storage/segstore/SegmentFileManagerTest.java   | 161 ++++++++++++++-
 9 files changed, 667 insertions(+), 60 deletions(-)

diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManager.java
 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManager.java
index 95ae8d2228b..94e164d9e14 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManager.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManager.java
@@ -22,9 +22,11 @@ import static java.nio.file.StandardOpenOption.WRITE;
 import static org.apache.ignite.internal.util.IgniteUtils.atomicMoveFile;
 import static org.apache.ignite.internal.util.IgniteUtils.fsyncFile;
 
+import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
 import java.io.EOFException;
 import java.io.IOException;
+import java.io.InputStream;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 import java.nio.channels.SeekableByteChannel;
@@ -35,6 +37,9 @@ import java.util.Iterator;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Stream;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.jetbrains.annotations.Nullable;
@@ -61,11 +66,11 @@ import org.jetbrains.annotations.Nullable;
  *
  * <p>Raft group meta is as follows:
  * <pre>
- * 
+----------------------------------------------------------------------------------------------------------------+-----+
- * |                                              Raft group 1 meta            
                                     | ... |
- * 
+----------------------------------------------------------------------------------------------------------------+-----+
- * | Group ID (8 bytes) | Flags (4 bytes) | Offset (4 bytes) | First Log Index 
(8 bytes) | Last Log Index (8 bytes) | ... |
- * 
+----------------------------------------------------------------------------------------------------------------+-----+
+ * 
+------------------------------------------------------------------------------------------------------------------------+-----+
+ * |                                              Raft group 1 meta            
                                             | ... |
+ * 
+------------------------------------------------------------------------------------------------------------------------+-----+
+ * | Group ID (8 bytes) | Flags (4 bytes) | Payload Offset (4 bytes) | First 
Log Index (8 bytes) | Last Log Index (8 bytes) | ... |
+ * 
+------------------------------------------------------------------------------------------------------------------------+-----+
  * </pre>
  *
  * <p>Payload of the index files has the following structure:
@@ -92,6 +97,10 @@ class IndexFileManager {
 
     private static final String INDEX_FILE_NAME_FORMAT = 
"index-%010d-%010d.bin";
 
+    private static final Pattern INDEX_FILE_NAME_PATTERN = 
Pattern.compile("index-(?<ordinal>\\d{10})-(?<generation>\\d{10})\\.bin");
+
+    private static final String TMP_FILE_SUFFIX = ".tmp";
+
     // Magic number + format version + number of Raft groups.
     static final int COMMON_META_SIZE = Integer.BYTES + Integer.BYTES + 
Integer.BYTES;
 
@@ -105,9 +114,11 @@ class IndexFileManager {
     /**
      * Current index file ordinal (used to generate index file names).
      *
-     * <p>No synchronized access is needed because this field is only used by 
the checkpoint thread.
+     * <p>No synchronized access is needed because this field is only used by 
the checkpoint thread and during startup.
+     *
+     * <p>{@code -1} means that the manager has not been started yet.
      */
-    private int curFileOrdinal = 0;
+    private int curFileOrdinal = -1;
 
     /**
      * Index file metadata grouped by Raft Group ID.
@@ -120,20 +131,52 @@ class IndexFileManager {
         Files.createDirectories(indexFilesDir);
     }
 
+    void start() throws IOException {
+        try (Stream<Path> indexFiles = Files.list(indexFilesDir)) {
+            Iterator<Path> it = indexFiles.sorted().iterator();
+
+            while (it.hasNext()) {
+                recoverIndexFileMetas(it.next());
+            }
+        }
+    }
+
     Path indexFilesDir() {
         return indexFilesDir;
     }
 
+    void cleanupTmpFiles() throws IOException {
+        try (Stream<Path> indexFiles = Files.list(indexFilesDir)) {
+            Iterator<Path> it = indexFiles.iterator();
+
+            while (it.hasNext()) {
+                Path indexFile = it.next();
+
+                if 
(indexFile.getFileName().toString().endsWith(TMP_FILE_SUFFIX)) {
+                    LOG.info("Deleting temporary index file: {}.", indexFile);
+
+                    Files.delete(indexFile);
+                }
+            }
+        }
+    }
+
     /**
      * Saves the given index memtable to a file.
+     *
+     * <p>Must only be called by the checkpoint thread.
      */
     Path saveIndexMemtable(ReadModeIndexMemTable indexMemTable) throws 
IOException {
-        String fileName = indexFileName(curFileOrdinal, 0);
+        return saveIndexMemtable(indexMemTable, ++curFileOrdinal);
+    }
+
+    Path saveIndexMemtable(ReadModeIndexMemTable indexMemTable, int 
fileOrdinal) throws IOException {
+        String fileName = indexFileName(fileOrdinal, 0);
 
-        Path tmpFilePath = indexFilesDir.resolve(fileName + ".tmp");
+        Path tmpFilePath = indexFilesDir.resolve(fileName + TMP_FILE_SUFFIX);
 
         try (var os = new 
BufferedOutputStream(Files.newOutputStream(tmpFilePath, CREATE_NEW, WRITE))) {
-            byte[] headerBytes = serializeHeaderAndFillMetadata(indexMemTable);
+            byte[] headerBytes = serializeHeaderAndFillMetadata(indexMemTable, 
fileOrdinal);
 
             os.write(headerBytes);
 
@@ -144,8 +187,6 @@ class IndexFileManager {
             }
         }
 
-        curFileOrdinal++;
-
         return syncAndRename(tmpFilePath, 
tmpFilePath.resolveSibling(fileName));
     }
 
@@ -213,7 +254,11 @@ class IndexFileManager {
         return groupIndexMeta == null ? -1 : 
groupIndexMeta.lastLogIndexExclusive();
     }
 
-    private byte[] serializeHeaderAndFillMetadata(ReadModeIndexMemTable 
indexMemTable) {
+    boolean indexFileExists(int fileOrdinal) {
+        return Files.exists(indexFilesDir.resolve(indexFileName(fileOrdinal, 
0)));
+    }
+
+    private byte[] serializeHeaderAndFillMetadata(ReadModeIndexMemTable 
indexMemTable, int fileOrdinal) {
         int numGroups = indexMemTable.numGroups();
 
         int headerSize = headerSize(numGroups);
@@ -240,7 +285,7 @@ class IndexFileManager {
 
             long lastLogIndexExclusive = segmentInfo.lastLogIndexExclusive();
 
-            var indexFileMeta = new IndexFileMeta(firstLogIndexInclusive, 
lastLogIndexExclusive, payloadOffset, curFileOrdinal);
+            var indexFileMeta = new IndexFileMeta(firstLogIndexInclusive, 
lastLogIndexExclusive, payloadOffset, fileOrdinal);
 
             putIndexFileMeta(groupId, indexFileMeta);
 
@@ -292,4 +337,77 @@ class IndexFileManager {
     private static String indexFileName(int fileOrdinal, int generation) {
         return String.format(INDEX_FILE_NAME_FORMAT, fileOrdinal, generation);
     }
+
+    private void recoverIndexFileMetas(Path indexFile) throws IOException {
+        int fileOrdinal = indexFileOrdinal(indexFile);
+
+        if (curFileOrdinal >= 0 && fileOrdinal != curFileOrdinal + 1) {
+            throw new IllegalStateException(String.format(
+                    "Unexpected index file ordinal. Expected %d, actual %d 
(%s).",
+                    curFileOrdinal + 1, fileOrdinal, indexFile
+            ));
+        }
+
+        curFileOrdinal = fileOrdinal;
+
+        try (InputStream is = new 
BufferedInputStream(Files.newInputStream(indexFile, StandardOpenOption.READ))) {
+            ByteBuffer commonMetaBuffer = readBytes(is, COMMON_META_SIZE, 
indexFile);
+
+            int magicNumber = commonMetaBuffer.getInt();
+
+            if (magicNumber != MAGIC_NUMBER) {
+                throw new IllegalStateException(String.format("Invalid magic 
number in index file %s: %d.", indexFile, magicNumber));
+            }
+
+            int formatVersion = commonMetaBuffer.getInt();
+
+            if (formatVersion > FORMAT_VERSION) {
+                throw new IllegalStateException(String.format(
+                        "Unsupported format version in index file %s: %d.", 
indexFile, formatVersion
+                ));
+            }
+
+            int numGroups = commonMetaBuffer.getInt();
+
+            if (numGroups <= 0) {
+                throw new IllegalStateException(String.format("Unexpected 
number of groups in index file %s: %d.", indexFile, numGroups));
+            }
+
+            for (int i = 0; i < numGroups; i++) {
+                ByteBuffer groupMetaBuffer = readBytes(is, GROUP_META_SIZE, 
indexFile);
+
+                long groupId = groupMetaBuffer.getLong();
+                groupMetaBuffer.getInt(); // Skip flags.
+                int payloadOffset = groupMetaBuffer.getInt();
+                long firstLogIndexInclusive = groupMetaBuffer.getLong();
+                long lastLogIndexExclusive = groupMetaBuffer.getLong();
+
+                var indexFileMeta = new IndexFileMeta(firstLogIndexInclusive, 
lastLogIndexExclusive, payloadOffset, curFileOrdinal);
+
+                putIndexFileMeta(groupId, indexFileMeta);
+            }
+        }
+    }
+
+    private static int indexFileOrdinal(Path indexFile) {
+        String fileName = indexFile.getFileName().toString();
+
+        Matcher matcher = INDEX_FILE_NAME_PATTERN.matcher(fileName);
+
+        if (!matcher.matches()) {
+            throw new IllegalArgumentException(String.format("Invalid index 
file name format: %s.", indexFile));
+        }
+
+        return Integer.parseInt(matcher.group("ordinal"));
+    }
+
+    private static ByteBuffer readBytes(InputStream is, int size, Path 
indexFile) throws IOException {
+        ByteBuffer result = 
ByteBuffer.wrap(is.readNBytes(size)).order(BYTE_ORDER);
+
+        if (result.remaining() != size) {
+            throw new IOException(String.format("Unexpected EOF when trying to 
read from index file: %s.", indexFile));
+        }
+
+        return result;
+    }
 }
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileMeta.java
 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileMeta.java
index dfdbe03b123..34bd73e12d2 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileMeta.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileMeta.java
@@ -34,6 +34,14 @@ class IndexFileMeta {
     private final int indexFileOrdinal;
 
     IndexFileMeta(long firstLogIndexInclusive, long lastLogIndexExclusive, int 
indexFilePayloadOffset, int indexFileOrdinal) {
+        if (lastLogIndexExclusive < firstLogIndexInclusive) {
+            throw new IllegalArgumentException("Invalid log index range: [" + 
firstLogIndexInclusive + ", " + lastLogIndexExclusive + ").");
+        }
+
+        if (indexFileOrdinal < 0) {
+            throw new IllegalArgumentException("Invalid index file ordinal: " 
+ indexFileOrdinal);
+        }
+
         this.firstLogIndexInclusive = firstLogIndexInclusive;
         this.lastLogIndexExclusive = lastLogIndexExclusive;
         this.indexFilePayloadOffset = indexFilePayloadOffset;
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexMemTable.java
 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexMemTable.java
index a4de58806ea..2ed981a53d5 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexMemTable.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexMemTable.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.raft.storage.segstore;
 
+import static org.apache.ignite.internal.util.IgniteUtils.safeAbs;
+
 import java.util.Iterator;
 import java.util.Map.Entry;
 import java.util.NoSuchElementException;
@@ -112,7 +114,9 @@ class IndexMemTable implements WriteModeIndexMemTable, 
ReadModeIndexMemTable {
     }
 
     private Stripe stripe(long groupId) {
-        int stripeIndex = Long.hashCode(groupId) % stripes.length;
+        // FIXME: We should calculate stripes the same way it is done in 
StripedDisruptor,
+        //  see https://issues.apache.org/jira/browse/IGNITE-26907
+        int stripeIndex = safeAbs(Long.hashCode(groupId) % stripes.length);
 
         return stripes[stripeIndex];
     }
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManager.java
 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManager.java
index ec768607d71..1954aef6dee 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManager.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManager.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.raft.storage.segstore;
 
+import static 
org.apache.ignite.internal.raft.storage.segstore.SegmentPayload.HASH_SIZE_BYTES;
+import static 
org.apache.ignite.internal.raft.storage.segstore.SegmentPayload.TRUNCATE_SUFFIX_RECORD_MARKER;
 import static 
org.apache.ignite.internal.raft.storage.segstore.SegmentPayload.TRUNCATE_SUFFIX_RECORD_SIZE;
 import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
 import static org.apache.ignite.lang.ErrorGroups.Common.NODE_STOPPING_ERR;
@@ -25,15 +27,24 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.file.Files;
 import java.nio.file.Path;
+import java.util.Iterator;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Stream;
 import org.apache.ignite.internal.close.ManuallyCloseable;
 import org.apache.ignite.internal.failure.FailureProcessor;
 import org.apache.ignite.internal.lang.IgniteInternalException;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
 import 
org.apache.ignite.internal.raft.storage.segstore.SegmentFile.WriteBuffer;
+import org.apache.ignite.internal.raft.util.VarlenEncoder;
+import org.apache.ignite.internal.util.FastCrc;
 import org.apache.ignite.raft.jraft.entity.LogEntry;
 import org.apache.ignite.raft.jraft.entity.codec.LogEntryDecoder;
 import org.apache.ignite.raft.jraft.entity.codec.LogEntryEncoder;
 import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.TestOnly;
 
 /**
  * File manager responsible for allocating and maintaining a pointer to the 
current segment file.
@@ -60,9 +71,9 @@ import org.jetbrains.annotations.Nullable;
  *
  * <p>Binary representation of each entry is as follows:
  * <pre>
- * 
+-------------------------+--------------------------+-------------------+---------------------+---------+----------------+
- * | Raft Group ID (8 bytes) | Payload Length (4 bytes) | Term (1-10 bytes) | 
Index (1-10 bytes)  | Payload | Hash (4 bytes) |
- * 
+-------------------------+--------------------------+-------------------+---------------------+---------+----------------+
+ * 
+-------------------------+--------------------------+--------------------+-------------------+---------+----------------+
+ * | Raft Group ID (8 bytes) | Payload Length (4 bytes) | Index (1-10 bytes) | 
Term (1-10 bytes) | Payload | Hash (4 bytes) |
+ * 
+-------------------------+--------------------------+--------------------+-------------------+---------+----------------+
  * </pre>
  *
  * <p>Log Entry Index and Term are stored as variable-length integers 
(varints), hence the non-fixed size in bytes. They are treated as
@@ -76,6 +87,8 @@ import org.jetbrains.annotations.Nullable;
  * written at the end of the file. If there are less than 8 bytes left, no 
switch records are written.
  */
 class SegmentFileManager implements ManuallyCloseable {
+    private static final IgniteLogger LOG = 
Loggers.forClass(SegmentFileManager.class);
+
     private static final int ROLLOVER_WAIT_TIMEOUT_MS = 30_000;
 
     private static final int MAGIC_NUMBER = 0x56E0B526;
@@ -84,6 +97,8 @@ class SegmentFileManager implements ManuallyCloseable {
 
     private static final String SEGMENT_FILE_NAME_FORMAT = 
"segment-%010d-%010d.bin";
 
+    private static final Pattern SEGMENT_FILE_NAME_PATTERN = 
Pattern.compile("segment-(?<ordinal>\\d{10})-(?<generation>\\d{10})\\.bin");
+
     /**
      * Byte sequence that is written at the beginning of every segment file.
      */
@@ -121,10 +136,8 @@ class SegmentFileManager implements ManuallyCloseable {
 
     /**
      * Current segment file ordinal (used to generate segment file names).
-     *
-     * <p>Must always be accessed under the {@link #rolloverLock}.
      */
-    private int curSegmentFileOrdinal;
+    private volatile int curSegmentFileOrdinal;
 
     /**
      * Flag indicating whether the file manager has been stopped.
@@ -150,10 +163,52 @@ class SegmentFileManager implements ManuallyCloseable {
     }
 
     void start() throws IOException {
-        checkpointer.start();
+        LOG.info("Starting segment file manager [segmentFilesDir={}, 
fileSize={}].", segmentFilesDir, fileSize);
+
+        indexFileManager.cleanupTmpFiles();
+
+        Path lastSegmentFilePath = null;
+
+        try (Stream<Path> segmentFiles = Files.list(segmentFilesDir)) {
+            Iterator<Path> it = segmentFiles.sorted().iterator();
+
+            while (it.hasNext()) {
+                Path segmentFilePath = it.next();
+
+                if (!it.hasNext()) {
+                    // Last segment file is treated differently.
+                    lastSegmentFilePath = segmentFilePath;
+                } else {
+                    // Create missing index files.
+                    int segmentFileOrdinal = 
segmentFileOrdinal(segmentFilePath);
+
+                    if (!indexFileManager.indexFileExists(segmentFileOrdinal)) 
{
+                        LOG.info("Creating missing index file for segment file 
{}.", segmentFilePath);
+
+                        SegmentFile segmentFile = 
SegmentFile.openExisting(segmentFilePath);
+
+                        WriteModeIndexMemTable memTable = 
recoverMemtable(segmentFile, segmentFilePath);
+
+                        
indexFileManager.saveIndexMemtable(memTable.transitionToReadMode(), 
segmentFileOrdinal);
+                    }
+                }
+            }
+        }
+
+        if (lastSegmentFilePath == null) {
+            currentSegmentFile.set(allocateNewSegmentFile(0));
+        } else {
+            curSegmentFileOrdinal = segmentFileOrdinal(lastSegmentFilePath);
+
+            
currentSegmentFile.set(recoverLatestSegmentFile(lastSegmentFilePath));
+        }
 
-        // TODO: implement recovery, see 
https://issues.apache.org/jira/browse/IGNITE-26283.
-        currentSegmentFile.set(allocateNewSegmentFile(0));
+        LOG.info("Segment file manager recovery completed. Current segment 
file: {}.", lastSegmentFilePath);
+
+        // Index File Manager must be started strictly before the checkpointer.
+        indexFileManager.start();
+
+        checkpointer.start();
     }
 
     Path segmentFilesDir() {
@@ -164,6 +219,11 @@ class SegmentFileManager implements ManuallyCloseable {
         return indexFileManager.indexFilesDir();
     }
 
+    @TestOnly
+    IndexFileManager indexFileManager() {
+        return indexFileManager;
+    }
+
     private SegmentFileWithMemtable allocateNewSegmentFile(int fileOrdinal) 
throws IOException {
         Path path = segmentFilesDir.resolve(segmentFileName(fileOrdinal, 0));
 
@@ -174,6 +234,12 @@ class SegmentFileManager implements ManuallyCloseable {
         return new SegmentFileWithMemtable(segmentFile, new 
IndexMemTable(stripes), false);
     }
 
+    private SegmentFileWithMemtable recoverLatestSegmentFile(Path 
segmentFilePath) throws IOException {
+        SegmentFile segmentFile = SegmentFile.openExisting(segmentFilePath);
+
+        return new SegmentFileWithMemtable(segmentFile, 
recoverLatestMemtable(segmentFile, segmentFilePath), false);
+    }
+
     private static String segmentFileName(int fileOrdinal, int generation) {
         return String.format(SEGMENT_FILE_NAME_FORMAT, fileOrdinal, 
generation);
     }
@@ -200,7 +266,7 @@ class SegmentFileManager implements ManuallyCloseable {
             SegmentPayload.writeTo(segmentBuffer, groupId, segmentEntrySize, 
entry, encoder);
 
             // Append to memtable before write buffer is released to avoid 
races with checkpoint on rollover.
-            writeBufferWithMemtable.memtable.appendSegmentFileOffset(groupId, 
entry.getId().getIndex(), segmentOffset);
+            
writeBufferWithMemtable.memtable().appendSegmentFileOffset(groupId, 
entry.getId().getIndex(), segmentOffset);
         }
     }
 
@@ -244,7 +310,7 @@ class SegmentFileManager implements ManuallyCloseable {
             SegmentPayload.writeTruncateSuffixRecordTo(segmentBuffer, groupId, 
lastLogIndexKept);
 
             // Modify the memtable before write buffer is released to avoid 
races with checkpoint on rollover.
-            writeBufferWithMemtable.memtable.truncateSuffix(groupId, 
lastLogIndexKept);
+            writeBufferWithMemtable.memtable().truncateSuffix(groupId, 
lastLogIndexKept);
         }
     }
 
@@ -393,7 +459,10 @@ class SegmentFileManager implements ManuallyCloseable {
 
             SegmentFileWithMemtable segmentFile = currentSegmentFile.get();
 
-            segmentFile.segmentFile().close();
+            // This should usually not happen but can happen on an abrupt node 
stop.
+            if (segmentFile != null) {
+                segmentFile.segmentFile().close();
+            }
 
             rolloverLock.notifyAll();
         }
@@ -429,23 +498,132 @@ class SegmentFileManager implements ManuallyCloseable {
         return 
segmentFile.buffer().position(segmentFilePointer.payloadOffset());
     }
 
-    private static class WriteBufferWithMemtable implements AutoCloseable {
-        final WriteBuffer writeBuffer;
+    private WriteModeIndexMemTable recoverMemtable(SegmentFile segmentFile, 
Path segmentFilePath) {
+        ByteBuffer buffer = segmentFile.buffer();
+
+        validateSegmentFileHeader(buffer, segmentFilePath);
+
+        var memtable = new IndexMemTable(stripes);
+
+        while (buffer.remaining() > SWITCH_SEGMENT_RECORD.length) {
+            int segmentFilePayloadOffset = buffer.position();
+
+            long groupId = buffer.getLong();
+
+            int payloadLength = buffer.getInt();
+
+            if (payloadLength == TRUNCATE_SUFFIX_RECORD_MARKER) {
+                long lastLogIndexKept = buffer.getLong();
+
+                memtable.truncateSuffix(groupId, lastLogIndexKept);
+
+                buffer.position(buffer.position() + HASH_SIZE_BYTES);
+            } else {
+                int endOfRecordPosition = buffer.position() + payloadLength + 
HASH_SIZE_BYTES;
+
+                long index = VarlenEncoder.readLong(buffer);
+
+                memtable.appendSegmentFileOffset(groupId, index, 
segmentFilePayloadOffset);
+
+                buffer.position(endOfRecordPosition);
+            }
+        }
+
+        return memtable;
+    }
+
+    /**
+     * Creates an index memtable from the given segment file. Unlike {@link 
#recoverMemtable} which is expected to only be called on
+     * "complete" segment files (i.e. those that has experienced a rollover), 
this method is expected to be called on the most recent,
+     * possibly incomplete segment file.
+     */
+    private WriteModeIndexMemTable recoverLatestMemtable(SegmentFile 
segmentFile, Path segmentFilePath) {
+        ByteBuffer buffer = segmentFile.buffer();
+
+        validateSegmentFileHeader(buffer, segmentFilePath);
+
+        var memtable = new IndexMemTable(stripes);
+
+        while (buffer.remaining() > SWITCH_SEGMENT_RECORD.length) {
+            int segmentFilePayloadOffset = buffer.position();
+
+            long groupId = buffer.getLong();
 
-        final WriteModeIndexMemTable memtable;
+            int payloadLength = buffer.getInt();
 
-        WriteBufferWithMemtable(WriteBuffer writeBuffer, 
WriteModeIndexMemTable memtable) {
-            this.writeBuffer = writeBuffer;
-            this.memtable = memtable;
+            int crcPosition;
+
+            if (payloadLength == TRUNCATE_SUFFIX_RECORD_MARKER) {
+                long lastLogIndexKept = buffer.getLong();
+
+                crcPosition = buffer.position();
+
+                buffer.position(segmentFilePayloadOffset);
+
+                // CRC violation signals the end of meaningful data in the 
segment file.
+                if (!isCrcValid(buffer, crcPosition)) {
+                    break;
+                }
+
+                memtable.truncateSuffix(groupId, lastLogIndexKept);
+            } else {
+                crcPosition = buffer.position() + payloadLength;
+
+                long index = VarlenEncoder.readLong(buffer);
+
+                buffer.position(segmentFilePayloadOffset);
+
+                // CRC violation signals the end of meaningful data in the 
segment file.
+                if (!isCrcValid(buffer, crcPosition)) {
+                    break;
+                }
+
+                memtable.appendSegmentFileOffset(groupId, index, 
segmentFilePayloadOffset);
+            }
+
+            buffer.position(crcPosition + HASH_SIZE_BYTES);
+        }
+
+        return memtable;
+    }
+
+    private static boolean isCrcValid(ByteBuffer buffer, int crcPosition) {
+        int originalPosition = buffer.position();
+
+        int crc = buffer.getInt(crcPosition);
+
+        int expectedCrc = FastCrc.calcCrc(buffer, crcPosition - 
buffer.position());
+
+        buffer.position(originalPosition);
+
+        return crc == expectedCrc;
+    }
+
+    private static void validateSegmentFileHeader(ByteBuffer buffer, Path 
segmentFilePath) {
+        int magicNumber = buffer.getInt();
+
+        if (magicNumber != MAGIC_NUMBER) {
+            throw new IllegalStateException(String.format("Invalid magic 
number in segment file %s: %d.", segmentFilePath, magicNumber));
         }
 
-        ByteBuffer buffer() {
-            return writeBuffer.buffer();
+        int formatVersion = buffer.getInt();
+
+        if (formatVersion > FORMAT_VERSION) {
+            throw new IllegalStateException(String.format(
+                    "Unsupported format version in segment file %s: %d.", 
segmentFilePath, formatVersion
+            ));
         }
+    }
+
+    private static int segmentFileOrdinal(Path segmentFile) {
+        String fileName = segmentFile.getFileName().toString();
 
-        @Override
-        public void close() {
-            writeBuffer.close();
+        Matcher matcher = SEGMENT_FILE_NAME_PATTERN.matcher(fileName);
+
+        if (!matcher.matches()) {
+            throw new IllegalArgumentException(String.format("Invalid segment 
file name format: %s.", segmentFile));
         }
+
+        return Integer.parseInt(matcher.group("ordinal"));
     }
 }
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentPayload.java
 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentPayload.java
index dd5e958cf3c..94fd4de599c 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentPayload.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentPayload.java
@@ -17,11 +17,8 @@
 
 package org.apache.ignite.internal.raft.storage.segstore;
 
-import static org.apache.ignite.internal.raft.util.VarlenEncoder.readLong;
-import static org.apache.ignite.internal.raft.util.VarlenEncoder.sizeInBytes;
-import static org.apache.ignite.internal.raft.util.VarlenEncoder.writeLong;
-
 import java.nio.ByteBuffer;
+import org.apache.ignite.internal.raft.util.VarlenEncoder;
 import org.apache.ignite.internal.util.FastCrc;
 import org.apache.ignite.raft.jraft.entity.LogEntry;
 import org.apache.ignite.raft.jraft.entity.LogId;
@@ -38,16 +35,16 @@ class SegmentPayload {
 
     static final int LENGTH_SIZE_BYTES = Integer.BYTES;
 
-    static final int HASH_SIZE = Integer.BYTES;
+    static final int HASH_SIZE_BYTES = Integer.BYTES;
 
     /**
      * Length of the byte sequence that is written when suffix truncation 
happens.
      *
      * <p>Format: {@code groupId, 0 (special length value), last kept index, 
crc}
      */
-    static final int TRUNCATE_SUFFIX_RECORD_SIZE = GROUP_ID_SIZE_BYTES + 
LENGTH_SIZE_BYTES + Long.BYTES + HASH_SIZE;
+    static final int TRUNCATE_SUFFIX_RECORD_SIZE = GROUP_ID_SIZE_BYTES + 
LENGTH_SIZE_BYTES + Long.BYTES + HASH_SIZE_BYTES;
 
-    private static final int TRUNCATE_SUFFIX_RECORD_MARKER = 0;
+    static final int TRUNCATE_SUFFIX_RECORD_MARKER = 0;
 
     static void writeTo(
             ByteBuffer buffer,
@@ -64,8 +61,8 @@ class SegmentPayload {
 
         LogId logId = logEntry.getId();
 
-        writeLong(logId.getIndex(), buffer);
-        writeLong(logId.getTerm(), buffer);
+        VarlenEncoder.writeLong(logId.getIndex(), buffer);
+        VarlenEncoder.writeLong(logId.getTerm(), buffer);
 
         logEntryEncoder.encode(buffer, logEntry);
 
@@ -90,7 +87,7 @@ class SegmentPayload {
 
         buffer.position(originalPos);
 
-        int crc = FastCrc.calcCrc(buffer, TRUNCATE_SUFFIX_RECORD_SIZE - 
HASH_SIZE);
+        int crc = FastCrc.calcCrc(buffer, TRUNCATE_SUFFIX_RECORD_SIZE - 
HASH_SIZE_BYTES);
 
         buffer.putInt(crc);
     }
@@ -104,8 +101,8 @@ class SegmentPayload {
 
         int payloadPosition = buffer.position();
 
-        readLong(buffer); // Skip log entry index.
-        readLong(buffer); // Skip log entry term.
+        VarlenEncoder.readLong(buffer); // Skip log entry index.
+        VarlenEncoder.readLong(buffer); // Skip log entry term.
 
         int logEntryPosition = buffer.position();
 
@@ -129,7 +126,7 @@ class SegmentPayload {
         buffer.get(entryBytes);
 
         // Move the position as if we have read the whole payload.
-        buffer.position(buffer.position() + HASH_SIZE);
+        buffer.position(buffer.position() + HASH_SIZE_BYTES);
 
         return logEntryDecoder.decode(entryBytes);
     }
@@ -139,10 +136,10 @@ class SegmentPayload {
 
         LogId logId = logEntry.getId();
 
-        return fixedOverheadSize() + sizeInBytes(logId.getIndex()) + 
sizeInBytes(logId.getTerm()) + entrySize;
+        return fixedOverheadSize() + 
VarlenEncoder.sizeInBytes(logId.getIndex()) + 
VarlenEncoder.sizeInBytes(logId.getTerm()) + entrySize;
     }
 
     static int fixedOverheadSize() {
-        return GROUP_ID_SIZE_BYTES + LENGTH_SIZE_BYTES + HASH_SIZE;
+        return GROUP_ID_SIZE_BYTES + LENGTH_SIZE_BYTES + HASH_SIZE_BYTES;
     }
 }
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/WriteBufferWithMemtable.java
 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/WriteBufferWithMemtable.java
new file mode 100644
index 00000000000..9a13831125c
--- /dev/null
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/WriteBufferWithMemtable.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft.storage.segstore;
+
+import java.nio.ByteBuffer;
+import 
org.apache.ignite.internal.raft.storage.segstore.SegmentFile.WriteBuffer;
+
+class WriteBufferWithMemtable implements AutoCloseable {
+    private final WriteBuffer writeBuffer;
+
+    private final WriteModeIndexMemTable memtable;
+
+    WriteBufferWithMemtable(WriteBuffer writeBuffer, WriteModeIndexMemTable 
memtable) {
+        this.writeBuffer = writeBuffer;
+        this.memtable = memtable;
+    }
+
+    ByteBuffer buffer() {
+        return writeBuffer.buffer();
+    }
+
+    WriteModeIndexMemTable memtable() {
+        return memtable;
+    }
+
+    @Override
+    public void close() {
+        writeBuffer.close();
+    }
+}
diff --git 
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/DeserializedSegmentPayload.java
 
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/DeserializedSegmentPayload.java
index 8c647cd0f99..4f7eb6d0faf 100644
--- 
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/DeserializedSegmentPayload.java
+++ 
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/DeserializedSegmentPayload.java
@@ -18,7 +18,7 @@
 package org.apache.ignite.internal.raft.storage.segstore;
 
 import static 
org.apache.ignite.internal.raft.storage.segstore.SegmentPayload.GROUP_ID_SIZE_BYTES;
-import static 
org.apache.ignite.internal.raft.storage.segstore.SegmentPayload.HASH_SIZE;
+import static 
org.apache.ignite.internal.raft.storage.segstore.SegmentPayload.HASH_SIZE_BYTES;
 import static 
org.apache.ignite.internal.raft.storage.segstore.SegmentPayload.LENGTH_SIZE_BYTES;
 import static org.apache.ignite.internal.raft.util.VarlenEncoder.readLong;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -63,9 +63,9 @@ class DeserializedSegmentPayload {
 
         int payloadLength = readFully(channel, LENGTH_SIZE_BYTES).getInt();
 
-        ByteBuffer remaining = readFully(channel, payloadLength + HASH_SIZE);
+        ByteBuffer remaining = readFully(channel, payloadLength + 
HASH_SIZE_BYTES);
 
-        ByteBuffer fullEntry = ByteBuffer.allocate(payloadLength + 
GROUP_ID_SIZE_BYTES + LENGTH_SIZE_BYTES + HASH_SIZE)
+        ByteBuffer fullEntry = ByteBuffer.allocate(payloadLength + 
GROUP_ID_SIZE_BYTES + LENGTH_SIZE_BYTES + HASH_SIZE_BYTES)
                 .order(SegmentFile.BYTE_ORDER)
                 .putLong(groupId)
                 .putInt(payloadLength)
@@ -113,7 +113,7 @@ class DeserializedSegmentPayload {
     }
 
     int size() {
-        return GROUP_ID_SIZE_BYTES + LENGTH_SIZE_BYTES + payload.length + 
HASH_SIZE;
+        return GROUP_ID_SIZE_BYTES + LENGTH_SIZE_BYTES + payload.length + 
HASH_SIZE_BYTES;
     }
 
     private static ByteBuffer readFully(ReadableByteChannel byteChannel, int 
len) throws IOException {
diff --git 
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManagerTest.java
 
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManagerTest.java
index 82a07424d7e..7190a61f009 100644
--- 
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManagerTest.java
+++ 
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManagerTest.java
@@ -38,6 +38,8 @@ class IndexFileManagerTest extends IgniteAbstractTest {
     @BeforeEach
     void setUp() throws IOException {
         indexFileManager = new IndexFileManager(workDir);
+
+        indexFileManager.start();
     }
 
     @Test
@@ -264,4 +266,104 @@ class IndexFileManagerTest extends IgniteAbstractTest {
 
         assertThat(indexFileManager.getSegmentFilePointer(0, 2), is(new 
SegmentFilePointer(2, 2)));
     }
+
+    @Test
+    void testRecovery() throws IOException {
+        var memtable = new IndexMemTable(STRIPES);
+
+        memtable.appendSegmentFileOffset(0, 1, 1);
+
+        indexFileManager.saveIndexMemtable(memtable);
+
+        memtable = new IndexMemTable(STRIPES);
+
+        memtable.appendSegmentFileOffset(0, 2, 2);
+
+        indexFileManager.saveIndexMemtable(memtable);
+
+        indexFileManager = new IndexFileManager(workDir);
+
+        assertThat(indexFileManager.getSegmentFilePointer(0, 1), 
is(nullValue()));
+        assertThat(indexFileManager.getSegmentFilePointer(0, 2), 
is(nullValue()));
+        assertThat(indexFileManager.getSegmentFilePointer(0, 3), 
is(nullValue()));
+
+        indexFileManager.start();
+
+        assertThat(indexFileManager.getSegmentFilePointer(0, 1), is(new 
SegmentFilePointer(0, 1)));
+        assertThat(indexFileManager.getSegmentFilePointer(0, 2), is(new 
SegmentFilePointer(1, 2)));
+        assertThat(indexFileManager.getSegmentFilePointer(0, 3), 
is(nullValue()));
+
+        memtable = new IndexMemTable(STRIPES);
+
+        memtable.appendSegmentFileOffset(0, 3, 3);
+
+        indexFileManager.saveIndexMemtable(memtable);
+
+        assertThat(indexFileManager.getSegmentFilePointer(0, 1), is(new 
SegmentFilePointer(0, 1)));
+        assertThat(indexFileManager.getSegmentFilePointer(0, 2), is(new 
SegmentFilePointer(1, 2)));
+        assertThat(indexFileManager.getSegmentFilePointer(0, 3), is(new 
SegmentFilePointer(2, 3)));
+    }
+
+    @Test
+    void testRecoveryWithTruncateSuffix() throws IOException {
+        var memtable = new IndexMemTable(STRIPES);
+
+        memtable.appendSegmentFileOffset(0, 1, 1);
+        memtable.appendSegmentFileOffset(0, 2, 2);
+        memtable.appendSegmentFileOffset(0, 3, 3);
+
+        indexFileManager.saveIndexMemtable(memtable);
+
+        memtable = new IndexMemTable(STRIPES);
+
+        memtable.truncateSuffix(0, 2);
+
+        indexFileManager.saveIndexMemtable(memtable);
+
+        indexFileManager = new IndexFileManager(workDir);
+
+        indexFileManager.start();
+
+        assertThat(indexFileManager.getSegmentFilePointer(0, 1), is(new 
SegmentFilePointer(0, 1)));
+        assertThat(indexFileManager.getSegmentFilePointer(0, 2), is(new 
SegmentFilePointer(0, 2)));
+        assertThat(indexFileManager.getSegmentFilePointer(0, 3), 
is(nullValue()));
+    }
+
+    @Test
+    void testExists() throws IOException {
+        assertThat(indexFileManager.indexFileExists(0), is(false));
+        assertThat(indexFileManager.indexFileExists(1), is(false));
+
+        var memtable = new IndexMemTable(STRIPES);
+
+        memtable.appendSegmentFileOffset(0, 1, 1);
+
+        indexFileManager.saveIndexMemtable(memtable);
+
+        assertThat(indexFileManager.indexFileExists(0), is(true));
+        assertThat(indexFileManager.indexFileExists(1), is(false));
+
+        indexFileManager.saveIndexMemtable(memtable);
+
+        assertThat(indexFileManager.indexFileExists(0), is(true));
+        assertThat(indexFileManager.indexFileExists(1), is(true));
+    }
+
+    @Test
+    void testSaveMemtableWithExplicitOrdinal() throws IOException {
+        var memtable = new IndexMemTable(STRIPES);
+
+        memtable.appendSegmentFileOffset(0, 1, 1);
+
+        indexFileManager.saveIndexMemtable(memtable, 5);
+
+        memtable = new IndexMemTable(STRIPES);
+
+        memtable.appendSegmentFileOffset(0, 2, 2);
+
+        indexFileManager.saveIndexMemtable(memtable, 10);
+
+        assertThat(indexFileManager.getSegmentFilePointer(0, 1), is(new 
SegmentFilePointer(5, 1)));
+        assertThat(indexFileManager.getSegmentFilePointer(0, 2), is(new 
SegmentFilePointer(10, 2)));
+    }
 }
diff --git 
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManagerTest.java
 
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManagerTest.java
index ac39ec43e7e..b26c20a5efa 100644
--- 
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManagerTest.java
+++ 
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManagerTest.java
@@ -42,6 +42,8 @@ import static org.hamcrest.Matchers.not;
 import static org.hamcrest.Matchers.notNullValue;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -60,6 +62,7 @@ import java.util.concurrent.ThreadLocalRandom;
 import java.util.function.IntFunction;
 import java.util.stream.IntStream;
 import java.util.stream.Stream;
+import org.apache.ignite.internal.failure.FailureManager;
 import org.apache.ignite.internal.failure.NoOpFailureManager;
 import org.apache.ignite.internal.lang.IgniteInternalException;
 import org.apache.ignite.internal.lang.RunnableX;
@@ -85,15 +88,21 @@ class SegmentFileManagerTest extends IgniteAbstractTest {
 
     private static final String NODE_NAME = "test";
 
+    private final FailureManager failureManager = new NoOpFailureManager();
+
     private SegmentFileManager fileManager;
 
     @BeforeEach
     void setUp() throws IOException {
-        fileManager = new SegmentFileManager(NODE_NAME, workDir, FILE_SIZE, 
STRIPES, new NoOpFailureManager());
+        fileManager = createFileManager();
 
         fileManager.start();
     }
 
+    private SegmentFileManager createFileManager() throws IOException {
+        return new SegmentFileManager(NODE_NAME, workDir, FILE_SIZE, STRIPES, 
failureManager);
+    }
+
     @AfterEach
     void tearDown() throws Exception {
         closeAllManually(fileManager);
@@ -102,8 +111,8 @@ class SegmentFileManagerTest extends IgniteAbstractTest {
     @SuppressWarnings("ResultOfObjectAllocationIgnored")
     @Test
     void testConstructorInvariants() {
-        assertThrows(IllegalArgumentException.class, () -> new 
SegmentFileManager(NODE_NAME, workDir, 0, 1, new NoOpFailureManager()));
-        assertThrows(IllegalArgumentException.class, () -> new 
SegmentFileManager(NODE_NAME, workDir, 1, 1, new NoOpFailureManager()));
+        assertThrows(IllegalArgumentException.class, () -> new 
SegmentFileManager(NODE_NAME, workDir, 0, 1, failureManager));
+        assertThrows(IllegalArgumentException.class, () -> new 
SegmentFileManager(NODE_NAME, workDir, 1, 1, failureManager));
     }
 
     @Test
@@ -425,6 +434,139 @@ class SegmentFileManagerTest extends IgniteAbstractTest {
         runRace(writerTaskFactory.apply(0), writerTaskFactory.apply(1));
     }
 
+    @Test
+    void testRecovery() throws Exception {
+        int batchSize = FILE_SIZE / 4;
+
+        List<byte[]> batches = randomData(batchSize, 10);
+
+        for (int i = 0; i < batches.size(); i++) {
+            appendBytes(batches.get(i), i);
+        }
+
+        List<Path> segmentFiles = segmentFiles();
+
+        assertThat(segmentFiles, hasSize(greaterThan(1)));
+
+        List<Path> indexFiles = await().until(this::indexFiles, 
hasSize(segmentFiles.size() - 1));
+
+        fileManager.close();
+
+        // Delete an index file. We expect it to be re-created after recovery.
+        Files.delete(indexFiles.get(0));
+
+        fileManager = createFileManager();
+
+        fileManager.start();
+
+        for (int i = 0; i < batches.size(); i++) {
+            byte[] expectedEntry = batches.get(i);
+
+            fileManager.getEntry(GROUP_ID, i, bs -> {
+                assertThat(bs, is(expectedEntry));
+
+                return null;
+            });
+        }
+    }
+
+    @Test
+    void testRecoveryWithTmpIndexFiles() throws Exception {
+        List<byte[]> batches = randomData(FILE_SIZE / 4, 3);
+
+        for (int i = 0; i < batches.size(); i++) {
+            appendBytes(batches.get(i), i);
+        }
+
+        // We need rollover to happen at least once to trigger index file 
recovery.
+        await().until(this::indexFiles, hasSize(1));
+
+        assertThat(segmentFiles(), hasSize(2));
+
+        // Use a mock memtable that throws an exception to force the index 
manager to create a temporary index file, but not rename it.
+        ReadModeIndexMemTable mockMemTable = mock(ReadModeIndexMemTable.class);
+
+        when(mockMemTable.iterator()).thenThrow(new RuntimeException("Test 
exception"));
+
+        // Create two tmp index files: one for the complete segment file and 
one the incomplete segment file.
+        try {
+            fileManager.indexFileManager().saveIndexMemtable(mockMemTable, 0);
+        } catch (RuntimeException ignored) {
+            // Ignore.
+        }
+
+        try {
+            fileManager.indexFileManager().saveIndexMemtable(mockMemTable, 1);
+        } catch (RuntimeException ignored) {
+            // Ignore.
+        }
+
+        assertThat(tmpIndexFiles(), hasSize(2));
+
+        fileManager.close();
+
+        for (Path indexFile : indexFiles()) {
+            Files.delete(indexFile);
+        }
+
+        fileManager = createFileManager();
+
+        fileManager.start();
+
+        assertThat(tmpIndexFiles(), is(empty()));
+
+        assertThat(indexFiles(), hasSize(1));
+    }
+
+    @Test
+    void testRecoveryWithTruncateSuffix() throws Exception {
+        List<byte[]> batches = randomData(FILE_SIZE / 4, 10);
+
+        for (int i = 0; i < batches.size(); i++) {
+            appendBytes(batches.get(i), i);
+        }
+
+        await().until(this::indexFiles, hasSize(4));
+
+        int lastLogIndexKept = batches.size() / 2;
+
+        fileManager.truncateSuffix(GROUP_ID, lastLogIndexKept);
+
+        // Insert more data in order to have two truncate suffix records: one 
in a "rollovered" segment file and one in the latest segment
+        // file.
+        for (int i = lastLogIndexKept + 1; i < batches.size(); i++) {
+            appendBytes(batches.get(i), i);
+        }
+
+        fileManager.truncateSuffix(GROUP_ID, lastLogIndexKept);
+
+        fileManager.close();
+
+        for (Path indexFile : indexFiles()) {
+            Files.deleteIfExists(indexFile);
+        }
+
+        fileManager = createFileManager();
+
+        fileManager.start();
+
+        for (int i = 0; i <= lastLogIndexKept; i++) {
+            byte[] expectedEntry = batches.get(i);
+
+            fileManager.getEntry(GROUP_ID, i, bs -> {
+                assertThat(bs, is(expectedEntry));
+
+                return null;
+            });
+        }
+
+        for (int i = lastLogIndexKept + 1; i < batches.size(); i++) {
+            fileManager.getEntry(GROUP_ID, i, bs -> {
+                throw new AssertionError("This method should not be called.");
+            });
+        }
+    }
+
     private Path findSoleSegmentFile() throws IOException {
         List<Path> segmentFiles = segmentFiles();
 
@@ -452,6 +594,19 @@ class SegmentFileManagerTest extends IgniteAbstractTest {
         }
     }
 
+    private List<Path> tmpIndexFiles() throws IOException {
+        try (Stream<Path> files = Files.list(fileManager.indexFilesDir())) {
+            return files
+                    .filter(p -> {
+                        String fileName = p.getFileName().toString();
+
+                        return fileName.endsWith(".tmp");
+                    })
+                    .sorted()
+                    .collect(toList());
+        }
+    }
+
     private static List<byte[]> randomData(int batchLength, int numBatches) {
         return IntStream.range(0, numBatches)
                 .mapToObj(i -> randomBytes(ThreadLocalRandom.current(), 
batchLength))


Reply via email to