This is an automated email from the ASF dual-hosted git repository.
apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 4d2eda3bed7 IGNITE-26283 Recover Log Storage on startup (#6839)
4d2eda3bed7 is described below
commit 4d2eda3bed7ffc41e547dbaab81f472204123ceb
Author: Alexander Polovtcev <[email protected]>
AuthorDate: Mon Nov 3 09:37:54 2025 +0200
IGNITE-26283 Recover Log Storage on startup (#6839)
---
.../raft/storage/segstore/IndexFileManager.java | 146 ++++++++++++--
.../raft/storage/segstore/IndexFileMeta.java | 8 +
.../raft/storage/segstore/IndexMemTable.java | 6 +-
.../raft/storage/segstore/SegmentFileManager.java | 224 ++++++++++++++++++---
.../raft/storage/segstore/SegmentPayload.java | 27 ++-
.../storage/segstore/WriteBufferWithMemtable.java | 45 +++++
.../segstore/DeserializedSegmentPayload.java | 8 +-
.../storage/segstore/IndexFileManagerTest.java | 102 ++++++++++
.../storage/segstore/SegmentFileManagerTest.java | 161 ++++++++++++++-
9 files changed, 667 insertions(+), 60 deletions(-)
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManager.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManager.java
index 95ae8d2228b..94e164d9e14 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManager.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManager.java
@@ -22,9 +22,11 @@ import static java.nio.file.StandardOpenOption.WRITE;
import static org.apache.ignite.internal.util.IgniteUtils.atomicMoveFile;
import static org.apache.ignite.internal.util.IgniteUtils.fsyncFile;
+import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.EOFException;
import java.io.IOException;
+import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.SeekableByteChannel;
@@ -35,6 +37,9 @@ import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Stream;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.jetbrains.annotations.Nullable;
@@ -61,11 +66,11 @@ import org.jetbrains.annotations.Nullable;
*
* <p>Raft group meta is as follows:
* <pre>
- *
+----------------------------------------------------------------------------------------------------------------+-----+
- * | Raft group 1 meta
| ... |
- *
+----------------------------------------------------------------------------------------------------------------+-----+
- * | Group ID (8 bytes) | Flags (4 bytes) | Offset (4 bytes) | First Log Index
(8 bytes) | Last Log Index (8 bytes) | ... |
- *
+----------------------------------------------------------------------------------------------------------------+-----+
+ *
+------------------------------------------------------------------------------------------------------------------------+-----+
+ * | Raft group 1 meta
| ... |
+ *
+------------------------------------------------------------------------------------------------------------------------+-----+
+ * | Group ID (8 bytes) | Flags (4 bytes) | Payload Offset (4 bytes) | First
Log Index (8 bytes) | Last Log Index (8 bytes) | ... |
+ *
+------------------------------------------------------------------------------------------------------------------------+-----+
* </pre>
*
* <p>Payload of the index files has the following structure:
@@ -92,6 +97,10 @@ class IndexFileManager {
private static final String INDEX_FILE_NAME_FORMAT =
"index-%010d-%010d.bin";
+ private static final Pattern INDEX_FILE_NAME_PATTERN =
Pattern.compile("index-(?<ordinal>\\d{10})-(?<generation>\\d{10})\\.bin");
+
+ private static final String TMP_FILE_SUFFIX = ".tmp";
+
// Magic number + format version + number of Raft groups.
static final int COMMON_META_SIZE = Integer.BYTES + Integer.BYTES +
Integer.BYTES;
@@ -105,9 +114,11 @@ class IndexFileManager {
/**
* Current index file ordinal (used to generate index file names).
*
- * <p>No synchronized access is needed because this field is only used by
the checkpoint thread.
+ * <p>No synchronized access is needed because this field is only used by
the checkpoint thread and during startup.
+ *
+ * <p>{@code -1} means that the manager has not been started yet.
*/
- private int curFileOrdinal = 0;
+ private int curFileOrdinal = -1;
/**
* Index file metadata grouped by Raft Group ID.
@@ -120,20 +131,52 @@ class IndexFileManager {
Files.createDirectories(indexFilesDir);
}
+ void start() throws IOException {
+ try (Stream<Path> indexFiles = Files.list(indexFilesDir)) {
+ Iterator<Path> it = indexFiles.sorted().iterator();
+
+ while (it.hasNext()) {
+ recoverIndexFileMetas(it.next());
+ }
+ }
+ }
+
Path indexFilesDir() {
return indexFilesDir;
}
+ void cleanupTmpFiles() throws IOException {
+ try (Stream<Path> indexFiles = Files.list(indexFilesDir)) {
+ Iterator<Path> it = indexFiles.iterator();
+
+ while (it.hasNext()) {
+ Path indexFile = it.next();
+
+ if
(indexFile.getFileName().toString().endsWith(TMP_FILE_SUFFIX)) {
+ LOG.info("Deleting temporary index file: {}.", indexFile);
+
+ Files.delete(indexFile);
+ }
+ }
+ }
+ }
+
/**
* Saves the given index memtable to a file.
+ *
+ * <p>Must only be called by the checkpoint thread.
*/
Path saveIndexMemtable(ReadModeIndexMemTable indexMemTable) throws
IOException {
- String fileName = indexFileName(curFileOrdinal, 0);
+ return saveIndexMemtable(indexMemTable, ++curFileOrdinal);
+ }
+
+ Path saveIndexMemtable(ReadModeIndexMemTable indexMemTable, int
fileOrdinal) throws IOException {
+ String fileName = indexFileName(fileOrdinal, 0);
- Path tmpFilePath = indexFilesDir.resolve(fileName + ".tmp");
+ Path tmpFilePath = indexFilesDir.resolve(fileName + TMP_FILE_SUFFIX);
try (var os = new
BufferedOutputStream(Files.newOutputStream(tmpFilePath, CREATE_NEW, WRITE))) {
- byte[] headerBytes = serializeHeaderAndFillMetadata(indexMemTable);
+ byte[] headerBytes = serializeHeaderAndFillMetadata(indexMemTable,
fileOrdinal);
os.write(headerBytes);
@@ -144,8 +187,6 @@ class IndexFileManager {
}
}
- curFileOrdinal++;
-
return syncAndRename(tmpFilePath,
tmpFilePath.resolveSibling(fileName));
}
@@ -213,7 +254,11 @@ class IndexFileManager {
return groupIndexMeta == null ? -1 :
groupIndexMeta.lastLogIndexExclusive();
}
- private byte[] serializeHeaderAndFillMetadata(ReadModeIndexMemTable
indexMemTable) {
+ boolean indexFileExists(int fileOrdinal) {
+ return Files.exists(indexFilesDir.resolve(indexFileName(fileOrdinal,
0)));
+ }
+
+ private byte[] serializeHeaderAndFillMetadata(ReadModeIndexMemTable
indexMemTable, int fileOrdinal) {
int numGroups = indexMemTable.numGroups();
int headerSize = headerSize(numGroups);
@@ -240,7 +285,7 @@ class IndexFileManager {
long lastLogIndexExclusive = segmentInfo.lastLogIndexExclusive();
- var indexFileMeta = new IndexFileMeta(firstLogIndexInclusive,
lastLogIndexExclusive, payloadOffset, curFileOrdinal);
+ var indexFileMeta = new IndexFileMeta(firstLogIndexInclusive,
lastLogIndexExclusive, payloadOffset, fileOrdinal);
putIndexFileMeta(groupId, indexFileMeta);
@@ -292,4 +337,77 @@ class IndexFileManager {
private static String indexFileName(int fileOrdinal, int generation) {
return String.format(INDEX_FILE_NAME_FORMAT, fileOrdinal, generation);
}
+
+ private void recoverIndexFileMetas(Path indexFile) throws IOException {
+ int fileOrdinal = indexFileOrdinal(indexFile);
+
+ if (curFileOrdinal >= 0 && fileOrdinal != curFileOrdinal + 1) {
+ throw new IllegalStateException(String.format(
+ "Unexpected index file ordinal. Expected %d, actual %d
(%s).",
+ curFileOrdinal + 1, fileOrdinal, indexFile
+ ));
+ }
+
+ curFileOrdinal = fileOrdinal;
+
+ try (InputStream is = new
BufferedInputStream(Files.newInputStream(indexFile, StandardOpenOption.READ))) {
+ ByteBuffer commonMetaBuffer = readBytes(is, COMMON_META_SIZE,
indexFile);
+
+ int magicNumber = commonMetaBuffer.getInt();
+
+ if (magicNumber != MAGIC_NUMBER) {
+ throw new IllegalStateException(String.format("Invalid magic
number in index file %s: %d.", indexFile, magicNumber));
+ }
+
+ int formatVersion = commonMetaBuffer.getInt();
+
+ if (formatVersion > FORMAT_VERSION) {
+ throw new IllegalStateException(String.format(
+ "Unsupported format version in index file %s: %d.",
indexFile, formatVersion
+ ));
+ }
+
+ int numGroups = commonMetaBuffer.getInt();
+
+ if (numGroups <= 0) {
+ throw new IllegalStateException(String.format("Unexpected
number of groups in index file %s: %d.", indexFile, numGroups));
+ }
+
+ for (int i = 0; i < numGroups; i++) {
+ ByteBuffer groupMetaBuffer = readBytes(is, GROUP_META_SIZE,
indexFile);
+
+ long groupId = groupMetaBuffer.getLong();
+ groupMetaBuffer.getInt(); // Skip flags.
+ int payloadOffset = groupMetaBuffer.getInt();
+ long firstLogIndexInclusive = groupMetaBuffer.getLong();
+ long lastLogIndexExclusive = groupMetaBuffer.getLong();
+
+ var indexFileMeta = new IndexFileMeta(firstLogIndexInclusive,
lastLogIndexExclusive, payloadOffset, curFileOrdinal);
+
+ putIndexFileMeta(groupId, indexFileMeta);
+ }
+ }
+ }
+
+ private static int indexFileOrdinal(Path indexFile) {
+ String fileName = indexFile.getFileName().toString();
+
+ Matcher matcher = INDEX_FILE_NAME_PATTERN.matcher(fileName);
+
+ if (!matcher.matches()) {
+ throw new IllegalArgumentException(String.format("Invalid index
file name format: %s.", indexFile));
+ }
+
+ return Integer.parseInt(matcher.group("ordinal"));
+ }
+
+ private static ByteBuffer readBytes(InputStream is, int size, Path
indexFile) throws IOException {
+ ByteBuffer result =
ByteBuffer.wrap(is.readNBytes(size)).order(BYTE_ORDER);
+
+ if (result.remaining() != size) {
+ throw new IOException(String.format("Unexpected EOF when trying to
read from index file: %s.", indexFile));
+ }
+
+ return result;
+ }
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileMeta.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileMeta.java
index dfdbe03b123..34bd73e12d2 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileMeta.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileMeta.java
@@ -34,6 +34,14 @@ class IndexFileMeta {
private final int indexFileOrdinal;
IndexFileMeta(long firstLogIndexInclusive, long lastLogIndexExclusive, int
indexFilePayloadOffset, int indexFileOrdinal) {
+ if (lastLogIndexExclusive < firstLogIndexInclusive) {
+ throw new IllegalArgumentException("Invalid log index range: [" +
firstLogIndexInclusive + ", " + lastLogIndexExclusive + ").");
+ }
+
+ if (indexFileOrdinal < 0) {
+ throw new IllegalArgumentException("Invalid index file ordinal: "
+ indexFileOrdinal);
+ }
+
this.firstLogIndexInclusive = firstLogIndexInclusive;
this.lastLogIndexExclusive = lastLogIndexExclusive;
this.indexFilePayloadOffset = indexFilePayloadOffset;
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexMemTable.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexMemTable.java
index a4de58806ea..2ed981a53d5 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexMemTable.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexMemTable.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.raft.storage.segstore;
+import static org.apache.ignite.internal.util.IgniteUtils.safeAbs;
+
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.NoSuchElementException;
@@ -112,7 +114,9 @@ class IndexMemTable implements WriteModeIndexMemTable,
ReadModeIndexMemTable {
}
private Stripe stripe(long groupId) {
- int stripeIndex = Long.hashCode(groupId) % stripes.length;
+ // FIXME: We should calculate stripes the same way it is done in
StripedDisruptor,
+ // see https://issues.apache.org/jira/browse/IGNITE-26907
+ int stripeIndex = safeAbs(Long.hashCode(groupId) % stripes.length);
return stripes[stripeIndex];
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManager.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManager.java
index ec768607d71..1954aef6dee 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManager.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManager.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.raft.storage.segstore;
+import static
org.apache.ignite.internal.raft.storage.segstore.SegmentPayload.HASH_SIZE_BYTES;
+import static
org.apache.ignite.internal.raft.storage.segstore.SegmentPayload.TRUNCATE_SUFFIX_RECORD_MARKER;
import static
org.apache.ignite.internal.raft.storage.segstore.SegmentPayload.TRUNCATE_SUFFIX_RECORD_SIZE;
import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
import static org.apache.ignite.lang.ErrorGroups.Common.NODE_STOPPING_ERR;
@@ -25,15 +27,24 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Path;
+import java.util.Iterator;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Stream;
import org.apache.ignite.internal.close.ManuallyCloseable;
import org.apache.ignite.internal.failure.FailureProcessor;
import org.apache.ignite.internal.lang.IgniteInternalException;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
import
org.apache.ignite.internal.raft.storage.segstore.SegmentFile.WriteBuffer;
+import org.apache.ignite.internal.raft.util.VarlenEncoder;
+import org.apache.ignite.internal.util.FastCrc;
import org.apache.ignite.raft.jraft.entity.LogEntry;
import org.apache.ignite.raft.jraft.entity.codec.LogEntryDecoder;
import org.apache.ignite.raft.jraft.entity.codec.LogEntryEncoder;
import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.TestOnly;
/**
* File manager responsible for allocating and maintaining a pointer to the
current segment file.
@@ -60,9 +71,9 @@ import org.jetbrains.annotations.Nullable;
*
* <p>Binary representation of each entry is as follows:
* <pre>
- *
+-------------------------+--------------------------+-------------------+---------------------+---------+----------------+
- * | Raft Group ID (8 bytes) | Payload Length (4 bytes) | Term (1-10 bytes) |
Index (1-10 bytes) | Payload | Hash (4 bytes) |
- *
+-------------------------+--------------------------+-------------------+---------------------+---------+----------------+
+ *
+-------------------------+--------------------------+--------------------+-------------------+---------+----------------+
+ * | Raft Group ID (8 bytes) | Payload Length (4 bytes) | Index (1-10 bytes) |
Term (1-10 bytes) | Payload | Hash (4 bytes) |
+ *
+-------------------------+--------------------------+--------------------+-------------------+---------+----------------+
* </pre>
*
* <p>Log Entry Index and Term are stored as variable-length integers
(varints), hence the non-fixed size in bytes. They are treated as
@@ -76,6 +87,8 @@ import org.jetbrains.annotations.Nullable;
* written at the end of the file. If there are less than 8 bytes left, no
switch records are written.
*/
class SegmentFileManager implements ManuallyCloseable {
+ private static final IgniteLogger LOG =
Loggers.forClass(SegmentFileManager.class);
+
private static final int ROLLOVER_WAIT_TIMEOUT_MS = 30_000;
private static final int MAGIC_NUMBER = 0x56E0B526;
@@ -84,6 +97,8 @@ class SegmentFileManager implements ManuallyCloseable {
private static final String SEGMENT_FILE_NAME_FORMAT =
"segment-%010d-%010d.bin";
+ private static final Pattern SEGMENT_FILE_NAME_PATTERN =
Pattern.compile("segment-(?<ordinal>\\d{10})-(?<generation>\\d{10})\\.bin");
+
/**
* Byte sequence that is written at the beginning of every segment file.
*/
@@ -121,10 +136,8 @@ class SegmentFileManager implements ManuallyCloseable {
/**
* Current segment file ordinal (used to generate segment file names).
- *
- * <p>Must always be accessed under the {@link #rolloverLock}.
*/
- private int curSegmentFileOrdinal;
+ private volatile int curSegmentFileOrdinal;
/**
* Flag indicating whether the file manager has been stopped.
@@ -150,10 +163,52 @@ class SegmentFileManager implements ManuallyCloseable {
}
void start() throws IOException {
- checkpointer.start();
+ LOG.info("Starting segment file manager [segmentFilesDir={},
fileSize={}].", segmentFilesDir, fileSize);
+
+ indexFileManager.cleanupTmpFiles();
+
+ Path lastSegmentFilePath = null;
+
+ try (Stream<Path> segmentFiles = Files.list(segmentFilesDir)) {
+ Iterator<Path> it = segmentFiles.sorted().iterator();
+
+ while (it.hasNext()) {
+ Path segmentFilePath = it.next();
+
+ if (!it.hasNext()) {
+ // Last segment file is treated differently.
+ lastSegmentFilePath = segmentFilePath;
+ } else {
+ // Create missing index files.
+ int segmentFileOrdinal =
segmentFileOrdinal(segmentFilePath);
+
+ if (!indexFileManager.indexFileExists(segmentFileOrdinal))
{
+ LOG.info("Creating missing index file for segment file
{}.", segmentFilePath);
+
+ SegmentFile segmentFile =
SegmentFile.openExisting(segmentFilePath);
+
+ WriteModeIndexMemTable memTable =
recoverMemtable(segmentFile, segmentFilePath);
+
+
indexFileManager.saveIndexMemtable(memTable.transitionToReadMode(),
segmentFileOrdinal);
+ }
+ }
+ }
+ }
+
+ if (lastSegmentFilePath == null) {
+ currentSegmentFile.set(allocateNewSegmentFile(0));
+ } else {
+ curSegmentFileOrdinal = segmentFileOrdinal(lastSegmentFilePath);
+
+
currentSegmentFile.set(recoverLatestSegmentFile(lastSegmentFilePath));
+ }
- // TODO: implement recovery, see
https://issues.apache.org/jira/browse/IGNITE-26283.
- currentSegmentFile.set(allocateNewSegmentFile(0));
+ LOG.info("Segment file manager recovery completed. Current segment
file: {}.", lastSegmentFilePath);
+
+ // Index File Manager must be started strictly before the checkpointer.
+ indexFileManager.start();
+
+ checkpointer.start();
}
Path segmentFilesDir() {
@@ -164,6 +219,11 @@ class SegmentFileManager implements ManuallyCloseable {
return indexFileManager.indexFilesDir();
}
+ @TestOnly
+ IndexFileManager indexFileManager() {
+ return indexFileManager;
+ }
+
private SegmentFileWithMemtable allocateNewSegmentFile(int fileOrdinal)
throws IOException {
Path path = segmentFilesDir.resolve(segmentFileName(fileOrdinal, 0));
@@ -174,6 +234,12 @@ class SegmentFileManager implements ManuallyCloseable {
return new SegmentFileWithMemtable(segmentFile, new
IndexMemTable(stripes), false);
}
+ private SegmentFileWithMemtable recoverLatestSegmentFile(Path
segmentFilePath) throws IOException {
+ SegmentFile segmentFile = SegmentFile.openExisting(segmentFilePath);
+
+ return new SegmentFileWithMemtable(segmentFile,
recoverLatestMemtable(segmentFile, segmentFilePath), false);
+ }
+
private static String segmentFileName(int fileOrdinal, int generation) {
return String.format(SEGMENT_FILE_NAME_FORMAT, fileOrdinal,
generation);
}
@@ -200,7 +266,7 @@ class SegmentFileManager implements ManuallyCloseable {
SegmentPayload.writeTo(segmentBuffer, groupId, segmentEntrySize,
entry, encoder);
// Append to memtable before write buffer is released to avoid
races with checkpoint on rollover.
- writeBufferWithMemtable.memtable.appendSegmentFileOffset(groupId,
entry.getId().getIndex(), segmentOffset);
+
writeBufferWithMemtable.memtable().appendSegmentFileOffset(groupId,
entry.getId().getIndex(), segmentOffset);
}
}
@@ -244,7 +310,7 @@ class SegmentFileManager implements ManuallyCloseable {
SegmentPayload.writeTruncateSuffixRecordTo(segmentBuffer, groupId,
lastLogIndexKept);
// Modify the memtable before write buffer is released to avoid
races with checkpoint on rollover.
- writeBufferWithMemtable.memtable.truncateSuffix(groupId,
lastLogIndexKept);
+ writeBufferWithMemtable.memtable().truncateSuffix(groupId,
lastLogIndexKept);
}
}
@@ -393,7 +459,10 @@ class SegmentFileManager implements ManuallyCloseable {
SegmentFileWithMemtable segmentFile = currentSegmentFile.get();
- segmentFile.segmentFile().close();
+ // This should usually not happen but can happen on an abrupt node
stop.
+ if (segmentFile != null) {
+ segmentFile.segmentFile().close();
+ }
rolloverLock.notifyAll();
}
@@ -429,23 +498,132 @@ class SegmentFileManager implements ManuallyCloseable {
return
segmentFile.buffer().position(segmentFilePointer.payloadOffset());
}
- private static class WriteBufferWithMemtable implements AutoCloseable {
- final WriteBuffer writeBuffer;
+ private WriteModeIndexMemTable recoverMemtable(SegmentFile segmentFile,
Path segmentFilePath) {
+ ByteBuffer buffer = segmentFile.buffer();
+
+ validateSegmentFileHeader(buffer, segmentFilePath);
+
+ var memtable = new IndexMemTable(stripes);
+
+ while (buffer.remaining() > SWITCH_SEGMENT_RECORD.length) {
+ int segmentFilePayloadOffset = buffer.position();
+
+ long groupId = buffer.getLong();
+
+ int payloadLength = buffer.getInt();
+
+ if (payloadLength == TRUNCATE_SUFFIX_RECORD_MARKER) {
+ long lastLogIndexKept = buffer.getLong();
+
+ memtable.truncateSuffix(groupId, lastLogIndexKept);
+
+ buffer.position(buffer.position() + HASH_SIZE_BYTES);
+ } else {
+ int endOfRecordPosition = buffer.position() + payloadLength +
HASH_SIZE_BYTES;
+
+ long index = VarlenEncoder.readLong(buffer);
+
+ memtable.appendSegmentFileOffset(groupId, index,
segmentFilePayloadOffset);
+
+ buffer.position(endOfRecordPosition);
+ }
+ }
+
+ return memtable;
+ }
+
+ /**
+ * Creates an index memtable from the given segment file. Unlike {@link
#recoverMemtable} which is expected to only be called on
+ * "complete" segment files (i.e. those that has experienced a rollover),
this method is expected to be called on the most recent,
+ * possibly incomplete segment file.
+ */
+ private WriteModeIndexMemTable recoverLatestMemtable(SegmentFile
segmentFile, Path segmentFilePath) {
+ ByteBuffer buffer = segmentFile.buffer();
+
+ validateSegmentFileHeader(buffer, segmentFilePath);
+
+ var memtable = new IndexMemTable(stripes);
+
+ while (buffer.remaining() > SWITCH_SEGMENT_RECORD.length) {
+ int segmentFilePayloadOffset = buffer.position();
+
+ long groupId = buffer.getLong();
- final WriteModeIndexMemTable memtable;
+ int payloadLength = buffer.getInt();
- WriteBufferWithMemtable(WriteBuffer writeBuffer,
WriteModeIndexMemTable memtable) {
- this.writeBuffer = writeBuffer;
- this.memtable = memtable;
+ int crcPosition;
+
+ if (payloadLength == TRUNCATE_SUFFIX_RECORD_MARKER) {
+ long lastLogIndexKept = buffer.getLong();
+
+ crcPosition = buffer.position();
+
+ buffer.position(segmentFilePayloadOffset);
+
+ // CRC violation signals the end of meaningful data in the
segment file.
+ if (!isCrcValid(buffer, crcPosition)) {
+ break;
+ }
+
+ memtable.truncateSuffix(groupId, lastLogIndexKept);
+ } else {
+ crcPosition = buffer.position() + payloadLength;
+
+ long index = VarlenEncoder.readLong(buffer);
+
+ buffer.position(segmentFilePayloadOffset);
+
+ // CRC violation signals the end of meaningful data in the
segment file.
+ if (!isCrcValid(buffer, crcPosition)) {
+ break;
+ }
+
+ memtable.appendSegmentFileOffset(groupId, index,
segmentFilePayloadOffset);
+ }
+
+ buffer.position(crcPosition + HASH_SIZE_BYTES);
+ }
+
+ return memtable;
+ }
+
+ private static boolean isCrcValid(ByteBuffer buffer, int crcPosition) {
+ int originalPosition = buffer.position();
+
+ int crc = buffer.getInt(crcPosition);
+
+ int expectedCrc = FastCrc.calcCrc(buffer, crcPosition -
buffer.position());
+
+ buffer.position(originalPosition);
+
+ return crc == expectedCrc;
+ }
+
+ private static void validateSegmentFileHeader(ByteBuffer buffer, Path
segmentFilePath) {
+ int magicNumber = buffer.getInt();
+
+ if (magicNumber != MAGIC_NUMBER) {
+ throw new IllegalStateException(String.format("Invalid magic
number in segment file %s: %d.", segmentFilePath, magicNumber));
}
- ByteBuffer buffer() {
- return writeBuffer.buffer();
+ int formatVersion = buffer.getInt();
+
+ if (formatVersion > FORMAT_VERSION) {
+ throw new IllegalStateException(String.format(
+ "Unsupported format version in segment file %s: %d.",
segmentFilePath, formatVersion
+ ));
}
+ }
+
+ private static int segmentFileOrdinal(Path segmentFile) {
+ String fileName = segmentFile.getFileName().toString();
- @Override
- public void close() {
- writeBuffer.close();
+ Matcher matcher = SEGMENT_FILE_NAME_PATTERN.matcher(fileName);
+
+ if (!matcher.matches()) {
+ throw new IllegalArgumentException(String.format("Invalid segment
file name format: %s.", segmentFile));
}
+
+ return Integer.parseInt(matcher.group("ordinal"));
}
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentPayload.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentPayload.java
index dd5e958cf3c..94fd4de599c 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentPayload.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentPayload.java
@@ -17,11 +17,8 @@
package org.apache.ignite.internal.raft.storage.segstore;
-import static org.apache.ignite.internal.raft.util.VarlenEncoder.readLong;
-import static org.apache.ignite.internal.raft.util.VarlenEncoder.sizeInBytes;
-import static org.apache.ignite.internal.raft.util.VarlenEncoder.writeLong;
-
import java.nio.ByteBuffer;
+import org.apache.ignite.internal.raft.util.VarlenEncoder;
import org.apache.ignite.internal.util.FastCrc;
import org.apache.ignite.raft.jraft.entity.LogEntry;
import org.apache.ignite.raft.jraft.entity.LogId;
@@ -38,16 +35,16 @@ class SegmentPayload {
static final int LENGTH_SIZE_BYTES = Integer.BYTES;
- static final int HASH_SIZE = Integer.BYTES;
+ static final int HASH_SIZE_BYTES = Integer.BYTES;
/**
* Length of the byte sequence that is written when suffix truncation
happens.
*
* <p>Format: {@code groupId, 0 (special length value), last kept index,
crc}
*/
- static final int TRUNCATE_SUFFIX_RECORD_SIZE = GROUP_ID_SIZE_BYTES +
LENGTH_SIZE_BYTES + Long.BYTES + HASH_SIZE;
+ static final int TRUNCATE_SUFFIX_RECORD_SIZE = GROUP_ID_SIZE_BYTES +
LENGTH_SIZE_BYTES + Long.BYTES + HASH_SIZE_BYTES;
- private static final int TRUNCATE_SUFFIX_RECORD_MARKER = 0;
+ static final int TRUNCATE_SUFFIX_RECORD_MARKER = 0;
static void writeTo(
ByteBuffer buffer,
@@ -64,8 +61,8 @@ class SegmentPayload {
LogId logId = logEntry.getId();
- writeLong(logId.getIndex(), buffer);
- writeLong(logId.getTerm(), buffer);
+ VarlenEncoder.writeLong(logId.getIndex(), buffer);
+ VarlenEncoder.writeLong(logId.getTerm(), buffer);
logEntryEncoder.encode(buffer, logEntry);
@@ -90,7 +87,7 @@ class SegmentPayload {
buffer.position(originalPos);
- int crc = FastCrc.calcCrc(buffer, TRUNCATE_SUFFIX_RECORD_SIZE -
HASH_SIZE);
+ int crc = FastCrc.calcCrc(buffer, TRUNCATE_SUFFIX_RECORD_SIZE -
HASH_SIZE_BYTES);
buffer.putInt(crc);
}
@@ -104,8 +101,8 @@ class SegmentPayload {
int payloadPosition = buffer.position();
- readLong(buffer); // Skip log entry index.
- readLong(buffer); // Skip log entry term.
+ VarlenEncoder.readLong(buffer); // Skip log entry index.
+ VarlenEncoder.readLong(buffer); // Skip log entry term.
int logEntryPosition = buffer.position();
@@ -129,7 +126,7 @@ class SegmentPayload {
buffer.get(entryBytes);
// Move the position as if we have read the whole payload.
- buffer.position(buffer.position() + HASH_SIZE);
+ buffer.position(buffer.position() + HASH_SIZE_BYTES);
return logEntryDecoder.decode(entryBytes);
}
@@ -139,10 +136,10 @@ class SegmentPayload {
LogId logId = logEntry.getId();
- return fixedOverheadSize() + sizeInBytes(logId.getIndex()) +
sizeInBytes(logId.getTerm()) + entrySize;
+ return fixedOverheadSize() +
VarlenEncoder.sizeInBytes(logId.getIndex()) +
VarlenEncoder.sizeInBytes(logId.getTerm()) + entrySize;
}
static int fixedOverheadSize() {
- return GROUP_ID_SIZE_BYTES + LENGTH_SIZE_BYTES + HASH_SIZE;
+ return GROUP_ID_SIZE_BYTES + LENGTH_SIZE_BYTES + HASH_SIZE_BYTES;
}
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/WriteBufferWithMemtable.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/WriteBufferWithMemtable.java
new file mode 100644
index 00000000000..9a13831125c
--- /dev/null
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/WriteBufferWithMemtable.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft.storage.segstore;
+
+import java.nio.ByteBuffer;
+import
org.apache.ignite.internal.raft.storage.segstore.SegmentFile.WriteBuffer;
+
+class WriteBufferWithMemtable implements AutoCloseable {
+ private final WriteBuffer writeBuffer;
+
+ private final WriteModeIndexMemTable memtable;
+
+ WriteBufferWithMemtable(WriteBuffer writeBuffer, WriteModeIndexMemTable
memtable) {
+ this.writeBuffer = writeBuffer;
+ this.memtable = memtable;
+ }
+
+ ByteBuffer buffer() {
+ return writeBuffer.buffer();
+ }
+
+ WriteModeIndexMemTable memtable() {
+ return memtable;
+ }
+
+ @Override
+ public void close() {
+ writeBuffer.close();
+ }
+}
diff --git
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/DeserializedSegmentPayload.java
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/DeserializedSegmentPayload.java
index 8c647cd0f99..4f7eb6d0faf 100644
---
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/DeserializedSegmentPayload.java
+++
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/DeserializedSegmentPayload.java
@@ -18,7 +18,7 @@
package org.apache.ignite.internal.raft.storage.segstore;
import static
org.apache.ignite.internal.raft.storage.segstore.SegmentPayload.GROUP_ID_SIZE_BYTES;
-import static
org.apache.ignite.internal.raft.storage.segstore.SegmentPayload.HASH_SIZE;
+import static
org.apache.ignite.internal.raft.storage.segstore.SegmentPayload.HASH_SIZE_BYTES;
import static
org.apache.ignite.internal.raft.storage.segstore.SegmentPayload.LENGTH_SIZE_BYTES;
import static org.apache.ignite.internal.raft.util.VarlenEncoder.readLong;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -63,9 +63,9 @@ class DeserializedSegmentPayload {
int payloadLength = readFully(channel, LENGTH_SIZE_BYTES).getInt();
- ByteBuffer remaining = readFully(channel, payloadLength + HASH_SIZE);
+ ByteBuffer remaining = readFully(channel, payloadLength +
HASH_SIZE_BYTES);
- ByteBuffer fullEntry = ByteBuffer.allocate(payloadLength +
GROUP_ID_SIZE_BYTES + LENGTH_SIZE_BYTES + HASH_SIZE)
+ ByteBuffer fullEntry = ByteBuffer.allocate(payloadLength +
GROUP_ID_SIZE_BYTES + LENGTH_SIZE_BYTES + HASH_SIZE_BYTES)
.order(SegmentFile.BYTE_ORDER)
.putLong(groupId)
.putInt(payloadLength)
@@ -113,7 +113,7 @@ class DeserializedSegmentPayload {
}
int size() {
- return GROUP_ID_SIZE_BYTES + LENGTH_SIZE_BYTES + payload.length +
HASH_SIZE;
+ return GROUP_ID_SIZE_BYTES + LENGTH_SIZE_BYTES + payload.length +
HASH_SIZE_BYTES;
}
private static ByteBuffer readFully(ReadableByteChannel byteChannel, int
len) throws IOException {
diff --git
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManagerTest.java
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManagerTest.java
index 82a07424d7e..7190a61f009 100644
---
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManagerTest.java
+++
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManagerTest.java
@@ -38,6 +38,8 @@ class IndexFileManagerTest extends IgniteAbstractTest {
@BeforeEach
void setUp() throws IOException {
indexFileManager = new IndexFileManager(workDir);
+
+ indexFileManager.start();
}
@Test
@@ -264,4 +266,104 @@ class IndexFileManagerTest extends IgniteAbstractTest {
assertThat(indexFileManager.getSegmentFilePointer(0, 2), is(new
SegmentFilePointer(2, 2)));
}
+
+ @Test
+ void testRecovery() throws IOException {
+ var memtable = new IndexMemTable(STRIPES);
+
+ memtable.appendSegmentFileOffset(0, 1, 1);
+
+ indexFileManager.saveIndexMemtable(memtable);
+
+ memtable = new IndexMemTable(STRIPES);
+
+ memtable.appendSegmentFileOffset(0, 2, 2);
+
+ indexFileManager.saveIndexMemtable(memtable);
+
+ indexFileManager = new IndexFileManager(workDir);
+
+ assertThat(indexFileManager.getSegmentFilePointer(0, 1),
is(nullValue()));
+ assertThat(indexFileManager.getSegmentFilePointer(0, 2),
is(nullValue()));
+ assertThat(indexFileManager.getSegmentFilePointer(0, 3),
is(nullValue()));
+
+ indexFileManager.start();
+
+ assertThat(indexFileManager.getSegmentFilePointer(0, 1), is(new
SegmentFilePointer(0, 1)));
+ assertThat(indexFileManager.getSegmentFilePointer(0, 2), is(new
SegmentFilePointer(1, 2)));
+ assertThat(indexFileManager.getSegmentFilePointer(0, 3),
is(nullValue()));
+
+ memtable = new IndexMemTable(STRIPES);
+
+ memtable.appendSegmentFileOffset(0, 3, 3);
+
+ indexFileManager.saveIndexMemtable(memtable);
+
+ assertThat(indexFileManager.getSegmentFilePointer(0, 1), is(new
SegmentFilePointer(0, 1)));
+ assertThat(indexFileManager.getSegmentFilePointer(0, 2), is(new
SegmentFilePointer(1, 2)));
+ assertThat(indexFileManager.getSegmentFilePointer(0, 3), is(new
SegmentFilePointer(2, 3)));
+ }
+
+ @Test
+ void testRecoveryWithTruncateSuffix() throws IOException {
+ var memtable = new IndexMemTable(STRIPES);
+
+ memtable.appendSegmentFileOffset(0, 1, 1);
+ memtable.appendSegmentFileOffset(0, 2, 2);
+ memtable.appendSegmentFileOffset(0, 3, 3);
+
+ indexFileManager.saveIndexMemtable(memtable);
+
+ memtable = new IndexMemTable(STRIPES);
+
+ memtable.truncateSuffix(0, 2);
+
+ indexFileManager.saveIndexMemtable(memtable);
+
+ indexFileManager = new IndexFileManager(workDir);
+
+ indexFileManager.start();
+
+ assertThat(indexFileManager.getSegmentFilePointer(0, 1), is(new
SegmentFilePointer(0, 1)));
+ assertThat(indexFileManager.getSegmentFilePointer(0, 2), is(new
SegmentFilePointer(0, 2)));
+ assertThat(indexFileManager.getSegmentFilePointer(0, 3),
is(nullValue()));
+ }
+
+ @Test
+ void testExists() throws IOException {
+ assertThat(indexFileManager.indexFileExists(0), is(false));
+ assertThat(indexFileManager.indexFileExists(1), is(false));
+
+ var memtable = new IndexMemTable(STRIPES);
+
+ memtable.appendSegmentFileOffset(0, 1, 1);
+
+ indexFileManager.saveIndexMemtable(memtable);
+
+ assertThat(indexFileManager.indexFileExists(0), is(true));
+ assertThat(indexFileManager.indexFileExists(1), is(false));
+
+ indexFileManager.saveIndexMemtable(memtable);
+
+ assertThat(indexFileManager.indexFileExists(0), is(true));
+ assertThat(indexFileManager.indexFileExists(1), is(true));
+ }
+
+ @Test
+ void testSaveMemtableWithExplicitOrdinal() throws IOException {
+ var memtable = new IndexMemTable(STRIPES);
+
+ memtable.appendSegmentFileOffset(0, 1, 1);
+
+ indexFileManager.saveIndexMemtable(memtable, 5);
+
+ memtable = new IndexMemTable(STRIPES);
+
+ memtable.appendSegmentFileOffset(0, 2, 2);
+
+ indexFileManager.saveIndexMemtable(memtable, 10);
+
+ assertThat(indexFileManager.getSegmentFilePointer(0, 1), is(new
SegmentFilePointer(5, 1)));
+ assertThat(indexFileManager.getSegmentFilePointer(0, 2), is(new
SegmentFilePointer(10, 2)));
+ }
}
diff --git
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManagerTest.java
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManagerTest.java
index ac39ec43e7e..b26c20a5efa 100644
---
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManagerTest.java
+++
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManagerTest.java
@@ -42,6 +42,8 @@ import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import java.io.IOException;
import java.io.InputStream;
@@ -60,6 +62,7 @@ import java.util.concurrent.ThreadLocalRandom;
import java.util.function.IntFunction;
import java.util.stream.IntStream;
import java.util.stream.Stream;
+import org.apache.ignite.internal.failure.FailureManager;
import org.apache.ignite.internal.failure.NoOpFailureManager;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.lang.RunnableX;
@@ -85,15 +88,21 @@ class SegmentFileManagerTest extends IgniteAbstractTest {
private static final String NODE_NAME = "test";
+ private final FailureManager failureManager = new NoOpFailureManager();
+
private SegmentFileManager fileManager;
@BeforeEach
void setUp() throws IOException {
- fileManager = new SegmentFileManager(NODE_NAME, workDir, FILE_SIZE,
STRIPES, new NoOpFailureManager());
+ fileManager = createFileManager();
fileManager.start();
}
+ private SegmentFileManager createFileManager() throws IOException {
+ return new SegmentFileManager(NODE_NAME, workDir, FILE_SIZE, STRIPES,
failureManager);
+ }
+
@AfterEach
void tearDown() throws Exception {
closeAllManually(fileManager);
@@ -102,8 +111,8 @@ class SegmentFileManagerTest extends IgniteAbstractTest {
@SuppressWarnings("ResultOfObjectAllocationIgnored")
@Test
void testConstructorInvariants() {
- assertThrows(IllegalArgumentException.class, () -> new
SegmentFileManager(NODE_NAME, workDir, 0, 1, new NoOpFailureManager()));
- assertThrows(IllegalArgumentException.class, () -> new
SegmentFileManager(NODE_NAME, workDir, 1, 1, new NoOpFailureManager()));
+ assertThrows(IllegalArgumentException.class, () -> new
SegmentFileManager(NODE_NAME, workDir, 0, 1, failureManager));
+ assertThrows(IllegalArgumentException.class, () -> new
SegmentFileManager(NODE_NAME, workDir, 1, 1, failureManager));
}
@Test
@@ -425,6 +434,139 @@ class SegmentFileManagerTest extends IgniteAbstractTest {
runRace(writerTaskFactory.apply(0), writerTaskFactory.apply(1));
}
+ @Test
+ void testRecovery() throws Exception {
+ int batchSize = FILE_SIZE / 4;
+
+ List<byte[]> batches = randomData(batchSize, 10);
+
+ for (int i = 0; i < batches.size(); i++) {
+ appendBytes(batches.get(i), i);
+ }
+
+ List<Path> segmentFiles = segmentFiles();
+
+ assertThat(segmentFiles, hasSize(greaterThan(1)));
+
+ List<Path> indexFiles = await().until(this::indexFiles,
hasSize(segmentFiles.size() - 1));
+
+ fileManager.close();
+
+ // Delete an index file. We expect it to be re-created after recovery.
+ Files.delete(indexFiles.get(0));
+
+ fileManager = createFileManager();
+
+ fileManager.start();
+
+ for (int i = 0; i < batches.size(); i++) {
+ byte[] expectedEntry = batches.get(i);
+
+ fileManager.getEntry(GROUP_ID, i, bs -> {
+ assertThat(bs, is(expectedEntry));
+
+ return null;
+ });
+ }
+ }
+
+ @Test
+ void testRecoveryWithTmpIndexFiles() throws Exception {
+ List<byte[]> batches = randomData(FILE_SIZE / 4, 3);
+
+ for (int i = 0; i < batches.size(); i++) {
+ appendBytes(batches.get(i), i);
+ }
+
+ // We need rollover to happen at least once to trigger index file
recovery.
+ await().until(this::indexFiles, hasSize(1));
+
+ assertThat(segmentFiles(), hasSize(2));
+
+ // Use a mock memtable that throws an exception to force the index
manager to create a temporary index file, but not rename it.
+ ReadModeIndexMemTable mockMemTable = mock(ReadModeIndexMemTable.class);
+
+ when(mockMemTable.iterator()).thenThrow(new RuntimeException("Test
exception"));
+
+ // Create two tmp index files: one for the complete segment file and
one the incomplete segment file.
+ try {
+ fileManager.indexFileManager().saveIndexMemtable(mockMemTable, 0);
+ } catch (RuntimeException ignored) {
+ // Ignore.
+ }
+
+ try {
+ fileManager.indexFileManager().saveIndexMemtable(mockMemTable, 1);
+ } catch (RuntimeException ignored) {
+ // Ignore.
+ }
+
+ assertThat(tmpIndexFiles(), hasSize(2));
+
+ fileManager.close();
+
+ for (Path indexFile : indexFiles()) {
+ Files.delete(indexFile);
+ }
+
+ fileManager = createFileManager();
+
+ fileManager.start();
+
+ assertThat(tmpIndexFiles(), is(empty()));
+
+ assertThat(indexFiles(), hasSize(1));
+ }
+
+ @Test
+ void testRecoveryWithTruncateSuffix() throws Exception {
+ List<byte[]> batches = randomData(FILE_SIZE / 4, 10);
+
+ for (int i = 0; i < batches.size(); i++) {
+ appendBytes(batches.get(i), i);
+ }
+
+ await().until(this::indexFiles, hasSize(4));
+
+ int lastLogIndexKept = batches.size() / 2;
+
+ fileManager.truncateSuffix(GROUP_ID, lastLogIndexKept);
+
+ // Insert more data in order to have two truncate suffix records: one
in a "rollovered" segment file and one in the latest segment
+ // file.
+ for (int i = lastLogIndexKept + 1; i < batches.size(); i++) {
+ appendBytes(batches.get(i), i);
+ }
+
+ fileManager.truncateSuffix(GROUP_ID, lastLogIndexKept);
+
+ fileManager.close();
+
+ for (Path indexFile : indexFiles()) {
+ Files.deleteIfExists(indexFile);
+ }
+
+ fileManager = createFileManager();
+
+ fileManager.start();
+
+ for (int i = 0; i <= lastLogIndexKept; i++) {
+ byte[] expectedEntry = batches.get(i);
+
+ fileManager.getEntry(GROUP_ID, i, bs -> {
+ assertThat(bs, is(expectedEntry));
+
+ return null;
+ });
+ }
+
+ for (int i = lastLogIndexKept + 1; i < batches.size(); i++) {
+ fileManager.getEntry(GROUP_ID, i, bs -> {
+ throw new AssertionError("This method should not be called.");
+ });
+ }
+ }
+
private Path findSoleSegmentFile() throws IOException {
List<Path> segmentFiles = segmentFiles();
@@ -452,6 +594,19 @@ class SegmentFileManagerTest extends IgniteAbstractTest {
}
}
+ private List<Path> tmpIndexFiles() throws IOException {
+ try (Stream<Path> files = Files.list(fileManager.indexFilesDir())) {
+ return files
+ .filter(p -> {
+ String fileName = p.getFileName().toString();
+
+ return fileName.endsWith(".tmp");
+ })
+ .sorted()
+ .collect(toList());
+ }
+ }
+
private static List<byte[]> randomData(int batchLength, int numBatches) {
return IntStream.range(0, numBatches)
.mapToObj(i -> randomBytes(ThreadLocalRandom.current(),
batchLength))