This is an automated email from the ASF dual-hosted git repository.
apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 71003ff51ab IGNITE-26720 Adapt SegstoreLogStorage to Jraft tests
(#6777)
71003ff51ab is described below
commit 71003ff51abc26b5d2b78d6b7470ccc8ea1bae43
Author: Alexander Polovtcev <[email protected]>
AuthorDate: Wed Oct 15 16:34:25 2025 +0300
IGNITE-26720 Adapt SegstoreLogStorage to Jraft tests (#6777)
---
.../raft/storage/segstore/GroupIndexMeta.java | 10 +-
.../raft/storage/segstore/IndexFileManager.java | 18 ++
.../raft/storage/segstore/IndexMemTable.java | 20 +-
.../raft/storage/segstore/RaftLogCheckpointer.java | 40 +++-
.../storage/segstore/ReadModeIndexMemTable.java | 6 +-
.../raft/storage/segstore/SegmentFileManager.java | 60 ++++-
.../raft/storage/segstore/SegstoreLogStorage.java | 8 +-
.../storage/segstore/WriteModeIndexMemTable.java | 7 +-
.../raft/storage/segstore/CheckpointQueueTest.java | 18 +-
.../raft/storage/segstore/IndexMemTableTest.java | 30 ++-
.../storage/segstore/RaftLogCheckpointerTest.java | 6 +-
.../storage/segstore/SegmentFileManagerTest.java | 25 ++
.../storage/segstore/SegstoreLogStorageTest.java | 260 +++------------------
13 files changed, 240 insertions(+), 268 deletions(-)
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/GroupIndexMeta.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/GroupIndexMeta.java
index bea36c49cc9..3227b418fc9 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/GroupIndexMeta.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/GroupIndexMeta.java
@@ -60,8 +60,16 @@ class GroupIndexMeta {
*/
@Nullable
IndexFileMeta indexMeta(long logIndex) {
+ return fileMetas.find(logIndex);
+ }
+
+ long firstLogIndex() {
+ return fileMetas.get(0).firstLogIndex();
+ }
+
+ long lastLogIndex() {
IndexFileMetaArray fileMetas = this.fileMetas;
- return fileMetas.find(logIndex);
+ return fileMetas.get(fileMetas.size() - 1).lastLogIndex();
}
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManager.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManager.java
index 86dd1f681f4..f2b5b36d086 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManager.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManager.java
@@ -189,6 +189,24 @@ class IndexFileManager {
}
}
+ /**
+ * Returns the lowest log index for the given group across all index files
or {@code -1} if no such index exists.
+ */
+ long firstLogIndex(long groupId) {
+ GroupIndexMeta groupIndexMeta = groupIndexMetas.get(groupId);
+
+ return groupIndexMeta == null ? -1 : groupIndexMeta.firstLogIndex();
+ }
+
+ /**
+ * Returns the highest log index for the given group across all index
files or {@code -1} if no such index exists.
+ */
+ long lastLogIndex(long groupId) {
+ GroupIndexMeta groupIndexMeta = groupIndexMetas.get(groupId);
+
+ return groupIndexMeta == null ? -1 : groupIndexMeta.lastLogIndex();
+ }
+
private byte[] serializeHeaderAndFillMetadata(ReadModeIndexMemTable
indexMemTable) {
int numGroups = indexMemTable.numGroups();
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexMemTable.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexMemTable.java
index b929aa8c776..37dcfdc9801 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexMemTable.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexMemTable.java
@@ -44,16 +44,24 @@ class IndexMemTable implements WriteModeIndexMemTable,
ReadModeIndexMemTable {
// File offset can be less than 0 (it's treated as an unsigned
integer) but never 0, because of the file header.
assert segmentFileOffset != 0 : String.format("Segment file offset
must not be 0 [groupId=%d]", groupId);
- SegmentInfo segmentInfo =
stripe(groupId).memTable.computeIfAbsent(groupId, id -> new
SegmentInfo(logIndex));
+ ConcurrentMap<Long, SegmentInfo> memTable = stripe(groupId).memTable;
- segmentInfo.addOffset(logIndex, segmentFileOffset);
+ SegmentInfo segmentInfo = memTable.get(groupId);
+
+ if (segmentInfo == null) {
+ segmentInfo = new SegmentInfo(logIndex);
+
+ segmentInfo.addOffset(logIndex, segmentFileOffset);
+
+ memTable.put(groupId, segmentInfo);
+ } else {
+ segmentInfo.addOffset(logIndex, segmentFileOffset);
+ }
}
@Override
- public int getSegmentFileOffset(long groupId, long logIndex) {
- SegmentInfo segmentInfo = stripe(groupId).memTable.get(groupId);
-
- return segmentInfo == null ? 0 : segmentInfo.getOffset(logIndex);
+ public SegmentInfo segmentInfo(long groupId) {
+ return stripe(groupId).memTable.get(groupId);
}
@Override
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogCheckpointer.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogCheckpointer.java
index 784645d5c75..122aeb3e64c 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogCheckpointer.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogCheckpointer.java
@@ -114,7 +114,9 @@ class RaftLogCheckpointer {
while (it.hasNext()) {
Entry e = it.next();
- int segmentPayloadOffset =
e.memTable().getSegmentFileOffset(groupId, logIndex);
+ SegmentInfo segmentInfo = e.memTable().segmentInfo(groupId);
+
+ int segmentPayloadOffset = segmentInfo == null ? 0 :
segmentInfo.getOffset(logIndex);
if (segmentPayloadOffset != 0) {
return e.segmentFile().buffer().position(segmentPayloadOffset);
@@ -124,6 +126,42 @@ class RaftLogCheckpointer {
return null;
}
+ /**
+ * Returns the lowest log index for the given group present in the
checkpoint queue or {@code -1} if no such index exists.
+ */
+ long firstLogIndex(long groupId) {
+ Iterator<Entry> it = queue.tailIterator();
+
+ long firstIndex = -1;
+
+ while (it.hasNext()) {
+ SegmentInfo segmentInfo =
it.next().memTable().segmentInfo(groupId);
+
+ if (segmentInfo != null) {
+ firstIndex = segmentInfo.firstLogIndex();
+ }
+ }
+
+ return firstIndex;
+ }
+
+ /**
+ * Returns the highest log index for the given group present in the
checkpoint queue or {@code -1} if no such index exists.
+ */
+ long lastLogIndex(long groupId) {
+ Iterator<Entry> it = queue.tailIterator();
+
+ while (it.hasNext()) {
+ SegmentInfo segmentInfo =
it.next().memTable().segmentInfo(groupId);
+
+ if (segmentInfo != null) {
+ return segmentInfo.lastLogIndex();
+ }
+ }
+
+ return -1;
+ }
+
private class CheckpointTask implements Runnable {
@Override
public void run() {
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/ReadModeIndexMemTable.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/ReadModeIndexMemTable.java
index 43f425dec18..ed5f5100d37 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/ReadModeIndexMemTable.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/ReadModeIndexMemTable.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.raft.storage.segstore;
import java.util.Iterator;
import java.util.Map.Entry;
+import org.jetbrains.annotations.Nullable;
/**
* Immutable version of an index memtable used by the {@link
RaftLogCheckpointer}.
@@ -27,10 +28,9 @@ import java.util.Map.Entry;
*/
interface ReadModeIndexMemTable {
/**
- * Returns the offset in the segment file where the log entry with the
given {@code logIndex} is stored or {@code 0} if the log entry
- * was not found in the memtable.
+ * Returns information about a segment file for the given group ID or
{@code null} if it is not present in this memtable.
*/
- int getSegmentFileOffset(long groupId, long logIndex);
+ @Nullable SegmentInfo segmentInfo(long groupId);
/**
* Returns an iterator over all {@code Group ID -> SegmentInfo} entries in
this memtable.
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManager.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManager.java
index 0f2a14c5e1b..86a5a211b53 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManager.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManager.java
@@ -216,6 +216,62 @@ class SegmentFileManager implements ManuallyCloseable {
return readFromOtherSegmentFiles(groupId, logIndex);
}
+ /**
+ * Returns the lowest log index for the given group present in the storage
or {@code -1} if no such index exists.
+ */
+ long firstLogIndex(long groupId) {
+ long logIndexFromMemtable = firstLogIndexFromMemtable(groupId);
+
+ long logIndexFromCheckpointQueue = checkpointer.firstLogIndex(groupId);
+
+ long logIndexFromIndexFiles = indexFileManager.firstLogIndex(groupId);
+
+ if (logIndexFromIndexFiles >= 0) {
+ return logIndexFromIndexFiles;
+ }
+
+ if (logIndexFromCheckpointQueue >= 0) {
+ return logIndexFromCheckpointQueue;
+ }
+
+ return logIndexFromMemtable;
+ }
+
+ private long firstLogIndexFromMemtable(long groupId) {
+ SegmentFileWithMemtable currentSegmentFile =
this.currentSegmentFile.get();
+
+ SegmentInfo segmentInfo =
currentSegmentFile.memtable().segmentInfo(groupId);
+
+ return segmentInfo == null ? -1 : segmentInfo.firstLogIndex();
+ }
+
+ /**
+ * Returns the highest log index for the given group present in the
storage or {@code -1} if no such index exists.
+ */
+ long lastLogIndex(long groupId) {
+ long logIndexFromMemtable = lastLogIndexFromMemtable(groupId);
+
+ if (logIndexFromMemtable >= 0) {
+ return logIndexFromMemtable;
+ }
+
+ long logIndexFromCheckpointQueue = checkpointer.lastLogIndex(groupId);
+
+ if (logIndexFromCheckpointQueue >= 0) {
+ return logIndexFromCheckpointQueue;
+ }
+
+ return indexFileManager.lastLogIndex(groupId);
+ }
+
+ private long lastLogIndexFromMemtable(long groupId) {
+ SegmentFileWithMemtable currentSegmentFile =
this.currentSegmentFile.get();
+
+ SegmentInfo segmentInfo =
currentSegmentFile.memtable().segmentInfo(groupId);
+
+ return segmentInfo == null ? -1 : segmentInfo.lastLogIndex();
+ }
+
/**
* Returns the current segment file possibly waiting for an ongoing
rollover to complete.
*/
@@ -309,7 +365,9 @@ class SegmentFileManager implements ManuallyCloseable {
private @Nullable ByteBuffer readFromCurrentSegmentFile(long groupId, long
logIndex) {
SegmentFileWithMemtable currentSegmentFile =
this.currentSegmentFile.get();
- int segmentPayloadOffset =
currentSegmentFile.memtable().getSegmentFileOffset(groupId, logIndex);
+ SegmentInfo segmentInfo =
currentSegmentFile.memtable().segmentInfo(groupId);
+
+ int segmentPayloadOffset = segmentInfo == null ? 0 :
segmentInfo.getOffset(logIndex);
if (segmentPayloadOffset == 0) {
return null;
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegstoreLogStorage.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegstoreLogStorage.java
index d15d6f4b0e5..d1ac329e082 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegstoreLogStorage.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegstoreLogStorage.java
@@ -83,12 +83,16 @@ class SegstoreLogStorage implements LogStorage {
@Override
public long getFirstLogIndex() {
- throw new UnsupportedOperationException();
+ long firstLogIndex = segmentFileManager.firstLogIndex(groupId);
+
+ return firstLogIndex >= 0 ? firstLogIndex : 1;
}
@Override
public long getLastLogIndex() {
- throw new UnsupportedOperationException();
+ long lastLogIndex = segmentFileManager.lastLogIndex(groupId);
+
+ return lastLogIndex >= 0 ? lastLogIndex : 0;
}
@Override
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/WriteModeIndexMemTable.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/WriteModeIndexMemTable.java
index 3a85db11a5a..1901d1d629c 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/WriteModeIndexMemTable.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/WriteModeIndexMemTable.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.raft.storage.segstore;
+import org.jetbrains.annotations.Nullable;
+
/**
* Mutable index memtable.
*
@@ -28,10 +30,9 @@ package org.apache.ignite.internal.raft.storage.segstore;
*/
interface WriteModeIndexMemTable {
/**
- * Returns the offset in the segment file where the log entry with the
given {@code logIndex} is stored or {@code 0} if the log entry
- * was not found in the memtable.
+ * Returns information about a segment file for the given group ID or
{@code null} if it is not present in this memtable.
*/
- int getSegmentFileOffset(long groupId, long logIndex);
+ @Nullable SegmentInfo segmentInfo(long groupId);
/**
* Appends a new segment file offset to the memtable.
diff --git
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/CheckpointQueueTest.java
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/CheckpointQueueTest.java
index e4d0c7f7290..53a5dc11396 100644
---
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/CheckpointQueueTest.java
+++
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/CheckpointQueueTest.java
@@ -182,7 +182,11 @@ class CheckpointQueueTest extends BaseIgniteAbstractTest {
for (int i = 0; i < numEntries; i++) {
ReadModeIndexMemTable mockTable =
mock(ReadModeIndexMemTable.class);
- when(mockTable.getSegmentFileOffset(anyLong(),
anyLong())).thenReturn(i);
+ var segmentInfo = new SegmentInfo(0);
+
+ segmentInfo.addOffset(0, i);
+
+ when(mockTable.segmentInfo(anyLong())).thenReturn(segmentInfo);
queue.add(segmentFile, mockTable);
}
@@ -192,7 +196,7 @@ class CheckpointQueueTest extends BaseIgniteAbstractTest {
for (int i = 0; i < numEntries; i++) {
Entry entry = queue.peekHead();
- assertThat(entry.memTable().getSegmentFileOffset(0, 0), is(i));
+ assertThat(entry.memTable().segmentInfo(0).getOffset(0),
is(i));
queue.removeHead();
}
@@ -211,7 +215,11 @@ class CheckpointQueueTest extends BaseIgniteAbstractTest {
for (int i = 0; i < numEntries; i++) {
ReadModeIndexMemTable mockTable =
mock(ReadModeIndexMemTable.class);
- when(mockTable.getSegmentFileOffset(anyLong(),
anyLong())).thenReturn(i);
+ var segmentInfo = new SegmentInfo(0);
+
+ segmentInfo.addOffset(0, i);
+
+ when(mockTable.segmentInfo(anyLong())).thenReturn(segmentInfo);
queue.add(segmentFile, mockTable);
}
@@ -221,7 +229,7 @@ class CheckpointQueueTest extends BaseIgniteAbstractTest {
for (int i = 0; i < numEntries; i++) {
Entry entry = queue.peekHead();
- assertThat(entry.memTable().getSegmentFileOffset(0, 0), is(i));
+ assertThat(entry.memTable().segmentInfo(0).getOffset(0),
is(i));
queue.removeHead();
}
@@ -238,7 +246,7 @@ class CheckpointQueueTest extends BaseIgniteAbstractTest {
while (iterator.hasNext()) {
Entry entry = iterator.next();
- int offset = entry.memTable().getSegmentFileOffset(0, 0);
+ int offset = entry.memTable().segmentInfo(0).getOffset(0);
// Offsets must be in sequential decreasing order.
if (prevOffset != 0) {
diff --git
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/IndexMemTableTest.java
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/IndexMemTableTest.java
index d120c33ee36..89856a8cbd7 100644
---
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/IndexMemTableTest.java
+++
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/IndexMemTableTest.java
@@ -21,6 +21,7 @@ import static
org.apache.ignite.internal.testframework.IgniteTestUtils.runRace;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.either;
import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.nullValue;
import java.util.ArrayList;
import java.util.Iterator;
@@ -42,10 +43,10 @@ class IndexMemTableTest extends BaseIgniteAbstractTest {
memTable.appendSegmentFileOffset(1, 0, 3);
memTable.appendSegmentFileOffset(1, 1, 4);
- assertThat(memTable.getSegmentFileOffset(0, 0), is(1));
- assertThat(memTable.getSegmentFileOffset(0, 1), is(2));
- assertThat(memTable.getSegmentFileOffset(1, 0), is(3));
- assertThat(memTable.getSegmentFileOffset(1, 1), is(4));
+ assertThat(memTable.segmentInfo(0).getOffset(0), is(1));
+ assertThat(memTable.segmentInfo(0).getOffset(1), is(2));
+ assertThat(memTable.segmentInfo(1).getOffset(0), is(3));
+ assertThat(memTable.segmentInfo(1).getOffset(1), is(4));
assertThat(memTable.numGroups(), is(2));
}
@@ -54,9 +55,10 @@ class IndexMemTableTest extends BaseIgniteAbstractTest {
void testMissingValue() {
memTable.appendSegmentFileOffset(0, 5, 1);
- assertThat(memTable.getSegmentFileOffset(0, 1), is(0));
- assertThat(memTable.getSegmentFileOffset(0, 5), is(1));
- assertThat(memTable.getSegmentFileOffset(0, 6), is(0));
+ assertThat(memTable.segmentInfo(0).getOffset(1), is(0));
+ assertThat(memTable.segmentInfo(0).getOffset(5), is(1));
+ assertThat(memTable.segmentInfo(0).getOffset(6), is(0));
+ assertThat(memTable.segmentInfo(1), is(nullValue()));
}
@Test
@@ -99,9 +101,11 @@ class IndexMemTableTest extends BaseIgniteAbstractTest {
RunnableX reader = () -> {
for (int i = 0; i < numItems; i++) {
- int offset = memTable.getSegmentFileOffset(0, i);
+ SegmentInfo segmentInfo = memTable.segmentInfo(0);
- assertThat(offset, either(is(i + 1)).or(is(0)));
+ if (segmentInfo != null) {
+ assertThat(segmentInfo.getOffset(i), either(is(i +
1)).or(is(0)));
+ }
}
};
@@ -129,9 +133,11 @@ class IndexMemTableTest extends BaseIgniteAbstractTest {
actions.add(() -> {
for (int j = 0; j < itemsPerGroup; j++) {
- int offset = memTable.getSegmentFileOffset(groupId, j);
+ SegmentInfo segmentInfo = memTable.segmentInfo(groupId);
- assertThat(offset, either(is(j + 1)).or(is(0)));
+ if (segmentInfo != null) {
+ assertThat(segmentInfo.getOffset(j), either(is(j +
1)).or(is(0)));
+ }
}
});
}
@@ -143,7 +149,7 @@ class IndexMemTableTest extends BaseIgniteAbstractTest {
for (int groupId = 0; groupId < STRIPES; groupId++) {
for (int j = 0; j < itemsPerGroup; j++) {
- assertThat(memTable.getSegmentFileOffset(groupId, j), is(j +
1));
+ assertThat(memTable.segmentInfo(groupId).getOffset(j), is(j +
1));
}
}
}
diff --git
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogCheckpointerTest.java
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogCheckpointerTest.java
index aeaa61d95c9..4ef20956321 100644
---
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogCheckpointerTest.java
+++
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogCheckpointerTest.java
@@ -126,7 +126,11 @@ class RaftLogCheckpointerTest extends
BaseIgniteAbstractTest {
IndexMemTable mockMemTable = mock(IndexMemTable.class);
- lenient().when(mockMemTable.getSegmentFileOffset(i,
i)).thenReturn(1);
+ var segmentInfo = new SegmentInfo(i);
+
+ segmentInfo.addOffset(i, 1);
+
+
lenient().when(mockMemTable.segmentInfo(i)).thenReturn(segmentInfo);
checkpointer.onRollover(mockFile, mockMemTable);
}
diff --git
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManagerTest.java
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManagerTest.java
index 9b686e7d974..fe3f9e7d749 100644
---
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManagerTest.java
+++
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManagerTest.java
@@ -57,6 +57,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.IntFunction;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.ignite.internal.failure.NoOpFailureManager;
@@ -342,6 +343,30 @@ class SegmentFileManagerTest extends IgniteAbstractTest {
}
}
+ @Test
+ void testFirstAndLastIndexOnAppend() {
+ int batchSize = FILE_SIZE / 10;
+
+ List<byte[]> batches = randomData(batchSize, 100);
+
+ IntFunction<RunnableX> writerTaskFactory = groupId -> () -> {
+ assertThat(fileManager.firstLogIndex(groupId), is(-1L));
+ assertThat(fileManager.lastLogIndex(groupId), is(-1L));
+
+ for (int i = 0; i < batches.size(); i++) {
+ appendBytes(groupId, batches.get(i), i);
+
+ assertThat(fileManager.firstLogIndex(groupId), is(0L));
+ assertThat(fileManager.lastLogIndex(groupId), is((long) i));
+ }
+
+ assertThat(fileManager.firstLogIndex(groupId), is(0L));
+ assertThat(fileManager.lastLogIndex(groupId), is((long)
(batches.size() - 1)));
+ };
+
+ runRace(writerTaskFactory.apply(0), writerTaskFactory.apply(1));
+ }
+
private Path findSoleSegmentFile() throws IOException {
List<Path> segmentFiles = segmentFiles();
diff --git
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegstoreLogStorageTest.java
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegstoreLogStorageTest.java
index 7ca0f32432b..1187a1113bb 100644
---
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegstoreLogStorageTest.java
+++
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegstoreLogStorageTest.java
@@ -17,265 +17,59 @@
package org.apache.ignite.internal.raft.storage.segstore;
-import static java.util.stream.Collectors.toList;
-import static
org.apache.ignite.internal.raft.storage.segstore.SegmentFileManager.SWITCH_SEGMENT_RECORD;
import static org.apache.ignite.internal.util.IgniteUtils.closeAllManually;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.hasSize;
-import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.notNullValue;
-import static org.hamcrest.Matchers.nullValue;
-import static org.junit.jupiter.api.Assertions.fail;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.when;
import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.SeekableByteChannel;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.stream.IntStream;
-import java.util.stream.Stream;
import org.apache.ignite.internal.failure.NoOpFailureManager;
-import org.apache.ignite.internal.testframework.IgniteAbstractTest;
-import org.apache.ignite.raft.jraft.entity.LogEntry;
-import org.apache.ignite.raft.jraft.entity.LogId;
-import org.apache.ignite.raft.jraft.entity.codec.LogEntryCodecFactory;
-import org.apache.ignite.raft.jraft.entity.codec.LogEntryDecoder;
-import org.apache.ignite.raft.jraft.entity.codec.LogEntryEncoder;
-import org.apache.ignite.raft.jraft.option.LogStorageOptions;
+import org.apache.ignite.raft.jraft.storage.LogStorage;
+import org.apache.ignite.raft.jraft.storage.impl.BaseLogStorageTest;
import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.Mock;
-import org.mockito.junit.jupiter.MockitoExtension;
+import org.junit.jupiter.api.Disabled;
-@ExtendWith(MockitoExtension.class)
-class SegstoreLogStorageTest extends IgniteAbstractTest {
- private static final int SEGMENT_SIZE = 1024;
+class SegstoreLogStorageTest extends BaseLogStorageTest {
+ private static final int SEGMENT_SIZE = 512 * 1024; // Same as in JRaft
tests.
private static final long GROUP_ID = 1000;
private static final String NODE_NAME = "test";
- private SegstoreLogStorage logStorage;
-
private SegmentFileManager segmentFileManager;
- @Mock
- private LogEntryEncoder encoder;
-
- @Mock
- private LogEntryDecoder decoder;
-
- @BeforeEach
- void setUp() throws IOException {
- segmentFileManager = new SegmentFileManager(NODE_NAME, workDir,
SEGMENT_SIZE, 1, new NoOpFailureManager());
-
- logStorage = new SegstoreLogStorage(GROUP_ID, segmentFileManager);
-
- var opts = new LogStorageOptions();
-
- opts.setLogEntryCodecFactory(new LogEntryCodecFactory() {
- @Override
- public LogEntryEncoder encoder() {
- return encoder;
- }
-
- @Override
- public LogEntryDecoder decoder() {
- return decoder;
- }
- });
-
- segmentFileManager.start();
-
- logStorage.init(opts);
- }
-
@AfterEach
void tearDown() throws Exception {
- closeAllManually(
- logStorage == null ? null : logStorage::shutdown,
- segmentFileManager
- );
- }
-
- @Test
- void testAppendEntry() throws IOException {
- byte[] payload = {1, 2, 3, 4, 5};
-
- doAnswer(invocation -> {
- ByteBuffer buffer = invocation.getArgument(0);
-
- buffer.put(payload);
-
- return null;
- }).when(encoder).encode(any(), any());
-
- when(encoder.size(any())).thenAnswer(invocation -> payload.length);
-
- logStorage.appendEntry(new LogEntry());
-
- List<Path> segmentFiles = segmentFiles();
-
- assertThat(segmentFiles, hasSize(1));
-
- try (SeekableByteChannel channel =
Files.newByteChannel(segmentFiles.get(0))) {
- // Skip header.
- channel.position(channel.position() +
SegmentFileManager.HEADER_RECORD.length);
-
- DeserializedSegmentPayload entry =
DeserializedSegmentPayload.fromByteChannel(channel);
-
- assertThat(entry, is(notNullValue()));
- assertThat(entry.groupId(), is(GROUP_ID));
- assertThat(entry.payload(), is(payload));
- }
+ closeAllManually(segmentFileManager);
}
- @Test
- void testAppendEntries() throws IOException {
- List<byte[]> payloads = generateRandomData();
-
- var iteratorEncoder = new LogEntryEncoder() {
- private final Iterator<byte[]> payloadsIterator =
payloads.iterator();
-
- private byte[] nextPayload;
-
- @Override
- public byte[] encode(LogEntry log) {
- return fail("Should not be called.");
- }
-
- @Override
- public void encode(ByteBuffer buffer, LogEntry log) {
- buffer.put(nextPayload);
- }
-
- @Override
- public int size(LogEntry logEntry) {
- nextPayload = payloadsIterator.next();
-
- return nextPayload.length;
- }
- };
-
- doAnswer(invocation -> {
- iteratorEncoder.encode(invocation.getArgument(0),
invocation.getArgument(1));
-
- return null;
- }).when(encoder).encode(any(), any());
-
- when(encoder.size(any())).thenAnswer(invocation ->
iteratorEncoder.size(invocation.getArgument(0)));
+ @Override
+ protected LogStorage newLogStorage() {
+ segmentFileManager = new SegmentFileManager(NODE_NAME, path,
SEGMENT_SIZE, 1, new NoOpFailureManager());
- List<LogEntry> entries = IntStream.range(0, payloads.size())
- .mapToObj(i -> {
- var entry = new LogEntry();
-
- entry.setId(new LogId(i, 0));
-
- return entry;
- })
- .collect(toList());
-
- assertThat(logStorage.appendEntries(entries), is(payloads.size()));
-
- var actualEntries = new
ArrayList<DeserializedSegmentPayload>(payloads.size());
-
- for (Path segmentFile : segmentFiles()) {
- try (SeekableByteChannel channel =
Files.newByteChannel(segmentFile)) {
- // Skip header.
- channel.position(channel.position() +
SegmentFileManager.HEADER_RECORD.length);
-
- long bytesRead = SegmentFileManager.HEADER_RECORD.length;
-
- while (bytesRead < SEGMENT_SIZE -
SWITCH_SEGMENT_RECORD.length) {
- DeserializedSegmentPayload entry =
DeserializedSegmentPayload.fromByteChannel(channel);
-
- if (entry == null) {
- // EOF reached.
- break;
- }
-
- actualEntries.add(entry);
+ logStorage = new SegstoreLogStorage(GROUP_ID, segmentFileManager);
- bytesRead += entry.size();
- }
- }
+ try {
+ segmentFileManager.start();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
}
- for (int i = 0; i < actualEntries.size(); i++) {
- assertThat(actualEntries.get(i).groupId(), is(GROUP_ID));
- assertThat(actualEntries.get(i).payload(), is(payloads.get(i)));
- }
+ return logStorage;
}
- @Test
- void testGetEntry() {
- byte[] payload = {1, 2, 3, 4, 5};
-
- var expectedEntry = new LogEntry();
-
- doAnswer(invocation -> {
- ByteBuffer buffer = invocation.getArgument(0);
-
- buffer.put(payload);
-
- return null;
- }).when(encoder).encode(any(), eq(expectedEntry));
-
- when(encoder.size(any())).thenAnswer(invocation -> payload.length);
-
- when(decoder.decode(payload)).thenReturn(expectedEntry);
-
- logStorage.appendEntry(expectedEntry);
-
- LogEntry actualEntry =
logStorage.getEntry(expectedEntry.getId().getIndex());
-
- assertThat(actualEntry, is(expectedEntry));
-
- LogEntry nonExistingEntry =
logStorage.getEntry(expectedEntry.getId().getIndex() + 1);
-
- assertThat(nonExistingEntry, is(nullValue()));
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-26286")
+ @Override
+ public void testReset() {
+ super.testReset();
}
- private List<Path> segmentFiles() throws IOException {
- try (Stream<Path> files = Files.list(workDir)) {
- return files
- .filter(p ->
p.getFileName().toString().startsWith("segment"))
- .sorted()
- .collect(toList());
- }
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-26285")
+ @Override
+ public void testTruncatePrefix() {
+ super.testTruncatePrefix();
}
- private static List<byte[]> generateRandomData() {
- int bytesToGenerate = SEGMENT_SIZE * 3;
-
- int maxPayloadSize = 100;
-
- var payloads = new ArrayList<byte[]>();
-
- ThreadLocalRandom random = ThreadLocalRandom.current();
-
- while (bytesToGenerate > 0) {
- int payloadSize = random.nextInt(maxPayloadSize);
-
- var payload = new byte[payloadSize];
-
- random.nextBytes(payload);
-
- payloads.add(payload);
-
- bytesToGenerate -= payloadSize;
- }
-
- return payloads;
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-26284")
+ @Override
+ public void testTruncateSuffix() {
+ super.testTruncateSuffix();
}
}