This is an automated email from the ASF dual-hosted git repository.
apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 67875f7deec IGNITE-26279 Implement Checkpointer for the new log
storage (#6624)
67875f7deec is described below
commit 67875f7deec44629692c7f89638a71b52ae9be99
Author: Alexander Polovtcev <[email protected]>
AuthorDate: Mon Sep 22 16:59:22 2025 +0300
IGNITE-26279 Implement Checkpointer for the new log storage (#6624)
---
.../raft/storage/segstore/CheckpointQueue.java | 217 ++++++++++++++++++
.../raft/storage/segstore/RaftLogCheckpointer.java | 108 +++++++++
.../raft/storage/segstore/SegmentFile.java | 4 +
.../raft/storage/segstore/SegmentFileManager.java | 26 ++-
.../raft/storage/segstore/CheckpointQueueTest.java | 247 +++++++++++++++++++++
.../storage/segstore/RaftLogCheckpointerTest.java | 87 ++++++++
.../storage/segstore/SegmentFileManagerTest.java | 8 +-
.../storage/segstore/SegstoreLogStorageTest.java | 4 +-
8 files changed, 695 insertions(+), 6 deletions(-)
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/CheckpointQueue.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/CheckpointQueue.java
new file mode 100644
index 00000000000..0207775249c
--- /dev/null
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/CheckpointQueue.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft.storage.segstore;
+
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * A blocking queue implementation tailored to be used by the {@link
RaftLogCheckpointer}.
+ *
+ * <p>This queue only supports the following access scenario:
+ *
+ * <ol>
+ * <li>One thread writes to the queue at a time, adding new entries via
the {@link #add} call;</li>
+ * <li>One thread peeks at the head of the queue via the {@link #peek}
method and, possibly after some time, removes the head
+ * using the {@link #removeHead} method. Since only one thread performs
these actions, there's no need for additional synchronization
+ * between the {@code peek} and {@code removeHead} calls.</li>
+ * <li>Multiple threads can read from the queue using the {@link
#tailIterator} method.</li>
+ * </ol>
+ */
+class CheckpointQueue {
+ static class Entry {
+ private final SegmentFile segmentFile;
+
+ private final IndexMemTable memTable;
+
+ @Nullable
+ private volatile Entry next;
+
+ @Nullable
+ private volatile Entry prev;
+
+ Entry(SegmentFile segmentFile, IndexMemTable memTable) {
+ this.segmentFile = segmentFile;
+ this.memTable = memTable;
+ }
+
+ SegmentFile segmentFile() {
+ return segmentFile;
+ }
+
+ IndexMemTable memTable() {
+ return memTable;
+ }
+ }
+
+ private final int maxSize;
+
+ private final Lock lock = new ReentrantLock();
+
+ private final Condition notEmpty = lock.newCondition();
+
+ private final Condition notFull = lock.newCondition();
+
+ /**
+ * Pointer to the first entry (oldest in terms of being added) in the
queue.
+ *
+ * <p>Multi-threaded access is guarded by {@link #lock}.
+ */
+ @Nullable
+ private Entry head;
+
+ /**
+ * Pointer to the last entry (newest in terms of being added) in the queue.
+ *
+ * <p>This field is intentionally left volatile to be used in {@link
#tailIterator} method.
+ */
+ @Nullable
+ private volatile Entry tail;
+
+ /**
+ * Number of entries in the queue.
+ *
+ * <p>Multi-threaded access is guarded by {@link #lock}.
+ */
+ private int size;
+
+ CheckpointQueue(int maxSize) {
+ this.maxSize = maxSize;
+ }
+
+ /**
+ * Adds a new entry to the queue, blocking if the queue is full.
+ */
+ void add(SegmentFile segmentFile, IndexMemTable memTable) throws
InterruptedException {
+ var newEntry = new Entry(segmentFile, memTable);
+
+ lock.lock();
+
+ try {
+ while (size == maxSize) {
+ notFull.await();
+ }
+
+ Entry curTail = tail;
+
+ if (curTail == null) {
+ // The queue was empty.
+ head = newEntry;
+ } else {
+ // Append the new entry to the tail of the queue.
+ curTail.next = newEntry;
+
+ newEntry.prev = curTail;
+ }
+
+ tail = newEntry;
+
+ size++;
+
+ notEmpty.signalAll();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Returns the head element of the queue (not removing it), blocking if
the queue is empty.
+ */
+ Entry peek() throws InterruptedException {
+ lock.lock();
+
+ try {
+ while (size == 0) {
+ notEmpty.await();
+ }
+
+ Entry head = this.head;
+
+ assert head != null;
+
+ return head;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Removes the head element of the queue.
+ *
+ * <p>This method must only be called after a successful call to {@link
#peek}.
+ */
+ void removeHead() {
+ lock.lock();
+
+ try {
+ Entry head = this.head;
+
+ // This method must only be called in conjunction with peek(), so
we expect the queue to be non-empty.
+ assert head != null;
+
+ Entry nextHead = head.next;
+
+ if (nextHead == null) {
+ // The queue is empty now.
+ tail = null;
+ } else {
+ nextHead.prev = null;
+ }
+
+ this.head = nextHead;
+
+ size--;
+
+ notFull.signalAll();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Returns an iterator that iterates over the entries in the queue from
the tail to the head (from newest entries to oldest).
+ */
+ Iterator<Entry> tailIterator() {
+ return new Iterator<>() {
+ @Nullable
+ private Entry curEntry = tail;
+
+ @Override
+ public boolean hasNext() {
+ return curEntry != null;
+ }
+
+ @Override
+ public Entry next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+
+ Entry result = curEntry;
+
+ curEntry = curEntry.prev;
+
+ return result;
+ }
+ };
+ }
+}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogCheckpointer.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogCheckpointer.java
new file mode 100644
index 00000000000..84751f1d743
--- /dev/null
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogCheckpointer.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft.storage.segstore;
+
+import static org.apache.ignite.lang.ErrorGroups.Marshalling.COMMON_ERR;
+
+import org.apache.ignite.internal.lang.IgniteInternalException;
+import org.apache.ignite.internal.thread.IgniteThread;
+
+/**
+ * Class responsible for running periodic checkpoint tasks.
+ *
+ * <p>A checkpoint is triggered by the {@link SegmentFileManager} when segment
file rollover occurs. Upon rollover, the following happens:
+ *
+ * <ol>
+ * <li>{@code SegmentFileManager} notifies the Checkpointer by providing
the segment file that got full and the {@link IndexMemTable}
+ * for this file;</li>
+ * <li>Checkpointer adds a task to the tail of the {@link
CheckpointQueue}. If the queue is full, the caller thread is blocked until
+ * free space is available in the queue.</li>
+ * <li>Checkpoint thread polls the head of the queue and performs the
following actions:
+ * <ol>
+ * <li>Syncs the segment file;</li>
+ * <li>Persists the memtable into an index file;</li>
+ * <li>Creates in-memory metadata structures for the index;</li>
+ * <li>Removes the mem table from the queue and therefore
disposes of it.</li>
+ * </ol>
+ * </li>
+ * </ol>
+ */
+class RaftLogCheckpointer {
+ // TODO: Move to configuration, see
https://issues.apache.org/jira/browse/IGNITE-26476.
+ static final int MEM_TABLE_QUEUE_SIZE = 10;
+
+ private final CheckpointQueue queue = new
CheckpointQueue(MEM_TABLE_QUEUE_SIZE);
+
+ private final Thread checkpointThread;
+
+ RaftLogCheckpointer(String nodeName) {
+ checkpointThread = new IgniteThread(nodeName, "segstore-checkpoint",
new CheckpointTask(queue));
+ }
+
+ void start() {
+ checkpointThread.start();
+ }
+
+ void stop() {
+ checkpointThread.interrupt();
+
+ try {
+ checkpointThread.join();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+
+ throw new IgniteInternalException(COMMON_ERR, "Interrupted while
waiting for the checkpoint thread to finish.", e);
+ }
+ }
+
+ void onRollover(SegmentFile segmentFile, IndexMemTable indexMemTable) {
+ try {
+ queue.add(segmentFile, indexMemTable);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+
+ throw new IgniteInternalException(COMMON_ERR, "Interrupted while
adding an entry to the checkpoint queue.", e);
+ }
+ }
+
+ private static class CheckpointTask implements Runnable {
+ private final CheckpointQueue queue;
+
+ CheckpointTask(CheckpointQueue queue) {
+ this.queue = queue;
+ }
+
+ @Override
+ public void run() {
+ while (true) {
+ try {
+ CheckpointQueue.Entry entry = queue.peek();
+
+ entry.segmentFile().sync();
+
+ // TODO: Persist the memtable, see
https://issues.apache.org/jira/browse/IGNITE-26473.
+
+ queue.removeHead();
+ } catch (InterruptedException e) {
+ // Interrupt is called on stop.
+ return;
+ }
+ }
+ }
+ }
+}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFile.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFile.java
index 86c240585b1..9e38df82c4b 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFile.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFile.java
@@ -171,6 +171,10 @@ class SegmentFile implements ManuallyCloseable {
}
}
+ void sync() {
+ buffer.force();
+ }
+
private @Nullable ByteBuffer reserveBytes(int size) {
while (true) {
int pos = bufferPosition.get();
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManager.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManager.java
index ad3c4d66553..087638cbf07 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManager.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManager.java
@@ -92,12 +92,24 @@ class SegmentFileManager implements ManuallyCloseable {
/** Configured size of a segment file. */
private final long fileSize;
+ /** Number of stripes used by the index memtable. Should be equal to the
number of stripes in the Raft server's Disruptor. */
+ private final int stripes;
+
/**
* Current segment file. Can store {@code null} while a rollover is in
progress or if the file manager has been stopped.
*/
private final AtomicReference<SegmentFile> currentSegmentFile = new
AtomicReference<>();
- private final IndexMemTable memTable;
+ /**
+ * Index memtable of the current segment file.
+ *
+ * <p>Multi-threaded visibility is guaranteed by volatile reads or writes
to the {@link #currentSegmentFile} field.
+ */
+ // TODO: Multi-threaded visibility should probably be revised in
https://issues.apache.org/jira/browse/IGNITE-26282.
+ @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
+ private IndexMemTable memTable;
+
+ private final RaftLogCheckpointer checkpointer;
/** Lock used to block threads while a rollover is in progress. */
private final Object rolloverLock = new Object();
@@ -116,18 +128,22 @@ class SegmentFileManager implements ManuallyCloseable {
*/
private boolean isStopped;
- SegmentFileManager(Path baseDir, long fileSize, int stripes) {
+ SegmentFileManager(String nodeName, Path baseDir, long fileSize, int
stripes) {
if (fileSize <= HEADER_RECORD.length) {
throw new IllegalArgumentException("File size must be greater than
the header size: " + fileSize);
}
this.baseDir = baseDir;
this.fileSize = fileSize;
+ this.stripes = stripes;
memTable = new IndexMemTable(stripes);
+ checkpointer = new RaftLogCheckpointer(nodeName);
}
void start() throws IOException {
+ checkpointer.start();
+
// TODO: implement recovery, see
https://issues.apache.org/jira/browse/IGNITE-26283.
currentSegmentFile.set(allocateNewSegmentFile(0));
}
@@ -230,6 +246,10 @@ class SegmentFileManager implements ManuallyCloseable {
SegmentFile newFile = allocateNewSegmentFile(++curFileIndex);
+ checkpointer.onRollover(observedSegmentFile, memTable);
+
+ memTable = new IndexMemTable(stripes);
+
currentSegmentFile.set(newFile);
rolloverLock.notifyAll();
@@ -256,6 +276,8 @@ class SegmentFileManager implements ManuallyCloseable {
rolloverLock.notifyAll();
}
+
+ checkpointer.stop();
}
private static void writeHeader(SegmentFile segmentFile) {
diff --git
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/CheckpointQueueTest.java
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/CheckpointQueueTest.java
new file mode 100644
index 00000000000..7bef444da5c
--- /dev/null
+++
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/CheckpointQueueTest.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft.storage.segstore;
+
+import static java.util.concurrent.CompletableFuture.runAsync;
+import static java.util.concurrent.CompletableFuture.supplyAsync;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.runRace;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willTimeoutFast;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.internal.lang.RunnableX;
+import org.apache.ignite.internal.raft.storage.segstore.CheckpointQueue.Entry;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
+import org.apache.ignite.internal.testframework.InjectExecutorService;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+@ExtendWith(ExecutorServiceExtension.class)
+class CheckpointQueueTest extends BaseIgniteAbstractTest {
+ private static final int MAX_QUEUE_SIZE = 10;
+
+ private final CheckpointQueue queue = new CheckpointQueue(MAX_QUEUE_SIZE);
+
+ @Test
+ void testAddPeekRemove(
+ @Mock SegmentFile segmentFile1,
+ @Mock SegmentFile segmentFile2,
+ @Mock IndexMemTable memTable1,
+ @Mock IndexMemTable memTable2
+ ) throws InterruptedException {
+ queue.add(segmentFile1, memTable1);
+ queue.add(segmentFile2, memTable2);
+
+ Entry entry = queue.peek();
+
+ assertThat(entry.segmentFile(), is(segmentFile1));
+ assertThat(entry.memTable(), is(memTable1));
+
+ // Head remains the same if we didn't remove it.
+ entry = queue.peek();
+
+ assertThat(entry.segmentFile(), is(segmentFile1));
+ assertThat(entry.memTable(), is(memTable1));
+
+ queue.removeHead();
+
+ entry = queue.peek();
+
+ assertThat(entry.segmentFile(), is(segmentFile2));
+ assertThat(entry.memTable(), is(memTable2));
+ }
+
+ @Test
+ void testBlockingPeek(
+ @InjectExecutorService(threadCount = 1) ExecutorService executor,
+ @Mock SegmentFile segmentFile,
+ @Mock IndexMemTable memTable
+ ) throws InterruptedException {
+ CompletableFuture<Entry> peekFuture = supplyAsync(() -> {
+ try {
+ return queue.peek();
+ } catch (InterruptedException e) {
+ throw new CompletionException(e);
+ }
+ }, executor);
+
+ assertThat(peekFuture, willTimeoutFast());
+
+ queue.add(segmentFile, memTable);
+
+ assertThat(peekFuture, willCompleteSuccessfully());
+
+ assertThat(peekFuture.join().segmentFile(), is(segmentFile));
+ assertThat(peekFuture.join().memTable(), is(memTable));
+ }
+
+ @Test
+ void testBlockingAdd(
+ @InjectExecutorService(threadCount = 1) ExecutorService executor,
+ @Mock SegmentFile segmentFile,
+ @Mock IndexMemTable memTable
+ ) throws InterruptedException {
+ for (int i = 0; i < MAX_QUEUE_SIZE; i++) {
+ queue.add(segmentFile, memTable);
+ }
+
+ CompletableFuture<Void> addFuture = runAsync(() -> {
+ try {
+ queue.add(segmentFile, memTable);
+ } catch (InterruptedException e) {
+ throw new CompletionException(e);
+ }
+ }, executor);
+
+ assertThat(addFuture, willTimeoutFast());
+
+ queue.removeHead();
+
+ assertThat(addFuture, willCompleteSuccessfully());
+ }
+
+ @Test
+ void testEmptyQueueSearch() {
+ Iterator<Entry> iterator = queue.tailIterator();
+
+ assertThat(iterator.hasNext(), is(false));
+ assertThrows(NoSuchElementException.class, iterator::next);
+ }
+
+ @Test
+ void testSingleThreadedSearch(
+ @Mock SegmentFile segmentFile1,
+ @Mock SegmentFile segmentFile2,
+ @Mock SegmentFile segmentFile3,
+ @Mock IndexMemTable memTable1,
+ @Mock IndexMemTable memTable2,
+ @Mock IndexMemTable memTable3
+ ) throws InterruptedException {
+ queue.add(segmentFile1, memTable1);
+ queue.add(segmentFile2, memTable2);
+ queue.add(segmentFile3, memTable3);
+
+ Iterator<Entry> iterator = queue.tailIterator();
+
+ Entry entry3 = iterator.next();
+ assertThat(entry3.segmentFile(), is(segmentFile3));
+ assertThat(entry3.memTable(), is(memTable3));
+
+ Entry entry2 = iterator.next();
+ assertThat(entry2.segmentFile(), is(segmentFile2));
+ assertThat(entry2.memTable(), is(memTable2));
+
+ Entry entry1 = iterator.next();
+ assertThat(entry1.segmentFile(), is(segmentFile1));
+ assertThat(entry1.memTable(), is(memTable1));
+ }
+
+ @Test
+ void testMultithreadedAddPeek(@Mock SegmentFile segmentFile) {
+ int numEntries = 10_000;
+
+ RunnableX producerTask = () -> {
+ for (int i = 0; i < numEntries; i++) {
+ IndexMemTable mockTable = mock(IndexMemTable.class);
+
+ when(mockTable.getSegmentFileOffset(anyLong(),
anyLong())).thenReturn(i);
+
+ queue.add(segmentFile, mockTable);
+ }
+ };
+
+ RunnableX consumerTask = () -> {
+ for (int i = 0; i < numEntries; i++) {
+ Entry entry = queue.peek();
+
+ assertThat(entry.memTable().getSegmentFileOffset(0, 0), is(i));
+
+ queue.removeHead();
+ }
+ };
+
+ runRace(producerTask, consumerTask);
+ }
+
+ @Test
+ void testMultiThreadedSearch(@Mock SegmentFile segmentFile) {
+ var isDone = new AtomicBoolean(false);
+
+ int numEntries = 10_000;
+
+ RunnableX producerTask = () -> {
+ for (int i = 0; i < numEntries; i++) {
+ IndexMemTable mockTable = mock(IndexMemTable.class);
+
+ when(mockTable.getSegmentFileOffset(anyLong(),
anyLong())).thenReturn(i);
+
+ queue.add(segmentFile, mockTable);
+ }
+ };
+
+ RunnableX consumerTask = () -> {
+ for (int i = 0; i < numEntries; i++) {
+ Entry entry = queue.peek();
+
+ assertThat(entry.memTable().getSegmentFileOffset(0, 0), is(i));
+
+ queue.removeHead();
+ }
+
+ isDone.set(true);
+ };
+
+ RunnableX searchTask = () -> {
+ while (!isDone.get()) {
+ Iterator<Entry> iterator = queue.tailIterator();
+
+ int prevOffset = 0;
+
+ while (iterator.hasNext()) {
+ Entry entry = iterator.next();
+
+ int offset = entry.memTable().getSegmentFileOffset(0, 0);
+
+ // Offsets must be in sequential decreasing order.
+ if (prevOffset != 0) {
+ assertThat(offset, is(prevOffset - 1));
+ }
+
+ prevOffset = offset;
+ }
+ }
+ };
+
+ runRace(producerTask, consumerTask, searchTask, searchTask,
searchTask);
+ }
+}
diff --git
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogCheckpointerTest.java
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogCheckpointerTest.java
new file mode 100644
index 00000000000..b2786d8b67e
--- /dev/null
+++
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogCheckpointerTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft.storage.segstore;
+
+import static java.util.concurrent.CompletableFuture.runAsync;
+import static
org.apache.ignite.internal.raft.storage.segstore.RaftLogCheckpointer.MEM_TABLE_QUEUE_SIZE;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willTimeoutFast;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
+import org.apache.ignite.internal.testframework.InjectExecutorService;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+@ExtendWith(ExecutorServiceExtension.class)
+class RaftLogCheckpointerTest extends BaseIgniteAbstractTest {
+ private static final String NODE_NAME = "test";
+
+ private final RaftLogCheckpointer checkpointer = new
RaftLogCheckpointer(NODE_NAME);
+
+ @BeforeEach
+ void setUp() {
+ checkpointer.start();
+ }
+
+ @AfterEach
+ void tearDown() {
+ checkpointer.stop();
+ }
+
+ @Test
+ void testOnRollover(@Mock SegmentFile segmentFile, @Mock IndexMemTable
memTable) {
+ checkpointer.onRollover(segmentFile, memTable);
+
+ verify(segmentFile, timeout(500)).sync();
+ }
+
+ @Test
+ void testBlockOnRollover(
+ @Mock SegmentFile segmentFile,
+ @Mock IndexMemTable memTable,
+ @InjectExecutorService(threadCount = 1) ExecutorService executor
+ ) {
+ var blockFuture = new CompletableFuture<Void>();
+
+ doAnswer(invocation -> blockFuture.join()).when(segmentFile).sync();
+
+ for (int i = 0; i < MEM_TABLE_QUEUE_SIZE; i++) {
+ checkpointer.onRollover(segmentFile, memTable);
+ }
+
+ CompletableFuture<Void> addFuture = runAsync(() ->
checkpointer.onRollover(segmentFile, memTable), executor);
+
+ assertThat(addFuture, willTimeoutFast());
+
+ blockFuture.complete(null);
+
+ assertThat(addFuture, willCompleteSuccessfully());
+ }
+}
diff --git
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManagerTest.java
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManagerTest.java
index 8b22e2c5ccb..219afe12e4a 100644
---
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManagerTest.java
+++
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManagerTest.java
@@ -75,11 +75,13 @@ class SegmentFileManagerTest extends IgniteAbstractTest {
private static final int STRIPES = 10;
+ private static final String NODE_NAME = "test";
+
private SegmentFileManager fileManager;
@BeforeEach
void setUp() throws IOException {
- fileManager = new SegmentFileManager(workDir, FILE_SIZE, STRIPES);
+ fileManager = new SegmentFileManager(NODE_NAME, workDir, FILE_SIZE,
STRIPES);
fileManager.start();
}
@@ -92,8 +94,8 @@ class SegmentFileManagerTest extends IgniteAbstractTest {
@SuppressWarnings("ResultOfObjectAllocationIgnored")
@Test
void testConstructorInvariants() {
- assertThrows(IllegalArgumentException.class, () -> new
SegmentFileManager(workDir, 0, 1));
- assertThrows(IllegalArgumentException.class, () -> new
SegmentFileManager(workDir, 1, 1));
+ assertThrows(IllegalArgumentException.class, () -> new
SegmentFileManager(NODE_NAME, workDir, 0, 1));
+ assertThrows(IllegalArgumentException.class, () -> new
SegmentFileManager(NODE_NAME, workDir, 1, 1));
}
@Test
diff --git
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegstoreLogStorageTest.java
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegstoreLogStorageTest.java
index 69253b8af85..e3af23750ed 100644
---
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegstoreLogStorageTest.java
+++
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegstoreLogStorageTest.java
@@ -60,6 +60,8 @@ class SegstoreLogStorageTest extends IgniteAbstractTest {
private static final long GROUP_ID = 1000;
+ private static final String NODE_NAME = "test";
+
private SegstoreLogStorage logStorage;
private SegmentFileManager segmentFileManager;
@@ -69,7 +71,7 @@ class SegstoreLogStorageTest extends IgniteAbstractTest {
@BeforeEach
void setUp() throws IOException {
- segmentFileManager = new SegmentFileManager(workDir, SEGMENT_SIZE, 1);
+ segmentFileManager = new SegmentFileManager(NODE_NAME, workDir,
SEGMENT_SIZE, 1);
logStorage = new SegstoreLogStorage(GROUP_ID, segmentFileManager);