This is an automated email from the ASF dual-hosted git repository.
apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 313c2252bcc IGNITE-26284 Implement truncateSuffix operation (#6753)
313c2252bcc is described below
commit 313c2252bcce0cc82e687c151559e00eecb5f129
Author: Alexander Polovtcev <[email protected]>
AuthorDate: Mon Oct 20 14:05:30 2025 +0300
IGNITE-26284 Implement truncateSuffix operation (#6753)
---
.../raft/storage/segstore/GroupIndexMeta.java | 104 +++++++--
.../raft/storage/segstore/IndexFileMeta.java | 8 +
.../raft/storage/segstore/IndexFileMetaArray.java | 10 +-
.../raft/storage/segstore/IndexMemTable.java | 16 ++
.../raft/storage/segstore/RaftLogCheckpointer.java | 3 +-
.../raft/storage/segstore/SegmentFileManager.java | 120 +++++++----
.../raft/storage/segstore/SegmentInfo.java | 38 ++++
.../raft/storage/segstore/SegmentPayload.java | 24 +++
.../raft/storage/segstore/SegstoreLogStorage.java | 8 +-
.../storage/segstore/WriteModeIndexMemTable.java | 7 +-
.../raft/storage/segstore/GroupIndexMetaTest.java | 119 ++++++++++
.../storage/segstore/IndexFileManagerTest.java | 80 +++++++
.../storage/segstore/IndexFileMetaArrayTest.java | 14 ++
.../raft/storage/segstore/IndexMemTableTest.java | 95 ++++++++
.../segstore/SegmentFileManagerGetEntryTest.java | 240 ++++++++++++++++-----
.../storage/segstore/SegmentFileManagerTest.java | 60 +++++-
.../storage/segstore/SegstoreLogStorageTest.java | 6 -
17 files changed, 825 insertions(+), 127 deletions(-)
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/GroupIndexMeta.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/GroupIndexMeta.java
index 5ee00ff8b47..a727216a180 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/GroupIndexMeta.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/GroupIndexMeta.java
@@ -19,57 +19,119 @@ package org.apache.ignite.internal.raft.storage.segstore;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentLinkedDeque;
import org.jetbrains.annotations.Nullable;
/**
* Represents in-memory meta information about a particular Raft group stored
in an index file.
*/
class GroupIndexMeta {
- private static final VarHandle FILE_METAS_VH;
+ private static class IndexMetaArrayHolder {
+ private static final VarHandle FILE_METAS_VH;
- static {
- try {
- FILE_METAS_VH =
MethodHandles.lookup().findVarHandle(GroupIndexMeta.class, "fileMetas",
IndexFileMetaArray.class);
- } catch (ReflectiveOperationException e) {
- throw new ExceptionInInitializerError(e);
+ static {
+ try {
+ FILE_METAS_VH =
MethodHandles.lookup().findVarHandle(IndexMetaArrayHolder.class, "fileMetas",
IndexFileMetaArray.class);
+ } catch (ReflectiveOperationException e) {
+ throw new ExceptionInInitializerError(e);
+ }
+ }
+
+ @SuppressWarnings("FieldMayBeFinal") // Updated through a VarHandle.
+ volatile IndexFileMetaArray fileMetas;
+
+ IndexMetaArrayHolder(IndexFileMeta startFileMeta) {
+ this.fileMetas = new IndexFileMetaArray(startFileMeta);
+ }
+
+ void addIndexMeta(IndexFileMeta indexFileMeta) {
+ IndexFileMetaArray fileMetas = this.fileMetas;
+
+ IndexFileMetaArray newFileMetas = fileMetas.add(indexFileMeta);
+
+ // Simple assignment would suffice, since we only have one thread
writing to this field, but we use compareAndSet to verify
+ // this invariant, just in case.
+ boolean updated = FILE_METAS_VH.compareAndSet(this, fileMetas,
newFileMetas);
+
+ assert updated : "Concurrent writes detected";
}
}
- @SuppressWarnings("FieldMayBeFinal") // Updated through a VarHandle.
- private volatile IndexFileMetaArray fileMetas;
+ private final Deque<IndexMetaArrayHolder> fileMetaDeque = new
ConcurrentLinkedDeque<>();
GroupIndexMeta(IndexFileMeta startFileMeta) {
- this.fileMetas = new IndexFileMetaArray(startFileMeta);
+ fileMetaDeque.add(new IndexMetaArrayHolder(startFileMeta));
}
void addIndexMeta(IndexFileMeta indexFileMeta) {
- IndexFileMetaArray fileMetas = this.fileMetas;
+ IndexMetaArrayHolder curFileMetas = fileMetaDeque.getLast();
+
+ long curLastLogIndex = curFileMetas.fileMetas.lastLogIndexExclusive();
- IndexFileMetaArray newFileMetas = fileMetas.add(indexFileMeta);
+ long newFirstLogIndex = indexFileMeta.firstLogIndexInclusive();
- // Simple assignment would suffice, since we only have one thread
writing to this field, but we use compareAndSet to verify
- // this invariant, just in case.
- boolean updated = FILE_METAS_VH.compareAndSet(this, fileMetas,
newFileMetas);
+ assert newFirstLogIndex <= curLastLogIndex :
+ String.format(
+ "Gaps between Index File Metas are not allowed. Last
log index: %d, new log index: %d",
+ curLastLogIndex, newFirstLogIndex
+ );
- assert updated : "Concurrent writes detected";
+ // Merge consecutive index metas into a single meta block. If there's
an overlap (e.g. due to log truncation), start a new block,
+ // which will override the previous one during search.
+ if (curLastLogIndex == newFirstLogIndex) {
+ curFileMetas.addIndexMeta(indexFileMeta);
+ } else {
+ fileMetaDeque.add(new IndexMetaArrayHolder(indexFileMeta));
+ }
}
/**
- * Returns a file pointer that uniquely identifies the index file for the
given log index. Returns {@code null} if the given log index
+ * Returns index file meta that uniquely identifies the index file for the
given log index. Returns {@code null} if the given log index
* is not found in any of the index files in this group.
*/
@Nullable
IndexFileMeta indexMeta(long logIndex) {
- return fileMetas.find(logIndex);
+ Iterator<IndexMetaArrayHolder> it = fileMetaDeque.descendingIterator();
+
+ while (it.hasNext()) {
+ IndexFileMetaArray fileMetas = it.next().fileMetas;
+
+ // Log suffix might have been truncated, so we can have an entry
on the top of the queue that cuts off part of the search range.
+ if (logIndex >= fileMetas.lastLogIndexExclusive()) {
+ return null;
+ }
+
+ if (logIndex < fileMetas.firstLogIndexInclusive()) {
+ continue;
+ }
+
+ IndexFileMeta indexMeta = fileMetas.find(logIndex);
+
+ if (indexMeta != null) {
+ return indexMeta;
+ }
+ }
+
+ return null;
}
long firstLogIndexInclusive() {
- return fileMetas.get(0).firstLogIndexInclusive();
+ for (IndexMetaArrayHolder indexMetaArrayHolder : fileMetaDeque) {
+ long firstLogIndex =
indexMetaArrayHolder.fileMetas.firstLogIndexInclusive();
+
+ // "firstLogIndexInclusive" can return -1 of the index file does
not contain any entries for this group, only the truncation
+ // record.
+ if (firstLogIndex >= 0) {
+ return firstLogIndex;
+ }
+ }
+
+ return -1;
}
long lastLogIndexExclusive() {
- IndexFileMetaArray fileMetas = this.fileMetas;
-
- return fileMetas.get(fileMetas.size() - 1).lastLogIndexExclusive();
+ return fileMetaDeque.getLast().fileMetas.lastLogIndexExclusive();
}
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileMeta.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileMeta.java
index 9c5bf9a77cb..dfdbe03b123 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileMeta.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileMeta.java
@@ -68,6 +68,14 @@ class IndexFileMeta {
return indexFileOrdinal;
}
+ /**
+ * Returns {@code true} if the index meta is empty. This happens if some
data was inserted but then the log suffix got truncated,
+ * completely wiping it out.
+ */
+ boolean isEmpty() {
+ return firstLogIndexInclusive == lastLogIndexExclusive;
+ }
+
@Override
public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) {
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileMetaArray.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileMetaArray.java
index 6ec73edac44..1256faa4713 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileMetaArray.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileMetaArray.java
@@ -73,7 +73,15 @@ class IndexFileMetaArray {
}
long firstLogIndexInclusive() {
- return array[0].firstLogIndexInclusive();
+ IndexFileMeta firstMeta = array[0];
+ IndexFileMeta lastMeta = array[size - 1];
+
+ if (firstMeta.firstLogIndexInclusive() >=
lastMeta.lastLogIndexExclusive()) {
+ // Log for this group has been truncated.
+ return -1;
+ }
+
+ return firstMeta.firstLogIndexInclusive();
}
long lastLogIndexExclusive() {
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexMemTable.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexMemTable.java
index 37dcfdc9801..a4de58806ea 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexMemTable.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexMemTable.java
@@ -64,6 +64,22 @@ class IndexMemTable implements WriteModeIndexMemTable,
ReadModeIndexMemTable {
return stripe(groupId).memTable.get(groupId);
}
+ @Override
+ public void truncateSuffix(long groupId, long lastLogIndexKept) {
+ ConcurrentMap<Long, SegmentInfo> memtable = stripe(groupId).memTable;
+
+ SegmentInfo segmentInfo = memtable.get(groupId);
+
+ if (segmentInfo == null || lastLogIndexKept <
segmentInfo.firstLogIndexInclusive()) {
+ // If the current memtable does not have information for the given
group or if we are truncating everything currently present
+ // in the memtable, we need to write a special "empty" SegmentInfo
into the memtable to override existing persisted data during
+ // search.
+ memtable.put(groupId, new SegmentInfo(lastLogIndexKept + 1));
+ } else {
+ segmentInfo.truncateSuffix(lastLogIndexKept);
+ }
+ }
+
@Override
public ReadModeIndexMemTable transitionToReadMode() {
return this;
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogCheckpointer.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogCheckpointer.java
index 89435d042f9..dc5c805e6b8 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogCheckpointer.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogCheckpointer.java
@@ -137,7 +137,8 @@ class RaftLogCheckpointer {
while (it.hasNext()) {
SegmentInfo segmentInfo =
it.next().memTable().segmentInfo(groupId);
- if (segmentInfo != null) {
+ // Segment Info can be empty if the log suffix was truncated.
+ if (segmentInfo != null && segmentInfo.size() > 0) {
firstIndex = segmentInfo.firstLogIndexInclusive();
}
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManager.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManager.java
index 8e52851a9d8..8f25ddae7cf 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManager.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManager.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.raft.storage.segstore;
+import static
org.apache.ignite.internal.raft.storage.segstore.SegmentPayload.TRUNCATE_SUFFIX_RECORD_SIZE;
import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
import static org.apache.ignite.lang.ErrorGroups.Common.NODE_STOPPING_ERR;
@@ -64,13 +65,17 @@ import org.jetbrains.annotations.Nullable;
*
+---------------+---------+--------------------------+---------+----------------+
* </pre>
*
+ * <p>In addition to regular Raft log entries, payload can also represent a
special type of entry which are written when Raft suffix
+ * is truncated. Such entries are identified by having a payload length of 0,
followed by 8 bytes of the last log index kept after the
+ * truncation.
+ *
* <p>When a rollover happens and the segment file being replaced has at least
8 bytes left, a special {@link #SWITCH_SEGMENT_RECORD} is
* written at the end of the file. If there are less than 8 bytes left, no
switch records are written.
*/
class SegmentFileManager implements ManuallyCloseable {
private static final int ROLLOVER_WAIT_TIMEOUT_MS = 30_000;
- private static final int MAGIC_NUMBER = 0xFEEDFACE;
+ private static final int MAGIC_NUMBER = 0x56E0B526;
private static final int FORMAT_VERSION = 1;
@@ -177,32 +182,23 @@ class SegmentFileManager implements ManuallyCloseable {
void appendEntry(long groupId, LogEntry entry, LogEntryEncoder encoder)
throws IOException {
int entrySize = encoder.size(entry);
- if (entrySize > maxEntrySize()) {
+ if (entrySize > maxPossibleEntrySize()) {
throw new IllegalArgumentException(String.format(
- "Entry size is too big (%d bytes), maximum allowed entry
size: %d bytes.", entrySize, maxEntrySize()
+ "Entry size is too big (%d bytes), maximum allowed entry
size: %d bytes.", entrySize, maxPossibleEntrySize()
));
}
int payloadSize = SegmentPayload.size(entrySize);
- while (true) {
- SegmentFileWithMemtable segmentFileWithMemtable =
currentSegmentFile();
-
- try (WriteBuffer writeBuffer =
segmentFileWithMemtable.segmentFile().reserve(payloadSize)) {
- if (writeBuffer != null) {
- int segmentOffset = writeBuffer.buffer().position();
+ try (WriteBufferWithMemtable writeBufferWithMemtable =
reserveBytesWithRollover(payloadSize)) {
+ ByteBuffer segmentBuffer = writeBufferWithMemtable.buffer();
- SegmentPayload.writeTo(writeBuffer.buffer(), groupId,
entrySize, entry, encoder);
+ int segmentOffset = segmentBuffer.position();
- // Append to memtable before write buffer is released to
avoid races with checkpoint on rollover.
-
segmentFileWithMemtable.memtable().appendSegmentFileOffset(groupId,
entry.getId().getIndex(), segmentOffset);
-
- return;
- }
- }
+ SegmentPayload.writeTo(segmentBuffer, groupId, entrySize, entry,
encoder);
- // Segment file does not have enough space. Try to switch to a new
one and retry the write attempt.
- initiateRollover(segmentFileWithMemtable);
+ // Append to memtable before write buffer is released to avoid
races with checkpoint on rollover.
+ writeBufferWithMemtable.memtable.appendSegmentFileOffset(groupId,
entry.getId().getIndex(), segmentOffset);
}
}
@@ -213,10 +209,21 @@ class SegmentFileManager implements ManuallyCloseable {
}
private @Nullable ByteBuffer getEntry(long groupId, long logIndex) throws
IOException {
- ByteBuffer bufferFromCurrentSegmentFile =
readFromCurrentSegmentFile(groupId, logIndex);
+ // First, read from the current segment file.
+ SegmentFileWithMemtable currentSegmentFile =
this.currentSegmentFile.get();
+
+ SegmentInfo segmentInfo =
currentSegmentFile.memtable().segmentInfo(groupId);
+
+ if (segmentInfo != null) {
+ if (logIndex >= segmentInfo.lastLogIndexExclusive()) {
+ return null;
+ }
- if (bufferFromCurrentSegmentFile != null) {
- return bufferFromCurrentSegmentFile;
+ int segmentPayloadOffset = segmentInfo.getOffset(logIndex);
+
+ if (segmentPayloadOffset != 0) {
+ return
currentSegmentFile.segmentFile().buffer().position(segmentPayloadOffset);
+ }
}
ByteBuffer bufferFromCheckpointQueue =
checkpointer.findSegmentPayloadInQueue(groupId, logIndex);
@@ -228,6 +235,32 @@ class SegmentFileManager implements ManuallyCloseable {
return readFromOtherSegmentFiles(groupId, logIndex);
}
+ void truncateSuffix(long groupId, long lastLogIndexKept) throws
IOException {
+ try (WriteBufferWithMemtable writeBufferWithMemtable =
reserveBytesWithRollover(TRUNCATE_SUFFIX_RECORD_SIZE)) {
+ ByteBuffer segmentBuffer = writeBufferWithMemtable.buffer();
+
+ SegmentPayload.writeTruncateSuffixRecordTo(segmentBuffer, groupId,
lastLogIndexKept);
+
+ // Modify the memtable before write buffer is released to avoid
races with checkpoint on rollover.
+ writeBufferWithMemtable.memtable.truncateSuffix(groupId,
lastLogIndexKept);
+ }
+ }
+
+ private WriteBufferWithMemtable reserveBytesWithRollover(int size) throws
IOException {
+ while (true) {
+ SegmentFileWithMemtable segmentFileWithMemtable =
currentSegmentFile();
+
+ WriteBuffer writeBuffer =
segmentFileWithMemtable.segmentFile().reserve(size);
+
+ if (writeBuffer != null) {
+ return new WriteBufferWithMemtable(writeBuffer,
segmentFileWithMemtable.memtable());
+ }
+
+ // Segment file does not have enough space. Try to switch to a new
one and retry the write attempt.
+ initiateRollover(segmentFileWithMemtable);
+ }
+ }
+
/**
* Returns the lowest log index for the given group present in the storage
or {@code -1} if no such index exists.
*/
@@ -254,7 +287,11 @@ class SegmentFileManager implements ManuallyCloseable {
SegmentInfo segmentInfo =
currentSegmentFile.memtable().segmentInfo(groupId);
- return segmentInfo == null ? -1 : segmentInfo.firstLogIndexInclusive();
+ if (segmentInfo == null || segmentInfo.size() == 0) {
+ return -1;
+ }
+
+ return segmentInfo.firstLogIndexInclusive();
}
/**
@@ -296,8 +333,7 @@ class SegmentFileManager implements ManuallyCloseable {
return segmentFile;
}
- // If the current segment file is null, then either the manager is
stopped or a rollover is in progress and we need to wait for
- // it to complete.
+ // If the current segment file is read-only, then a rollover is in
progress and we need to wait for it to complete.
try {
synchronized (rolloverLock) {
while (true) {
@@ -372,24 +408,10 @@ class SegmentFileManager implements ManuallyCloseable {
}
}
- private long maxEntrySize() {
+ private long maxPossibleEntrySize() {
return fileSize - HEADER_RECORD.length - SegmentPayload.overheadSize();
}
- private @Nullable ByteBuffer readFromCurrentSegmentFile(long groupId, long
logIndex) {
- SegmentFileWithMemtable currentSegmentFile =
this.currentSegmentFile.get();
-
- SegmentInfo segmentInfo =
currentSegmentFile.memtable().segmentInfo(groupId);
-
- int segmentPayloadOffset = segmentInfo == null ? 0 :
segmentInfo.getOffset(logIndex);
-
- if (segmentPayloadOffset == 0) {
- return null;
- }
-
- return
currentSegmentFile.segmentFile().buffer().position(segmentPayloadOffset);
- }
-
private @Nullable ByteBuffer readFromOtherSegmentFiles(long groupId, long
logIndex) throws IOException {
SegmentFilePointer segmentFilePointer =
indexFileManager.getSegmentFilePointer(groupId, logIndex);
@@ -404,4 +426,24 @@ class SegmentFileManager implements ManuallyCloseable {
return
segmentFile.buffer().position(segmentFilePointer.payloadOffset());
}
+
+ private static class WriteBufferWithMemtable implements AutoCloseable {
+ final WriteBuffer writeBuffer;
+
+ final WriteModeIndexMemTable memtable;
+
+ WriteBufferWithMemtable(WriteBuffer writeBuffer,
WriteModeIndexMemTable memtable) {
+ this.writeBuffer = writeBuffer;
+ this.memtable = memtable;
+ }
+
+ ByteBuffer buffer() {
+ return writeBuffer.buffer();
+ }
+
+ @Override
+ public void close() {
+ writeBuffer.close();
+ }
+ }
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentInfo.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentInfo.java
index e4428cd2f12..ffffb17f72e 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentInfo.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentInfo.java
@@ -56,6 +56,17 @@ class SegmentInfo {
return new ArrayWithSize(array, size + 1);
}
+ ArrayWithSize truncate(int newSize) {
+ assert newSize <= size
+ : String.format("Array must shrink on truncation, current
size: %d, size after truncation: %d", size, newSize);
+
+ int[] newArray = new int[size];
+
+ System.arraycopy(array, 0, newArray, 0, newSize);
+
+ return new ArrayWithSize(newArray, newSize);
+ }
+
int get(int index) {
return array[index];
}
@@ -159,4 +170,31 @@ class SegmentInfo {
buffer.asIntBuffer().put(offsets.array, 0, offsets.size);
}
+
+ /**
+ * Removes all data which log indices are strictly greater than {@code
lastLogIndexKept}.
+ */
+ void truncateSuffix(long lastLogIndexKept) {
+ assert lastLogIndexKept >= logIndexBase :
String.format("logIndexBase=%d, lastLogIndexKept=%d", logIndexBase,
lastLogIndexKept);
+
+ ArrayWithSize segmentFileOffsets = this.segmentFileOffsets;
+
+ long newSize = lastLogIndexKept - logIndexBase + 1;
+
+ // Not using an assertion here, because this value comes doesn't come
from the storage code.
+ if (newSize > segmentFileOffsets.size()) {
+ throw new IllegalArgumentException(String.format(
+ "lastLogIndexKept is too large. Last index in memtable:
%d, lastLogIndexKept: %d",
+ logIndexBase + segmentFileOffsets.size() - 1,
lastLogIndexKept
+ ));
+ }
+
+ ArrayWithSize newSegmentFileOffsets =
segmentFileOffsets.truncate((int) newSize);
+
+ // Simple assignment would suffice, since we only have one thread
writing to this field, but we use compareAndSet to verify
+ // this invariant, just in case.
+ boolean updated = SEGMENT_FILE_OFFSETS_VH.compareAndSet(this,
segmentFileOffsets, newSegmentFileOffsets);
+
+ assert updated : "Concurrent writes detected";
+ }
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentPayload.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentPayload.java
index 89a7af99eb1..ddf1ad65373 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentPayload.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentPayload.java
@@ -35,6 +35,15 @@ class SegmentPayload {
static final int HASH_SIZE = Integer.BYTES;
+ /**
+ * Length of the byte sequence that is written when suffix truncation
happens.
+ *
+ * <p>Format: {@code groupId, 0 (special length value), last kept index,
crc}
+ */
+ static final int TRUNCATE_SUFFIX_RECORD_SIZE = GROUP_ID_SIZE_BYTES +
LENGTH_SIZE_BYTES + Long.BYTES + HASH_SIZE;
+
+ private static final int TRUNCATE_SUFFIX_RECORD_MARKER = 0;
+
static void writeTo(
ByteBuffer buffer,
long groupId,
@@ -61,6 +70,21 @@ class SegmentPayload {
buffer.putInt(crc);
}
+ static void writeTruncateSuffixRecordTo(ByteBuffer buffer, long groupId,
long lastLogIndexKept) {
+ int originalPos = buffer.position();
+
+ buffer
+ .putLong(groupId)
+ .putInt(TRUNCATE_SUFFIX_RECORD_MARKER)
+ .putLong(lastLogIndexKept);
+
+ buffer.position(originalPos);
+
+ int crc = FastCrc.calcCrc(buffer, TRUNCATE_SUFFIX_RECORD_SIZE -
HASH_SIZE);
+
+ buffer.putInt(crc);
+ }
+
static LogEntry readFrom(ByteBuffer buffer, LogEntryDecoder
logEntryDecoder) {
int entrySize = buffer.getInt(buffer.position() + GROUP_ID_SIZE_BYTES);
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegstoreLogStorage.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegstoreLogStorage.java
index 7e0a28e9756..7a2f86e4106 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegstoreLogStorage.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegstoreLogStorage.java
@@ -120,7 +120,13 @@ class SegstoreLogStorage implements LogStorage {
@Override
public boolean truncateSuffix(long lastIndexKept) {
- throw new UnsupportedOperationException();
+ try {
+ segmentFileManager.truncateSuffix(groupId, lastIndexKept);
+ } catch (IOException e) {
+ throw new IgniteInternalException(INTERNAL_ERR, e);
+ }
+
+ return true;
}
@Override
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/WriteModeIndexMemTable.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/WriteModeIndexMemTable.java
index 1901d1d629c..df8bd40e1d4 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/WriteModeIndexMemTable.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/WriteModeIndexMemTable.java
@@ -25,7 +25,7 @@ import org.jetbrains.annotations.Nullable;
* <p>This class represents an in-memory index of the current segment file
used by a {@link SegmentFileManager}. Index is
* essentially a mapping from {@code [groupId, logIndex]} to the offset in the
segment file where the corresponding log entry is stored.
*
- * <p>It is expected that entries for each {@code groupId} are written by one
thread, therefore concurrent writes to the same
+ * <p>It is expected that entries for each {@code groupId} are modified by one
thread, therefore concurrent writes to the same
* {@code groupId} are not safe. However, reads from multiple threads are safe
in relation to the aforementioned writes.
*/
interface WriteModeIndexMemTable {
@@ -43,6 +43,11 @@ interface WriteModeIndexMemTable {
*/
void appendSegmentFileOffset(long groupId, long logIndex, int
segmentFileOffset);
+ /**
+ * Removes all offsets for the given Raft group which log indices are
strictly larger than {@code lastLogIndexKept}.
+ */
+ void truncateSuffix(long groupId, long lastLogIndexKept);
+
/**
* Returns the read-only version of this memtable.
*/
diff --git
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/GroupIndexMetaTest.java
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/GroupIndexMetaTest.java
index 717b70dc08b..ceca38d7b56 100644
---
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/GroupIndexMetaTest.java
+++
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/GroupIndexMetaTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.raft.storage.segstore;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.runRace;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.either;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
@@ -49,6 +50,54 @@ class GroupIndexMetaTest extends BaseIgniteAbstractTest {
assertThat(groupMeta.indexMeta(100), is(nullValue()));
}
+ @Test
+ void testAddGetWithOverlap() {
+ var initialMeta = new IndexFileMeta(1, 100, 0, 0);
+
+ var groupMeta = new GroupIndexMeta(initialMeta);
+
+ var additionalMeta = new IndexFileMeta(42, 100, 42, 1);
+
+ groupMeta.addIndexMeta(additionalMeta);
+
+ assertThat(groupMeta.indexMeta(0), is(nullValue()));
+
+ assertThat(groupMeta.indexMeta(1), is(initialMeta));
+
+ assertThat(groupMeta.indexMeta(41), is(initialMeta));
+
+ assertThat(groupMeta.indexMeta(42), is(additionalMeta));
+
+ assertThat(groupMeta.indexMeta(66), is(additionalMeta));
+
+ assertThat(groupMeta.indexMeta(100), is(nullValue()));
+ }
+
+ @Test
+ void testEmptyMetas() {
+ var initialMeta = new IndexFileMeta(1, 1, 0, 0);
+
+ var groupMeta = new GroupIndexMeta(initialMeta);
+
+ assertThat(groupMeta.indexMeta(1), is(nullValue()));
+
+ assertThat(groupMeta.firstLogIndexInclusive(), is(-1L));
+
+ assertThat(groupMeta.lastLogIndexExclusive(), is(1L));
+
+ var additionalMeta = new IndexFileMeta(1, 2, 42, 1);
+
+ groupMeta.addIndexMeta(additionalMeta);
+
+ assertThat(groupMeta.indexMeta(1), is(additionalMeta));
+
+ assertThat(groupMeta.indexMeta(2), is(nullValue()));
+
+ assertThat(groupMeta.firstLogIndexInclusive(), is(1L));
+
+ assertThat(groupMeta.lastLogIndexExclusive(), is(2L));
+ }
+
@RepeatedTest(10)
void testOneWriterMultipleReaders() {
int startFileOrdinal = 100;
@@ -94,4 +143,74 @@ class GroupIndexMetaTest extends BaseIgniteAbstractTest {
runRace(writer, reader, reader, reader);
}
+
+ @RepeatedTest(10)
+ void testOneWriterMultipleReadersWithOverlaps() {
+ int startFileOrdinal = 100;
+
+ int logEntriesPerFile = 50;
+
+ var initialMeta = new IndexFileMeta(0, logEntriesPerFile - 1, 0,
startFileOrdinal);
+
+ var groupMeta = new GroupIndexMeta(initialMeta);
+
+ int totalIndexFiles = 1000;
+
+ int overlap = 10;
+
+ RunnableX writer = () -> {
+ for (int relativeFileOrdinal = 1; relativeFileOrdinal <
totalIndexFiles; relativeFileOrdinal++) {
+ long startLogIndex = relativeFileOrdinal * (logEntriesPerFile
- overlap);
+ long lastLogIndex = startLogIndex + logEntriesPerFile - 1;
+
+ groupMeta.addIndexMeta(new IndexFileMeta(startLogIndex,
lastLogIndex, 0, startFileOrdinal + relativeFileOrdinal));
+ }
+ };
+
+ int totalLogEntries = totalIndexFiles * logEntriesPerFile;
+
+ RunnableX reader = () -> {
+ int expectedFirstLogIndex = 0;
+
+ int relativeFileOrdinal = 0;
+
+ for (int logIndex = 0; logIndex < totalLogEntries; logIndex++) {
+ IndexFileMeta indexFileMeta = groupMeta.indexMeta(logIndex);
+
+ int nextFirstLogIndex = expectedFirstLogIndex +
logEntriesPerFile - overlap;
+
+ // Last file is special, as it doesn't have an overlap.
+ if (logIndex >= nextFirstLogIndex && relativeFileOrdinal !=
totalIndexFiles - 1) {
+ expectedFirstLogIndex = nextFirstLogIndex;
+ relativeFileOrdinal++;
+ }
+
+ if (indexFileMeta != null) {
+ int expectedFileOrdinal = startFileOrdinal +
relativeFileOrdinal;
+
+ var expectedMetaWithOverlap = new IndexFileMeta(
+ expectedFirstLogIndex,
+ expectedFirstLogIndex + logEntriesPerFile - 1,
+ 0,
+ expectedFileOrdinal
+ );
+
+ var expectedMetaWithoutOverlap = new IndexFileMeta(
+ expectedFirstLogIndex + overlap -
logEntriesPerFile,
+ expectedFirstLogIndex + overlap - 1,
+ 0,
+ expectedFileOrdinal - 1
+ );
+
+ // We can possibly be reading from two different metas -
from the newer one (that overlaps the older one) or
+ // the older one.
+ assertThat(
+ logIndex + " -> " + indexFileMeta,
+ indexFileMeta,
either(is(expectedMetaWithOverlap)).or(is(expectedMetaWithoutOverlap)));
+ }
+ }
+ };
+
+ runRace(writer, reader, reader, reader);
+ }
}
diff --git
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManagerTest.java
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManagerTest.java
index d4ad1f22a7b..82a07424d7e 100644
---
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManagerTest.java
+++
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManagerTest.java
@@ -184,4 +184,84 @@ class IndexFileManagerTest extends IgniteAbstractTest {
is(new SegmentFilePointer(2, 3))
);
}
+
+ @Test
+ void testFirstLastLogIndicesIndependence() throws IOException {
+ var memtable = new IndexMemTable(STRIPES);
+
+ memtable.appendSegmentFileOffset(0, 1, 1);
+
+ indexFileManager.saveIndexMemtable(memtable);
+
+ assertThat(indexFileManager.firstLogIndexInclusive(0), is(1L));
+ assertThat(indexFileManager.lastLogIndexExclusive(0), is(2L));
+
+ assertThat(indexFileManager.firstLogIndexInclusive(1), is(-1L));
+ assertThat(indexFileManager.lastLogIndexExclusive(1), is(-1L));
+
+ memtable = new IndexMemTable(STRIPES);
+
+ memtable.appendSegmentFileOffset(1, 2, 1);
+
+ indexFileManager.saveIndexMemtable(memtable);
+
+ assertThat(indexFileManager.firstLogIndexInclusive(0), is(1L));
+ assertThat(indexFileManager.lastLogIndexExclusive(0), is(2L));
+
+ assertThat(indexFileManager.firstLogIndexInclusive(1), is(2L));
+ assertThat(indexFileManager.lastLogIndexExclusive(1), is(3L));
+ }
+
+ @Test
+ void testFirstLastLogIndicesWithTruncate() throws IOException {
+ var memtable = new IndexMemTable(STRIPES);
+
+ memtable.appendSegmentFileOffset(0, 1, 1);
+ memtable.appendSegmentFileOffset(0, 2, 1);
+ memtable.appendSegmentFileOffset(0, 3, 1);
+
+ indexFileManager.saveIndexMemtable(memtable);
+
+ assertThat(indexFileManager.firstLogIndexInclusive(0), is(1L));
+ assertThat(indexFileManager.lastLogIndexExclusive(0), is(4L));
+
+ memtable = new IndexMemTable(STRIPES);
+
+ memtable.truncateSuffix(0, 1);
+
+ indexFileManager.saveIndexMemtable(memtable);
+
+ assertThat(indexFileManager.firstLogIndexInclusive(0), is(1L));
+ assertThat(indexFileManager.lastLogIndexExclusive(0), is(2L));
+ }
+
+ @Test
+ void testGetSegmentPointerWithTruncate() throws IOException {
+ var memtable = new IndexMemTable(STRIPES);
+
+ memtable.appendSegmentFileOffset(0, 1, 1);
+ memtable.appendSegmentFileOffset(0, 2, 2);
+ memtable.appendSegmentFileOffset(0, 3, 3);
+
+ indexFileManager.saveIndexMemtable(memtable);
+
+ assertThat(indexFileManager.getSegmentFilePointer(0, 2), is(new
SegmentFilePointer(0, 2)));
+
+ memtable = new IndexMemTable(STRIPES);
+
+ memtable.truncateSuffix(0, 1);
+
+ indexFileManager.saveIndexMemtable(memtable);
+
+ assertThat(indexFileManager.getSegmentFilePointer(0, 1), is(new
SegmentFilePointer(0, 1)));
+ assertThat(indexFileManager.getSegmentFilePointer(0, 2),
is(nullValue()));
+
+ memtable = new IndexMemTable(STRIPES);
+
+ memtable.appendSegmentFileOffset(0, 2, 2);
+
+ indexFileManager.saveIndexMemtable(memtable);
+
+ assertThat(indexFileManager.getSegmentFilePointer(0, 2), is(new
SegmentFilePointer(2, 2)));
+ }
}
diff --git
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileMetaArrayTest.java
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileMetaArrayTest.java
index 44fc67718f4..55d24237ebb 100644
---
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileMetaArrayTest.java
+++
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileMetaArrayTest.java
@@ -91,4 +91,18 @@ class IndexFileMetaArrayTest extends BaseIgniteAbstractTest {
assertThat(array.find(99), is(nullValue()));
assertThat(array.find(201), is(nullValue()));
}
+
+ @Test
+ void testFindWorksCorrectlyWithEmptyMetas() {
+ var meta1 = new IndexFileMeta(1, 10, 100, 0);
+ var meta2 = new IndexFileMeta(10, 10, 200, 1);
+ var meta3 = new IndexFileMeta(10, 20, 200, 2);
+
+ IndexFileMetaArray array = new IndexFileMetaArray(meta1)
+ .add(meta2)
+ .add(meta3);
+
+ assertThat(array.find(9), is(meta1));
+ assertThat(array.find(10), is(meta3));
+ }
}
diff --git
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/IndexMemTableTest.java
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/IndexMemTableTest.java
index 89856a8cbd7..afc2263f9cf 100644
---
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/IndexMemTableTest.java
+++
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/IndexMemTableTest.java
@@ -22,6 +22,8 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.either;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import java.util.ArrayList;
import java.util.Iterator;
@@ -153,4 +155,97 @@ class IndexMemTableTest extends BaseIgniteAbstractTest {
}
}
}
+
+ @Test
+ void testTruncateSuffix() {
+ long groupId0 = 1;
+ long groupId1 = 2;
+
+ memTable.appendSegmentFileOffset(groupId0, 1, 42);
+ memTable.appendSegmentFileOffset(groupId0, 2, 43);
+ memTable.appendSegmentFileOffset(groupId0, 3, 44);
+ memTable.appendSegmentFileOffset(groupId0, 4, 45);
+
+ memTable.appendSegmentFileOffset(groupId1, 1, 55);
+ memTable.appendSegmentFileOffset(groupId1, 2, 56);
+ memTable.appendSegmentFileOffset(groupId1, 3, 57);
+ memTable.appendSegmentFileOffset(groupId1, 4, 58);
+
+ memTable.truncateSuffix(groupId0, 2);
+
+ assertThat(memTable.segmentInfo(groupId0).getOffset(1), is(42));
+ assertThat(memTable.segmentInfo(groupId0).getOffset(2), is(43));
+ assertThat(memTable.segmentInfo(groupId0).getOffset(3), is(0));
+ assertThat(memTable.segmentInfo(groupId0).getOffset(4), is(0));
+
+ assertThat(memTable.segmentInfo(groupId1).getOffset(1), is(55));
+ assertThat(memTable.segmentInfo(groupId1).getOffset(2), is(56));
+ assertThat(memTable.segmentInfo(groupId1).getOffset(3), is(57));
+ assertThat(memTable.segmentInfo(groupId1).getOffset(4), is(58));
+
+ memTable.truncateSuffix(groupId1, 4);
+
+ assertThat(memTable.segmentInfo(groupId0).getOffset(1), is(42));
+ assertThat(memTable.segmentInfo(groupId0).getOffset(2), is(43));
+ assertThat(memTable.segmentInfo(groupId0).getOffset(3), is(0));
+ assertThat(memTable.segmentInfo(groupId0).getOffset(4), is(0));
+
+ assertThat(memTable.segmentInfo(groupId1).getOffset(1), is(55));
+ assertThat(memTable.segmentInfo(groupId1).getOffset(2), is(56));
+ assertThat(memTable.segmentInfo(groupId1).getOffset(3), is(57));
+ assertThat(memTable.segmentInfo(groupId1).getOffset(4), is(58));
+
+ memTable.truncateSuffix(groupId1, 0);
+
+ assertThat(memTable.segmentInfo(groupId0).getOffset(1), is(42));
+ assertThat(memTable.segmentInfo(groupId0).getOffset(2), is(43));
+ assertThat(memTable.segmentInfo(groupId0).getOffset(3), is(0));
+ assertThat(memTable.segmentInfo(groupId0).getOffset(4), is(0));
+
+ assertThat(memTable.segmentInfo(groupId1).getOffset(1), is(0));
+ assertThat(memTable.segmentInfo(groupId1).getOffset(2), is(0));
+ assertThat(memTable.segmentInfo(groupId1).getOffset(3), is(0));
+ assertThat(memTable.segmentInfo(groupId1).getOffset(4), is(0));
+ }
+
+ @Test
+ void testTruncateNonExistingSuffix() {
+ assertDoesNotThrow(() -> memTable.truncateSuffix(0, 4));
+
+ memTable.appendSegmentFileOffset(1, 5, 42);
+
+ assertThrows(IllegalArgumentException.class, () ->
memTable.truncateSuffix(1, 10));
+ }
+
+ @Test
+ void testAppendAfterTruncateSuffix() {
+ memTable.appendSegmentFileOffset(0, 1, 42);
+
+ assertThat(memTable.segmentInfo(0).getOffset(1), is(42));
+
+ memTable.truncateSuffix(0, 0);
+
+ assertThat(memTable.segmentInfo(0).getOffset(1), is(0));
+
+ memTable.appendSegmentFileOffset(0, 1, 43);
+
+ assertThat(memTable.segmentInfo(0).getOffset(1), is(43));
+ }
+
+ @Test
+ void testTruncateIntoThePast() {
+ memTable.appendSegmentFileOffset(0, 36, 42);
+
+ // Truncate to a position before the moment the last segment info was
added.
+ memTable.truncateSuffix(0, 10);
+
+ assertThat(memTable.segmentInfo(0).getOffset(36), is(0));
+ assertThat(memTable.segmentInfo(0).getOffset(11), is(0));
+
+ memTable.appendSegmentFileOffset(0, 11, 43);
+
+ assertThat(memTable.segmentInfo(0).getOffset(11), is(43));
+ assertThat(memTable.segmentInfo(0).getOffset(12), is(0));
+ assertThat(memTable.segmentInfo(0).getOffset(36), is(0));
+ }
}
diff --git
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManagerGetEntryTest.java
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManagerGetEntryTest.java
index 7cb6c061507..e565f4ab163 100644
---
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManagerGetEntryTest.java
+++
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManagerGetEntryTest.java
@@ -23,14 +23,19 @@ import static
org.apache.ignite.internal.testframework.IgniteTestUtils.runRace;
import static org.apache.ignite.internal.util.IgniteUtils.closeAllManually;
import static org.apache.ignite.internal.util.IgniteUtils.newHashMap;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.either;
import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
+import static org.hamcrest.Matchers.sameInstance;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.same;
+import static org.mockito.Mockito.lenient;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
@@ -47,6 +52,7 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
@ExtendWith(MockitoExtension.class)
@@ -59,6 +65,12 @@ class SegmentFileManagerGetEntryTest extends
IgniteAbstractTest {
private SegmentFileManager fileManager;
+ @Mock
+ private LogEntryEncoder encoder;
+
+ @Mock
+ private LogEntryDecoder decoder;
+
@BeforeEach
void setUp() throws IOException {
fileManager = new SegmentFileManager(NODE_NAME, workDir, FILE_SIZE,
STRIPES, new NoOpFailureManager());
@@ -77,14 +89,14 @@ class SegmentFileManagerGetEntryTest extends
IgniteAbstractTest {
int logIndex = 1;
- MockEntry entry = new MockEntry(logIndex, 5);
+ LogEntry entry = createLogEntry(logIndex, 5);
- fileManager.appendEntry(groupId, entry.logEntry, entry.encoder);
+ fileManager.appendEntry(groupId, entry, encoder);
- assertThat(fileManager.getEntry(groupId, logIndex, entry.decoder),
is(entry.logEntry));
+ assertThat(fileManager.getEntry(groupId, logIndex, decoder),
is(entry));
- assertThat(fileManager.getEntry(groupId + 1, logIndex, entry.decoder),
is(nullValue()));
- assertThat(fileManager.getEntry(groupId, logIndex + 1, entry.decoder),
is(nullValue()));
+ assertThat(fileManager.getEntry(groupId + 1, logIndex, decoder),
is(nullValue()));
+ assertThat(fileManager.getEntry(groupId, logIndex + 1, decoder),
is(nullValue()));
}
@Test
@@ -93,37 +105,37 @@ class SegmentFileManagerGetEntryTest extends
IgniteAbstractTest {
int entrySize = FILE_SIZE / 4;
- List<MockEntry> entries = IntStream.range(0, numEntries)
- .mapToObj(logIndex -> new MockEntry(logIndex, entrySize))
+ List<LogEntry> entries = IntStream.range(0, numEntries)
+ .mapToObj(logIndex -> createLogEntry(logIndex, entrySize))
.collect(toList());
- for (MockEntry e : entries) {
- fileManager.appendEntry(0, e.logEntry, e.encoder);
+ for (LogEntry e : entries) {
+ fileManager.appendEntry(0, e, encoder);
}
- for (MockEntry e : entries) {
- LogEntry actualEntry = fileManager.getEntry(0, e.logIndex(),
e.decoder);
+ for (LogEntry e : entries) {
+ LogEntry actualEntry = fileManager.getEntry(0,
e.getId().getIndex(), decoder);
- assertThat(actualEntry, is(e.logEntry));
+ assertThat(actualEntry, is(e));
}
}
- @RepeatedTest(10)
+ @RepeatedTest(5)
void getEntryMultithreadedTest() throws IOException {
int numGroups = STRIPES;
- Map<Long, List<MockEntry>> entriesByGroupId =
generateEntries(numGroups, 100, 10);
+ Map<Long, List<LogEntry>> entriesByGroupId =
generateEntries(numGroups, 100, 10);
var tasks = new ArrayList<RunnableX>();
for (int i = 0; i < numGroups; i++) {
long groupId = i;
- List<MockEntry> entries = entriesByGroupId.get(groupId);
+ List<LogEntry> entries = entriesByGroupId.get(groupId);
tasks.add(() -> {
- for (MockEntry e : entries) {
- fileManager.appendEntry(groupId, e.logEntry, e.encoder);
+ for (LogEntry e : entries) {
+ fileManager.appendEntry(groupId, e, encoder);
}
});
}
@@ -131,14 +143,14 @@ class SegmentFileManagerGetEntryTest extends
IgniteAbstractTest {
for (int i = 0; i < numGroups; i++) {
long groupId = i;
- List<MockEntry> entries = entriesByGroupId.get(groupId);
+ List<LogEntry> entries = entriesByGroupId.get(groupId);
RunnableX reader = () -> {
- for (MockEntry e : entries) {
- LogEntry actualEntry = fileManager.getEntry(groupId,
e.logIndex(), e.decoder);
+ for (LogEntry e : entries) {
+ LogEntry actualEntry = fileManager.getEntry(groupId,
e.getId().getIndex(), decoder);
if (actualEntry != null) {
- assertThat(actualEntry, is(e.logEntry));
+ assertThat(actualEntry, is(e));
}
}
};
@@ -151,67 +163,183 @@ class SegmentFileManagerGetEntryTest extends
IgniteAbstractTest {
runRace(tasks.toArray(RunnableX[]::new));
// Validate that data was actually inserted.
- for (Map.Entry<Long, List<MockEntry>> e : entriesByGroupId.entrySet())
{
+ for (Map.Entry<Long, List<LogEntry>> e : entriesByGroupId.entrySet()) {
long groupId = e.getKey();
- for (MockEntry entry : e.getValue()) {
- LogEntry actualEntry = fileManager.getEntry(groupId,
entry.logIndex(), entry.decoder);
+ for (LogEntry entry : e.getValue()) {
+ LogEntry actualEntry = fileManager.getEntry(groupId,
entry.getId().getIndex(), decoder);
- assertThat(actualEntry, is(entry.logEntry));
+ assertThat(actualEntry, is(entry));
}
}
}
- private static Map<Long, List<MockEntry>> generateEntries(int numGroups,
int numEntriesPerGroup, int entrySize) {
- Map<Long, List<MockEntry>> entriesByGroupId = newHashMap(numGroups);
+ @Test
+ void testGetEntryWithSuffixTruncate() throws IOException {
+ int entrySize = FILE_SIZE / 10;
+
+ int numEntries = 100;
+
+ long curLogIndex = 0;
+
+ for (int i = 0; i < numEntries; i++) {
+ var entry = createLogEntry(curLogIndex, entrySize);
+
+ fileManager.appendEntry(0, entry, encoder);
+
+ if (i > 0 && i % 10 == 0) {
+ curLogIndex -= 4;
+
+ fileManager.truncateSuffix(0, curLogIndex);
+
+ // Check that the "lastIndexKept" entry is accessible, while
the truncated one is not.
+ assertThat(fileManager.getEntry(0, curLogIndex, decoder),
is(notNullValue()));
+
+ assertThat(fileManager.getEntry(0, curLogIndex + 1, decoder),
is(nullValue()));
+ }
+
+ curLogIndex++;
+ }
+ }
+
+ @RepeatedTest(5)
+ void truncateSuffixMultithreadedTest() throws IOException {
+ int numGroups = STRIPES;
+
+ int entrySize = 10;
+
+ int numEntriesPerGroup = 100;
+
+ Map<Long, List<LogEntry>> entriesByGroupId =
generateEntries(numGroups, numEntriesPerGroup, entrySize);
+
+ // Entries that will be used to replace truncated entries.
+ var replacementEntriesByGroupId = new HashMap<Long, List<LogEntry>>();
+
+ int numReplacementEntriesPerGroup = numEntriesPerGroup / 5;
for (long groupId = 0; groupId < numGroups; groupId++) {
- var entries = new ArrayList<MockEntry>(numEntriesPerGroup);
+ var entries = new
ArrayList<LogEntry>(numReplacementEntriesPerGroup);
- for (int i = 0; i < numEntriesPerGroup; i++) {
- entries.add(new MockEntry(i, entrySize));
+ for (int i = 0; i < numReplacementEntriesPerGroup; i++) {
+ entries.add(createLogEntry(i * 5, entrySize));
}
- entriesByGroupId.put(groupId, entries);
+ replacementEntriesByGroupId.put(groupId, entries);
}
- return entriesByGroupId;
- }
+ var tasks = new ArrayList<RunnableX>();
- private static class MockEntry {
- private final LogEntry logEntry = mock(LogEntry.class);
+ for (int i = 0; i < numGroups; i++) {
+ long groupId = i;
- private final LogEntryEncoder encoder;
+ List<LogEntry> entries = entriesByGroupId.get(groupId);
- private final LogEntryDecoder decoder;
+ List<LogEntry> replacementEntries =
replacementEntriesByGroupId.get(groupId);
- MockEntry(long logIndex, int entrySize) {
- when(logEntry.getId()).thenReturn(new LogId(logIndex, 0));
+ tasks.add(() -> {
+ for (int entryIndex = 0; entryIndex < entries.size();
entryIndex++) {
+ LogEntry entry = entries.get(entryIndex);
- byte[] bytes = randomBytes(ThreadLocalRandom.current(), entrySize);
+ fileManager.appendEntry(groupId, entry, encoder);
- encoder = new LogEntryEncoder() {
- @Override
- public byte[] encode(LogEntry log) {
- throw new UnsupportedOperationException();
- }
+ // Truncate every 5th entry.
+ if (entryIndex % 5 == 0) {
+ fileManager.truncateSuffix(groupId, entryIndex - 1);
- @Override
- public void encode(ByteBuffer buffer, LogEntry log) {
- buffer.put(bytes);
+ LogEntry replacementEntry =
replacementEntries.get(entryIndex / 5);
+
+ fileManager.appendEntry(groupId, replacementEntry,
encoder);
+ }
}
+ });
+ }
+
+ for (int i = 0; i < numGroups; i++) {
+ long groupId = i;
+
+ List<LogEntry> entries = entriesByGroupId.get(groupId);
- @Override
- public int size(LogEntry logEntry) {
- return entrySize;
+ List<LogEntry> replacementEntries =
replacementEntriesByGroupId.get(groupId);
+
+ RunnableX reader = () -> {
+ for (int logIndex = 0; logIndex < entries.size(); logIndex++) {
+ LogEntry actualEntry = fileManager.getEntry(groupId,
logIndex, decoder);
+
+ if (actualEntry == null) {
+ continue;
+ }
+
+ LogEntry expectedEntry = entries.get(logIndex);
+
+ if (logIndex % 5 == 0) {
+ // Here we can read both the truncated and the
replacement entry.
+ LogEntry replacementEntry =
replacementEntries.get(logIndex / 5);
+
+ assertThat(actualEntry,
either(sameInstance(replacementEntry)).or(sameInstance(expectedEntry)));
+ } else {
+ assertThat(actualEntry,
is(sameInstance(expectedEntry)));
+ }
}
};
- decoder = bs -> logEntry;
+ // Two readers per every group.
+ tasks.add(reader);
+ tasks.add(reader);
+ }
+
+ runRace(tasks.toArray(RunnableX[]::new));
+
+ // Validate that data was actually inserted.
+ for (long groupId = 0; groupId < numGroups; groupId++) {
+ List<LogEntry> entries = entriesByGroupId.get(groupId);
+
+ List<LogEntry> replacementEntries =
replacementEntriesByGroupId.get(groupId);
+
+ for (int logIndex = 0; logIndex < entries.size(); logIndex++) {
+ LogEntry expectedEntry = logIndex % 5 == 0 ?
replacementEntries.get(logIndex / 5) : entries.get(logIndex);
+
+ LogEntry actualEntry = fileManager.getEntry(groupId, logIndex,
decoder);
+
+ assertThat(actualEntry, is(sameInstance(expectedEntry)));
+ }
}
+ }
+
+ private Map<Long, List<LogEntry>> generateEntries(int numGroups, int
numEntriesPerGroup, int entrySize) {
+ Map<Long, List<LogEntry>> entriesByGroupId = newHashMap(numGroups);
+
+ for (long groupId = 0; groupId < numGroups; groupId++) {
+ var entries = new ArrayList<LogEntry>(numEntriesPerGroup);
+
+ for (int i = 0; i < numEntriesPerGroup; i++) {
+ entries.add(createLogEntry(i, entrySize));
+ }
- long logIndex() {
- return logEntry.getId().getIndex();
+ entriesByGroupId.put(groupId, entries);
}
+
+ return entriesByGroupId;
+ }
+
+ private LogEntry createLogEntry(long logIndex, int entrySize) {
+ LogEntry logEntry = new LogEntry();
+
+ logEntry.setId(new LogId(logIndex, 0));
+
+ byte[] bytes = randomBytes(ThreadLocalRandom.current(), entrySize);
+
+ lenient().doAnswer(invocationOnMock -> {
+ ByteBuffer buffer = invocationOnMock.getArgument(0);
+
+ buffer.put(bytes);
+
+ return null;
+ }).when(encoder).encode(any(), same(logEntry));
+
+ lenient().when(encoder.size(same(logEntry))).thenReturn(entrySize);
+
+ lenient().when(decoder.decode(bytes)).thenReturn(logEntry);
+
+ return logEntry;
}
}
diff --git
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManagerTest.java
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManagerTest.java
index 63829a826db..7d30b6aa65c 100644
---
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManagerTest.java
+++
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManagerTest.java
@@ -367,6 +367,64 @@ class SegmentFileManagerTest extends IgniteAbstractTest {
runRace(writerTaskFactory.apply(0), writerTaskFactory.apply(1));
}
+ @Test
+ void truncateRecordIsWrittenOnSuffixTruncate() throws IOException {
+ long groupId = 36;
+
+ long lastLogIndexKept = 42;
+
+ fileManager.truncateSuffix(groupId, lastLogIndexKept);
+
+ Path path = findSoleSegmentFile();
+
+ ByteBuffer expectedTruncateRecord =
ByteBuffer.allocate(SegmentPayload.TRUNCATE_SUFFIX_RECORD_SIZE)
+ .order(SegmentFile.BYTE_ORDER);
+
+ SegmentPayload.writeTruncateSuffixRecordTo(expectedTruncateRecord,
groupId, lastLogIndexKept);
+
+ expectedTruncateRecord.rewind();
+
+ try (SeekableByteChannel channel = Files.newByteChannel(path)) {
+ channel.position(HEADER_RECORD.length);
+
+ assertThat(readFully(channel,
SegmentPayload.TRUNCATE_SUFFIX_RECORD_SIZE), is(expectedTruncateRecord));
+ }
+ }
+
+ @Test
+ void testLastIndexAfterTruncateSuffix() {
+ int batchSize = FILE_SIZE / 10;
+
+ List<byte[]> batches = randomData(batchSize, 100);
+
+ IntFunction<RunnableX> writerTaskFactory = groupId -> () -> {
+ assertThat(fileManager.firstLogIndexInclusive(groupId), is(-1L));
+ assertThat(fileManager.lastLogIndexExclusive(groupId), is(-1L));
+
+ long curLogIndex = 0;
+
+ for (int i = 0; i < batches.size(); i++) {
+ appendBytes(groupId, batches.get(i), curLogIndex);
+
+ if (i > 0 && i % 10 == 0) {
+ curLogIndex -= 4;
+
+ fileManager.truncateSuffix(groupId, curLogIndex);
+ }
+
+ assertThat(fileManager.firstLogIndexInclusive(groupId),
is(0L));
+ assertThat(fileManager.lastLogIndexExclusive(groupId),
is(curLogIndex + 1));
+
+ curLogIndex++;
+ }
+
+ assertThat(fileManager.firstLogIndexInclusive(groupId), is(0L));
+ assertThat(fileManager.lastLogIndexExclusive(groupId),
is(curLogIndex));
+ };
+
+ runRace(writerTaskFactory.apply(0), writerTaskFactory.apply(1));
+ }
+
private Path findSoleSegmentFile() throws IOException {
List<Path> segmentFiles = segmentFiles();
@@ -434,7 +492,7 @@ class SegmentFileManagerTest extends IgniteAbstractTest {
appendBytes(GROUP_ID, serializedEntry, index);
}
- private void appendBytes(long groupId, byte[] serializedEntry, int index)
throws IOException {
+ private void appendBytes(long groupId, byte[] serializedEntry, long index)
throws IOException {
var entry = new LogEntry();
entry.setId(new LogId(index, 0));
diff --git
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegstoreLogStorageTest.java
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegstoreLogStorageTest.java
index d84f3fe3d68..797f9c24e3a 100644
---
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegstoreLogStorageTest.java
+++
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegstoreLogStorageTest.java
@@ -66,10 +66,4 @@ class SegstoreLogStorageTest extends BaseLogStorageTest {
public void testTruncatePrefix() {
super.testTruncatePrefix();
}
-
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-26284")
- @Override
- public void testTruncateSuffix() {
- super.testTruncateSuffix();
- }
}