This is an automated email from the ASF dual-hosted git repository.

apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 67875f7deec IGNITE-26279 Implement Checkpointer for the new log 
storage (#6624)
67875f7deec is described below

commit 67875f7deec44629692c7f89638a71b52ae9be99
Author: Alexander Polovtcev <[email protected]>
AuthorDate: Mon Sep 22 16:59:22 2025 +0300

    IGNITE-26279 Implement Checkpointer for the new log storage (#6624)
---
 .../raft/storage/segstore/CheckpointQueue.java     | 217 ++++++++++++++++++
 .../raft/storage/segstore/RaftLogCheckpointer.java | 108 +++++++++
 .../raft/storage/segstore/SegmentFile.java         |   4 +
 .../raft/storage/segstore/SegmentFileManager.java  |  26 ++-
 .../raft/storage/segstore/CheckpointQueueTest.java | 247 +++++++++++++++++++++
 .../storage/segstore/RaftLogCheckpointerTest.java  |  87 ++++++++
 .../storage/segstore/SegmentFileManagerTest.java   |   8 +-
 .../storage/segstore/SegstoreLogStorageTest.java   |   4 +-
 8 files changed, 695 insertions(+), 6 deletions(-)

diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/CheckpointQueue.java
 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/CheckpointQueue.java
new file mode 100644
index 00000000000..0207775249c
--- /dev/null
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/CheckpointQueue.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft.storage.segstore;
+
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * A blocking queue implementation tailored to be used by the {@link 
RaftLogCheckpointer}.
+ *
+ * <p>This queue only supports the following access scenario:
+ *
+ * <ol>
+ *     <li>One thread writes to the queue at a time, adding new entries via 
the {@link #add} call;</li>
+ *     <li>One thread peeks at the head of the queue via the {@link #peek} 
method and, possibly after some time, removes the head
+ *     using the {@link #removeHead} method. Since only one thread performs 
these actions, there's no need for additional synchronization
+ *     between the {@code peek} and {@code removeHead} calls.</li>
+ *     <li>Multiple threads can read from the queue using the {@link 
#tailIterator} method.</li>
+ * </ol>
+ */
+class CheckpointQueue {
+    static class Entry {
+        private final SegmentFile segmentFile;
+
+        private final IndexMemTable memTable;
+
+        @Nullable
+        private volatile Entry next;
+
+        @Nullable
+        private volatile Entry prev;
+
+        Entry(SegmentFile segmentFile, IndexMemTable memTable) {
+            this.segmentFile = segmentFile;
+            this.memTable = memTable;
+        }
+
+        SegmentFile segmentFile() {
+            return segmentFile;
+        }
+
+        IndexMemTable memTable() {
+            return memTable;
+        }
+    }
+
+    private final int maxSize;
+
+    private final Lock lock = new ReentrantLock();
+
+    private final Condition notEmpty = lock.newCondition();
+
+    private final Condition notFull = lock.newCondition();
+
+    /**
+     * Pointer to the first entry (oldest in terms of being added) in the 
queue.
+     *
+     * <p>Multi-threaded access is guarded by {@link #lock}.
+     */
+    @Nullable
+    private Entry head;
+
+    /**
+     * Pointer to the last entry (newest in terms of being added) in the queue.
+     *
+     * <p>This field is intentionally left volatile to be used in {@link 
#tailIterator} method.
+     */
+    @Nullable
+    private volatile Entry tail;
+
+    /**
+     * Number of entries in the queue.
+     *
+     * <p>Multi-threaded access is guarded by {@link #lock}.
+     */
+    private int size;
+
+    CheckpointQueue(int maxSize) {
+        this.maxSize = maxSize;
+    }
+
+    /**
+     * Adds a new entry to the queue, blocking if the queue is full.
+     */
+    void add(SegmentFile segmentFile, IndexMemTable memTable) throws 
InterruptedException {
+        var newEntry = new Entry(segmentFile, memTable);
+
+        lock.lock();
+
+        try {
+            while (size == maxSize) {
+                notFull.await();
+            }
+
+            Entry curTail = tail;
+
+            if (curTail == null) {
+                // The queue was empty.
+                head = newEntry;
+            } else {
+                // Append the new entry to the tail of the queue.
+                curTail.next = newEntry;
+
+                newEntry.prev = curTail;
+            }
+
+            tail = newEntry;
+
+            size++;
+
+            notEmpty.signalAll();
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * Returns the head element of the queue (not removing it), blocking if 
the queue is empty.
+     */
+    Entry peek() throws InterruptedException {
+        lock.lock();
+
+        try {
+            while (size == 0) {
+                notEmpty.await();
+            }
+
+            Entry head = this.head;
+
+            assert head != null;
+
+            return head;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * Removes the head element of the queue.
+     *
+     * <p>This method must only be called after a successful call to {@link 
#peek}.
+     */
+    void removeHead() {
+        lock.lock();
+
+        try {
+            Entry head = this.head;
+
+            // This method must only be called in conjunction with peek(), so 
we expect the queue to be non-empty.
+            assert head != null;
+
+            Entry nextHead = head.next;
+
+            if (nextHead == null) {
+                // The queue is empty now.
+                tail = null;
+            } else {
+                nextHead.prev = null;
+            }
+
+            this.head = nextHead;
+
+            size--;
+
+            notFull.signalAll();
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * Returns an iterator that iterates over the entries in the queue from 
the tail to the head (from newest entries to oldest).
+     */
+    Iterator<Entry> tailIterator() {
+        return new Iterator<>() {
+            @Nullable
+            private Entry curEntry = tail;
+
+            @Override
+            public boolean hasNext() {
+                return curEntry != null;
+            }
+
+            @Override
+            public Entry next() {
+                if (!hasNext()) {
+                    throw new NoSuchElementException();
+                }
+
+                Entry result = curEntry;
+
+                curEntry = curEntry.prev;
+
+                return result;
+            }
+        };
+    }
+}
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogCheckpointer.java
 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogCheckpointer.java
new file mode 100644
index 00000000000..84751f1d743
--- /dev/null
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogCheckpointer.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft.storage.segstore;
+
+import static org.apache.ignite.lang.ErrorGroups.Marshalling.COMMON_ERR;
+
+import org.apache.ignite.internal.lang.IgniteInternalException;
+import org.apache.ignite.internal.thread.IgniteThread;
+
+/**
+ * Class responsible for running periodic checkpoint tasks.
+ *
+ * <p>A checkpoint is triggered by the {@link SegmentFileManager} when segment 
file rollover occurs. Upon rollover, the following happens:
+ *
+ * <ol>
+ *     <li>{@code SegmentFileManager} notifies the Checkpointer by providing 
the segment file that got full and the {@link IndexMemTable}
+ *     for this file;</li>
+ *     <li>Checkpointer adds a task to the tail of the {@link 
CheckpointQueue}. If the queue is full, the caller thread is blocked until
+ *     free space is available in the queue.</li>
+ *     <li>Checkpoint thread polls the head of the queue and performs the 
following actions:
+ *          <ol>
+ *              <li>Syncs the segment file;</li>
+ *              <li>Persists the memtable into an index file;</li>
+ *              <li>Creates in-memory metadata structures for the index;</li>
+ *              <li>Removes the mem table from the queue and therefore 
disposes of it.</li>
+ *          </ol>
+ *     </li>
+ * </ol>
+ */
+class RaftLogCheckpointer {
+    // TODO: Move to configuration, see 
https://issues.apache.org/jira/browse/IGNITE-26476.
+    static final int MEM_TABLE_QUEUE_SIZE = 10;
+
+    private final CheckpointQueue queue = new 
CheckpointQueue(MEM_TABLE_QUEUE_SIZE);
+
+    private final Thread checkpointThread;
+
+    RaftLogCheckpointer(String nodeName) {
+        checkpointThread = new IgniteThread(nodeName, "segstore-checkpoint", 
new CheckpointTask(queue));
+    }
+
+    void start() {
+        checkpointThread.start();
+    }
+
+    void stop() {
+        checkpointThread.interrupt();
+
+        try {
+            checkpointThread.join();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+
+            throw new IgniteInternalException(COMMON_ERR, "Interrupted while 
waiting for the checkpoint thread to finish.", e);
+        }
+    }
+
+    void onRollover(SegmentFile segmentFile, IndexMemTable indexMemTable) {
+        try {
+            queue.add(segmentFile, indexMemTable);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+
+            throw new IgniteInternalException(COMMON_ERR, "Interrupted while 
adding an entry to the checkpoint queue.", e);
+        }
+    }
+
+    private static class CheckpointTask implements Runnable {
+        private final CheckpointQueue queue;
+
+        CheckpointTask(CheckpointQueue queue) {
+            this.queue = queue;
+        }
+
+        @Override
+        public void run() {
+            while (true) {
+                try {
+                    CheckpointQueue.Entry entry = queue.peek();
+
+                    entry.segmentFile().sync();
+
+                    // TODO: Persist the memtable, see 
https://issues.apache.org/jira/browse/IGNITE-26473.
+
+                    queue.removeHead();
+                } catch (InterruptedException e) {
+                    // Interrupt is called on stop.
+                    return;
+                }
+            }
+        }
+    }
+}
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFile.java
 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFile.java
index 86c240585b1..9e38df82c4b 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFile.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFile.java
@@ -171,6 +171,10 @@ class SegmentFile implements ManuallyCloseable {
         }
     }
 
+    void sync() {
+        buffer.force();
+    }
+
     private @Nullable ByteBuffer reserveBytes(int size) {
         while (true) {
             int pos = bufferPosition.get();
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManager.java
 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManager.java
index ad3c4d66553..087638cbf07 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManager.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManager.java
@@ -92,12 +92,24 @@ class SegmentFileManager implements ManuallyCloseable {
     /** Configured size of a segment file. */
     private final long fileSize;
 
+    /** Number of stripes used by the index memtable. Should be equal to the 
number of stripes in the Raft server's Disruptor. */
+    private final int stripes;
+
     /**
      * Current segment file. Can store {@code null} while a rollover is in 
progress or if the file manager has been stopped.
      */
     private final AtomicReference<SegmentFile> currentSegmentFile = new 
AtomicReference<>();
 
-    private final IndexMemTable memTable;
+    /**
+     * Index memtable of the current segment file.
+     *
+     * <p>Multi-threaded visibility is guaranteed by volatile reads or writes 
to the {@link #currentSegmentFile} field.
+     */
+    // TODO: Multi-threaded visibility should probably be revised in 
https://issues.apache.org/jira/browse/IGNITE-26282.
+    @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
+    private IndexMemTable memTable;
+
+    private final RaftLogCheckpointer checkpointer;
 
     /** Lock used to block threads while a rollover is in progress. */
     private final Object rolloverLock = new Object();
@@ -116,18 +128,22 @@ class SegmentFileManager implements ManuallyCloseable {
      */
     private boolean isStopped;
 
-    SegmentFileManager(Path baseDir, long fileSize, int stripes) {
+    SegmentFileManager(String nodeName, Path baseDir, long fileSize, int 
stripes) {
         if (fileSize <= HEADER_RECORD.length) {
             throw new IllegalArgumentException("File size must be greater than 
the header size: " + fileSize);
         }
 
         this.baseDir = baseDir;
         this.fileSize = fileSize;
+        this.stripes = stripes;
 
         memTable = new IndexMemTable(stripes);
+        checkpointer = new RaftLogCheckpointer(nodeName);
     }
 
     void start() throws IOException {
+        checkpointer.start();
+
         // TODO: implement recovery, see 
https://issues.apache.org/jira/browse/IGNITE-26283.
         currentSegmentFile.set(allocateNewSegmentFile(0));
     }
@@ -230,6 +246,10 @@ class SegmentFileManager implements ManuallyCloseable {
 
             SegmentFile newFile = allocateNewSegmentFile(++curFileIndex);
 
+            checkpointer.onRollover(observedSegmentFile, memTable);
+
+            memTable = new IndexMemTable(stripes);
+
             currentSegmentFile.set(newFile);
 
             rolloverLock.notifyAll();
@@ -256,6 +276,8 @@ class SegmentFileManager implements ManuallyCloseable {
 
             rolloverLock.notifyAll();
         }
+
+        checkpointer.stop();
     }
 
     private static void writeHeader(SegmentFile segmentFile) {
diff --git 
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/CheckpointQueueTest.java
 
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/CheckpointQueueTest.java
new file mode 100644
index 00000000000..7bef444da5c
--- /dev/null
+++ 
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/CheckpointQueueTest.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft.storage.segstore;
+
+import static java.util.concurrent.CompletableFuture.runAsync;
+import static java.util.concurrent.CompletableFuture.supplyAsync;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.runRace;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willTimeoutFast;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.internal.lang.RunnableX;
+import org.apache.ignite.internal.raft.storage.segstore.CheckpointQueue.Entry;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
+import org.apache.ignite.internal.testframework.InjectExecutorService;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+@ExtendWith(ExecutorServiceExtension.class)
+class CheckpointQueueTest extends BaseIgniteAbstractTest {
+    private static final int MAX_QUEUE_SIZE = 10;
+
+    private final CheckpointQueue queue = new CheckpointQueue(MAX_QUEUE_SIZE);
+
+    @Test
+    void testAddPeekRemove(
+            @Mock SegmentFile segmentFile1,
+            @Mock SegmentFile segmentFile2,
+            @Mock IndexMemTable memTable1,
+            @Mock IndexMemTable memTable2
+    ) throws InterruptedException {
+        queue.add(segmentFile1, memTable1);
+        queue.add(segmentFile2, memTable2);
+
+        Entry entry = queue.peek();
+
+        assertThat(entry.segmentFile(), is(segmentFile1));
+        assertThat(entry.memTable(), is(memTable1));
+
+        // Head remains the same if we didn't remove it.
+        entry = queue.peek();
+
+        assertThat(entry.segmentFile(), is(segmentFile1));
+        assertThat(entry.memTable(), is(memTable1));
+
+        queue.removeHead();
+
+        entry = queue.peek();
+
+        assertThat(entry.segmentFile(), is(segmentFile2));
+        assertThat(entry.memTable(), is(memTable2));
+    }
+
+    @Test
+    void testBlockingPeek(
+            @InjectExecutorService(threadCount = 1) ExecutorService executor,
+            @Mock SegmentFile segmentFile,
+            @Mock IndexMemTable memTable
+    ) throws InterruptedException {
+        CompletableFuture<Entry> peekFuture = supplyAsync(() -> {
+            try {
+                return queue.peek();
+            } catch (InterruptedException e) {
+                throw new CompletionException(e);
+            }
+        }, executor);
+
+        assertThat(peekFuture, willTimeoutFast());
+
+        queue.add(segmentFile, memTable);
+
+        assertThat(peekFuture, willCompleteSuccessfully());
+
+        assertThat(peekFuture.join().segmentFile(), is(segmentFile));
+        assertThat(peekFuture.join().memTable(), is(memTable));
+    }
+
+    @Test
+    void testBlockingAdd(
+            @InjectExecutorService(threadCount = 1) ExecutorService executor,
+            @Mock SegmentFile segmentFile,
+            @Mock IndexMemTable memTable
+    ) throws InterruptedException {
+        for (int i = 0; i < MAX_QUEUE_SIZE; i++) {
+            queue.add(segmentFile, memTable);
+        }
+
+        CompletableFuture<Void> addFuture = runAsync(() -> {
+            try {
+                queue.add(segmentFile, memTable);
+            } catch (InterruptedException e) {
+                throw new CompletionException(e);
+            }
+        }, executor);
+
+        assertThat(addFuture, willTimeoutFast());
+
+        queue.removeHead();
+
+        assertThat(addFuture, willCompleteSuccessfully());
+    }
+
+    @Test
+    void testEmptyQueueSearch() {
+        Iterator<Entry> iterator = queue.tailIterator();
+
+        assertThat(iterator.hasNext(), is(false));
+        assertThrows(NoSuchElementException.class, iterator::next);
+    }
+
+    @Test
+    void testSingleThreadedSearch(
+            @Mock SegmentFile segmentFile1,
+            @Mock SegmentFile segmentFile2,
+            @Mock SegmentFile segmentFile3,
+            @Mock IndexMemTable memTable1,
+            @Mock IndexMemTable memTable2,
+            @Mock IndexMemTable memTable3
+    ) throws InterruptedException {
+        queue.add(segmentFile1, memTable1);
+        queue.add(segmentFile2, memTable2);
+        queue.add(segmentFile3, memTable3);
+
+        Iterator<Entry> iterator = queue.tailIterator();
+
+        Entry entry3 = iterator.next();
+        assertThat(entry3.segmentFile(), is(segmentFile3));
+        assertThat(entry3.memTable(), is(memTable3));
+
+        Entry entry2 = iterator.next();
+        assertThat(entry2.segmentFile(), is(segmentFile2));
+        assertThat(entry2.memTable(), is(memTable2));
+
+        Entry entry1 = iterator.next();
+        assertThat(entry1.segmentFile(), is(segmentFile1));
+        assertThat(entry1.memTable(), is(memTable1));
+    }
+
+    @Test
+    void testMultithreadedAddPeek(@Mock SegmentFile segmentFile) {
+        int numEntries = 10_000;
+
+        RunnableX producerTask = () -> {
+            for (int i = 0; i < numEntries; i++) {
+                IndexMemTable mockTable = mock(IndexMemTable.class);
+
+                when(mockTable.getSegmentFileOffset(anyLong(), 
anyLong())).thenReturn(i);
+
+                queue.add(segmentFile, mockTable);
+            }
+        };
+
+        RunnableX consumerTask = () -> {
+            for (int i = 0; i < numEntries; i++) {
+                Entry entry = queue.peek();
+
+                assertThat(entry.memTable().getSegmentFileOffset(0, 0), is(i));
+
+                queue.removeHead();
+            }
+        };
+
+        runRace(producerTask, consumerTask);
+    }
+
+    @Test
+    void testMultiThreadedSearch(@Mock SegmentFile segmentFile) {
+        var isDone = new AtomicBoolean(false);
+
+        int numEntries = 10_000;
+
+        RunnableX producerTask = () -> {
+            for (int i = 0; i < numEntries; i++) {
+                IndexMemTable mockTable = mock(IndexMemTable.class);
+
+                when(mockTable.getSegmentFileOffset(anyLong(), 
anyLong())).thenReturn(i);
+
+                queue.add(segmentFile, mockTable);
+            }
+        };
+
+        RunnableX consumerTask = () -> {
+            for (int i = 0; i < numEntries; i++) {
+                Entry entry = queue.peek();
+
+                assertThat(entry.memTable().getSegmentFileOffset(0, 0), is(i));
+
+                queue.removeHead();
+            }
+
+            isDone.set(true);
+        };
+
+        RunnableX searchTask = () -> {
+            while (!isDone.get()) {
+                Iterator<Entry> iterator = queue.tailIterator();
+
+                int prevOffset = 0;
+
+                while (iterator.hasNext()) {
+                    Entry entry = iterator.next();
+
+                    int offset = entry.memTable().getSegmentFileOffset(0, 0);
+
+                    // Offsets must be in sequential decreasing order.
+                    if (prevOffset != 0) {
+                        assertThat(offset, is(prevOffset - 1));
+                    }
+
+                    prevOffset = offset;
+                }
+            }
+        };
+
+        runRace(producerTask, consumerTask, searchTask, searchTask, 
searchTask);
+    }
+}
diff --git 
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogCheckpointerTest.java
 
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogCheckpointerTest.java
new file mode 100644
index 00000000000..b2786d8b67e
--- /dev/null
+++ 
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogCheckpointerTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft.storage.segstore;
+
+import static java.util.concurrent.CompletableFuture.runAsync;
+import static 
org.apache.ignite.internal.raft.storage.segstore.RaftLogCheckpointer.MEM_TABLE_QUEUE_SIZE;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willTimeoutFast;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
+import org.apache.ignite.internal.testframework.InjectExecutorService;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+@ExtendWith(ExecutorServiceExtension.class)
+class RaftLogCheckpointerTest extends BaseIgniteAbstractTest {
+    private static final String NODE_NAME = "test";
+
+    private final RaftLogCheckpointer checkpointer = new 
RaftLogCheckpointer(NODE_NAME);
+
+    @BeforeEach
+    void setUp() {
+        checkpointer.start();
+    }
+
+    @AfterEach
+    void tearDown() {
+        checkpointer.stop();
+    }
+
+    @Test
+    void testOnRollover(@Mock SegmentFile segmentFile, @Mock IndexMemTable 
memTable) {
+        checkpointer.onRollover(segmentFile, memTable);
+
+        verify(segmentFile, timeout(500)).sync();
+    }
+
+    @Test
+    void testBlockOnRollover(
+            @Mock SegmentFile segmentFile,
+            @Mock IndexMemTable memTable,
+            @InjectExecutorService(threadCount = 1) ExecutorService executor
+    ) {
+        var blockFuture = new CompletableFuture<Void>();
+
+        doAnswer(invocation -> blockFuture.join()).when(segmentFile).sync();
+
+        for (int i = 0; i < MEM_TABLE_QUEUE_SIZE; i++) {
+            checkpointer.onRollover(segmentFile, memTable);
+        }
+
+        CompletableFuture<Void> addFuture = runAsync(() -> 
checkpointer.onRollover(segmentFile, memTable), executor);
+
+        assertThat(addFuture, willTimeoutFast());
+
+        blockFuture.complete(null);
+
+        assertThat(addFuture, willCompleteSuccessfully());
+    }
+}
diff --git 
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManagerTest.java
 
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManagerTest.java
index 8b22e2c5ccb..219afe12e4a 100644
--- 
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManagerTest.java
+++ 
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManagerTest.java
@@ -75,11 +75,13 @@ class SegmentFileManagerTest extends IgniteAbstractTest {
 
     private static final int STRIPES = 10;
 
+    private static final String NODE_NAME = "test";
+
     private SegmentFileManager fileManager;
 
     @BeforeEach
     void setUp() throws IOException {
-        fileManager = new SegmentFileManager(workDir, FILE_SIZE, STRIPES);
+        fileManager = new SegmentFileManager(NODE_NAME, workDir, FILE_SIZE, 
STRIPES);
 
         fileManager.start();
     }
@@ -92,8 +94,8 @@ class SegmentFileManagerTest extends IgniteAbstractTest {
     @SuppressWarnings("ResultOfObjectAllocationIgnored")
     @Test
     void testConstructorInvariants() {
-        assertThrows(IllegalArgumentException.class, () -> new 
SegmentFileManager(workDir, 0, 1));
-        assertThrows(IllegalArgumentException.class, () -> new 
SegmentFileManager(workDir, 1, 1));
+        assertThrows(IllegalArgumentException.class, () -> new 
SegmentFileManager(NODE_NAME, workDir, 0, 1));
+        assertThrows(IllegalArgumentException.class, () -> new 
SegmentFileManager(NODE_NAME, workDir, 1, 1));
     }
 
     @Test
diff --git 
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegstoreLogStorageTest.java
 
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegstoreLogStorageTest.java
index 69253b8af85..e3af23750ed 100644
--- 
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegstoreLogStorageTest.java
+++ 
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegstoreLogStorageTest.java
@@ -60,6 +60,8 @@ class SegstoreLogStorageTest extends IgniteAbstractTest {
 
     private static final long GROUP_ID = 1000;
 
+    private static final String NODE_NAME = "test";
+
     private SegstoreLogStorage logStorage;
 
     private SegmentFileManager segmentFileManager;
@@ -69,7 +71,7 @@ class SegstoreLogStorageTest extends IgniteAbstractTest {
 
     @BeforeEach
     void setUp() throws IOException {
-        segmentFileManager = new SegmentFileManager(workDir, SEGMENT_SIZE, 1);
+        segmentFileManager = new SegmentFileManager(NODE_NAME, workDir, 
SEGMENT_SIZE, 1);
 
         logStorage = new SegstoreLogStorage(GROUP_ID, segmentFileManager);
 

Reply via email to