This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new b73cf97c6 RATIS-2129. Low replication performance because of lock
contention on RaftLog (#1322)
b73cf97c6 is described below
commit b73cf97c6f6349abb2e37a801f214629cb5245b3
Author: Symious <[email protected]>
AuthorDate: Fri Dec 19 01:26:52 2025 +0800
RATIS-2129. Low replication performance because of lock contention on
RaftLog (#1322)
Co-authored-by: Tsz-Wo Nicholas Sze <[email protected]>
---
.../apache/ratis/server/RaftServerConfigKeys.java | 10 +++
.../ratis/server/raftlog/segmented/LogSegment.java | 83 ++++++++++++++++------
.../server/raftlog/segmented/SegmentedRaftLog.java | 10 ++-
.../raftlog/segmented/SegmentedRaftLogCache.java | 13 ++--
.../segmented/TestSegmentedRaftLogCache.java | 5 +-
5 files changed, 90 insertions(+), 31 deletions(-)
diff --git
a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
index 849597433..002286c4c 100644
---
a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
+++
b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
@@ -415,6 +415,16 @@ public interface RaftServerConfigKeys {
setSizeInBytes(properties::set, SEGMENT_SIZE_MAX_KEY, segmentSizeMax);
}
+ String READ_LOCK_ENABLED_KEY = PREFIX + ".read.lock.enabled";
+ boolean READ_LOCK_ENABLED_DEFAULT = true;
+ static boolean readLockEnabled(RaftProperties properties) {
+ return getBoolean(properties::getBoolean,
+ READ_LOCK_ENABLED_KEY, READ_LOCK_ENABLED_DEFAULT, getDefaultLog());
+ }
+ static void setReadLockEnabled(RaftProperties properties, boolean
readLockEnabled) {
+ setBoolean(properties::setBoolean, READ_LOCK_ENABLED_KEY,
readLockEnabled);
+ }
+
/**
* Besides the open segment, the max number of segments caching log
entries.
*/
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
index 444d417ba..c40b91f70 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
@@ -36,13 +36,13 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
-import java.util.ArrayList;
import java.nio.file.Path;
import java.util.Comparator;
-import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@@ -105,6 +105,44 @@ public final class LogSegment {
}
}
+ private static class Records {
+ private final ConcurrentNavigableMap<Long, LogRecord> map = new
ConcurrentSkipListMap<>();
+
+ int size() {
+ return map.size();
+ }
+
+ LogRecord getFirst() {
+ final Map.Entry<Long, LogRecord> first = map.firstEntry();
+ return first != null? first.getValue() : null;
+ }
+
+ LogRecord getLast() {
+ final Map.Entry<Long, LogRecord> last = map.lastEntry();
+ return last != null? last.getValue() : null;
+ }
+
+ LogRecord get(long i) {
+ return map.get(i);
+ }
+
+ long append(LogRecord record) {
+ final long index = record.getTermIndex().getIndex();
+ final LogRecord previous = map.put(index, record);
+ Preconditions.assertNull(previous, "previous");
+ return index;
+ }
+
+ LogRecord removeLast() {
+ final Map.Entry<Long, LogRecord> last = map.pollLastEntry();
+ return Objects.requireNonNull(last, "last == null").getValue();
+ }
+
+ void clear() {
+ map.clear();
+ }
+ }
+
static LogSegment newOpenSegment(RaftStorage storage, long start,
SizeInBytes maxOpSize,
SegmentedRaftLogMetrics raftLogMetrics) {
Preconditions.assertTrue(start >= 0);
@@ -204,10 +242,12 @@ public final class LogSegment {
final long expectedLastIndex = expectedStart + expectedEntryCount - 1;
Preconditions.assertSame(expectedLastIndex, getEndIndex(), "Segment end
index");
- final LogRecord last = getLastRecord();
+ final LogRecord last = records.getLast();
if (last != null) {
Preconditions.assertSame(expectedLastIndex,
last.getTermIndex().getIndex(), "Index at the last record");
- Preconditions.assertSame(expectedStart,
records.get(0).getTermIndex().getIndex(), "Index at the first record");
+ final LogRecord first = records.getFirst();
+ Objects.requireNonNull(first, "first record");
+ Preconditions.assertSame(expectedStart, first.getTermIndex().getIndex(),
"Index at the first record");
}
if (!corrupted) {
Preconditions.assertSame(expectedEnd, expectedLastIndex, "End/last
Index");
@@ -272,7 +312,7 @@ public final class LogSegment {
/**
* the list of records is more like the index of a segment
*/
- private final List<LogRecord> records = new ArrayList<>();
+ private final Records records = new Records();
/**
* the entryCache caches the content of log entries.
*/
@@ -293,7 +333,11 @@ public final class LogSegment {
}
long getEndIndex() {
- return endIndex;
+ if (!isOpen) {
+ return endIndex;
+ }
+ final LogRecord last = records.getLast();
+ return last == null ? getStartIndex() - 1 : last.getTermIndex().getIndex();
}
boolean isOpen() {
@@ -301,7 +345,7 @@ public final class LogSegment {
}
int numOfEntries() {
- return Math.toIntExact(endIndex - startIndex + 1);
+ return Math.toIntExact(getEndIndex() - startIndex + 1);
}
CorruptionPolicy getLogCorruptionPolicy() {
@@ -315,14 +359,12 @@ public final class LogSegment {
private void append(boolean keepEntryInCache, LogEntryProto entry, Op op) {
Objects.requireNonNull(entry, "entry == null");
- if (records.isEmpty()) {
+ final LogRecord currentLast = records.getLast();
+ if (currentLast == null) {
Preconditions.assertTrue(entry.getIndex() == startIndex,
"gap between start index %s and first entry to append %s",
startIndex, entry.getIndex());
- }
-
- final LogRecord currentLast = getLastRecord();
- if (currentLast != null) {
+ } else {
Preconditions.assertTrue(entry.getIndex() ==
currentLast.getTermIndex().getIndex() + 1,
"gap between entries %s and %s", entry.getIndex(),
currentLast.getTermIndex().getIndex());
}
@@ -331,7 +373,7 @@ public final class LogSegment {
if (keepEntryInCache) {
putEntryCache(record.getTermIndex(), entry, op);
}
- records.add(record);
+ records.append(record);
totalFileSize += getEntrySize(entry, op);
endIndex = entry.getIndex();
}
@@ -358,18 +400,14 @@ public final class LogSegment {
}
LogRecord getLogRecord(long index) {
- if (index >= startIndex && index <= endIndex) {
- return records.get(Math.toIntExact(index - startIndex));
+ if (index >= startIndex && index <= getEndIndex()) {
+ return records.get(index);
}
return null;
}
- private LogRecord getLastRecord() {
- return records.isEmpty() ? null : records.get(records.size() - 1);
- }
-
TermIndex getLastTermIndex() {
- LogRecord last = getLastRecord();
+ final LogRecord last = records.getLast();
return last == null ? null : last.getTermIndex();
}
@@ -387,7 +425,8 @@ public final class LogSegment {
synchronized void truncate(long fromIndex) {
Preconditions.assertTrue(fromIndex >= startIndex && fromIndex <= endIndex);
for (long index = endIndex; index >= fromIndex; index--) {
- LogRecord removed = records.remove(Math.toIntExact(index - startIndex));
+ final LogRecord removed = records.removeLast();
+ Preconditions.assertSame(index, removed.getTermIndex().getIndex(),
"removedIndex");
removeEntryCache(removed.getTermIndex(), Op.REMOVE_CACHE);
totalFileSize = removed.offset;
}
@@ -458,7 +497,7 @@ public final class LogSegment {
}
boolean containsIndex(long index) {
- return startIndex <= index && endIndex >= index;
+ return startIndex <= index && getEndIndex() >= index;
}
boolean hasEntries() {
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
index 6dc3d7961..6bcc3f8e1 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
@@ -202,6 +202,7 @@ public final class SegmentedRaftLog extends RaftLogBase {
private final long segmentMaxSize;
private final boolean stateMachineCachingEnabled;
private final SegmentedRaftLogMetrics metrics;
+ private final boolean readLockEnabled;
@SuppressWarnings({"squid:S2095"}) // Suppress closeable warning
private SegmentedRaftLog(Builder b) {
@@ -217,6 +218,12 @@ public final class SegmentedRaftLog extends RaftLogBase {
this.fileLogWorker = new SegmentedRaftLogWorker(b.memberId, stateMachine,
b.submitUpdateCommitEvent, b.server, storage, b.properties,
getRaftLogMetrics());
stateMachineCachingEnabled =
RaftServerConfigKeys.Log.StateMachineData.cachingEnabled(b.properties);
+ this.readLockEnabled =
RaftServerConfigKeys.Log.readLockEnabled(b.properties);
+ }
+
+ @Override
+ public AutoCloseableLock readLock() {
+ return readLockEnabled ? super.readLock() : null;
}
@Override
@@ -338,8 +345,7 @@ public final class SegmentedRaftLog extends RaftLogBase {
public TermIndex getTermIndex(long index) {
checkLogState();
try(AutoCloseableLock readLock = readLock()) {
- LogRecord record = cache.getLogRecord(index);
- return record != null ? record.getTermIndex() : null;
+ return cache.getTermIndex(index);
}
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
index a1f0cdd8a..46acbcc3d 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
@@ -547,9 +547,13 @@ public class SegmentedRaftLogCache {
}
}
- LogRecord getLogRecord(long index) {
+ TermIndex getTermIndex(long index) {
LogSegment segment = getSegment(index);
- return segment == null ? null : segment.getLogRecord(index);
+ if (segment == null) {
+ return null;
+ }
+ final LogRecord record = segment.getLogRecord(index);
+ return record != null ? record.getTermIndex() : null;
}
/**
@@ -610,8 +614,9 @@ public class SegmentedRaftLogCache {
TermIndex getLastTermIndex() {
try (AutoCloseableLock readLock = closedSegments.readLock()) {
- return (openSegment != null && openSegment.numOfEntries() > 0) ?
- openSegment.getLastTermIndex() :
+ LogSegment tmpSegment = openSegment;
+ return (tmpSegment != null && tmpSegment.getLastTermIndex() != null) ?
+ tmpSegment.getLastTermIndex() :
(closedSegments.isEmpty() ? null :
closedSegments.get(closedSegments.size() -
1).getLastTermIndex());
}
diff --git
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java
index 7c2dbac91..532e32c87 100644
---
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java
+++
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java
@@ -20,7 +20,6 @@ package org.apache.ratis.server.raftlog.segmented;
import static org.apache.ratis.server.metrics.SegmentedRaftLogMetrics.*;
import static
org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogTestUtils.MAX_OP_SIZE;
-import java.io.IOException;
import java.util.Iterator;
import java.util.stream.IntStream;
@@ -282,12 +281,12 @@ public class TestSegmentedRaftLogCache {
});
}
- private void testIterator(long startIndex) throws IOException {
+ private void testIterator(long startIndex) {
Iterator<TermIndex> iterator = cache.iterator(startIndex);
TermIndex prev = null;
while (iterator.hasNext()) {
TermIndex termIndex = iterator.next();
-
Assertions.assertEquals(cache.getLogRecord(termIndex.getIndex()).getTermIndex(),
termIndex);
+ Assertions.assertEquals(cache.getTermIndex(termIndex.getIndex()),
termIndex);
if (prev != null) {
Assertions.assertEquals(prev.getIndex() + 1, termIndex.getIndex());
}