This is an automated email from the ASF dual-hosted git repository.
apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new e1b9752ac83 IGNITE-26285 Implement truncatePrefix operation (#7101)
e1b9752ac83 is described below
commit e1b9752ac8371769a475774838fa427adbbbbf8c
Author: Alexander Polovtcev <[email protected]>
AuthorDate: Mon Dec 8 18:23:31 2025 +0200
IGNITE-26285 Implement truncatePrefix operation (#7101)
---
.../raft/storage/segstore/EntrySearchResult.java | 87 +++++++++++
.../raft/storage/segstore/GroupIndexMeta.java | 74 +++++++++-
.../raft/storage/segstore/IndexFileManager.java | 95 +++++++++---
.../raft/storage/segstore/IndexFileMeta.java | 3 +
.../raft/storage/segstore/IndexFileMetaArray.java | 55 ++++++-
.../raft/storage/segstore/IndexMemTable.java | 53 +++++--
.../raft/storage/segstore/RaftLogCheckpointer.java | 30 ++--
.../raft/storage/segstore/SegmentFileManager.java | 160 +++++++++++++--------
.../raft/storage/segstore/SegmentInfo.java | 149 ++++++++++++++++---
.../raft/storage/segstore/SegmentPayload.java | 48 ++++---
.../raft/storage/segstore/SegstoreLogStorage.java | 10 +-
.../storage/segstore/WriteModeIndexMemTable.java | 5 +
.../raft/storage/segstore/CheckpointQueueTest.java | 8 +-
.../segstore/DeserializedSegmentPayload.java | 8 +-
.../raft/storage/segstore/GroupIndexMetaTest.java | 85 +++++++++--
.../storage/segstore/IndexFileManagerTest.java | 116 ++++++++++++++-
.../storage/segstore/IndexFileMetaArrayTest.java | 39 +++++
.../raft/storage/segstore/IndexMemTableTest.java | 149 ++++++++++++++++---
.../storage/segstore/RaftLogCheckpointerTest.java | 78 ++++++++--
.../storage/segstore/SegmentFileManagerTest.java | 94 ++++++++++--
.../SegstoreLogStorageConcurrencyTest.java | 40 ++++++
.../storage/segstore/SegstoreLogStorageTest.java | 31 +++-
22 files changed, 1204 insertions(+), 213 deletions(-)
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/EntrySearchResult.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/EntrySearchResult.java
new file mode 100644
index 00000000000..0aa3f44c49a
--- /dev/null
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/EntrySearchResult.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft.storage.segstore;
+
+import static
org.apache.ignite.internal.raft.storage.segstore.EntrySearchResult.SearchOutcome.CONTINUE_SEARCH;
+import static
org.apache.ignite.internal.raft.storage.segstore.EntrySearchResult.SearchOutcome.NOT_FOUND;
+import static
org.apache.ignite.internal.raft.storage.segstore.EntrySearchResult.SearchOutcome.SUCCESS;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.tostring.S;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Class representing the result of an entry search in the log storage.
+ *
+ * <p>It is used to represent three search outcomes:
+ *
+ * <ol>
+ * <li>If {@link EntrySearchResult#searchOutcome()} is {@link
SearchOutcome#CONTINUE_SEARCH}, then the entry was not found and we should
+ * continue looking in other places;</li>
+ * <li>If {@link EntrySearchResult#searchOutcome()} is {@link
SearchOutcome#NOT_FOUND}, then the entry was not found and we know for
+ * sure that it does not exist in the storage;</li>
+ * <li>If {@link EntrySearchResult#searchOutcome()} is {@link
SearchOutcome#SUCCESS}, then the corresponding entry has been found
+ * successfully and {@link EntrySearchResult#entryBuffer()} method can be
used to obtain the entry value.</li>
+ * </ol>
+ */
+class EntrySearchResult {
+ enum SearchOutcome {
+ SUCCESS, NOT_FOUND, CONTINUE_SEARCH
+ }
+
+ private static final EntrySearchResult NOT_FOUND_RESULT = new
EntrySearchResult(null, NOT_FOUND);
+
+ private static final EntrySearchResult CONTINUE_SEARCH_RESULT = new
EntrySearchResult(null, CONTINUE_SEARCH);
+
+ @Nullable
+ private final ByteBuffer entryBuffer;
+
+ private final SearchOutcome searchOutcome;
+
+ private EntrySearchResult(@Nullable ByteBuffer entryBuffer, SearchOutcome
searchOutcome) {
+ this.entryBuffer = entryBuffer;
+ this.searchOutcome = searchOutcome;
+ }
+
+ ByteBuffer entryBuffer() {
+ assert entryBuffer != null : "Search result is empty";
+
+ return entryBuffer;
+ }
+
+ SearchOutcome searchOutcome() {
+ return searchOutcome;
+ }
+
+ static EntrySearchResult success(ByteBuffer entryBuffer) {
+ return new EntrySearchResult(entryBuffer, SUCCESS);
+ }
+
+ static EntrySearchResult notFound() {
+ return NOT_FOUND_RESULT;
+ }
+
+ static EntrySearchResult continueSearch() {
+ return CONTINUE_SEARCH_RESULT;
+ }
+
+ @Override
+ public String toString() {
+ return S.toString(this);
+ }
+}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/GroupIndexMeta.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/GroupIndexMeta.java
index a727216a180..912fedb951a 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/GroupIndexMeta.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/GroupIndexMeta.java
@@ -49,8 +49,27 @@ class GroupIndexMeta {
void addIndexMeta(IndexFileMeta indexFileMeta) {
IndexFileMetaArray fileMetas = this.fileMetas;
- IndexFileMetaArray newFileMetas = fileMetas.add(indexFileMeta);
+ setFileMetas(fileMetas, fileMetas.add(indexFileMeta));
+ }
+
+ long firstLogIndexInclusive() {
+ return fileMetas.firstLogIndexInclusive();
+ }
+
+ long lastLogIndexExclusive() {
+ return fileMetas.lastLogIndexExclusive();
+ }
+
+ /**
+ * Removes all metas which log indices are smaller than the given
value.
+ */
+ void truncateIndicesSmallerThan(long firstLogIndexKept) {
+ IndexFileMetaArray fileMetas = this.fileMetas;
+
+ setFileMetas(fileMetas,
fileMetas.truncateIndicesSmallerThan(firstLogIndexKept));
+ }
+ private void setFileMetas(IndexFileMetaArray fileMetas,
IndexFileMetaArray newFileMetas) {
// Simple assignment would suffice, since we only have one thread
writing to this field, but we use compareAndSet to verify
// this invariant, just in case.
boolean updated = FILE_METAS_VH.compareAndSet(this, fileMetas,
newFileMetas);
@@ -59,6 +78,19 @@ class GroupIndexMeta {
}
}
+ /**
+ * A deque of index file meta blocks.
+ *
+ * <p>When a new index file is created, its meta is appended to the last
block (represented by a {@link IndexMetaArrayHolder}) of the
+ * deque if its {@link IndexFileMeta#firstLogIndexInclusive()} matches the
most recent {@link IndexFileMeta#lastLogIndexExclusive()} in
+ * the block. I.e. consecutive index file metas are merged into a single
block, no new elements are added to the deque.
+ *
+ * <p>The only case when a new block is added to the deque is if a log
suffix truncation happened somewhere during an index file's
+ * lifecycle. In this case, the new block will have its {@link
IndexFileMeta#firstLogIndexInclusive()} smaller than the most recent
+ * block's {@link IndexFileMeta#lastLogIndexExclusive()} (two blocks
"overlap"). During search, the newer block will be checked first,
+ * so elements in the range {@code [newBlock#firstLogIndexInclusive :
oldBlock#lastLogIndexExclusive)} will be taken from the new block,
+ * effectively overriding the old block's entries in this range.
+ */
private final Deque<IndexMetaArrayHolder> fileMetaDeque = new
ConcurrentLinkedDeque<>();
GroupIndexMeta(IndexFileMeta startFileMeta) {
@@ -68,7 +100,7 @@ class GroupIndexMeta {
void addIndexMeta(IndexFileMeta indexFileMeta) {
IndexMetaArrayHolder curFileMetas = fileMetaDeque.getLast();
- long curLastLogIndex = curFileMetas.fileMetas.lastLogIndexExclusive();
+ long curLastLogIndex = curFileMetas.lastLogIndexExclusive();
long newFirstLogIndex = indexFileMeta.firstLogIndexInclusive();
@@ -117,9 +149,43 @@ class GroupIndexMeta {
return null;
}
+ /**
+ * Removes all index metas that have log indices smaller than the given
value.
+ */
+ void truncatePrefix(long firstLogIndexKept) {
+ Iterator<IndexMetaArrayHolder> it = fileMetaDeque.descendingIterator();
+
+ // Find the most recent entry, which first index is smaller than
firstLogIndexKept.
+ while (it.hasNext()) {
+ IndexMetaArrayHolder holder = it.next();
+
+ long firstLogIndex = holder.firstLogIndexInclusive();
+
+ if (firstLogIndex == firstLogIndexKept) {
+ // We are right on the edge of meta range, keep this entry and
simply drop everything older.
+ break;
+ } else if (firstLogIndex < firstLogIndexKept) {
+ // Truncate this entry (possibly in its entirety) and drop
everything older.
+ if (holder.lastLogIndexExclusive() <= firstLogIndexKept) {
+ it.remove();
+ } else {
+ holder.truncateIndicesSmallerThan(firstLogIndexKept);
+ }
+
+ break;
+ }
+ }
+
+ // Remove all remaining entries.
+ while (it.hasNext()) {
+ it.next();
+ it.remove();
+ }
+ }
+
long firstLogIndexInclusive() {
for (IndexMetaArrayHolder indexMetaArrayHolder : fileMetaDeque) {
- long firstLogIndex =
indexMetaArrayHolder.fileMetas.firstLogIndexInclusive();
+ long firstLogIndex = indexMetaArrayHolder.firstLogIndexInclusive();
// "firstLogIndexInclusive" can return -1 of the index file does
not contain any entries for this group, only the truncation
// record.
@@ -132,6 +198,6 @@ class GroupIndexMeta {
}
long lastLogIndexExclusive() {
- return fileMetaDeque.getLast().fileMetas.lastLogIndexExclusive();
+ return fileMetaDeque.getLast().lastLogIndexExclusive();
}
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManager.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManager.java
index 967dc9702a0..a2f3a74617c 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManager.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManager.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.raft.storage.segstore;
+import static java.lang.Math.toIntExact;
import static java.nio.file.StandardOpenOption.CREATE_NEW;
import static java.nio.file.StandardOpenOption.WRITE;
import static org.apache.ignite.internal.util.IgniteUtils.atomicMoveFile;
@@ -64,14 +65,16 @@ import org.jetbrains.annotations.Nullable;
* +------------------------------------------------------------------+
* </pre>
*
- * <p>Raft group meta is as follows:
- * <pre>
- *
+------------------------------------------------------------------------------------------------------------------------+-----+
- * | Raft group 1 meta
| ... |
- *
+------------------------------------------------------------------------------------------------------------------------+-----+
- * | Group ID (8 bytes) | Flags (4 bytes) | Payload Offset (4 bytes) | First
Log Index (8 bytes) | Last Log Index (8 bytes) | ... |
- *
+------------------------------------------------------------------------------------------------------------------------+-----+
- * </pre>
+ * <p>Each Raft group meta is as follows (written as a list, because the table
doesn't fit the configured line length):
+ * <ol>
+ * <li>Group ID (8 bytes);</li>
+ * <li>Flags (4 bytes);</li>
+ * <li>Payload offset (4 bytes);</li>
+ * <li>First log index (8 bytes, inclusive);</li>
+ * <li>Last log index (8 bytes, exclusive);</li>
+ * <li>First log index kept (8 bytes): used during prefix truncation,
either equal to first index kept if prefix was truncated at least
+ * once during the index file lifecycle, otherwise equal to {@code
-1}.</li>
+ * </ol>
*
* <p>Payload of the index files has the following structure:
* <pre>
@@ -101,11 +104,14 @@ class IndexFileManager {
private static final String TMP_FILE_SUFFIX = ".tmp";
+ /** Size of the segment file offset entry (used as the payload of an index
file). */
+ static final int SEGMENT_FILE_OFFSET_SIZE = Integer.BYTES;
+
// Magic number + format version + number of Raft groups.
static final int COMMON_META_SIZE = Integer.BYTES + Integer.BYTES +
Integer.BYTES;
- // Group ID + flags + file offset + start log index + end log index.
- static final int GROUP_META_SIZE = Long.BYTES + Integer.BYTES +
Integer.BYTES + Long.BYTES + Long.BYTES;
+ // Group ID + flags + file offset + start log index + end log index +
first index kept.
+ static final int GROUP_META_SIZE = Long.BYTES + Integer.BYTES +
Integer.BYTES + Long.BYTES + Long.BYTES + Long.BYTES;
static final ByteOrder BYTE_ORDER = ByteOrder.LITTLE_ENDIAN;
@@ -183,7 +189,12 @@ class IndexFileManager {
Iterator<Entry<Long, SegmentInfo>> it = indexMemTable.iterator();
while (it.hasNext()) {
- os.write(payload(it.next().getValue()));
+ SegmentInfo segmentInfo = it.next().getValue();
+
+ // Segment Info may not contain payload in case of suffix
truncation, see "IndexMemTable#truncateSuffix".
+ if (segmentInfo.size() > 0) {
+ os.write(payload(segmentInfo));
+ }
}
}
@@ -293,11 +304,15 @@ class IndexFileManager {
long lastLogIndexExclusive = segmentInfo.lastLogIndexExclusive();
+ long firstIndexKept = segmentInfo.firstIndexKept();
+
// On recovery we are only creating missing index files, in-memory
meta will be created on Index File Manager start.
if (!onRecovery) {
- var indexFileMeta = new IndexFileMeta(firstLogIndexInclusive,
lastLogIndexExclusive, payloadOffset, fileOrdinal);
+ IndexFileMeta indexFileMeta = createIndexFileMeta(
+ firstLogIndexInclusive, lastLogIndexExclusive,
firstIndexKept, payloadOffset, fileOrdinal
+ );
- putIndexFileMeta(groupId, indexFileMeta);
+ putIndexFileMeta(groupId, indexFileMeta, firstIndexKept);
}
headerBuffer
@@ -305,7 +320,8 @@ class IndexFileManager {
.putInt(0) // Flags.
.putInt(payloadOffset)
.putLong(firstLogIndexInclusive)
- .putLong(lastLogIndexExclusive);
+ .putLong(lastLogIndexExclusive)
+ .putLong(firstIndexKept);
payloadOffset += payloadSize(segmentInfo);
}
@@ -313,13 +329,51 @@ class IndexFileManager {
return headerBuffer.array();
}
- private void putIndexFileMeta(Long groupId, IndexFileMeta indexFileMeta) {
+ private static @Nullable IndexFileMeta createIndexFileMeta(
+ long firstLogIndexInclusive,
+ long lastLogIndexExclusive,
+ long firstIndexKept,
+ int payloadOffset,
+ int fileOrdinal
+ ) {
+ if (firstLogIndexInclusive == -1) {
+ assert firstIndexKept != -1 : "Expected a prefix tombstone, but
firstIndexKept is not set.";
+
+ // This is a "prefix tombstone", no need to create any meta, we
will just truncate the prefix.
+ return null;
+ }
+
+ if (firstIndexKept == -1 || firstIndexKept <= firstLogIndexInclusive) {
+ // No prefix truncation required, simply create a new meta.
+ return new IndexFileMeta(firstLogIndexInclusive,
lastLogIndexExclusive, payloadOffset, fileOrdinal);
+ }
+
+ // Create a meta with a truncated prefix.
+ int numEntriesToSkip = toIntExact(firstIndexKept -
firstLogIndexInclusive);
+
+ int adjustedPayloadOffset = payloadOffset + numEntriesToSkip *
SEGMENT_FILE_OFFSET_SIZE;
+
+ return new IndexFileMeta(firstIndexKept, lastLogIndexExclusive,
adjustedPayloadOffset, fileOrdinal);
+ }
+
+ private void putIndexFileMeta(Long groupId, @Nullable IndexFileMeta
indexFileMeta, long firstIndexKept) {
GroupIndexMeta existingGroupIndexMeta = groupIndexMetas.get(groupId);
if (existingGroupIndexMeta == null) {
- groupIndexMetas.put(groupId, new GroupIndexMeta(indexFileMeta));
+ if (indexFileMeta != null) {
+ groupIndexMetas.put(groupId, new
GroupIndexMeta(indexFileMeta));
+ }
} else {
- existingGroupIndexMeta.addIndexMeta(indexFileMeta);
+ if (firstIndexKept != -1) {
+ existingGroupIndexMeta.truncatePrefix(firstIndexKept);
+ }
+
+ if (indexFileMeta != null) {
+ // New index meta must have already been truncated according
to the prefix tombstone.
+ assert indexFileMeta.firstLogIndexInclusive() >=
firstIndexKept : indexFileMeta;
+
+ existingGroupIndexMeta.addIndexMeta(indexFileMeta);
+ }
}
}
@@ -392,10 +446,13 @@ class IndexFileManager {
int payloadOffset = groupMetaBuffer.getInt();
long firstLogIndexInclusive = groupMetaBuffer.getLong();
long lastLogIndexExclusive = groupMetaBuffer.getLong();
+ long firstIndexKept = groupMetaBuffer.getLong();
- var indexFileMeta = new IndexFileMeta(firstLogIndexInclusive,
lastLogIndexExclusive, payloadOffset, curFileOrdinal);
+ IndexFileMeta indexFileMeta = createIndexFileMeta(
+ firstLogIndexInclusive, lastLogIndexExclusive,
firstIndexKept, payloadOffset, fileOrdinal
+ );
- putIndexFileMeta(groupId, indexFileMeta);
+ putIndexFileMeta(groupId, indexFileMeta, firstIndexKept);
}
}
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileMeta.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileMeta.java
index 34bd73e12d2..f75cc00ba1d 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileMeta.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileMeta.java
@@ -34,6 +34,9 @@ class IndexFileMeta {
private final int indexFileOrdinal;
IndexFileMeta(long firstLogIndexInclusive, long lastLogIndexExclusive, int
indexFilePayloadOffset, int indexFileOrdinal) {
+ assert firstLogIndexInclusive >= 0 : "Invalid first log index: " +
firstLogIndexInclusive;
+ assert lastLogIndexExclusive >= 0 : "Invalid first log index: " +
firstLogIndexInclusive;
+
if (lastLogIndexExclusive < firstLogIndexInclusive) {
throw new IllegalArgumentException("Invalid log index range: [" +
firstLogIndexInclusive + ", " + lastLogIndexExclusive + ").");
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileMetaArray.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileMetaArray.java
index 1256faa4713..8d74c8f267d 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileMetaArray.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileMetaArray.java
@@ -17,6 +17,9 @@
package org.apache.ignite.internal.raft.storage.segstore;
+import static java.lang.Math.toIntExact;
+import static
org.apache.ignite.internal.raft.storage.segstore.IndexFileManager.SEGMENT_FILE_OFFSET_SIZE;
+
import java.util.Arrays;
import org.jetbrains.annotations.Nullable;
@@ -93,6 +96,12 @@ class IndexFileMetaArray {
*/
@Nullable
IndexFileMeta find(long logIndex) {
+ int arrayIndex = findArrayIndex(logIndex);
+
+ return arrayIndex == -1 ? null : array[arrayIndex];
+ }
+
+ private int findArrayIndex(long logIndex) {
int lowArrayIndex = 0;
int highArrayIndex = size - 1;
@@ -106,10 +115,52 @@ class IndexFileMetaArray {
} else if (logIndex >= midValue.lastLogIndexExclusive()) {
lowArrayIndex = middleArrayIndex + 1;
} else {
- return midValue;
+ return middleArrayIndex;
}
}
- return null;
+ return -1;
+ }
+
+ IndexFileMetaArray truncateIndicesSmallerThan(long firstLogIndexKept) {
+ int firstLogIndexKeptArrayIndex = findArrayIndex(firstLogIndexKept);
+
+ assert firstLogIndexKeptArrayIndex >= 0 : String.format(
+ "Missing entry for log index %d in range [%d:%d).",
+ firstLogIndexKept, firstLogIndexInclusive(),
lastLogIndexExclusive()
+ );
+
+ IndexFileMeta metaToUpdate = array[firstLogIndexKeptArrayIndex];
+
+ int numEntriesToSkip = toIntExact(firstLogIndexKept -
metaToUpdate.firstLogIndexInclusive());
+
+ assert numEntriesToSkip >= 0 : String.format(
+ "Trying to do a no-op prefix truncate from index %d in range
[%d:%d).",
+ firstLogIndexKept, firstLogIndexInclusive(),
lastLogIndexExclusive()
+ );
+
+ // Move the payload offset pointer to skip truncated entries (each
entry is 4 bytes).
+ int adjustedPayloadOffset = metaToUpdate.indexFilePayloadOffset() +
numEntriesToSkip * SEGMENT_FILE_OFFSET_SIZE;
+
+ var trimmedMeta = new IndexFileMeta(
+ firstLogIndexKept,
+ metaToUpdate.lastLogIndexExclusive(),
+ adjustedPayloadOffset,
+ metaToUpdate.indexFileOrdinal()
+ );
+
+ // Create a new array: the trimmed meta becomes the first element,
other elements with "firstLogIndexInclusive" larger
+ // than "firstLogIndexKept" are copied from the old array.
+ IndexFileMeta[] newArray = new IndexFileMeta[array.length];
+
+ newArray[0] = trimmedMeta;
+
+ int newSize = size - firstLogIndexKeptArrayIndex;
+
+ if (newSize > 1) {
+ System.arraycopy(array, firstLogIndexKeptArrayIndex + 1, newArray,
1, newSize - 1);
+ }
+
+ return new IndexFileMetaArray(newArray, newSize);
}
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexMemTable.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexMemTable.java
index 2ed981a53d5..e9f0eff6f25 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexMemTable.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexMemTable.java
@@ -46,7 +46,7 @@ class IndexMemTable implements WriteModeIndexMemTable,
ReadModeIndexMemTable {
// File offset can be less than 0 (it's treated as an unsigned
integer) but never 0, because of the file header.
assert segmentFileOffset != 0 : String.format("Segment file offset
must not be 0 [groupId=%d]", groupId);
- ConcurrentMap<Long, SegmentInfo> memTable = stripe(groupId).memTable;
+ ConcurrentMap<Long, SegmentInfo> memTable = memtable(groupId);
SegmentInfo segmentInfo = memTable.get(groupId);
@@ -55,6 +55,12 @@ class IndexMemTable implements WriteModeIndexMemTable,
ReadModeIndexMemTable {
segmentInfo.addOffset(logIndex, segmentFileOffset);
+ memTable.put(groupId, segmentInfo);
+ } else if (segmentInfo.isPrefixTombstone()) {
+ segmentInfo = new SegmentInfo(logIndex,
segmentInfo.firstIndexKept());
+
+ segmentInfo.addOffset(logIndex, segmentFileOffset);
+
memTable.put(groupId, segmentInfo);
} else {
segmentInfo.addOffset(logIndex, segmentFileOffset);
@@ -63,23 +69,40 @@ class IndexMemTable implements WriteModeIndexMemTable,
ReadModeIndexMemTable {
@Override
public SegmentInfo segmentInfo(long groupId) {
- return stripe(groupId).memTable.get(groupId);
+ return memtable(groupId).get(groupId);
}
@Override
public void truncateSuffix(long groupId, long lastLogIndexKept) {
- ConcurrentMap<Long, SegmentInfo> memtable = stripe(groupId).memTable;
-
- SegmentInfo segmentInfo = memtable.get(groupId);
+ ConcurrentMap<Long, SegmentInfo> memtable = memtable(groupId);
+
+ memtable.compute(groupId, (id, segmentInfo) -> {
+ if (segmentInfo == null || lastLogIndexKept <
segmentInfo.firstLogIndexInclusive()) {
+ // If the current memtable does not have information for the
given group or if we are truncating everything currently
+ // present in the memtable, we need to write a special "empty"
SegmentInfo into the memtable to override existing persisted
+ // data during search.
+ return new SegmentInfo(lastLogIndexKept + 1);
+ } else if (segmentInfo.isPrefixTombstone()) {
+ // This is a prefix tombstone inserted by "truncatePrefix".
+ return new SegmentInfo(lastLogIndexKept + 1,
segmentInfo.firstIndexKept());
+ } else {
+ return segmentInfo.truncateSuffix(lastLogIndexKept);
+ }
+ });
+ }
- if (segmentInfo == null || lastLogIndexKept <
segmentInfo.firstLogIndexInclusive()) {
- // If the current memtable does not have information for the given
group or if we are truncating everything currently present
- // in the memtable, we need to write a special "empty" SegmentInfo
into the memtable to override existing persisted data during
- // search.
- memtable.put(groupId, new SegmentInfo(lastLogIndexKept + 1));
- } else {
- segmentInfo.truncateSuffix(lastLogIndexKept);
- }
+ @Override
+ public void truncatePrefix(long groupId, long firstIndexKept) {
+ ConcurrentMap<Long, SegmentInfo> memtable = memtable(groupId);
+
+ memtable.compute(groupId, (id, segmentInfo) -> {
+ if (segmentInfo == null) {
+ // The memtable does not have any information for the given
group, we need to write a special "prefix tombstone".
+ return SegmentInfo.prefixTombstone(firstIndexKept);
+ } else {
+ return segmentInfo.truncatePrefix(firstIndexKept);
+ }
+ });
}
@Override
@@ -121,6 +144,10 @@ class IndexMemTable implements WriteModeIndexMemTable,
ReadModeIndexMemTable {
return stripes[stripeIndex];
}
+ private ConcurrentMap<Long, SegmentInfo> memtable(long groupId) {
+ return stripe(groupId).memTable;
+ }
+
private class SegmentInfoIterator implements Iterator<Entry<Long,
SegmentInfo>> {
private int stripeIndex = 0;
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogCheckpointer.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogCheckpointer.java
index e5bb28e077e..e3400628b4a 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogCheckpointer.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogCheckpointer.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.raft.storage.segstore;
import static org.apache.ignite.internal.failure.FailureType.CRITICAL_ERROR;
import static
org.apache.ignite.internal.raft.storage.segstore.SegmentFileManager.SWITCH_SEGMENT_RECORD;
+import static
org.apache.ignite.internal.raft.storage.segstore.SegmentInfo.MISSING_SEGMENT_FILE_OFFSET;
import static org.apache.ignite.internal.util.IgniteUtils.closeAllManually;
import static org.apache.ignite.lang.ErrorGroups.Marshalling.COMMON_ERR;
@@ -31,7 +32,6 @@ import org.apache.ignite.internal.failure.FailureProcessor;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.raft.storage.segstore.CheckpointQueue.Entry;
import org.apache.ignite.internal.thread.IgniteThread;
-import org.jetbrains.annotations.Nullable;
/**
* Class responsible for running periodic checkpoint tasks.
@@ -104,11 +104,8 @@ class RaftLogCheckpointer {
/**
* Searches for the segment payload corresponding to the given Raft Group
ID and Raft Log Index in the checkpoint queue.
- *
- * @return {@code ByteBuffer} which position is set to the start of the
corresponding segment payload or {@code null} if the payload has
- * not been found in all files currently present in the queue.
*/
- @Nullable ByteBuffer findSegmentPayloadInQueue(long groupId, long
logIndex) {
+ EntrySearchResult findSegmentPayloadInQueue(long groupId, long logIndex) {
Iterator<Entry> it = queue.tailIterator();
while (it.hasNext()) {
@@ -116,14 +113,29 @@ class RaftLogCheckpointer {
SegmentInfo segmentInfo = e.memTable().segmentInfo(groupId);
- int segmentPayloadOffset = segmentInfo == null ? 0 :
segmentInfo.getOffset(logIndex);
+ if (segmentInfo == null) {
+ continue;
+ }
+
+ if (logIndex >= segmentInfo.lastLogIndexExclusive()) {
+ return EntrySearchResult.notFound();
+ }
+
+ if (logIndex < segmentInfo.firstIndexKept()) {
+ // This is a prefix tombstone and it cuts off the log index we
search for.
+ return EntrySearchResult.notFound();
+ }
+
+ int segmentPayloadOffset = segmentInfo.getOffset(logIndex);
+
+ if (segmentPayloadOffset != MISSING_SEGMENT_FILE_OFFSET) {
+ ByteBuffer entryBuffer =
e.segmentFile().buffer().position(segmentPayloadOffset);
- if (segmentPayloadOffset != 0) {
- return e.segmentFile().buffer().position(segmentPayloadOffset);
+ return EntrySearchResult.success(entryBuffer);
}
}
- return null;
+ return EntrySearchResult.continueSearch();
}
private class CheckpointTask implements Runnable {
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManager.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManager.java
index 62222005d00..dedbbb7b37b 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManager.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManager.java
@@ -17,7 +17,10 @@
package org.apache.ignite.internal.raft.storage.segstore;
-import static
org.apache.ignite.internal.raft.storage.segstore.SegmentPayload.HASH_SIZE_BYTES;
+import static
org.apache.ignite.internal.raft.storage.segstore.SegmentInfo.MISSING_SEGMENT_FILE_OFFSET;
+import static
org.apache.ignite.internal.raft.storage.segstore.SegmentPayload.CRC_SIZE_BYTES;
+import static
org.apache.ignite.internal.raft.storage.segstore.SegmentPayload.TRUNCATE_PREFIX_RECORD_MARKER;
+import static
org.apache.ignite.internal.raft.storage.segstore.SegmentPayload.TRUNCATE_PREFIX_RECORD_SIZE;
import static
org.apache.ignite.internal.raft.storage.segstore.SegmentPayload.TRUNCATE_SUFFIX_RECORD_MARKER;
import static
org.apache.ignite.internal.raft.storage.segstore.SegmentPayload.TRUNCATE_SUFFIX_RECORD_SIZE;
import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
@@ -38,6 +41,7 @@ import org.apache.ignite.internal.failure.FailureProcessor;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
+import
org.apache.ignite.internal.raft.storage.segstore.EntrySearchResult.SearchOutcome;
import
org.apache.ignite.internal.raft.storage.segstore.SegmentFile.WriteBuffer;
import org.apache.ignite.internal.raft.util.VarlenEncoder;
import org.apache.ignite.internal.util.FastCrc;
@@ -278,43 +282,70 @@ class SegmentFileManager implements ManuallyCloseable {
}
private @Nullable ByteBuffer getEntry(long groupId, long logIndex) throws
IOException {
- // First, read from the current segment file.
+ EntrySearchResult searchResult = getEntryFromCurrentMemtable(groupId,
logIndex);
+
+ if (searchResult.searchOutcome() == SearchOutcome.CONTINUE_SEARCH) {
+ searchResult = checkpointer.findSegmentPayloadInQueue(groupId,
logIndex);
+
+ if (searchResult.searchOutcome() == SearchOutcome.CONTINUE_SEARCH)
{
+ searchResult = readFromOtherSegmentFiles(groupId, logIndex);
+ }
+ }
+
+ switch (searchResult.searchOutcome()) {
+ case SUCCESS: return searchResult.entryBuffer();
+ case NOT_FOUND: return null;
+ default: throw new IllegalStateException("Unexpected search
outcome: " + searchResult.searchOutcome());
+ }
+ }
+
+ private EntrySearchResult getEntryFromCurrentMemtable(long groupId, long
logIndex) {
SegmentFileWithMemtable currentSegmentFile =
this.currentSegmentFile.get();
SegmentInfo segmentInfo =
currentSegmentFile.memtable().segmentInfo(groupId);
- if (segmentInfo != null) {
- if (logIndex >= segmentInfo.lastLogIndexExclusive()) {
- return null;
- }
+ if (segmentInfo == null) {
+ return EntrySearchResult.continueSearch();
+ }
- int segmentPayloadOffset = segmentInfo.getOffset(logIndex);
+ if (logIndex >= segmentInfo.lastLogIndexExclusive()) {
+ return EntrySearchResult.notFound();
+ }
- if (segmentPayloadOffset != 0) {
- return
currentSegmentFile.segmentFile().buffer().position(segmentPayloadOffset);
- }
+ if (logIndex < segmentInfo.firstIndexKept()) {
+ // This is a prefix tombstone and it cuts off the log index we
search for.
+ return EntrySearchResult.notFound();
}
- ByteBuffer bufferFromCheckpointQueue =
checkpointer.findSegmentPayloadInQueue(groupId, logIndex);
+ int segmentPayloadOffset = segmentInfo.getOffset(logIndex);
- if (bufferFromCheckpointQueue != null) {
- return bufferFromCheckpointQueue;
+ if (segmentPayloadOffset == MISSING_SEGMENT_FILE_OFFSET) {
+ return EntrySearchResult.continueSearch();
}
- return readFromOtherSegmentFiles(groupId, logIndex);
+ ByteBuffer entryBuffer =
currentSegmentFile.segmentFile().buffer().position(segmentPayloadOffset);
+
+ return EntrySearchResult.success(entryBuffer);
}
void truncateSuffix(long groupId, long lastLogIndexKept) throws
IOException {
try (WriteBufferWithMemtable writeBufferWithMemtable =
reserveBytesWithRollover(TRUNCATE_SUFFIX_RECORD_SIZE)) {
- ByteBuffer segmentBuffer = writeBufferWithMemtable.buffer();
-
- SegmentPayload.writeTruncateSuffixRecordTo(segmentBuffer, groupId,
lastLogIndexKept);
+
SegmentPayload.writeTruncateSuffixRecordTo(writeBufferWithMemtable.buffer(),
groupId, lastLogIndexKept);
// Modify the memtable before write buffer is released to avoid
races with checkpoint on rollover.
writeBufferWithMemtable.memtable().truncateSuffix(groupId,
lastLogIndexKept);
}
}
+ void truncatePrefix(long groupId, long firstLogIndexKept) throws
IOException {
+ try (WriteBufferWithMemtable writeBufferWithMemtable =
reserveBytesWithRollover(TRUNCATE_PREFIX_RECORD_SIZE)) {
+
SegmentPayload.writeTruncatePrefixRecordTo(writeBufferWithMemtable.buffer(),
groupId, firstLogIndexKept);
+
+ // Modify the memtable before write buffer is released to avoid
races with checkpoint on rollover.
+ writeBufferWithMemtable.memtable().truncatePrefix(groupId,
firstLogIndexKept);
+ }
+ }
+
private WriteBufferWithMemtable reserveBytesWithRollover(int size) throws
IOException {
while (true) {
SegmentFileWithMemtable segmentFileWithMemtable =
currentSegmentFile();
@@ -337,17 +368,22 @@ class SegmentFileManager implements ManuallyCloseable {
* storage, not taking pending in-memory state into account.
*/
long firstLogIndexInclusiveOnRecovery(long groupId) {
+ SegmentFileWithMemtable currentSegmentFile =
this.currentSegmentFile.get();
+
+ SegmentInfo segmentInfo =
currentSegmentFile.memtable().segmentInfo(groupId);
+
+ // We need to consult with the latest memtable in case it contains a
prefix tombstone.
+ if (segmentInfo != null && segmentInfo.firstIndexKept() != -1) {
+ return segmentInfo.firstIndexKept();
+ }
+
long firstLogIndexFromIndexStorage =
indexFileManager.firstLogIndexInclusive(groupId);
if (firstLogIndexFromIndexStorage != -1) {
return firstLogIndexFromIndexStorage;
}
- SegmentFileWithMemtable currentSegmentFile =
this.currentSegmentFile.get();
-
- SegmentInfo segmentInfo =
currentSegmentFile.memtable().segmentInfo(groupId);
-
- return segmentInfo != null ? segmentInfo.firstLogIndexInclusive() : -1;
+ return segmentInfo == null ? -1 : segmentInfo.firstLogIndexInclusive();
}
/**
@@ -462,11 +498,11 @@ class SegmentFileManager implements ManuallyCloseable {
return fileSize - HEADER_RECORD.length;
}
- private @Nullable ByteBuffer readFromOtherSegmentFiles(long groupId, long
logIndex) throws IOException {
+ private EntrySearchResult readFromOtherSegmentFiles(long groupId, long
logIndex) throws IOException {
SegmentFilePointer segmentFilePointer =
indexFileManager.getSegmentFilePointer(groupId, logIndex);
if (segmentFilePointer == null) {
- return null;
+ return EntrySearchResult.notFound();
}
Path path =
segmentFilesDir.resolve(segmentFileName(segmentFilePointer.fileOrdinal(), 0));
@@ -474,41 +510,9 @@ class SegmentFileManager implements ManuallyCloseable {
// TODO: Add a cache for recently accessed segment files, see
https://issues.apache.org/jira/browse/IGNITE-26622.
SegmentFile segmentFile = SegmentFile.openExisting(path);
- return
segmentFile.buffer().position(segmentFilePointer.payloadOffset());
- }
-
- private WriteModeIndexMemTable recoverMemtable(SegmentFile segmentFile,
Path segmentFilePath) {
- ByteBuffer buffer = segmentFile.buffer();
-
- validateSegmentFileHeader(buffer, segmentFilePath);
+ ByteBuffer buffer =
segmentFile.buffer().position(segmentFilePointer.payloadOffset());
- var memtable = new IndexMemTable(stripes);
-
- while (!endOfSegmentReached(buffer)) {
- int segmentFilePayloadOffset = buffer.position();
-
- long groupId = buffer.getLong();
-
- int payloadLength = buffer.getInt();
-
- if (payloadLength == TRUNCATE_SUFFIX_RECORD_MARKER) {
- long lastLogIndexKept = buffer.getLong();
-
- memtable.truncateSuffix(groupId, lastLogIndexKept);
-
- buffer.position(buffer.position() + HASH_SIZE_BYTES);
- } else {
- int endOfRecordPosition = buffer.position() + payloadLength +
HASH_SIZE_BYTES;
-
- long index = VarlenEncoder.readLong(buffer);
-
- memtable.appendSegmentFileOffset(groupId, index,
segmentFilePayloadOffset);
-
- buffer.position(endOfRecordPosition);
- }
- }
-
- return memtable;
+ return EntrySearchResult.success(buffer);
}
private static boolean endOfSegmentReached(ByteBuffer buffer) {
@@ -529,17 +533,34 @@ class SegmentFileManager implements ManuallyCloseable {
/**
* Creates an index memtable from the given segment file. Unlike {@link
#recoverMemtable} which is expected to only be called on
- * "complete" segment files (i.e. those that has experienced a rollover),
this method is expected to be called on the most recent,
+ * "complete" segment files (i.e. those that have experienced a rollover)
this method is expected to be called on the most recent,
* possibly incomplete segment file.
*/
private WriteModeIndexMemTable recoverLatestMemtable(SegmentFile
segmentFile, Path segmentFilePath) {
+ return recoverMemtable(segmentFile, segmentFilePath, true);
+ }
+
+ /**
+ * Creates an index memtable from the given segment file. This method is
expected to be called only on "complete" segment files
+ * (i.e. those that have experienced a rollover).
+ *
+ * <p>This method skips CRC validation, because it is used to identify the
end of incomplete segment files (and, by definition, this can
+ * never happen during this method's invocation), not to validate storage
integrity.
+ */
+ private WriteModeIndexMemTable recoverMemtable(SegmentFile segmentFile,
Path segmentFilePath) {
+ // We skip CRC validation during recovery of already "rollovered"
segment files, because CRC validation is only used to find an end
+ // of an incomplete segment file, not to check for storage integrity.
+ return recoverMemtable(segmentFile, segmentFilePath, false);
+ }
+
+ private WriteModeIndexMemTable recoverMemtable(SegmentFile segmentFile,
Path segmentFilePath, boolean validateCrc) {
ByteBuffer buffer = segmentFile.buffer();
validateSegmentFileHeader(buffer, segmentFilePath);
var memtable = new IndexMemTable(stripes);
- while (buffer.remaining() > SWITCH_SEGMENT_RECORD.length) {
+ while (!endOfSegmentReached(buffer)) {
int segmentFilePayloadOffset = buffer.position();
long groupId = buffer.getLong();
@@ -556,11 +577,24 @@ class SegmentFileManager implements ManuallyCloseable {
buffer.position(segmentFilePayloadOffset);
// CRC violation signals the end of meaningful data in the
segment file.
- if (!isCrcValid(buffer, crcPosition)) {
+ if (validateCrc && !isCrcValid(buffer, crcPosition)) {
break;
}
memtable.truncateSuffix(groupId, lastLogIndexKept);
+ } else if (payloadLength == TRUNCATE_PREFIX_RECORD_MARKER) {
+ long firstLogIndexKept = buffer.getLong();
+
+ crcPosition = buffer.position();
+
+ buffer.position(segmentFilePayloadOffset);
+
+ // CRC violation signals the end of meaningful data in the
segment file.
+ if (validateCrc && !isCrcValid(buffer, crcPosition)) {
+ break;
+ }
+
+ memtable.truncatePrefix(groupId, firstLogIndexKept);
} else {
crcPosition = buffer.position() + payloadLength;
@@ -569,14 +603,14 @@ class SegmentFileManager implements ManuallyCloseable {
buffer.position(segmentFilePayloadOffset);
// CRC violation signals the end of meaningful data in the
segment file.
- if (!isCrcValid(buffer, crcPosition)) {
+ if (validateCrc && !isCrcValid(buffer, crcPosition)) {
break;
}
memtable.appendSegmentFileOffset(groupId, index,
segmentFilePayloadOffset);
}
- buffer.position(crcPosition + HASH_SIZE_BYTES);
+ buffer.position(crcPosition + CRC_SIZE_BYTES);
}
return memtable;
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentInfo.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentInfo.java
index ffffb17f72e..34ea4c3e567 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentInfo.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentInfo.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.raft.storage.segstore;
+import static java.lang.Math.toIntExact;
+
import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
import java.nio.ByteBuffer;
@@ -24,8 +26,29 @@ import java.util.Arrays;
/**
* Information about a segment file for single Raft Group stored in a {@link
IndexMemTable}.
+ *
+ * <p>It consists of a base log index and an array of segment file offsets
which stores in log entry offsets which indices lie in the
+ * {@code [logIndexBase, logIndexBase + segmentFileOffsets.size)} range.
+ *
+ * <p>Objects of this class can represent special conditions in presence of
log truncations:
+ *
+ * <ul>
+ * <li>When a suffix truncation happens, a "suffix tombstone" is inserted
with {@code logIndexBase} equal to the next log index
+ * after the cutoff value and an empty offsets array. This array can then
be populated with new entries, as usual. This means that
+ * {@link #firstLogIndexInclusive()} can be equal to {@link
#lastLogIndexExclusive()} if this is a "pure" suffix tombstone
+ * (i.e. tombstone without any values added after it was created);</li>
+ * <li>When a prefix truncation happens, a "prefix tombstone" is inserted.
Depending on the state of the object, it can either be a
+ * "pure" prefix tombstone ({@code logIndexBase = -1}, {@code
firstIndexKept = <cutoff value>}, empty offsets array), this means that
+ * a memtable didn't contain any data for a particular Raft group at the
moment of truncation, or a "regular" prefix tombstone
+ * ({@code logIndexBase > 0}, {@code firstIndexKept=<cutoff value>},
non-empty offsets array), which means that some entries were
+ * inserted (or already existed) after the cutoff index. This means that
if {@link #firstIndexKept()} is not equal to {@code -1}
+ * (i.e. this is a prefix tombstone), an additional check is required
({@link #firstLogIndexInclusive()} == -1) to identify if this is a
+ * "pure" prefix tombstone.</li>
+ * </ul>
*/
class SegmentInfo {
+ static int MISSING_SEGMENT_FILE_OFFSET = 0;
+
private static class ArrayWithSize {
private static final int INITIAL_CAPACITY = 10;
@@ -56,13 +79,25 @@ class SegmentInfo {
return new ArrayWithSize(array, size + 1);
}
- ArrayWithSize truncate(int newSize) {
+ ArrayWithSize truncateSuffix(int newSize) {
+ return truncate(0, newSize);
+ }
+
+ ArrayWithSize truncatePrefix(int newSize) {
+ int srcPos = size - newSize;
+
+ return truncate(srcPos, newSize);
+ }
+
+ private ArrayWithSize truncate(int srcPos, int newSize) {
assert newSize <= size
: String.format("Array must shrink on truncation, current
size: %d, size after truncation: %d", size, newSize);
- int[] newArray = new int[size];
+ // We use the original array's size, not "newSize" here, because
new entries are expected to be added at the end anyway,
+ // so we can use the larger size to avoid unnecessary array copies.
+ int[] newArray = new int[array.length];
- System.arraycopy(array, 0, newArray, 0, newSize);
+ System.arraycopy(array, srcPos, newArray, 0, newSize);
return new ArrayWithSize(newArray, newSize);
}
@@ -92,20 +127,42 @@ class SegmentInfo {
*/
private final long logIndexBase;
+ /**
+ * Special log index value used to indicate that a prefix truncation
command has been executed. Is equal to the cutoff log index value
+ * or {@code -1}, if no prefix truncations happened.
+ */
+ private final long firstIndexKept;
+
/**
* Offsets in a segment file.
*/
@SuppressWarnings("FieldMayBeFinal") // Updated through a VarHandle.
- private volatile ArrayWithSize segmentFileOffsets = new ArrayWithSize();
+ private volatile ArrayWithSize segmentFileOffsets;
SegmentInfo(long logIndexBase) {
+ this(logIndexBase, -1, new ArrayWithSize());
+ }
+
+ SegmentInfo(long logIndexBase, long firstIndexKept) {
+ this(logIndexBase, firstIndexKept, new ArrayWithSize());
+ }
+
+ static SegmentInfo prefixTombstone(long firstIndexKept) {
+ return new SegmentInfo(-1, firstIndexKept);
+ }
+
+ private SegmentInfo(long logIndexBase, long firstIndexKept, ArrayWithSize
segmentFileOffsets) {
this.logIndexBase = logIndexBase;
+ this.firstIndexKept = firstIndexKept;
+ this.segmentFileOffsets = segmentFileOffsets;
}
/**
* Puts the given segment file offset under the given log index.
*/
void addOffset(long logIndex, int segmentFileOffset) {
+ assert segmentFileOffset != MISSING_SEGMENT_FILE_OFFSET : "Segment
file offset cannot be 0";
+
ArrayWithSize segmentFileOffsets = this.segmentFileOffsets;
// Check that log indexes are monotonically increasing.
@@ -113,29 +170,23 @@ class SegmentInfo {
String.format("Log indexes are not monotonically increasing
[logIndex=%d, expectedLogIndex=%d].",
logIndex, logIndexBase + segmentFileOffsets.size());
- ArrayWithSize newSegmentFileOffsets =
segmentFileOffsets.add(segmentFileOffset);
-
- // Simple assignment would suffice, since we only have one thread
writing to this field, but we use compareAndSet to verify
- // this invariant, just in case.
- boolean updated = SEGMENT_FILE_OFFSETS_VH.compareAndSet(this,
segmentFileOffsets, newSegmentFileOffsets);
-
- assert updated : "Concurrent writes detected";
+ setSegmentFileOffsets(segmentFileOffsets,
segmentFileOffsets.add(segmentFileOffset));
}
/**
- * Returns the segment file offset for the given log index or {@code 0} if
the log index was not found.
+ * Returns the segment file offset for the given log index or {@link
#MISSING_SEGMENT_FILE_OFFSET} if the log index was not found.
*/
int getOffset(long logIndex) {
long offsetIndex = logIndex - logIndexBase;
if (offsetIndex < 0) {
- return 0;
+ return MISSING_SEGMENT_FILE_OFFSET;
}
ArrayWithSize segmentFileOffsets = this.segmentFileOffsets;
if (offsetIndex >= segmentFileOffsets.size()) {
- return 0;
+ return MISSING_SEGMENT_FILE_OFFSET;
}
return segmentFileOffsets.get((int) offsetIndex);
@@ -155,6 +206,17 @@ class SegmentInfo {
return logIndexBase + segmentFileOffsets.size();
}
+ /**
+ * Returns the log index used during prefix truncation or {@code -1} if no
prefix truncation was issued.
+ */
+ long firstIndexKept() {
+ return firstIndexKept;
+ }
+
+ boolean isPrefixTombstone() {
+ return logIndexBase == -1;
+ }
+
/**
* Returns the number of offsets stored in this memtable.
*/
@@ -168,29 +230,76 @@ class SegmentInfo {
void saveOffsetsTo(ByteBuffer buffer) {
ArrayWithSize offsets = segmentFileOffsets;
+ assert offsets.size() > 0 : "Offsets array must not be empty";
+
buffer.asIntBuffer().put(offsets.array, 0, offsets.size);
}
/**
* Removes all data which log indices are strictly greater than {@code
lastLogIndexKept}.
*/
- void truncateSuffix(long lastLogIndexKept) {
+ SegmentInfo truncateSuffix(long lastLogIndexKept) {
assert lastLogIndexKept >= logIndexBase :
String.format("logIndexBase=%d, lastLogIndexKept=%d", logIndexBase,
lastLogIndexKept);
ArrayWithSize segmentFileOffsets = this.segmentFileOffsets;
- long newSize = lastLogIndexKept - logIndexBase + 1;
+ long lastLogIndexExclusive = logIndexBase + segmentFileOffsets.size();
- // Not using an assertion here, because this value comes doesn't come
from the storage code.
- if (newSize > segmentFileOffsets.size()) {
+ // Not using an assertion here, because this value doesn't come from
the storage code.
+ if (lastLogIndexKept >= lastLogIndexExclusive) {
throw new IllegalArgumentException(String.format(
"lastLogIndexKept is too large. Last index in memtable:
%d, lastLogIndexKept: %d",
- logIndexBase + segmentFileOffsets.size() - 1,
lastLogIndexKept
+ lastLogIndexExclusive - 1, lastLogIndexKept
));
}
- ArrayWithSize newSegmentFileOffsets =
segmentFileOffsets.truncate((int) newSize);
+ int newSize = toIntExact(lastLogIndexKept - logIndexBase + 1);
+
+ setSegmentFileOffsets(segmentFileOffsets,
segmentFileOffsets.truncateSuffix(newSize));
+
+ // This could have been a "void" method, but this way it looks
consistent with "truncatePrefix". Since the tail of the
+ // {@code segmentFileOffsets} array is mutable, we can avoid creating
a new object.
+ return this;
+ }
+
+ /**
+ * Removes all data which log indices are strictly smaller than {@code
firstIndexKept}.
+ */
+ SegmentInfo truncatePrefix(long firstIndexKept) {
+ if (isPrefixTombstone()) {
+ if (firstIndexKept <= this.firstIndexKept) {
+ throw new IllegalStateException(String.format(
+ "Trying to truncate an already truncated prefix
[curFirstIndexKept=%d, newFirstIndexKept=%d]",
+ this.firstIndexKept, firstIndexKept
+ ));
+ }
+
+ return prefixTombstone(firstIndexKept);
+ }
+
+ ArrayWithSize segmentFileOffsets = this.segmentFileOffsets;
+
+ if (firstIndexKept < logIndexBase) {
+ // Add the prefix tombstone property to the current SegmentInfo.
+ return new SegmentInfo(logIndexBase, firstIndexKept,
segmentFileOffsets);
+ }
+
+ long lastLogIndexExclusive = logIndexBase + segmentFileOffsets.size();
+
+ // Not using an assertion here, because this value doesn't come from
the storage code.
+ if (firstIndexKept >= lastLogIndexExclusive) {
+ throw new IllegalArgumentException(String.format(
+ "firstIndexKept is too large. Last index in memtable: %d,
firstIndexKept: %d",
+ lastLogIndexExclusive - 1, firstIndexKept
+ ));
+ }
+
+ int newSize = toIntExact(lastLogIndexExclusive - firstIndexKept);
+
+ return new SegmentInfo(firstIndexKept, firstIndexKept,
segmentFileOffsets.truncatePrefix(newSize));
+ }
+ private void setSegmentFileOffsets(ArrayWithSize segmentFileOffsets,
ArrayWithSize newSegmentFileOffsets) {
// Simple assignment would suffice, since we only have one thread
writing to this field, but we use compareAndSet to verify
// this invariant, just in case.
boolean updated = SEGMENT_FILE_OFFSETS_VH.compareAndSet(this,
segmentFileOffsets, newSegmentFileOffsets);
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentPayload.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentPayload.java
index 94fd4de599c..6428cfda804 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentPayload.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentPayload.java
@@ -35,17 +35,26 @@ class SegmentPayload {
static final int LENGTH_SIZE_BYTES = Integer.BYTES;
- static final int HASH_SIZE_BYTES = Integer.BYTES;
+ static final int CRC_SIZE_BYTES = Integer.BYTES;
/**
* Length of the byte sequence that is written when suffix truncation
happens.
*
- * <p>Format: {@code groupId, 0 (special length value), last kept index,
crc}
+ * <p>Format: {@code groupId, TRUNCATE_SUFFIX_RECORD_MARKER (special
length value), last kept index, crc}
*/
- static final int TRUNCATE_SUFFIX_RECORD_SIZE = GROUP_ID_SIZE_BYTES +
LENGTH_SIZE_BYTES + Long.BYTES + HASH_SIZE_BYTES;
+ static final int TRUNCATE_SUFFIX_RECORD_SIZE = GROUP_ID_SIZE_BYTES +
LENGTH_SIZE_BYTES + Long.BYTES + CRC_SIZE_BYTES;
+
+ /**
+ * Length of the byte sequence that is written when prefix truncation
happens.
+ *
+ * <p>Format: {@code groupId, TRUNCATE_PREFIX_RECORD_MARKER (special
length value), first kept index, crc}
+ */
+ static final int TRUNCATE_PREFIX_RECORD_SIZE = TRUNCATE_SUFFIX_RECORD_SIZE;
static final int TRUNCATE_SUFFIX_RECORD_MARKER = 0;
+ static final int TRUNCATE_PREFIX_RECORD_MARKER = -1;
+
static void writeTo(
ByteBuffer buffer,
long groupId,
@@ -66,28 +75,33 @@ class SegmentPayload {
logEntryEncoder.encode(buffer, logEntry);
- int dataSize = buffer.position() - originalPos;
+ int recordSize = buffer.position() - originalPos;
- // Rewind the position for CRC calculation.
- buffer.position(originalPos);
-
- int crc = FastCrc.calcCrc(buffer, dataSize);
-
- // After CRC calculation the position will be at the provided end of
the buffer.
- buffer.putInt(crc);
+ writeCrc(buffer, recordSize);
}
static void writeTruncateSuffixRecordTo(ByteBuffer buffer, long groupId,
long lastLogIndexKept) {
- int originalPos = buffer.position();
-
buffer
.putLong(groupId)
.putInt(TRUNCATE_SUFFIX_RECORD_MARKER)
.putLong(lastLogIndexKept);
- buffer.position(originalPos);
+ writeCrc(buffer, TRUNCATE_SUFFIX_RECORD_SIZE - CRC_SIZE_BYTES);
+ }
+
+ static void writeTruncatePrefixRecordTo(ByteBuffer buffer, long groupId,
long firstIndexKept) {
+ buffer
+ .putLong(groupId)
+ .putInt(TRUNCATE_PREFIX_RECORD_MARKER)
+ .putLong(firstIndexKept);
+
+ writeCrc(buffer, TRUNCATE_PREFIX_RECORD_SIZE - CRC_SIZE_BYTES);
+ }
+
+ private static void writeCrc(ByteBuffer buffer, int recordSizeWithoutCrc) {
+ buffer.position(buffer.position() - recordSizeWithoutCrc);
- int crc = FastCrc.calcCrc(buffer, TRUNCATE_SUFFIX_RECORD_SIZE -
HASH_SIZE_BYTES);
+ int crc = FastCrc.calcCrc(buffer, recordSizeWithoutCrc);
buffer.putInt(crc);
}
@@ -126,7 +140,7 @@ class SegmentPayload {
buffer.get(entryBytes);
// Move the position as if we have read the whole payload.
- buffer.position(buffer.position() + HASH_SIZE_BYTES);
+ buffer.position(buffer.position() + CRC_SIZE_BYTES);
return logEntryDecoder.decode(entryBytes);
}
@@ -140,6 +154,6 @@ class SegmentPayload {
}
static int fixedOverheadSize() {
- return GROUP_ID_SIZE_BYTES + LENGTH_SIZE_BYTES + HASH_SIZE_BYTES;
+ return GROUP_ID_SIZE_BYTES + LENGTH_SIZE_BYTES + CRC_SIZE_BYTES;
}
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegstoreLogStorage.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegstoreLogStorage.java
index 99191668774..186d3031e8c 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegstoreLogStorage.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegstoreLogStorage.java
@@ -142,7 +142,15 @@ class SegstoreLogStorage implements LogStorage {
@Override
public boolean truncatePrefix(long firstIndexKept) {
- throw new UnsupportedOperationException();
+ try {
+ segmentFileManager.truncatePrefix(groupId, firstIndexKept);
+ } catch (IOException e) {
+ throw new IgniteInternalException(INTERNAL_ERR, e);
+ }
+
+ firstLogIndexInclusive = firstIndexKept;
+
+ return true;
}
@Override
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/WriteModeIndexMemTable.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/WriteModeIndexMemTable.java
index df8bd40e1d4..96a8babfad5 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/WriteModeIndexMemTable.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/WriteModeIndexMemTable.java
@@ -48,6 +48,11 @@ interface WriteModeIndexMemTable {
*/
void truncateSuffix(long groupId, long lastLogIndexKept);
+ /**
+ * Removes all offsets for the given Raft group which log indices are
strictly smaller than {@code firstIndexKept}.
+ */
+ void truncatePrefix(long groupId, long firstIndexKept);
+
/**
* Returns the read-only version of this memtable.
*/
diff --git
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/CheckpointQueueTest.java
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/CheckpointQueueTest.java
index 53a5dc11396..66519753345 100644
---
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/CheckpointQueueTest.java
+++
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/CheckpointQueueTest.java
@@ -179,7 +179,7 @@ class CheckpointQueueTest extends BaseIgniteAbstractTest {
int numEntries = 10_000;
RunnableX producerTask = () -> {
- for (int i = 0; i < numEntries; i++) {
+ for (int i = 1; i <= numEntries; i++) {
ReadModeIndexMemTable mockTable =
mock(ReadModeIndexMemTable.class);
var segmentInfo = new SegmentInfo(0);
@@ -193,7 +193,7 @@ class CheckpointQueueTest extends BaseIgniteAbstractTest {
};
RunnableX consumerTask = () -> {
- for (int i = 0; i < numEntries; i++) {
+ for (int i = 1; i <= numEntries; i++) {
Entry entry = queue.peekHead();
assertThat(entry.memTable().segmentInfo(0).getOffset(0),
is(i));
@@ -212,7 +212,7 @@ class CheckpointQueueTest extends BaseIgniteAbstractTest {
int numEntries = 10_000;
RunnableX producerTask = () -> {
- for (int i = 0; i < numEntries; i++) {
+ for (int i = 1; i <= numEntries; i++) {
ReadModeIndexMemTable mockTable =
mock(ReadModeIndexMemTable.class);
var segmentInfo = new SegmentInfo(0);
@@ -226,7 +226,7 @@ class CheckpointQueueTest extends BaseIgniteAbstractTest {
};
RunnableX consumerTask = () -> {
- for (int i = 0; i < numEntries; i++) {
+ for (int i = 1; i <= numEntries; i++) {
Entry entry = queue.peekHead();
assertThat(entry.memTable().segmentInfo(0).getOffset(0),
is(i));
diff --git
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/DeserializedSegmentPayload.java
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/DeserializedSegmentPayload.java
index 4f7eb6d0faf..42cd54a6396 100644
---
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/DeserializedSegmentPayload.java
+++
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/DeserializedSegmentPayload.java
@@ -17,8 +17,8 @@
package org.apache.ignite.internal.raft.storage.segstore;
+import static
org.apache.ignite.internal.raft.storage.segstore.SegmentPayload.CRC_SIZE_BYTES;
import static
org.apache.ignite.internal.raft.storage.segstore.SegmentPayload.GROUP_ID_SIZE_BYTES;
-import static
org.apache.ignite.internal.raft.storage.segstore.SegmentPayload.HASH_SIZE_BYTES;
import static
org.apache.ignite.internal.raft.storage.segstore.SegmentPayload.LENGTH_SIZE_BYTES;
import static org.apache.ignite.internal.raft.util.VarlenEncoder.readLong;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -63,9 +63,9 @@ class DeserializedSegmentPayload {
int payloadLength = readFully(channel, LENGTH_SIZE_BYTES).getInt();
- ByteBuffer remaining = readFully(channel, payloadLength +
HASH_SIZE_BYTES);
+ ByteBuffer remaining = readFully(channel, payloadLength +
CRC_SIZE_BYTES);
- ByteBuffer fullEntry = ByteBuffer.allocate(payloadLength +
GROUP_ID_SIZE_BYTES + LENGTH_SIZE_BYTES + HASH_SIZE_BYTES)
+ ByteBuffer fullEntry = ByteBuffer.allocate(payloadLength +
GROUP_ID_SIZE_BYTES + LENGTH_SIZE_BYTES + CRC_SIZE_BYTES)
.order(SegmentFile.BYTE_ORDER)
.putLong(groupId)
.putInt(payloadLength)
@@ -113,7 +113,7 @@ class DeserializedSegmentPayload {
}
int size() {
- return GROUP_ID_SIZE_BYTES + LENGTH_SIZE_BYTES + payload.length +
HASH_SIZE_BYTES;
+ return GROUP_ID_SIZE_BYTES + LENGTH_SIZE_BYTES + payload.length +
CRC_SIZE_BYTES;
}
private static ByteBuffer readFully(ReadableByteChannel byteChannel, int
len) throws IOException {
diff --git
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/GroupIndexMetaTest.java
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/GroupIndexMetaTest.java
index ceca38d7b56..da01d0eb8c7 100644
---
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/GroupIndexMetaTest.java
+++
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/GroupIndexMetaTest.java
@@ -195,22 +195,83 @@ class GroupIndexMetaTest extends BaseIgniteAbstractTest {
expectedFileOrdinal
);
- var expectedMetaWithoutOverlap = new IndexFileMeta(
- expectedFirstLogIndex + overlap -
logEntriesPerFile,
- expectedFirstLogIndex + overlap - 1,
- 0,
- expectedFileOrdinal - 1
- );
-
- // We can possibly be reading from two different metas -
from the newer one (that overlaps the older one) or
- // the older one.
- assertThat(
- logIndex + " -> " + indexFileMeta,
- indexFileMeta,
either(is(expectedMetaWithOverlap)).or(is(expectedMetaWithoutOverlap)));
+ if (expectedFirstLogIndex == 0) {
+ assertThat(logIndex + " -> " + indexFileMeta,
indexFileMeta, is(expectedMetaWithOverlap));
+ } else {
+ var expectedMetaWithoutOverlap = new IndexFileMeta(
+ expectedFirstLogIndex + overlap -
logEntriesPerFile,
+ expectedFirstLogIndex + overlap - 1,
+ 0,
+ expectedFileOrdinal - 1
+ );
+
+ // We can possibly be reading from two different metas
- from the newer one (that overlaps the older one) or
+ // the older one.
+ assertThat(
+ logIndex + " -> " + indexFileMeta,
+ indexFileMeta,
either(is(expectedMetaWithOverlap)).or(is(expectedMetaWithoutOverlap))
+ );
+ }
}
}
};
runRace(writer, reader, reader, reader);
}
+
+ @Test
+ void testTruncatePrefix() {
+ var meta1 = new IndexFileMeta(1, 100, 0, 0);
+ var meta2 = new IndexFileMeta(42, 100, 42, 1);
+ var meta3 = new IndexFileMeta(100, 120, 66, 2);
+ var meta4 = new IndexFileMeta(110, 200, 95, 3);
+
+ var groupMeta = new GroupIndexMeta(meta1);
+
+ groupMeta.addIndexMeta(meta2);
+ groupMeta.addIndexMeta(meta3);
+ groupMeta.addIndexMeta(meta4);
+
+ assertThat(groupMeta.firstLogIndexInclusive(), is(1L));
+ assertThat(groupMeta.lastLogIndexExclusive(), is(200L));
+
+ assertThat(groupMeta.indexMeta(10), is(meta1));
+ assertThat(groupMeta.indexMeta(42), is(meta2));
+ assertThat(groupMeta.indexMeta(100), is(meta3));
+ assertThat(groupMeta.indexMeta(110), is(meta4));
+
+ groupMeta.truncatePrefix(43);
+
+ assertThat(groupMeta.indexMeta(10), is(nullValue()));
+ assertThat(groupMeta.indexMeta(42), is(nullValue()));
+
+ // Payload offset is shifted 4 bytes in order to skip the truncated
entry.
+ var trimmedMeta = new IndexFileMeta(43, 100, 46, 1);
+
+ assertThat(groupMeta.indexMeta(43), is(trimmedMeta));
+ assertThat(groupMeta.indexMeta(100), is(meta3));
+ assertThat(groupMeta.indexMeta(110), is(meta4));
+
+ groupMeta.truncatePrefix(110);
+
+ assertThat(groupMeta.indexMeta(43), is(nullValue()));
+ assertThat(groupMeta.indexMeta(100), is(nullValue()));
+ assertThat(groupMeta.indexMeta(110), is(meta4));
+ }
+
+ @Test
+ void testTruncatePrefixRemovesAllEntriesWhenKeptAfterLast() {
+ var meta1 = new IndexFileMeta(1, 10, 0, 0);
+ var meta2 = new IndexFileMeta(10, 20, 100, 1);
+
+ var groupMeta = new GroupIndexMeta(meta1);
+ groupMeta.addIndexMeta(meta2);
+
+ // Truncate to the end of last meta - everything should be removed.
+ groupMeta.truncatePrefix(20);
+
+ assertThat(groupMeta.indexMeta(0), is(nullValue()));
+ assertThat(groupMeta.indexMeta(19), is(nullValue()));
+ assertThat(groupMeta.firstLogIndexInclusive(), is(-1L));
+ }
}
diff --git
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManagerTest.java
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManagerTest.java
index 62a08638b05..cb76a4adc53 100644
---
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManagerTest.java
+++
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManagerTest.java
@@ -215,7 +215,7 @@ class IndexFileManagerTest extends IgniteAbstractTest {
}
@Test
- void testFirstLastLogIndicesWithTruncate() throws IOException {
+ void testFirstLastLogIndicesWithTruncateSuffix() throws IOException {
var memtable = new IndexMemTable(STRIPES);
memtable.appendSegmentFileOffset(0, 1, 1);
@@ -237,6 +237,29 @@ class IndexFileManagerTest extends IgniteAbstractTest {
assertThat(indexFileManager.lastLogIndexExclusive(0), is(2L));
}
+ @Test
+ void testFirstLastLogIndicesWithTruncatePrefix() throws IOException {
+ var memtable = new IndexMemTable(STRIPES);
+
+ memtable.appendSegmentFileOffset(0, 1, 1);
+ memtable.appendSegmentFileOffset(0, 2, 1);
+ memtable.appendSegmentFileOffset(0, 3, 1);
+
+ indexFileManager.saveIndexMemtable(memtable);
+
+ assertThat(indexFileManager.firstLogIndexInclusive(0), is(1L));
+ assertThat(indexFileManager.lastLogIndexExclusive(0), is(4L));
+
+ memtable = new IndexMemTable(STRIPES);
+
+ memtable.truncatePrefix(0, 2);
+
+ indexFileManager.saveIndexMemtable(memtable);
+
+ assertThat(indexFileManager.firstLogIndexInclusive(0), is(2L));
+ assertThat(indexFileManager.lastLogIndexExclusive(0), is(4L));
+ }
+
@Test
void testGetSegmentPointerWithTruncate() throws IOException {
var memtable = new IndexMemTable(STRIPES);
@@ -329,6 +352,31 @@ class IndexFileManagerTest extends IgniteAbstractTest {
assertThat(indexFileManager.getSegmentFilePointer(0, 3),
is(nullValue()));
}
+ @Test
+ void testRecoveryWithTruncatePrefix() throws IOException {
+ var memtable = new IndexMemTable(STRIPES);
+
+ memtable.appendSegmentFileOffset(0, 1, 1);
+ memtable.appendSegmentFileOffset(0, 2, 2);
+ memtable.appendSegmentFileOffset(0, 3, 3);
+
+ indexFileManager.saveIndexMemtable(memtable);
+
+ memtable = new IndexMemTable(STRIPES);
+
+ memtable.truncatePrefix(0, 2);
+
+ indexFileManager.saveIndexMemtable(memtable);
+
+ indexFileManager = new IndexFileManager(workDir);
+
+ indexFileManager.start();
+
+ assertThat(indexFileManager.getSegmentFilePointer(0, 1),
is(nullValue()));
+ assertThat(indexFileManager.getSegmentFilePointer(0, 2), is(new
SegmentFilePointer(0, 2)));
+ assertThat(indexFileManager.getSegmentFilePointer(0, 3), is(new
SegmentFilePointer(0, 3)));
+ }
+
@Test
void testExists() throws IOException {
assertThat(indexFileManager.indexFileExists(0), is(false));
@@ -371,4 +419,70 @@ class IndexFileManagerTest extends IgniteAbstractTest {
assertThat(indexFileManager.getSegmentFilePointer(0, 1), is(new
SegmentFilePointer(5, 1)));
assertThat(indexFileManager.getSegmentFilePointer(0, 2), is(new
SegmentFilePointer(6, 2)));
}
+
+ @Test
+ void testTruncatePrefix() throws IOException {
+ var memtable = new IndexMemTable(STRIPES);
+
+ memtable.appendSegmentFileOffset(0, 1, 1);
+
+ indexFileManager.saveIndexMemtable(memtable);
+
+ memtable = new IndexMemTable(STRIPES);
+
+ memtable.appendSegmentFileOffset(0, 2, 1);
+
+ indexFileManager.saveIndexMemtable(memtable);
+
+ memtable = new IndexMemTable(STRIPES);
+
+ memtable.appendSegmentFileOffset(0, 3, 1);
+
+ indexFileManager.saveIndexMemtable(memtable);
+
+ memtable = new IndexMemTable(STRIPES);
+
+ memtable.truncatePrefix(0, 2);
+
+ indexFileManager.saveIndexMemtable(memtable);
+
+ assertThat(indexFileManager.getSegmentFilePointer(0, 1),
is(nullValue()));
+ assertThat(indexFileManager.getSegmentFilePointer(0, 2), is(new
SegmentFilePointer(1, 1)));
+ assertThat(indexFileManager.getSegmentFilePointer(0, 3), is(new
SegmentFilePointer(2, 1)));
+ }
+
+ @Test
+ void testCombinationOfPrefixAndSuffixTombstones() throws IOException {
+ var memtable = new IndexMemTable(STRIPES);
+
+ memtable.appendSegmentFileOffset(0, 1, 1);
+ memtable.appendSegmentFileOffset(0, 2, 2);
+ memtable.appendSegmentFileOffset(0, 3, 3);
+ memtable.appendSegmentFileOffset(0, 4, 4);
+
+ indexFileManager.saveIndexMemtable(memtable);
+
+ memtable = new IndexMemTable(STRIPES);
+
+ memtable.truncatePrefix(0, 2);
+
+ memtable.truncateSuffix(0, 3);
+
+ indexFileManager.saveIndexMemtable(memtable);
+
+ assertThat(indexFileManager.getSegmentFilePointer(0, 1),
is(nullValue()));
+ assertThat(indexFileManager.getSegmentFilePointer(0, 2), is(new
SegmentFilePointer(0, 2)));
+ assertThat(indexFileManager.getSegmentFilePointer(0, 3), is(new
SegmentFilePointer(0, 3)));
+ assertThat(indexFileManager.getSegmentFilePointer(0, 4),
is(nullValue()));
+
+ // Restart the manager to check recovery.
+ indexFileManager = new IndexFileManager(workDir);
+
+ indexFileManager.start();
+
+ assertThat(indexFileManager.getSegmentFilePointer(0, 1),
is(nullValue()));
+ assertThat(indexFileManager.getSegmentFilePointer(0, 2), is(new
SegmentFilePointer(0, 2)));
+ assertThat(indexFileManager.getSegmentFilePointer(0, 3), is(new
SegmentFilePointer(0, 3)));
+ assertThat(indexFileManager.getSegmentFilePointer(0, 4),
is(nullValue()));
+ }
}
diff --git
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileMetaArrayTest.java
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileMetaArrayTest.java
index 55d24237ebb..23a4e61f6d0 100644
---
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileMetaArrayTest.java
+++
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileMetaArrayTest.java
@@ -34,6 +34,8 @@ class IndexFileMetaArrayTest extends BaseIgniteAbstractTest {
assertThat(array.size(), is(1));
assertThat(array.get(0), is(initialMeta));
+ assertThat(array.firstLogIndexInclusive(), is(1L));
+ assertThat(array.lastLogIndexExclusive(), is(2L));
var meta2 = new IndexFileMeta(2, 3, 0, 1);
@@ -41,6 +43,8 @@ class IndexFileMetaArrayTest extends BaseIgniteAbstractTest {
assertThat(array.size(), is(2));
assertThat(array.get(1), is(meta2));
+ assertThat(array.firstLogIndexInclusive(), is(1L));
+ assertThat(array.lastLogIndexExclusive(), is(3L));
for (int i = 0; i < INITIAL_CAPACITY; i++) {
long logIndex = meta2.firstLogIndexInclusive() + i + 1;
@@ -54,6 +58,8 @@ class IndexFileMetaArrayTest extends BaseIgniteAbstractTest {
assertThat(array.size(), is(3 + INITIAL_CAPACITY));
assertThat(array.get(array.size() - 1), is(meta3));
+ assertThat(array.firstLogIndexInclusive(), is(1L));
+ assertThat(array.lastLogIndexExclusive(), is(INITIAL_CAPACITY + 4L));
}
@Test
@@ -105,4 +111,37 @@ class IndexFileMetaArrayTest extends
BaseIgniteAbstractTest {
assertThat(array.find(9), is(meta1));
assertThat(array.find(10), is(meta3));
}
+
+ @Test
+ void testTruncateInsideMetaAdjustsPayloadOffset() {
+ var meta1 = new IndexFileMeta(1, 10, 100, 0);
+ var meta2 = new IndexFileMeta(10, 20, 200, 1);
+
+ IndexFileMetaArray array = new IndexFileMetaArray(meta1).add(meta2);
+
+ IndexFileMetaArray truncated = array.truncateIndicesSmallerThan(5);
+
+ assertThat(truncated.size(), is(2));
+
+ IndexFileMeta trimmedMeta = truncated.get(0);
+
+ assertThat(trimmedMeta.firstLogIndexInclusive(), is(5L));
+ assertThat(trimmedMeta.lastLogIndexExclusive(), is(10L));
+ assertThat(trimmedMeta.indexFilePayloadOffset(), is(100 + 4 *
Integer.BYTES));
+
+ assertThat(truncated.get(1), is(meta2));
+ }
+
+ @Test
+ void testTruncateToMetaBoundarySkipsPreviousMeta() {
+ var meta1 = new IndexFileMeta(1, 10, 100, 0);
+ var meta2 = new IndexFileMeta(10, 20, 200, 1);
+
+ IndexFileMetaArray array = new IndexFileMetaArray(meta1).add(meta2);
+
+ IndexFileMetaArray truncated = array.truncateIndicesSmallerThan(10);
+
+ assertThat(truncated.size(), is(1));
+ assertThat(truncated.get(0), is(meta2));
+ }
}
diff --git
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/IndexMemTableTest.java
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/IndexMemTableTest.java
index afc2263f9cf..a7c4e14749c 100644
---
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/IndexMemTableTest.java
+++
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/IndexMemTableTest.java
@@ -17,10 +17,12 @@
package org.apache.ignite.internal.raft.storage.segstore;
+import static
org.apache.ignite.internal.raft.storage.segstore.SegmentInfo.MISSING_SEGMENT_FILE_OFFSET;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.runRace;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.either;
import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -57,9 +59,9 @@ class IndexMemTableTest extends BaseIgniteAbstractTest {
void testMissingValue() {
memTable.appendSegmentFileOffset(0, 5, 1);
- assertThat(memTable.segmentInfo(0).getOffset(1), is(0));
+ assertThat(memTable.segmentInfo(0).getOffset(1),
is(MISSING_SEGMENT_FILE_OFFSET));
assertThat(memTable.segmentInfo(0).getOffset(5), is(1));
- assertThat(memTable.segmentInfo(0).getOffset(6), is(0));
+ assertThat(memTable.segmentInfo(0).getOffset(6),
is(MISSING_SEGMENT_FILE_OFFSET));
assertThat(memTable.segmentInfo(1), is(nullValue()));
}
@@ -81,11 +83,11 @@ class IndexMemTableTest extends BaseIgniteAbstractTest {
if (groupId == 0) {
assertThat(segmentInfo.getOffset(0), is(1));
assertThat(segmentInfo.getOffset(1), is(2));
- assertThat(segmentInfo.getOffset(2), is(0));
+ assertThat(segmentInfo.getOffset(2),
is(MISSING_SEGMENT_FILE_OFFSET));
} else {
assertThat(segmentInfo.getOffset(0), is(3));
assertThat(segmentInfo.getOffset(1), is(4));
- assertThat(segmentInfo.getOffset(2), is(0));
+ assertThat(segmentInfo.getOffset(2),
is(MISSING_SEGMENT_FILE_OFFSET));
}
});
}
@@ -106,7 +108,7 @@ class IndexMemTableTest extends BaseIgniteAbstractTest {
SegmentInfo segmentInfo = memTable.segmentInfo(0);
if (segmentInfo != null) {
- assertThat(segmentInfo.getOffset(i), either(is(i +
1)).or(is(0)));
+ assertThat(segmentInfo.getOffset(i), either(is(i +
1)).or(is(MISSING_SEGMENT_FILE_OFFSET)));
}
}
};
@@ -138,7 +140,7 @@ class IndexMemTableTest extends BaseIgniteAbstractTest {
SegmentInfo segmentInfo = memTable.segmentInfo(groupId);
if (segmentInfo != null) {
- assertThat(segmentInfo.getOffset(j), either(is(j +
1)).or(is(0)));
+ assertThat(segmentInfo.getOffset(j), either(is(j +
1)).or(is(MISSING_SEGMENT_FILE_OFFSET)));
}
}
});
@@ -175,8 +177,8 @@ class IndexMemTableTest extends BaseIgniteAbstractTest {
assertThat(memTable.segmentInfo(groupId0).getOffset(1), is(42));
assertThat(memTable.segmentInfo(groupId0).getOffset(2), is(43));
- assertThat(memTable.segmentInfo(groupId0).getOffset(3), is(0));
- assertThat(memTable.segmentInfo(groupId0).getOffset(4), is(0));
+ assertThat(memTable.segmentInfo(groupId0).getOffset(3),
is(MISSING_SEGMENT_FILE_OFFSET));
+ assertThat(memTable.segmentInfo(groupId0).getOffset(4),
is(MISSING_SEGMENT_FILE_OFFSET));
assertThat(memTable.segmentInfo(groupId1).getOffset(1), is(55));
assertThat(memTable.segmentInfo(groupId1).getOffset(2), is(56));
@@ -187,8 +189,8 @@ class IndexMemTableTest extends BaseIgniteAbstractTest {
assertThat(memTable.segmentInfo(groupId0).getOffset(1), is(42));
assertThat(memTable.segmentInfo(groupId0).getOffset(2), is(43));
- assertThat(memTable.segmentInfo(groupId0).getOffset(3), is(0));
- assertThat(memTable.segmentInfo(groupId0).getOffset(4), is(0));
+ assertThat(memTable.segmentInfo(groupId0).getOffset(3),
is(MISSING_SEGMENT_FILE_OFFSET));
+ assertThat(memTable.segmentInfo(groupId0).getOffset(4),
is(MISSING_SEGMENT_FILE_OFFSET));
assertThat(memTable.segmentInfo(groupId1).getOffset(1), is(55));
assertThat(memTable.segmentInfo(groupId1).getOffset(2), is(56));
@@ -199,13 +201,13 @@ class IndexMemTableTest extends BaseIgniteAbstractTest {
assertThat(memTable.segmentInfo(groupId0).getOffset(1), is(42));
assertThat(memTable.segmentInfo(groupId0).getOffset(2), is(43));
- assertThat(memTable.segmentInfo(groupId0).getOffset(3), is(0));
- assertThat(memTable.segmentInfo(groupId0).getOffset(4), is(0));
+ assertThat(memTable.segmentInfo(groupId0).getOffset(3),
is(MISSING_SEGMENT_FILE_OFFSET));
+ assertThat(memTable.segmentInfo(groupId0).getOffset(4),
is(MISSING_SEGMENT_FILE_OFFSET));
- assertThat(memTable.segmentInfo(groupId1).getOffset(1), is(0));
- assertThat(memTable.segmentInfo(groupId1).getOffset(2), is(0));
- assertThat(memTable.segmentInfo(groupId1).getOffset(3), is(0));
- assertThat(memTable.segmentInfo(groupId1).getOffset(4), is(0));
+ assertThat(memTable.segmentInfo(groupId1).getOffset(1),
is(MISSING_SEGMENT_FILE_OFFSET));
+ assertThat(memTable.segmentInfo(groupId1).getOffset(2),
is(MISSING_SEGMENT_FILE_OFFSET));
+ assertThat(memTable.segmentInfo(groupId1).getOffset(3),
is(MISSING_SEGMENT_FILE_OFFSET));
+ assertThat(memTable.segmentInfo(groupId1).getOffset(4),
is(MISSING_SEGMENT_FILE_OFFSET));
}
@Test
@@ -225,7 +227,7 @@ class IndexMemTableTest extends BaseIgniteAbstractTest {
memTable.truncateSuffix(0, 0);
- assertThat(memTable.segmentInfo(0).getOffset(1), is(0));
+ assertThat(memTable.segmentInfo(0).getOffset(1),
is(MISSING_SEGMENT_FILE_OFFSET));
memTable.appendSegmentFileOffset(0, 1, 43);
@@ -233,19 +235,122 @@ class IndexMemTableTest extends BaseIgniteAbstractTest {
}
@Test
- void testTruncateIntoThePast() {
+ void testTruncateSuffixIntoThePast() {
memTable.appendSegmentFileOffset(0, 36, 42);
// Truncate to a position before the moment the last segment info was
added.
memTable.truncateSuffix(0, 10);
- assertThat(memTable.segmentInfo(0).getOffset(36), is(0));
- assertThat(memTable.segmentInfo(0).getOffset(11), is(0));
+ assertThat(memTable.segmentInfo(0).getOffset(36),
is(MISSING_SEGMENT_FILE_OFFSET));
+ assertThat(memTable.segmentInfo(0).getOffset(11),
is(MISSING_SEGMENT_FILE_OFFSET));
memTable.appendSegmentFileOffset(0, 11, 43);
assertThat(memTable.segmentInfo(0).getOffset(11), is(43));
- assertThat(memTable.segmentInfo(0).getOffset(12), is(0));
- assertThat(memTable.segmentInfo(0).getOffset(36), is(0));
+ assertThat(memTable.segmentInfo(0).getOffset(12),
is(MISSING_SEGMENT_FILE_OFFSET));
+ assertThat(memTable.segmentInfo(0).getOffset(36),
is(MISSING_SEGMENT_FILE_OFFSET));
+ }
+
+ @Test
+ void testTruncatePrefix() {
+ long groupId1 = 1;
+ long groupId2 = 2;
+
+ memTable.appendSegmentFileOffset(groupId1, 1, 42);
+ memTable.appendSegmentFileOffset(groupId1, 2, 43);
+ memTable.appendSegmentFileOffset(groupId1, 3, 44);
+ memTable.appendSegmentFileOffset(groupId1, 4, 45);
+
+ memTable.appendSegmentFileOffset(groupId2, 1, 55);
+ memTable.appendSegmentFileOffset(groupId2, 2, 56);
+ memTable.appendSegmentFileOffset(groupId2, 3, 57);
+ memTable.appendSegmentFileOffset(groupId2, 4, 58);
+
+ memTable.truncatePrefix(groupId1, 1);
+
+ assertThat(memTable.segmentInfo(groupId1).getOffset(1), is(42));
+ assertThat(memTable.segmentInfo(groupId1).getOffset(2), is(43));
+ assertThat(memTable.segmentInfo(groupId1).getOffset(3), is(44));
+ assertThat(memTable.segmentInfo(groupId1).getOffset(4), is(45));
+
+ assertThat(memTable.segmentInfo(groupId2).getOffset(1), is(55));
+ assertThat(memTable.segmentInfo(groupId2).getOffset(2), is(56));
+ assertThat(memTable.segmentInfo(groupId2).getOffset(3), is(57));
+ assertThat(memTable.segmentInfo(groupId2).getOffset(4), is(58));
+
+ memTable.truncatePrefix(groupId2, 3);
+
+ assertThat(memTable.segmentInfo(groupId1).getOffset(1), is(42));
+ assertThat(memTable.segmentInfo(groupId1).getOffset(2), is(43));
+ assertThat(memTable.segmentInfo(groupId1).getOffset(3), is(44));
+ assertThat(memTable.segmentInfo(groupId1).getOffset(4), is(45));
+
+ assertThat(memTable.segmentInfo(groupId2).getOffset(1),
is(MISSING_SEGMENT_FILE_OFFSET));
+ assertThat(memTable.segmentInfo(groupId2).getOffset(2),
is(MISSING_SEGMENT_FILE_OFFSET));
+ assertThat(memTable.segmentInfo(groupId2).getOffset(3), is(57));
+ assertThat(memTable.segmentInfo(groupId2).getOffset(4), is(58));
+
+ memTable.truncatePrefix(groupId1, 4);
+
+ assertThat(memTable.segmentInfo(groupId1).getOffset(1),
is(MISSING_SEGMENT_FILE_OFFSET));
+ assertThat(memTable.segmentInfo(groupId1).getOffset(2),
is(MISSING_SEGMENT_FILE_OFFSET));
+ assertThat(memTable.segmentInfo(groupId1).getOffset(3),
is(MISSING_SEGMENT_FILE_OFFSET));
+ assertThat(memTable.segmentInfo(groupId1).getOffset(4), is(45));
+
+ assertThat(memTable.segmentInfo(groupId2).getOffset(1),
is(MISSING_SEGMENT_FILE_OFFSET));
+ assertThat(memTable.segmentInfo(groupId2).getOffset(2),
is(MISSING_SEGMENT_FILE_OFFSET));
+ assertThat(memTable.segmentInfo(groupId2).getOffset(3), is(57));
+ assertThat(memTable.segmentInfo(groupId2).getOffset(4), is(58));
+ }
+
+ @Test
+ void testTruncateNonExistingPrefix() {
+ assertDoesNotThrow(() -> memTable.truncatePrefix(0, 4));
+
+ memTable.appendSegmentFileOffset(1, 5, 42);
+
+ assertDoesNotThrow(() -> memTable.truncatePrefix(1, 4));
+ assertThrows(IllegalArgumentException.class, () ->
memTable.truncatePrefix(1, 10));
+ }
+
+ @Test
+ void testPrefixAndSuffixTombstones() {
+ memTable.truncatePrefix(0, 10);
+
+ memTable.truncateSuffix(0, 15);
+
+ memTable.appendSegmentFileOffset(0, 16, 42);
+
+ SegmentInfo segmentInfo = memTable.segmentInfo(0);
+
+ assertThat(segmentInfo, is(notNullValue()));
+ assertThat(segmentInfo.getOffset(16), is(42));
+ }
+
+ @Test
+ void testSuffixAndPrefixTombstones() {
+ memTable.truncateSuffix(0, 15);
+
+ memTable.truncatePrefix(0, 10);
+
+ memTable.appendSegmentFileOffset(0, 16, 42);
+
+ SegmentInfo segmentInfo = memTable.segmentInfo(0);
+
+ assertThat(segmentInfo, is(notNullValue()));
+ assertThat(segmentInfo.getOffset(16), is(42));
+ }
+
+ @Test
+ void testMultiplePrefixTombstones() {
+ memTable.truncatePrefix(0, 10);
+
+ memTable.truncatePrefix(0, 15);
+
+ SegmentInfo segmentInfo = memTable.segmentInfo(0);
+
+ assertThat(segmentInfo, is(notNullValue()));
+ assertThat(segmentInfo.isPrefixTombstone(), is(true));
+ assertThat(segmentInfo.firstIndexKept(), is(15L));
}
}
diff --git
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogCheckpointerTest.java
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogCheckpointerTest.java
index 4ef20956321..f3bd0972190 100644
---
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogCheckpointerTest.java
+++
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogCheckpointerTest.java
@@ -23,9 +23,8 @@ import static
org.apache.ignite.internal.testframework.matchers.CompletableFutur
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.notNullValue;
-import static org.hamcrest.Matchers.nullValue;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
@@ -38,6 +37,7 @@ import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import org.apache.ignite.internal.failure.NoOpFailureManager;
+import
org.apache.ignite.internal.raft.storage.segstore.EntrySearchResult.SearchOutcome;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
import org.apache.ignite.internal.testframework.InjectExecutorService;
@@ -110,7 +110,7 @@ class RaftLogCheckpointerTest extends
BaseIgniteAbstractTest {
@Test
void testReadFromQueue() {
// Read from empty queue.
- assertThat(checkpointer.findSegmentPayloadInQueue(0, 0),
is(nullValue()));
+ assertThat(checkpointer.findSegmentPayloadInQueue(0,
0).searchOutcome(), is(SearchOutcome.CONTINUE_SEARCH));
var blockFuture = new CompletableFuture<Void>();
@@ -137,22 +137,84 @@ class RaftLogCheckpointerTest extends
BaseIgniteAbstractTest {
for (int groupId = 0; groupId < MAX_QUEUE_SIZE; groupId++) {
for (int logIndex = 0; logIndex < MAX_QUEUE_SIZE; logIndex++) {
- ByteBuffer payload =
checkpointer.findSegmentPayloadInQueue(groupId, logIndex);
+ EntrySearchResult searchResult =
checkpointer.findSegmentPayloadInQueue(groupId, logIndex);
if (groupId == logIndex) {
- assertThat(payload, is(notNullValue()));
+ assertThat(searchResult.searchOutcome(),
is(SearchOutcome.SUCCESS));
} else {
- assertThat(payload, is(nullValue()));
+ assertThat(searchResult.searchOutcome(),
anyOf(is(SearchOutcome.CONTINUE_SEARCH), is(SearchOutcome.NOT_FOUND)));
}
}
}
- assertThat(checkpointer.findSegmentPayloadInQueue(MAX_QUEUE_SIZE,
MAX_QUEUE_SIZE), is(nullValue()));
+ assertThat(
+ checkpointer.findSegmentPayloadInQueue(MAX_QUEUE_SIZE,
MAX_QUEUE_SIZE).searchOutcome(),
+ is(SearchOutcome.CONTINUE_SEARCH)
+ );
} finally {
blockFuture.complete(null);
}
// The queue should eventually become empty again.
- await().until(() -> checkpointer.findSegmentPayloadInQueue(0, 0),
is(nullValue()));
+ await().until(() -> checkpointer.findSegmentPayloadInQueue(0,
0).searchOutcome(), is(SearchOutcome.CONTINUE_SEARCH));
+ }
+
+ @Test
+ void testFindSegmentPayloadReturnsBufferWhenOffsetPresent(@Mock
SegmentFile mockFile, @Mock IndexMemTable mockMemTable) {
+ var blockFuture = new CompletableFuture<Void>();
+
+ try {
+ doAnswer(invocation -> blockFuture.join()).when(mockFile).sync();
+
+ ByteBuffer buffer = ByteBuffer.allocate(16);
+
+ when(mockFile.buffer()).thenReturn(buffer);
+
+ long groupId = 2;
+ long logIndex = 5;
+
+ var segmentInfo = new SegmentInfo(1);
+
+ for (int i = 1; i <= 10; i++) {
+ segmentInfo.addOffset(i, i);
+ }
+
+ when(mockMemTable.segmentInfo(groupId)).thenReturn(segmentInfo);
+
+ checkpointer.onRollover(mockFile, mockMemTable);
+
+ EntrySearchResult res =
checkpointer.findSegmentPayloadInQueue(groupId, logIndex);
+
+ assertThat(res.searchOutcome(), is(SearchOutcome.SUCCESS));
+ assertThat(res.entryBuffer(), is(buffer));
+ } finally {
+ blockFuture.complete(null);
+ }
+ }
+
+ @Test
+ void testFindSegmentPayloadReturnsEmptyWhenPrefixTombstoneCutsOff(@Mock
SegmentFile mockFile, @Mock IndexMemTable mockMemTable) {
+ var blockFuture = new CompletableFuture<Void>();
+
+ try {
+ doAnswer(invocation -> blockFuture.join()).when(mockFile).sync();
+
+ long groupId = 2;
+
+ SegmentInfo mockSegmentInfo = mock(SegmentInfo.class);
+
+
when(mockMemTable.segmentInfo(groupId)).thenReturn(mockSegmentInfo);
+ when(mockSegmentInfo.lastLogIndexExclusive()).thenReturn(20L);
+ // Emulate prefix truncation from index 10.
+ when(mockSegmentInfo.firstIndexKept()).thenReturn(10L);
+
+ checkpointer.onRollover(mockFile, mockMemTable);
+
+ EntrySearchResult res =
checkpointer.findSegmentPayloadInQueue(groupId, 5);
+
+ assertThat(res.searchOutcome(), is(SearchOutcome.NOT_FOUND));
+ } finally {
+ blockFuture.complete(null);
+ }
}
}
diff --git
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManagerTest.java
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManagerTest.java
index f2099afb89f..3ec7422d464 100644
---
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManagerTest.java
+++
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManagerTest.java
@@ -76,6 +76,8 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
@ExtendWith(ExecutorServiceExtension.class)
class SegmentFileManagerTest extends IgniteAbstractTest {
@@ -375,6 +377,30 @@ class SegmentFileManagerTest extends IgniteAbstractTest {
}
}
+ @Test
+ void truncateRecordIsWrittenOnPrefixTruncate() throws IOException {
+ long groupId = 36;
+
+ long firstLogIndexKept = 42;
+
+ fileManager.truncatePrefix(groupId, firstLogIndexKept);
+
+ Path path = findSoleSegmentFile();
+
+ ByteBuffer expectedTruncateRecord =
ByteBuffer.allocate(SegmentPayload.TRUNCATE_PREFIX_RECORD_SIZE)
+ .order(SegmentFile.BYTE_ORDER);
+
+ SegmentPayload.writeTruncatePrefixRecordTo(expectedTruncateRecord,
groupId, firstLogIndexKept);
+
+ expectedTruncateRecord.rewind();
+
+ try (SeekableByteChannel channel = Files.newByteChannel(path)) {
+ channel.position(HEADER_RECORD.length);
+
+ assertThat(readFully(channel,
SegmentPayload.TRUNCATE_PREFIX_RECORD_SIZE), is(expectedTruncateRecord));
+ }
+ }
+
@Test
void testRecovery() throws Exception {
int batchSize = FILE_SIZE / 4;
@@ -459,8 +485,9 @@ class SegmentFileManagerTest extends IgniteAbstractTest {
assertThat(indexFiles(), hasSize(1));
}
- @Test
- void testRecoveryWithTruncateSuffix() throws Exception {
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void testTruncateSuffix(boolean restart) throws Exception {
List<byte[]> batches = randomData(FILE_SIZE / 4, 10);
for (int i = 0; i < batches.size(); i++) {
@@ -481,15 +508,17 @@ class SegmentFileManagerTest extends IgniteAbstractTest {
fileManager.truncateSuffix(GROUP_ID, lastLogIndexKept);
- fileManager.close();
+ if (restart) {
+ fileManager.close();
- for (Path indexFile : indexFiles()) {
- Files.deleteIfExists(indexFile);
- }
+ for (Path indexFile : indexFiles()) {
+ Files.deleteIfExists(indexFile);
+ }
- fileManager = createFileManager();
+ fileManager = createFileManager();
- fileManager.start();
+ fileManager.start();
+ }
for (int i = 0; i <= lastLogIndexKept; i++) {
byte[] expectedEntry = batches.get(i);
@@ -508,6 +537,55 @@ class SegmentFileManagerTest extends IgniteAbstractTest {
}
}
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void testTruncatePrefix(boolean restart) throws Exception {
+ List<byte[]> batches = randomData(FILE_SIZE / 4, 10);
+
+ for (int i = 0; i < batches.size(); i++) {
+ appendBytes(batches.get(i), i);
+ }
+
+ await().until(this::indexFiles, hasSize(4));
+
+ int firstLogIndexKept = batches.size() / 2;
+
+ fileManager.truncatePrefix(GROUP_ID, firstLogIndexKept);
+
+ // Insert more data, just in case.
+ for (int i = 0; i < batches.size(); i++) {
+ appendBytes(batches.get(i), i + batches.size());
+ }
+
+ if (restart) {
+ fileManager.close();
+
+ for (Path indexFile : indexFiles()) {
+ Files.deleteIfExists(indexFile);
+ }
+
+ fileManager = createFileManager();
+
+ fileManager.start();
+ }
+
+ for (int i = 0; i < batches.size() - firstLogIndexKept; i++) {
+ byte[] expectedEntry = batches.get(firstLogIndexKept + i);
+
+ fileManager.getEntry(GROUP_ID, i, bs -> {
+ assertThat(bs, is(expectedEntry));
+
+ return null;
+ });
+ }
+
+ for (int i = 0; i < firstLogIndexKept; i++) {
+ fileManager.getEntry(GROUP_ID, i, bs -> {
+ throw new AssertionError("This method should not be called.");
+ });
+ }
+ }
+
private Path findSoleSegmentFile() throws IOException {
List<Path> segmentFiles = segmentFiles();
diff --git
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegstoreLogStorageConcurrencyTest.java
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegstoreLogStorageConcurrencyTest.java
index aff053d6236..7d10cefdc54 100644
---
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegstoreLogStorageConcurrencyTest.java
+++
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegstoreLogStorageConcurrencyTest.java
@@ -133,4 +133,44 @@ class SegstoreLogStorageConcurrencyTest extends
IgniteAbstractTest {
runRace(writerTaskFactory.apply(1), writerTaskFactory.apply(2));
}
+
+ @Test
+ void testFirstIndexAfterTruncatePrefix() {
+ List<LogEntry> entries = TestUtils.mockEntries();
+
+ IntFunction<RunnableX> writerTaskFactory = groupId -> () -> {
+ SegstoreLogStorage logStorage = newLogStorage(groupId);
+
+ try {
+ assertThat(logStorage.getFirstLogIndex(), is(1L));
+ assertThat(logStorage.getLastLogIndex(), is(0L));
+
+ long firstLogIndex = 0;
+
+ for (int i = 0; i < entries.size(); i++) {
+ LogEntry entry = entries.get(i);
+
+ logStorage.appendEntry(entry);
+
+ long logIndex = entry.getId().getIndex();
+
+ if (i > 0 && i % 10 == 0) {
+ logStorage.truncatePrefix(logIndex);
+
+ firstLogIndex = logIndex;
+ }
+
+ assertThat(logStorage.getFirstLogIndex(),
is(firstLogIndex));
+ assertThat(logStorage.getLastLogIndex(), is(logIndex));
+ }
+
+ assertThat(logStorage.getFirstLogIndex(), is(firstLogIndex));
+ assertThat(logStorage.getLastLogIndex(), is((long)
entries.size() - 1));
+ } finally {
+ logStorage.shutdown();
+ }
+ };
+
+ runRace(writerTaskFactory.apply(1), writerTaskFactory.apply(2));
+ }
}
diff --git
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegstoreLogStorageTest.java
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegstoreLogStorageTest.java
index 0a7d28db343..34d4cc00f8f 100644
---
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegstoreLogStorageTest.java
+++
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegstoreLogStorageTest.java
@@ -66,12 +66,6 @@ class SegstoreLogStorageTest extends BaseLogStorageTest {
super.testReset();
}
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-26285")
- @Override
- public void testTruncatePrefix() {
- super.testTruncatePrefix();
- }
-
@ParameterizedTest
// Number of entries is chosen to test scenarios with zero index files and
with multiple index files.
@ValueSource(ints = { 15, 100_000 })
@@ -97,6 +91,9 @@ class SegstoreLogStorageTest extends BaseLogStorageTest {
logStorage.truncateSuffix(lastIndexKept);
+ assertThat(logStorage.getFirstLogIndex(), is(0L));
+ assertThat(logStorage.getLastLogIndex(), is(lastIndexKept));
+
logStorage.shutdown();
segmentFileManager.close();
@@ -106,4 +103,26 @@ class SegstoreLogStorageTest extends BaseLogStorageTest {
assertThat(logStorage.getFirstLogIndex(), is(0L));
assertThat(logStorage.getLastLogIndex(), is(lastIndexKept));
}
+
+ @ParameterizedTest
+ @ValueSource(ints = { 15, 100_000 })
+ public void firstAndLastLogIndexAfterPrefixTruncateAndRestart(int
numEntries) throws Exception {
+ logStorage.appendEntries(TestUtils.mockEntries(numEntries));
+
+ long firstIndexKept = numEntries / 2;
+
+ logStorage.truncatePrefix(firstIndexKept);
+
+ assertThat(logStorage.getFirstLogIndex(), is(firstIndexKept));
+ assertThat(logStorage.getLastLogIndex(), is((long) numEntries - 1));
+
+ logStorage.shutdown();
+ segmentFileManager.close();
+
+ logStorage = newLogStorage();
+ logStorage.init(newLogStorageOptions());
+
+ assertThat(logStorage.getFirstLogIndex(), is(firstIndexKept));
+ assertThat(logStorage.getLastLogIndex(), is((long) numEntries - 1));
+ }
}