This is an automated email from the ASF dual-hosted git repository.
apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 41a5034b523 IGNITE-26292 Add sync mode to the log storage (#7299)
41a5034b523 is described below
commit 41a5034b523a0fc9bf14eed23ce88d97a782590d
Author: Alexander Polovtcev <[email protected]>
AuthorDate: Wed Dec 24 11:29:41 2025 +0200
IGNITE-26292 Add sync mode to the log storage (#7299)
---
.../storage/segstore/MappedByteBufferSyncer.java | 125 ++++++++++++++++++++
.../raft/storage/segstore/SegmentFile.java | 80 ++++++++++++-
.../raft/storage/segstore/SegmentFileManager.java | 20 ++--
.../segstore/SegmentFileManagerGetEntryTest.java | 3 +
.../storage/segstore/SegmentFileManagerTest.java | 5 +
.../raft/storage/segstore/SegmentFileTest.java | 79 ++++++++++++-
.../SegstoreLogStorageConcurrencyTest.java | 3 +
.../storage/segstore/SegstoreLogStorageTest.java | 5 +
.../raft/storage/segstore/SyncSegmentFileTest.java | 128 +++++++++++++++++++++
9 files changed, 429 insertions(+), 19 deletions(-)
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/MappedByteBufferSyncer.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/MappedByteBufferSyncer.java
new file mode 100644
index 00000000000..3b2d417644d
--- /dev/null
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/MappedByteBufferSyncer.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft.storage.segstore;
+
+import static org.apache.ignite.internal.util.IgniteUtils.jdkVersion;
+import static org.apache.ignite.internal.util.IgniteUtils.majorJavaVersion;
+import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
+
+import java.io.FileDescriptor;
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.nio.MappedByteBuffer;
+import org.apache.ignite.internal.lang.IgniteInternalException;
+import org.apache.ignite.internal.util.GridUnsafe;
+
+/**
+ * Class that encapsulates different strategies of syncing a region of a
memory-mapped byte buffer to the underlying storage for
+ * different JDK versions.
+ */
+abstract class MappedByteBufferSyncer {
+ /**
+ * Forces any changes made to a region of given buffer's content to be
written to the storage device containing the mapped file. The
+ * region starts at the given {@code index} in this buffer and is {@code
length} bytes.
+ *
+ * @param buf mmap-ed byte buffer which content should be synced.
+ * @param index The index of the first byte in the buffer region that is
to be written back to storage; must be non-negative and
+ * less than {@code capacity()}.
+ * @param length The length of the region in bytes; must be non-negative
and no larger than {@code capacity() - index}.
+ */
+ abstract void force(MappedByteBuffer buf, int index, int length);
+
+ static MappedByteBufferSyncer createSyncer() {
+ return majorJavaVersion(jdkVersion()) >= 13 ? new Jdk13Syncer() : new
LegacySyncer();
+ }
+
+ private static class Jdk13Syncer extends MappedByteBufferSyncer {
+ /** {@code MappedByteBuffer#force(int, int)}. */
+ private static final Method force = findMethod("force", int.class,
int.class);
+
+ @Override
+ public void force(MappedByteBuffer buf, int index, int length) {
+ try {
+ force.invoke(buf, index, length);
+ } catch (IllegalAccessException | InvocationTargetException e) {
+ throw new IgniteInternalException(INTERNAL_ERR, e);
+ }
+ }
+ }
+
+ private static class LegacySyncer extends MappedByteBufferSyncer {
+ /** {@code MappedByteBuffer#fd}. */
+ private static final Field fd = findField("fd");
+
+ /** {@code MappedByteBuffer#force0(FileDescriptor, long, long)}. */
+ private static final Method force0 = findMethod("force0",
FileDescriptor.class, long.class, long.class);
+
+ /** {@code MappedByteBuffer#mappingOffset()}. */
+ private static final Method mappingOffset =
findMethod("mappingOffset");
+
+ /** {@code MappedByteBuffer#mappingAddress(long)}. */
+ private static final Method mappingAddress =
findMethod("mappingAddress", long.class);
+
+ @Override
+ public void force(MappedByteBuffer buf, int index, int length) {
+ try {
+ long mappingOffset = (Long)
LegacySyncer.mappingOffset.invoke(buf);
+
+ assert mappingOffset == 0 : mappingOffset;
+
+ long mappingAddress = (Long)
LegacySyncer.mappingAddress.invoke(buf, mappingOffset);
+
+ long alignmentDelta = (mappingAddress + index) %
GridUnsafe.pageSize();
+
+ // Given an alignment delta calculate the largest page aligned
address
+ // of the mapping less than or equal to the address of the
buffer
+ // element identified by the index.
+ long alignedAddress = (mappingAddress + index) -
alignmentDelta;
+
+ force0.invoke(buf, fd.get(buf), alignedAddress, length +
alignmentDelta);
+ } catch (IllegalAccessException | InvocationTargetException e) {
+ throw new IgniteInternalException(INTERNAL_ERR, e);
+ }
+ }
+ }
+
+ private static Method findMethod(String name, Class<?>... paramTypes) {
+ try {
+ Method mtd = MappedByteBuffer.class.getDeclaredMethod(name,
paramTypes);
+
+ mtd.setAccessible(true);
+
+ return mtd;
+ } catch (NoSuchMethodException e) {
+ throw new IgniteInternalException(INTERNAL_ERR, e);
+ }
+ }
+
+ private static Field findField(String name) {
+ try {
+ Field field = MappedByteBuffer.class.getDeclaredField(name);
+
+ field.setAccessible(true);
+
+ return field;
+ } catch (NoSuchFieldException e) {
+ throw new IgniteInternalException(INTERNAL_ERR, e);
+ }
+ }
+}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFile.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFile.java
index 09bae8ddffb..535a54f7eab 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFile.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFile.java
@@ -45,18 +45,36 @@ class SegmentFile implements ManuallyCloseable {
*/
private static final int CLOSED_POS_MARKER = -1;
+ private static final MappedByteBufferSyncer SYNCER =
MappedByteBufferSyncer.createSyncer();
+
private final MappedByteBuffer buffer;
+ /** Flag indicating if an fsync call should follow every write to the
buffer. */
+ private final boolean isSync;
+
+ /** Position of the first non-reserved byte in the buffer. */
private final AtomicInteger bufferPosition = new AtomicInteger();
+ /** Number of concurrent writers to the buffer. */
private final AtomicInteger numWriters = new AtomicInteger();
- private SegmentFile(RandomAccessFile file) throws IOException {
+ /** Position in the buffer <b>up to which</b> some data has been written,
but not necessarily synced to the underlying storage. */
+ private volatile int lastWritePosition;
+
+ /** Position in the buffer <b>up to which</b> all written bytes have been
synced. */
+ private volatile int syncPosition;
+
+ /** Lock used to atomically execute fsync. */
+ private final Object syncLock = new Object();
+
+ private SegmentFile(RandomAccessFile file, boolean isSync) throws
IOException {
//noinspection ChannelOpenedButNotSafelyClosed
buffer = file.getChannel().map(MapMode.READ_WRITE, 0, file.length());
+
+ this.isSync = isSync;
}
- static SegmentFile createNew(Path path, long fileSize) throws IOException {
+ static SegmentFile createNew(Path path, long fileSize, boolean isSync)
throws IOException {
if (fileSize < 0) {
throw new IllegalArgumentException("File size is negative: " +
fileSize);
}
@@ -70,17 +88,17 @@ class SegmentFile implements ManuallyCloseable {
try (var file = new RandomAccessFile(path.toFile(), "rw")) {
file.setLength(fileSize);
- return new SegmentFile(file);
+ return new SegmentFile(file, isSync);
}
}
- static SegmentFile openExisting(Path path) throws IOException {
+ static SegmentFile openExisting(Path path, boolean isSync) throws
IOException {
if (!Files.exists(path)) {
throw new IllegalArgumentException("File does not exist: " + path);
}
try (var file = new RandomAccessFile(path.toFile(), "rw")) {
- return new SegmentFile(file);
+ return new SegmentFile(file, isSync);
}
}
@@ -91,8 +109,11 @@ class SegmentFile implements ManuallyCloseable {
class WriteBuffer implements AutoCloseable {
private final ByteBuffer slice;
+ private final int pos;
+
WriteBuffer(ByteBuffer slice) {
this.slice = slice;
+ this.pos = slice.position();
}
ByteBuffer buffer() {
@@ -101,6 +122,17 @@ class SegmentFile implements ManuallyCloseable {
@Override
public void close() {
+ if (isSync) {
+ // Wait for all previous writes to complete.
+ while (lastWritePosition != pos) {
+ Thread.onSpinWait();
+ }
+
+ lastWritePosition = slice.limit();
+
+ sync(slice.limit());
+ }
+
numWriters.decrementAndGet();
}
}
@@ -115,6 +147,11 @@ class SegmentFile implements ManuallyCloseable {
close(bytesToWrite);
}
+ /**
+ * {@inheritDoc}
+ *
+ * <p>Does not execute {@link #sync} on its own, it should be performed
manually by the caller.
+ */
@Override
public void close() {
close(null);
@@ -133,6 +170,10 @@ class SegmentFile implements ManuallyCloseable {
if (bytesToWrite != null && pos + bytesToWrite.length <=
buffer.limit()) {
slice(pos, bytesToWrite.length).put(bytesToWrite);
+
+ lastWritePosition = pos + bytesToWrite.length;
+ } else {
+ lastWritePosition = pos;
}
}
@@ -163,8 +204,35 @@ class SegmentFile implements ManuallyCloseable {
}
}
+ int lastWritePosition() {
+ return lastWritePosition;
+ }
+
+ int syncPosition() {
+ return syncPosition;
+ }
+
void sync() {
- buffer.force();
+ sync(lastWritePosition);
+ }
+
+ private void sync(int upToPosition) {
+ if (upToPosition <= syncPosition) {
+ return;
+ }
+
+ synchronized (syncLock) {
+ int syncPosition = this.syncPosition;
+
+ if (upToPosition <= syncPosition) {
+ return;
+ }
+
+ //noinspection AccessToStaticFieldLockedOnInstance
+ SYNCER.force(buffer, syncPosition, upToPosition - syncPosition);
+
+ this.syncPosition = upToPosition;
+ }
}
private @Nullable ByteBuffer reserveBytes(int size) {
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManager.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManager.java
index 04dbb0701fa..c84ab41e3ac 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManager.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManager.java
@@ -43,6 +43,7 @@ import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.raft.configuration.LogStorageConfiguration;
import org.apache.ignite.internal.raft.configuration.LogStorageView;
+import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
import
org.apache.ignite.internal.raft.storage.segstore.EntrySearchResult.SearchOutcome;
import
org.apache.ignite.internal.raft.storage.segstore.SegmentFile.WriteBuffer;
import org.apache.ignite.raft.jraft.entity.LogEntry;
@@ -139,6 +140,8 @@ class SegmentFileManager implements ManuallyCloseable {
/** Configured maximum log entry size. */
private final int maxLogEntrySize;
+ private final boolean isSync;
+
/** Lock used to block threads while a rollover is in progress. */
private final Object rolloverLock = new Object();
@@ -159,14 +162,15 @@ class SegmentFileManager implements ManuallyCloseable {
Path baseDir,
int stripes,
FailureProcessor failureProcessor,
+ RaftConfiguration raftConfiguration,
LogStorageConfiguration storageConfiguration
) throws IOException {
this.segmentFilesDir = baseDir.resolve("segments");
+ this.stripes = stripes;
+ this.isSync = raftConfiguration.fsync().value();
Files.createDirectories(segmentFilesDir);
- this.stripes = stripes;
-
LogStorageView logStorageView = storageConfiguration.value();
segmentFileSize = toIntExact(logStorageView.segmentFileSizeBytes());
@@ -248,7 +252,7 @@ class SegmentFileManager implements ManuallyCloseable {
private SegmentFileWithMemtable allocateNewSegmentFile(int fileOrdinal)
throws IOException {
Path path = segmentFilesDir.resolve(segmentFileName(fileOrdinal, 0));
- SegmentFile segmentFile = SegmentFile.createNew(path, segmentFileSize);
+ SegmentFile segmentFile = SegmentFile.createNew(path, segmentFileSize,
isSync);
writeHeader(segmentFile);
@@ -260,10 +264,10 @@ class SegmentFileManager implements ManuallyCloseable {
* "complete" segment files (i.e. those that have experienced a rollover)
this method is expected to be called on the most recent,
* possibly incomplete segment file.
*/
- private static SegmentFileWithMemtable recoverLatestSegmentFile(
+ private SegmentFileWithMemtable recoverLatestSegmentFile(
Path segmentFilePath, SegmentPayloadParser payloadParser
) throws IOException {
- SegmentFile segmentFile = SegmentFile.openExisting(segmentFilePath);
+ SegmentFile segmentFile = SegmentFile.openExisting(segmentFilePath,
isSync);
WriteModeIndexMemTable memTable =
payloadParser.recoverMemtable(segmentFile, segmentFilePath, true);
@@ -277,10 +281,10 @@ class SegmentFileManager implements ManuallyCloseable {
* <p>This method skips CRC validation, because it is used to identify the
end of incomplete segment files (and, by definition, this can
* never happen during this method's invocation), not to validate storage
integrity.
*/
- private static SegmentFileWithMemtable recoverSegmentFile(
+ private SegmentFileWithMemtable recoverSegmentFile(
Path segmentFilePath, SegmentPayloadParser payloadParser
) throws IOException {
- SegmentFile segmentFile = SegmentFile.openExisting(segmentFilePath);
+ SegmentFile segmentFile = SegmentFile.openExisting(segmentFilePath,
isSync);
WriteModeIndexMemTable memTable =
payloadParser.recoverMemtable(segmentFile, segmentFilePath, false);
@@ -555,7 +559,7 @@ class SegmentFileManager implements ManuallyCloseable {
Path path =
segmentFilesDir.resolve(segmentFileName(segmentFilePointer.fileOrdinal(), 0));
// TODO: Add a cache for recently accessed segment files, see
https://issues.apache.org/jira/browse/IGNITE-26622.
- SegmentFile segmentFile = SegmentFile.openExisting(path);
+ SegmentFile segmentFile = SegmentFile.openExisting(path, isSync);
ByteBuffer buffer =
segmentFile.buffer().position(segmentFilePointer.payloadOffset());
diff --git
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManagerGetEntryTest.java
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManagerGetEntryTest.java
index dbe22480cff..3debc97b513 100644
---
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManagerGetEntryTest.java
+++
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManagerGetEntryTest.java
@@ -45,6 +45,7 @@ import
org.apache.ignite.internal.configuration.testframework.InjectConfiguratio
import org.apache.ignite.internal.failure.NoOpFailureManager;
import org.apache.ignite.internal.lang.RunnableX;
import org.apache.ignite.internal.raft.configuration.LogStorageConfiguration;
+import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
import org.apache.ignite.raft.jraft.entity.LogEntry;
import org.apache.ignite.raft.jraft.entity.LogId;
@@ -77,6 +78,7 @@ class SegmentFileManagerGetEntryTest extends
IgniteAbstractTest {
@BeforeEach
void setUp(
+ @InjectConfiguration RaftConfiguration raftConfiguration,
@InjectConfiguration("mock.segmentFileSizeBytes=" + FILE_SIZE)
LogStorageConfiguration storageConfiguration
) throws IOException {
@@ -85,6 +87,7 @@ class SegmentFileManagerGetEntryTest extends
IgniteAbstractTest {
workDir,
STRIPES,
new NoOpFailureManager(),
+ raftConfiguration,
storageConfiguration
);
diff --git
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManagerTest.java
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManagerTest.java
index b5cb4f495bc..f8e35fc3992 100644
---
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManagerTest.java
+++
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManagerTest.java
@@ -69,6 +69,7 @@ import org.apache.ignite.internal.failure.NoOpFailureManager;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.lang.RunnableX;
import org.apache.ignite.internal.raft.configuration.LogStorageConfiguration;
+import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
import org.apache.ignite.internal.testframework.InjectExecutorService;
@@ -96,6 +97,9 @@ class SegmentFileManagerTest extends IgniteAbstractTest {
private final FailureManager failureManager = new NoOpFailureManager();
+ @InjectConfiguration
+ private RaftConfiguration raftConfiguration;
+
@InjectConfiguration("mock.segmentFileSizeBytes=" + FILE_SIZE)
private LogStorageConfiguration storageConfiguration;
@@ -114,6 +118,7 @@ class SegmentFileManagerTest extends IgniteAbstractTest {
workDir,
STRIPES,
failureManager,
+ raftConfiguration,
storageConfiguration
);
}
diff --git
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileTest.java
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileTest.java
index 2c0103ee6cc..1b9b5ad8007 100644
---
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileTest.java
+++
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileTest.java
@@ -106,17 +106,17 @@ class SegmentFileTest extends IgniteAbstractTest {
@Test
void testCreateNewConstructorInvariants() {
- assertThrows(IllegalArgumentException.class, () ->
SegmentFile.createNew(path, -1));
- assertThrows(IllegalArgumentException.class, () ->
SegmentFile.createNew(path, Integer.MAX_VALUE + 1L));
+ assertThrows(IllegalArgumentException.class, () ->
SegmentFile.createNew(path, -1, false));
+ assertThrows(IllegalArgumentException.class, () ->
SegmentFile.createNew(path, Integer.MAX_VALUE + 1L, false));
}
@Test
void testOpenExistingConstructorInvariants() throws IOException {
- assertThrows(IllegalArgumentException.class, () ->
SegmentFile.openExisting(path));
+ assertThrows(IllegalArgumentException.class, () ->
SegmentFile.openExisting(path, false));
createSegmentFile(1);
- assertDoesNotThrow(() -> SegmentFile.openExisting(path));
+ assertDoesNotThrow(() -> SegmentFile.openExisting(path, false));
}
/**
@@ -385,8 +385,77 @@ class SegmentFileTest extends IgniteAbstractTest {
assertThat(readFileContent(offset, bytesForRollover.length),
is(bytesForRollover));
}
+ @Test
+ void testSync() throws IOException {
+ createSegmentFile(100);
+
+ assertThat(file.syncPosition(), is(0));
+
+ try (WriteBuffer ignored = file.reserve(5)) {
+ // No-op.
+ }
+
+ try (WriteBuffer ignored = file.reserve(7)) {
+ // No-op.
+ }
+
+ assertThat(file.syncPosition(), is(0));
+
+ file.close();
+
+ file.sync();
+
+ assertThat(file.syncPosition(), is(12));
+ }
+
+ @Test
+ void testSyncWithRollover() throws IOException {
+ createSegmentFile(100);
+
+ assertThat(file.syncPosition(), is(0));
+
+ try (WriteBuffer ignored = file.reserve(5)) {
+ // No-op.
+ }
+
+ try (WriteBuffer ignored = file.reserve(7)) {
+ // No-op.
+ }
+
+ assertThat(file.syncPosition(), is(0));
+
+ file.closeForRollover(new byte[] {1, 2, 3});
+
+ file.sync();
+
+ assertThat(file.syncPosition(), is(15));
+ }
+
+ @Test
+ void testSyncWithNotEnoughSpaceForRollover() throws IOException {
+ createSegmentFile(13);
+
+ assertThat(file.syncPosition(), is(0));
+
+ try (WriteBuffer ignored = file.reserve(5)) {
+ // No-op.
+ }
+
+ try (WriteBuffer ignored = file.reserve(7)) {
+ // No-op.
+ }
+
+ assertThat(file.syncPosition(), is(0));
+
+ file.closeForRollover(new byte[] {1, 2, 3});
+
+ file.sync();
+
+ assertThat(file.syncPosition(), is(12));
+ }
+
private void createSegmentFile(int size) throws IOException {
- file = SegmentFile.createNew(path, size);
+ file = SegmentFile.createNew(path, size, false);
}
private boolean writeToSegmentFile(byte[] bytes) {
diff --git
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegstoreLogStorageConcurrencyTest.java
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegstoreLogStorageConcurrencyTest.java
index e6da6d236df..98053c13787 100644
---
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegstoreLogStorageConcurrencyTest.java
+++
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegstoreLogStorageConcurrencyTest.java
@@ -30,6 +30,7 @@ import
org.apache.ignite.internal.configuration.testframework.InjectConfiguratio
import org.apache.ignite.internal.failure.NoOpFailureManager;
import org.apache.ignite.internal.lang.RunnableX;
import org.apache.ignite.internal.raft.configuration.LogStorageConfiguration;
+import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
import org.apache.ignite.raft.jraft.conf.ConfigurationManager;
import org.apache.ignite.raft.jraft.entity.LogEntry;
@@ -51,6 +52,7 @@ class SegstoreLogStorageConcurrencyTest extends
IgniteAbstractTest {
@BeforeEach
void setUp(
+ @InjectConfiguration RaftConfiguration raftConfiguration,
@InjectConfiguration("mock.segmentFileSizeBytes=" + SEGMENT_SIZE)
LogStorageConfiguration storageConfiguration
) throws IOException {
@@ -59,6 +61,7 @@ class SegstoreLogStorageConcurrencyTest extends
IgniteAbstractTest {
workDir,
1,
new NoOpFailureManager(),
+ raftConfiguration,
storageConfiguration
);
diff --git
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegstoreLogStorageTest.java
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegstoreLogStorageTest.java
index b48f14ad667..c058d46078d 100644
---
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegstoreLogStorageTest.java
+++
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegstoreLogStorageTest.java
@@ -26,6 +26,7 @@ import
org.apache.ignite.internal.configuration.testframework.ConfigurationExten
import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.failure.NoOpFailureManager;
import org.apache.ignite.internal.raft.configuration.LogStorageConfiguration;
+import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
import org.apache.ignite.raft.jraft.storage.LogStorage;
import org.apache.ignite.raft.jraft.storage.impl.BaseLogStorageTest;
import org.apache.ignite.raft.jraft.test.TestUtils;
@@ -44,6 +45,9 @@ class SegstoreLogStorageTest extends BaseLogStorageTest {
private SegmentFileManager segmentFileManager;
+ @InjectConfiguration
+ private RaftConfiguration raftConfiguration;
+
@InjectConfiguration("mock.segmentFileSizeBytes=" + SEGMENT_SIZE)
private LogStorageConfiguration storageConfiguration;
@@ -60,6 +64,7 @@ class SegstoreLogStorageTest extends BaseLogStorageTest {
path,
1,
new NoOpFailureManager(),
+ raftConfiguration,
storageConfiguration
);
diff --git
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SyncSegmentFileTest.java
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SyncSegmentFileTest.java
new file mode 100644
index 00000000000..ebd02ae7146
--- /dev/null
+++
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SyncSegmentFileTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft.storage.segstore;
+
+import static java.util.Collections.nCopies;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.runRace;
+import static org.apache.ignite.internal.util.IgniteUtils.closeAllManually;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.internal.lang.RunnableX;
+import
org.apache.ignite.internal.raft.storage.segstore.SegmentFile.WriteBuffer;
+import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
+import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.RepeatedTest;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+@ExtendWith(ExecutorServiceExtension.class)
+class SyncSegmentFileTest extends IgniteAbstractTest {
+ private static final String FILE_NAME = "test.bin";
+
+ private static final int size = 1024;
+
+ private SegmentFile file;
+
+ @BeforeEach
+ void setUp() throws IOException {
+ Path path = workDir.resolve(FILE_NAME);
+
+ file = SegmentFile.createNew(path, size, true);
+ }
+
+ @AfterEach
+ void tearDown() throws Exception {
+ closeAllManually(file);
+ }
+
+ @Test
+ void testLastWrittenIncreaseOrder() {
+ assertThat(file.lastWritePosition(), is(0));
+ assertThat(file.syncPosition(), is(0));
+
+ try (WriteBuffer ignored = file.reserve(10)) {
+ assertThat(file.lastWritePosition(), is(0));
+ assertThat(file.syncPosition(), is(0));
+ }
+
+ assertThat(file.lastWritePosition(), is(10));
+ assertThat(file.syncPosition(), is(10));
+ }
+
+ @Test
+ void testLastWrittenMultithreadedIncreaseOrder() {
+ var task1ReservedBuffer = new CountDownLatch(1);
+ var task1ReleasedBuffer = new CountDownLatch(1);
+ var task2ReservedBuffer = new CountDownLatch(1);
+
+ RunnableX task1 = () -> {
+ try (WriteBuffer ignored = file.reserve(10)) {
+ task1ReservedBuffer.countDown();
+ task2ReservedBuffer.await(1, TimeUnit.SECONDS);
+ }
+
+ assertThat(file.lastWritePosition(), is(10));
+ assertThat(file.syncPosition(), is(10));
+
+ task1ReleasedBuffer.countDown();
+ };
+
+ RunnableX task2 = () -> {
+ task1ReservedBuffer.await(1, TimeUnit.SECONDS);
+
+ try (WriteBuffer ignored = file.reserve(10)) {
+ task2ReservedBuffer.countDown();
+ task1ReleasedBuffer.await(1, TimeUnit.SECONDS);
+ }
+
+ assertThat(file.lastWritePosition(), is(20));
+ assertThat(file.syncPosition(), is(20));
+ };
+
+ runRace(task1, task2);
+ }
+
+ @RepeatedTest(10)
+ void testSyncMultithreadedTest() {
+ int chunkSize = 32;
+
+ int numThreads = 4;
+
+ int numChunksPerThread = size / (chunkSize * numThreads);
+
+ RunnableX writer = () -> {
+ for (int i = 0; i < numChunksPerThread; i++) {
+ try (WriteBuffer ignored = file.reserve(chunkSize)) {
+ // No-op.
+ }
+ }
+ };
+
+ runRace(nCopies(numThreads, writer).toArray(RunnableX[]::new));
+
+ assertThat(file.lastWritePosition(), is(size));
+ assertThat(file.syncPosition(), is(size));
+ }
+}