This is an automated email from the ASF dual-hosted git repository.

apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 41a5034b523 IGNITE-26292 Add sync mode to the log storage (#7299)
41a5034b523 is described below

commit 41a5034b523a0fc9bf14eed23ce88d97a782590d
Author: Alexander Polovtcev <[email protected]>
AuthorDate: Wed Dec 24 11:29:41 2025 +0200

    IGNITE-26292 Add sync mode to the log storage (#7299)
---
 .../storage/segstore/MappedByteBufferSyncer.java   | 125 ++++++++++++++++++++
 .../raft/storage/segstore/SegmentFile.java         |  80 ++++++++++++-
 .../raft/storage/segstore/SegmentFileManager.java  |  20 ++--
 .../segstore/SegmentFileManagerGetEntryTest.java   |   3 +
 .../storage/segstore/SegmentFileManagerTest.java   |   5 +
 .../raft/storage/segstore/SegmentFileTest.java     |  79 ++++++++++++-
 .../SegstoreLogStorageConcurrencyTest.java         |   3 +
 .../storage/segstore/SegstoreLogStorageTest.java   |   5 +
 .../raft/storage/segstore/SyncSegmentFileTest.java | 128 +++++++++++++++++++++
 9 files changed, 429 insertions(+), 19 deletions(-)

diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/MappedByteBufferSyncer.java
 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/MappedByteBufferSyncer.java
new file mode 100644
index 00000000000..3b2d417644d
--- /dev/null
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/MappedByteBufferSyncer.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft.storage.segstore;
+
+import static org.apache.ignite.internal.util.IgniteUtils.jdkVersion;
+import static org.apache.ignite.internal.util.IgniteUtils.majorJavaVersion;
+import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
+
+import java.io.FileDescriptor;
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.nio.MappedByteBuffer;
+import org.apache.ignite.internal.lang.IgniteInternalException;
+import org.apache.ignite.internal.util.GridUnsafe;
+
+/**
+ * Class that encapsulates different strategies of syncing a region of a 
memory-mapped byte buffer to the underlying storage for
+ * different JDK versions.
+ */
+abstract class MappedByteBufferSyncer {
+    /**
+     * Forces any changes made to a region of given buffer's content to be 
written to the storage device containing the mapped file. The
+     * region starts at the given {@code index} in this buffer and is {@code 
length} bytes.
+     *
+     * @param buf mmap-ed byte buffer which content should be synced.
+     * @param index The index of the first byte in the buffer region that is 
to be written back to storage; must be non-negative and
+     *         less than {@code capacity()}.
+     * @param length The length of the region in bytes; must be non-negative 
and no larger than {@code capacity() - index}.
+     */
+    abstract void force(MappedByteBuffer buf, int index, int length);
+
+    static MappedByteBufferSyncer createSyncer() {
+        return majorJavaVersion(jdkVersion()) >= 13 ? new Jdk13Syncer() : new 
LegacySyncer();
+    }
+
+    private static class Jdk13Syncer extends MappedByteBufferSyncer {
+        /** {@code MappedByteBuffer#force(int, int)}. */
+        private static final Method force = findMethod("force", int.class, 
int.class);
+
+        @Override
+        public void force(MappedByteBuffer buf, int index, int length) {
+            try {
+                force.invoke(buf, index, length);
+            } catch (IllegalAccessException | InvocationTargetException e) {
+                throw new IgniteInternalException(INTERNAL_ERR, e);
+            }
+        }
+    }
+
+    private static class LegacySyncer extends MappedByteBufferSyncer {
+        /** {@code MappedByteBuffer#fd}. */
+        private static final Field fd = findField("fd");
+
+        /** {@code MappedByteBuffer#force0(FileDescriptor, long, long)}. */
+        private static final Method force0 = findMethod("force0", 
FileDescriptor.class, long.class, long.class);
+
+        /** {@code MappedByteBuffer#mappingOffset()}. */
+        private static final Method mappingOffset = 
findMethod("mappingOffset");
+
+        /** {@code MappedByteBuffer#mappingAddress(long)}. */
+        private static final Method mappingAddress = 
findMethod("mappingAddress", long.class);
+
+        @Override
+        public void force(MappedByteBuffer buf, int index, int length) {
+            try {
+                long mappingOffset = (Long) 
LegacySyncer.mappingOffset.invoke(buf);
+
+                assert mappingOffset == 0 : mappingOffset;
+
+                long mappingAddress = (Long) 
LegacySyncer.mappingAddress.invoke(buf, mappingOffset);
+
+                long alignmentDelta = (mappingAddress + index) % 
GridUnsafe.pageSize();
+
+                // Given an alignment delta calculate the largest page aligned 
address
+                // of the mapping less than or equal to the address of the 
buffer
+                // element identified by the index.
+                long alignedAddress = (mappingAddress + index) - 
alignmentDelta;
+
+                force0.invoke(buf, fd.get(buf), alignedAddress, length + 
alignmentDelta);
+            } catch (IllegalAccessException | InvocationTargetException e) {
+                throw new IgniteInternalException(INTERNAL_ERR, e);
+            }
+        }
+    }
+
+    private static Method findMethod(String name, Class<?>... paramTypes) {
+        try {
+            Method mtd = MappedByteBuffer.class.getDeclaredMethod(name, 
paramTypes);
+
+            mtd.setAccessible(true);
+
+            return mtd;
+        } catch (NoSuchMethodException e) {
+            throw new IgniteInternalException(INTERNAL_ERR, e);
+        }
+    }
+
+    private static Field findField(String name) {
+        try {
+            Field field = MappedByteBuffer.class.getDeclaredField(name);
+
+            field.setAccessible(true);
+
+            return field;
+        } catch (NoSuchFieldException e) {
+            throw new IgniteInternalException(INTERNAL_ERR, e);
+        }
+    }
+}
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFile.java
 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFile.java
index 09bae8ddffb..535a54f7eab 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFile.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFile.java
@@ -45,18 +45,36 @@ class SegmentFile implements ManuallyCloseable {
      */
     private static final int CLOSED_POS_MARKER = -1;
 
+    private static final MappedByteBufferSyncer SYNCER = 
MappedByteBufferSyncer.createSyncer();
+
     private final MappedByteBuffer buffer;
 
+    /** Flag indicating if an fsync call should follow every write to the 
buffer. */
+    private final boolean isSync;
+
+    /** Position of the first non-reserved byte in the buffer. */
     private final AtomicInteger bufferPosition = new AtomicInteger();
 
+    /** Number of concurrent writers to the buffer. */
     private final AtomicInteger numWriters = new AtomicInteger();
 
-    private SegmentFile(RandomAccessFile file) throws IOException {
+    /** Position in the buffer <b>up to which</b> some data has been written, 
but not necessarily synced to the underlying storage. */
+    private volatile int lastWritePosition;
+
+    /** Position in the buffer <b>up to which</b> all written bytes have been 
synced. */
+    private volatile int syncPosition;
+
+    /** Lock used to atomically execute fsync. */
+    private final Object syncLock = new Object();
+
+    private SegmentFile(RandomAccessFile file, boolean isSync) throws 
IOException {
         //noinspection ChannelOpenedButNotSafelyClosed
         buffer = file.getChannel().map(MapMode.READ_WRITE, 0, file.length());
+
+        this.isSync = isSync;
     }
 
-    static SegmentFile createNew(Path path, long fileSize) throws IOException {
+    static SegmentFile createNew(Path path, long fileSize, boolean isSync) 
throws IOException {
         if (fileSize < 0) {
             throw new IllegalArgumentException("File size is negative: " + 
fileSize);
         }
@@ -70,17 +88,17 @@ class SegmentFile implements ManuallyCloseable {
         try (var file = new RandomAccessFile(path.toFile(), "rw")) {
             file.setLength(fileSize);
 
-            return new SegmentFile(file);
+            return new SegmentFile(file, isSync);
         }
     }
 
-    static SegmentFile openExisting(Path path) throws IOException {
+    static SegmentFile openExisting(Path path, boolean isSync) throws 
IOException {
         if (!Files.exists(path)) {
             throw new IllegalArgumentException("File does not exist: " + path);
         }
 
         try (var file = new RandomAccessFile(path.toFile(), "rw")) {
-            return new SegmentFile(file);
+            return new SegmentFile(file, isSync);
         }
     }
 
@@ -91,8 +109,11 @@ class SegmentFile implements ManuallyCloseable {
     class WriteBuffer implements AutoCloseable {
         private final ByteBuffer slice;
 
+        private final int pos;
+
         WriteBuffer(ByteBuffer slice) {
             this.slice = slice;
+            this.pos = slice.position();
         }
 
         ByteBuffer buffer() {
@@ -101,6 +122,17 @@ class SegmentFile implements ManuallyCloseable {
 
         @Override
         public void close() {
+            if (isSync) {
+                // Wait for all previous writes to complete.
+                while (lastWritePosition != pos) {
+                    Thread.onSpinWait();
+                }
+
+                lastWritePosition = slice.limit();
+
+                sync(slice.limit());
+            }
+
             numWriters.decrementAndGet();
         }
     }
@@ -115,6 +147,11 @@ class SegmentFile implements ManuallyCloseable {
         close(bytesToWrite);
     }
 
+    /**
+     * {@inheritDoc}
+     *
+     * <p>Does not execute {@link #sync} on its own, it should be performed 
manually by the caller.
+     */
     @Override
     public void close() {
         close(null);
@@ -133,6 +170,10 @@ class SegmentFile implements ManuallyCloseable {
 
         if (bytesToWrite != null && pos + bytesToWrite.length <= 
buffer.limit()) {
             slice(pos, bytesToWrite.length).put(bytesToWrite);
+
+            lastWritePosition = pos + bytesToWrite.length;
+        } else {
+            lastWritePosition = pos;
         }
     }
 
@@ -163,8 +204,35 @@ class SegmentFile implements ManuallyCloseable {
         }
     }
 
+    int lastWritePosition() {
+        return lastWritePosition;
+    }
+
+    int syncPosition() {
+        return syncPosition;
+    }
+
     void sync() {
-        buffer.force();
+        sync(lastWritePosition);
+    }
+
+    private void sync(int upToPosition) {
+        if (upToPosition <= syncPosition) {
+            return;
+        }
+
+        synchronized (syncLock) {
+            int syncPosition = this.syncPosition;
+
+            if (upToPosition <= syncPosition) {
+                return;
+            }
+
+            //noinspection AccessToStaticFieldLockedOnInstance
+            SYNCER.force(buffer, syncPosition, upToPosition - syncPosition);
+
+            this.syncPosition = upToPosition;
+        }
     }
 
     private @Nullable ByteBuffer reserveBytes(int size) {
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManager.java
 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManager.java
index 04dbb0701fa..c84ab41e3ac 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManager.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManager.java
@@ -43,6 +43,7 @@ import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.raft.configuration.LogStorageConfiguration;
 import org.apache.ignite.internal.raft.configuration.LogStorageView;
+import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
 import 
org.apache.ignite.internal.raft.storage.segstore.EntrySearchResult.SearchOutcome;
 import 
org.apache.ignite.internal.raft.storage.segstore.SegmentFile.WriteBuffer;
 import org.apache.ignite.raft.jraft.entity.LogEntry;
@@ -139,6 +140,8 @@ class SegmentFileManager implements ManuallyCloseable {
     /** Configured maximum log entry size. */
     private final int maxLogEntrySize;
 
+    private final boolean isSync;
+
     /** Lock used to block threads while a rollover is in progress. */
     private final Object rolloverLock = new Object();
 
@@ -159,14 +162,15 @@ class SegmentFileManager implements ManuallyCloseable {
             Path baseDir,
             int stripes,
             FailureProcessor failureProcessor,
+            RaftConfiguration raftConfiguration,
             LogStorageConfiguration storageConfiguration
     ) throws IOException {
         this.segmentFilesDir = baseDir.resolve("segments");
+        this.stripes = stripes;
+        this.isSync = raftConfiguration.fsync().value();
 
         Files.createDirectories(segmentFilesDir);
 
-        this.stripes = stripes;
-
         LogStorageView logStorageView = storageConfiguration.value();
 
         segmentFileSize = toIntExact(logStorageView.segmentFileSizeBytes());
@@ -248,7 +252,7 @@ class SegmentFileManager implements ManuallyCloseable {
     private SegmentFileWithMemtable allocateNewSegmentFile(int fileOrdinal) 
throws IOException {
         Path path = segmentFilesDir.resolve(segmentFileName(fileOrdinal, 0));
 
-        SegmentFile segmentFile = SegmentFile.createNew(path, segmentFileSize);
+        SegmentFile segmentFile = SegmentFile.createNew(path, segmentFileSize, 
isSync);
 
         writeHeader(segmentFile);
 
@@ -260,10 +264,10 @@ class SegmentFileManager implements ManuallyCloseable {
      * "complete" segment files (i.e. those that have experienced a rollover) 
this method is expected to be called on the most recent,
      * possibly incomplete segment file.
      */
-    private static SegmentFileWithMemtable recoverLatestSegmentFile(
+    private SegmentFileWithMemtable recoverLatestSegmentFile(
             Path segmentFilePath, SegmentPayloadParser payloadParser
     ) throws IOException {
-        SegmentFile segmentFile = SegmentFile.openExisting(segmentFilePath);
+        SegmentFile segmentFile = SegmentFile.openExisting(segmentFilePath, 
isSync);
 
         WriteModeIndexMemTable memTable = 
payloadParser.recoverMemtable(segmentFile, segmentFilePath, true);
 
@@ -277,10 +281,10 @@ class SegmentFileManager implements ManuallyCloseable {
      * <p>This method skips CRC validation, because it is used to identify the 
end of incomplete segment files (and, by definition, this can
      * never happen during this method's invocation), not to validate storage 
integrity.
      */
-    private static SegmentFileWithMemtable recoverSegmentFile(
+    private SegmentFileWithMemtable recoverSegmentFile(
             Path segmentFilePath, SegmentPayloadParser payloadParser
     ) throws IOException {
-        SegmentFile segmentFile = SegmentFile.openExisting(segmentFilePath);
+        SegmentFile segmentFile = SegmentFile.openExisting(segmentFilePath, 
isSync);
 
         WriteModeIndexMemTable memTable = 
payloadParser.recoverMemtable(segmentFile, segmentFilePath, false);
 
@@ -555,7 +559,7 @@ class SegmentFileManager implements ManuallyCloseable {
         Path path = 
segmentFilesDir.resolve(segmentFileName(segmentFilePointer.fileOrdinal(), 0));
 
         // TODO: Add a cache for recently accessed segment files, see 
https://issues.apache.org/jira/browse/IGNITE-26622.
-        SegmentFile segmentFile = SegmentFile.openExisting(path);
+        SegmentFile segmentFile = SegmentFile.openExisting(path, isSync);
 
         ByteBuffer buffer = 
segmentFile.buffer().position(segmentFilePointer.payloadOffset());
 
diff --git 
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManagerGetEntryTest.java
 
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManagerGetEntryTest.java
index dbe22480cff..3debc97b513 100644
--- 
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManagerGetEntryTest.java
+++ 
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManagerGetEntryTest.java
@@ -45,6 +45,7 @@ import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguratio
 import org.apache.ignite.internal.failure.NoOpFailureManager;
 import org.apache.ignite.internal.lang.RunnableX;
 import org.apache.ignite.internal.raft.configuration.LogStorageConfiguration;
+import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
 import org.apache.ignite.internal.testframework.IgniteAbstractTest;
 import org.apache.ignite.raft.jraft.entity.LogEntry;
 import org.apache.ignite.raft.jraft.entity.LogId;
@@ -77,6 +78,7 @@ class SegmentFileManagerGetEntryTest extends 
IgniteAbstractTest {
 
     @BeforeEach
     void setUp(
+            @InjectConfiguration RaftConfiguration raftConfiguration,
             @InjectConfiguration("mock.segmentFileSizeBytes=" + FILE_SIZE)
             LogStorageConfiguration storageConfiguration
     ) throws IOException {
@@ -85,6 +87,7 @@ class SegmentFileManagerGetEntryTest extends 
IgniteAbstractTest {
                 workDir,
                 STRIPES,
                 new NoOpFailureManager(),
+                raftConfiguration,
                 storageConfiguration
         );
 
diff --git 
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManagerTest.java
 
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManagerTest.java
index b5cb4f495bc..f8e35fc3992 100644
--- 
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManagerTest.java
+++ 
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManagerTest.java
@@ -69,6 +69,7 @@ import org.apache.ignite.internal.failure.NoOpFailureManager;
 import org.apache.ignite.internal.lang.IgniteInternalException;
 import org.apache.ignite.internal.lang.RunnableX;
 import org.apache.ignite.internal.raft.configuration.LogStorageConfiguration;
+import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
 import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
 import org.apache.ignite.internal.testframework.IgniteAbstractTest;
 import org.apache.ignite.internal.testframework.InjectExecutorService;
@@ -96,6 +97,9 @@ class SegmentFileManagerTest extends IgniteAbstractTest {
 
     private final FailureManager failureManager = new NoOpFailureManager();
 
+    @InjectConfiguration
+    private RaftConfiguration raftConfiguration;
+
     @InjectConfiguration("mock.segmentFileSizeBytes=" + FILE_SIZE)
     private LogStorageConfiguration storageConfiguration;
 
@@ -114,6 +118,7 @@ class SegmentFileManagerTest extends IgniteAbstractTest {
                 workDir,
                 STRIPES,
                 failureManager,
+                raftConfiguration,
                 storageConfiguration
         );
     }
diff --git 
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileTest.java
 
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileTest.java
index 2c0103ee6cc..1b9b5ad8007 100644
--- 
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileTest.java
+++ 
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileTest.java
@@ -106,17 +106,17 @@ class SegmentFileTest extends IgniteAbstractTest {
 
     @Test
     void testCreateNewConstructorInvariants() {
-        assertThrows(IllegalArgumentException.class, () -> 
SegmentFile.createNew(path, -1));
-        assertThrows(IllegalArgumentException.class, () -> 
SegmentFile.createNew(path, Integer.MAX_VALUE + 1L));
+        assertThrows(IllegalArgumentException.class, () -> 
SegmentFile.createNew(path, -1, false));
+        assertThrows(IllegalArgumentException.class, () -> 
SegmentFile.createNew(path, Integer.MAX_VALUE + 1L, false));
     }
 
     @Test
     void testOpenExistingConstructorInvariants() throws IOException {
-        assertThrows(IllegalArgumentException.class, () -> 
SegmentFile.openExisting(path));
+        assertThrows(IllegalArgumentException.class, () -> 
SegmentFile.openExisting(path, false));
 
         createSegmentFile(1);
 
-        assertDoesNotThrow(() -> SegmentFile.openExisting(path));
+        assertDoesNotThrow(() -> SegmentFile.openExisting(path, false));
     }
 
     /**
@@ -385,8 +385,77 @@ class SegmentFileTest extends IgniteAbstractTest {
         assertThat(readFileContent(offset, bytesForRollover.length), 
is(bytesForRollover));
     }
 
+    @Test
+    void testSync() throws IOException {
+        createSegmentFile(100);
+
+        assertThat(file.syncPosition(), is(0));
+
+        try (WriteBuffer ignored = file.reserve(5)) {
+            // No-op.
+        }
+
+        try (WriteBuffer ignored = file.reserve(7)) {
+            // No-op.
+        }
+
+        assertThat(file.syncPosition(), is(0));
+
+        file.close();
+
+        file.sync();
+
+        assertThat(file.syncPosition(), is(12));
+    }
+
+    @Test
+    void testSyncWithRollover() throws IOException {
+        createSegmentFile(100);
+
+        assertThat(file.syncPosition(), is(0));
+
+        try (WriteBuffer ignored = file.reserve(5)) {
+            // No-op.
+        }
+
+        try (WriteBuffer ignored = file.reserve(7)) {
+            // No-op.
+        }
+
+        assertThat(file.syncPosition(), is(0));
+
+        file.closeForRollover(new byte[] {1, 2, 3});
+
+        file.sync();
+
+        assertThat(file.syncPosition(), is(15));
+    }
+
+    @Test
+    void testSyncWithNotEnoughSpaceForRollover() throws IOException {
+        createSegmentFile(13);
+
+        assertThat(file.syncPosition(), is(0));
+
+        try (WriteBuffer ignored = file.reserve(5)) {
+            // No-op.
+        }
+
+        try (WriteBuffer ignored = file.reserve(7)) {
+            // No-op.
+        }
+
+        assertThat(file.syncPosition(), is(0));
+
+        file.closeForRollover(new byte[] {1, 2, 3});
+
+        file.sync();
+
+        assertThat(file.syncPosition(), is(12));
+    }
+
     private void createSegmentFile(int size) throws IOException {
-        file = SegmentFile.createNew(path, size);
+        file = SegmentFile.createNew(path, size, false);
     }
 
     private boolean writeToSegmentFile(byte[] bytes) {
diff --git 
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegstoreLogStorageConcurrencyTest.java
 
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegstoreLogStorageConcurrencyTest.java
index e6da6d236df..98053c13787 100644
--- 
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegstoreLogStorageConcurrencyTest.java
+++ 
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegstoreLogStorageConcurrencyTest.java
@@ -30,6 +30,7 @@ import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguratio
 import org.apache.ignite.internal.failure.NoOpFailureManager;
 import org.apache.ignite.internal.lang.RunnableX;
 import org.apache.ignite.internal.raft.configuration.LogStorageConfiguration;
+import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
 import org.apache.ignite.internal.testframework.IgniteAbstractTest;
 import org.apache.ignite.raft.jraft.conf.ConfigurationManager;
 import org.apache.ignite.raft.jraft.entity.LogEntry;
@@ -51,6 +52,7 @@ class SegstoreLogStorageConcurrencyTest extends 
IgniteAbstractTest {
 
     @BeforeEach
     void setUp(
+            @InjectConfiguration RaftConfiguration raftConfiguration,
             @InjectConfiguration("mock.segmentFileSizeBytes=" + SEGMENT_SIZE)
             LogStorageConfiguration storageConfiguration
     ) throws IOException {
@@ -59,6 +61,7 @@ class SegstoreLogStorageConcurrencyTest extends 
IgniteAbstractTest {
                 workDir,
                 1,
                 new NoOpFailureManager(),
+                raftConfiguration,
                 storageConfiguration
         );
 
diff --git 
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegstoreLogStorageTest.java
 
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegstoreLogStorageTest.java
index b48f14ad667..c058d46078d 100644
--- 
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegstoreLogStorageTest.java
+++ 
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegstoreLogStorageTest.java
@@ -26,6 +26,7 @@ import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExten
 import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
 import org.apache.ignite.internal.failure.NoOpFailureManager;
 import org.apache.ignite.internal.raft.configuration.LogStorageConfiguration;
+import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
 import org.apache.ignite.raft.jraft.storage.LogStorage;
 import org.apache.ignite.raft.jraft.storage.impl.BaseLogStorageTest;
 import org.apache.ignite.raft.jraft.test.TestUtils;
@@ -44,6 +45,9 @@ class SegstoreLogStorageTest extends BaseLogStorageTest {
 
     private SegmentFileManager segmentFileManager;
 
+    @InjectConfiguration
+    private RaftConfiguration raftConfiguration;
+
     @InjectConfiguration("mock.segmentFileSizeBytes=" + SEGMENT_SIZE)
     private LogStorageConfiguration storageConfiguration;
 
@@ -60,6 +64,7 @@ class SegstoreLogStorageTest extends BaseLogStorageTest {
                     path,
                     1,
                     new NoOpFailureManager(),
+                    raftConfiguration,
                     storageConfiguration
             );
 
diff --git 
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SyncSegmentFileTest.java
 
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SyncSegmentFileTest.java
new file mode 100644
index 00000000000..ebd02ae7146
--- /dev/null
+++ 
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SyncSegmentFileTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft.storage.segstore;
+
+import static java.util.Collections.nCopies;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.runRace;
+import static org.apache.ignite.internal.util.IgniteUtils.closeAllManually;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.internal.lang.RunnableX;
+import 
org.apache.ignite.internal.raft.storage.segstore.SegmentFile.WriteBuffer;
+import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
+import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.RepeatedTest;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+@ExtendWith(ExecutorServiceExtension.class)
+class SyncSegmentFileTest extends IgniteAbstractTest {
+    private static final String FILE_NAME = "test.bin";
+
+    private static final int size = 1024;
+
+    private SegmentFile file;
+
+    @BeforeEach
+    void setUp() throws IOException {
+        Path path = workDir.resolve(FILE_NAME);
+
+        file = SegmentFile.createNew(path, size, true);
+    }
+
+    @AfterEach
+    void tearDown() throws Exception {
+        closeAllManually(file);
+    }
+
+    @Test
+    void testLastWrittenIncreaseOrder() {
+        assertThat(file.lastWritePosition(), is(0));
+        assertThat(file.syncPosition(), is(0));
+
+        try (WriteBuffer ignored = file.reserve(10)) {
+            assertThat(file.lastWritePosition(), is(0));
+            assertThat(file.syncPosition(), is(0));
+        }
+
+        assertThat(file.lastWritePosition(), is(10));
+        assertThat(file.syncPosition(), is(10));
+    }
+
+    @Test
+    void testLastWrittenMultithreadedIncreaseOrder() {
+        var task1ReservedBuffer = new CountDownLatch(1);
+        var task1ReleasedBuffer = new CountDownLatch(1);
+        var task2ReservedBuffer = new CountDownLatch(1);
+
+        RunnableX task1 = () -> {
+            try (WriteBuffer ignored = file.reserve(10)) {
+                task1ReservedBuffer.countDown();
+                task2ReservedBuffer.await(1, TimeUnit.SECONDS);
+            }
+
+            assertThat(file.lastWritePosition(), is(10));
+            assertThat(file.syncPosition(), is(10));
+
+            task1ReleasedBuffer.countDown();
+        };
+
+        RunnableX task2 = () -> {
+            task1ReservedBuffer.await(1, TimeUnit.SECONDS);
+
+            try (WriteBuffer ignored = file.reserve(10)) {
+                task2ReservedBuffer.countDown();
+                task1ReleasedBuffer.await(1, TimeUnit.SECONDS);
+            }
+
+            assertThat(file.lastWritePosition(), is(20));
+            assertThat(file.syncPosition(), is(20));
+        };
+
+        runRace(task1, task2);
+    }
+
+    @RepeatedTest(10)
+    void testSyncMultithreadedTest() {
+        int chunkSize = 32;
+
+        int numThreads = 4;
+
+        int numChunksPerThread = size / (chunkSize * numThreads);
+
+        RunnableX writer = () -> {
+            for (int i = 0; i < numChunksPerThread; i++) {
+                try (WriteBuffer ignored = file.reserve(chunkSize)) {
+                    // No-op.
+                }
+            }
+        };
+
+        runRace(nCopies(numThreads, writer).toArray(RunnableX[]::new));
+
+        assertThat(file.lastWritePosition(), is(size));
+        assertThat(file.syncPosition(), is(size));
+    }
+}

Reply via email to