This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new b73cf97c6 RATIS-2129. Low replication performance because of lock 
contention on RaftLog (#1322)
b73cf97c6 is described below

commit b73cf97c6f6349abb2e37a801f214629cb5245b3
Author: Symious <[email protected]>
AuthorDate: Fri Dec 19 01:26:52 2025 +0800

    RATIS-2129. Low replication performance because of lock contention on 
RaftLog (#1322)
    
    Co-authored-by: Tsz-Wo Nicholas Sze <[email protected]>
---
 .../apache/ratis/server/RaftServerConfigKeys.java  | 10 +++
 .../ratis/server/raftlog/segmented/LogSegment.java | 83 ++++++++++++++++------
 .../server/raftlog/segmented/SegmentedRaftLog.java | 10 ++-
 .../raftlog/segmented/SegmentedRaftLogCache.java   | 13 ++--
 .../segmented/TestSegmentedRaftLogCache.java       |  5 +-
 5 files changed, 90 insertions(+), 31 deletions(-)

diff --git 
a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
 
b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
index 849597433..002286c4c 100644
--- 
a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
+++ 
b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
@@ -415,6 +415,16 @@ public interface RaftServerConfigKeys {
       setSizeInBytes(properties::set, SEGMENT_SIZE_MAX_KEY, segmentSizeMax);
     }
 
+    String READ_LOCK_ENABLED_KEY = PREFIX + ".read.lock.enabled";
+    boolean READ_LOCK_ENABLED_DEFAULT = true;
+    static boolean readLockEnabled(RaftProperties properties) {
+      return getBoolean(properties::getBoolean,
+          READ_LOCK_ENABLED_KEY, READ_LOCK_ENABLED_DEFAULT, getDefaultLog());
+    }
+    static void setReadLockEnabled(RaftProperties properties, boolean 
readLockEnabled) {
+      setBoolean(properties::setBoolean, READ_LOCK_ENABLED_KEY, 
readLockEnabled);
+    }
+
     /**
      * Besides the open segment, the max number of segments caching log 
entries.
      */
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
index 444d417ba..c40b91f70 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
@@ -36,13 +36,13 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.ArrayList;
 import java.nio.file.Path;
 import java.util.Comparator;
-import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
@@ -105,6 +105,44 @@ public final class LogSegment {
     }
   }
 
+  private static class Records {
+    private final ConcurrentNavigableMap<Long, LogRecord> map = new 
ConcurrentSkipListMap<>();
+
+    int size() {
+      return map.size();
+    }
+
+    LogRecord getFirst() {
+      final Map.Entry<Long, LogRecord> first = map.firstEntry();
+      return first != null? first.getValue() : null;
+    }
+
+    LogRecord getLast() {
+      final Map.Entry<Long, LogRecord> last = map.lastEntry();
+      return last != null? last.getValue() : null;
+    }
+
+    LogRecord get(long i) {
+      return map.get(i);
+    }
+
+    long append(LogRecord record) {
+      final long index = record.getTermIndex().getIndex();
+      final LogRecord previous = map.put(index, record);
+      Preconditions.assertNull(previous, "previous");
+      return index;
+    }
+
+    LogRecord removeLast() {
+      final Map.Entry<Long, LogRecord> last = map.pollLastEntry();
+      return Objects.requireNonNull(last, "last == null").getValue();
+    }
+
+    void clear() {
+      map.clear();
+    }
+  }
+
   static LogSegment newOpenSegment(RaftStorage storage, long start, 
SizeInBytes maxOpSize,
       SegmentedRaftLogMetrics raftLogMetrics) {
     Preconditions.assertTrue(start >= 0);
@@ -204,10 +242,12 @@ public final class LogSegment {
     final long expectedLastIndex = expectedStart + expectedEntryCount - 1;
     Preconditions.assertSame(expectedLastIndex, getEndIndex(), "Segment end 
index");
 
-    final LogRecord last = getLastRecord();
+    final LogRecord last = records.getLast();
     if (last != null) {
       Preconditions.assertSame(expectedLastIndex, 
last.getTermIndex().getIndex(), "Index at the last record");
-      Preconditions.assertSame(expectedStart, 
records.get(0).getTermIndex().getIndex(), "Index at the first record");
+      final LogRecord first = records.getFirst();
+      Objects.requireNonNull(first, "first record");
+      Preconditions.assertSame(expectedStart, first.getTermIndex().getIndex(), 
"Index at the first record");
     }
     if (!corrupted) {
       Preconditions.assertSame(expectedEnd, expectedLastIndex, "End/last 
Index");
@@ -272,7 +312,7 @@ public final class LogSegment {
   /**
    * the list of records is more like the index of a segment
    */
-  private final List<LogRecord> records = new ArrayList<>();
+  private final Records records = new Records();
   /**
    * the entryCache caches the content of log entries.
    */
@@ -293,7 +333,11 @@ public final class LogSegment {
   }
 
   long getEndIndex() {
-    return endIndex;
+    if (!isOpen) {
+      return endIndex;
+    }
+    final LogRecord last = records.getLast();
+    return last == null ? getStartIndex() - 1 : last.getTermIndex().getIndex();
   }
 
   boolean isOpen() {
@@ -301,7 +345,7 @@ public final class LogSegment {
   }
 
   int numOfEntries() {
-    return Math.toIntExact(endIndex - startIndex + 1);
+    return Math.toIntExact(getEndIndex() - startIndex + 1);
   }
 
   CorruptionPolicy getLogCorruptionPolicy() {
@@ -315,14 +359,12 @@ public final class LogSegment {
 
   private void append(boolean keepEntryInCache, LogEntryProto entry, Op op) {
     Objects.requireNonNull(entry, "entry == null");
-    if (records.isEmpty()) {
+    final LogRecord currentLast = records.getLast();
+    if (currentLast == null) {
       Preconditions.assertTrue(entry.getIndex() == startIndex,
           "gap between start index %s and first entry to append %s",
           startIndex, entry.getIndex());
-    }
-
-    final LogRecord currentLast = getLastRecord();
-    if (currentLast != null) {
+    } else {
       Preconditions.assertTrue(entry.getIndex() == 
currentLast.getTermIndex().getIndex() + 1,
           "gap between entries %s and %s", entry.getIndex(), 
currentLast.getTermIndex().getIndex());
     }
@@ -331,7 +373,7 @@ public final class LogSegment {
     if (keepEntryInCache) {
       putEntryCache(record.getTermIndex(), entry, op);
     }
-    records.add(record);
+    records.append(record);
     totalFileSize += getEntrySize(entry, op);
     endIndex = entry.getIndex();
   }
@@ -358,18 +400,14 @@ public final class LogSegment {
   }
 
   LogRecord getLogRecord(long index) {
-    if (index >= startIndex && index <= endIndex) {
-      return records.get(Math.toIntExact(index - startIndex));
+    if (index >= startIndex && index <= getEndIndex()) {
+      return records.get(index);
     }
     return null;
   }
 
-  private LogRecord getLastRecord() {
-    return records.isEmpty() ? null : records.get(records.size() - 1);
-  }
-
   TermIndex getLastTermIndex() {
-    LogRecord last = getLastRecord();
+    final LogRecord last = records.getLast();
     return last == null ? null : last.getTermIndex();
   }
 
@@ -387,7 +425,8 @@ public final class LogSegment {
   synchronized void truncate(long fromIndex) {
     Preconditions.assertTrue(fromIndex >= startIndex && fromIndex <= endIndex);
     for (long index = endIndex; index >= fromIndex; index--) {
-      LogRecord removed = records.remove(Math.toIntExact(index - startIndex));
+      final LogRecord removed = records.removeLast();
+      Preconditions.assertSame(index, removed.getTermIndex().getIndex(), 
"removedIndex");
       removeEntryCache(removed.getTermIndex(), Op.REMOVE_CACHE);
       totalFileSize = removed.offset;
     }
@@ -458,7 +497,7 @@ public final class LogSegment {
   }
 
   boolean containsIndex(long index) {
-    return startIndex <= index && endIndex >= index;
+    return startIndex <= index && getEndIndex() >= index;
   }
 
   boolean hasEntries() {
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
index 6dc3d7961..6bcc3f8e1 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
@@ -202,6 +202,7 @@ public final class SegmentedRaftLog extends RaftLogBase {
   private final long segmentMaxSize;
   private final boolean stateMachineCachingEnabled;
   private final SegmentedRaftLogMetrics metrics;
+  private final boolean readLockEnabled;
 
   @SuppressWarnings({"squid:S2095"}) // Suppress closeable  warning
   private SegmentedRaftLog(Builder b) {
@@ -217,6 +218,12 @@ public final class SegmentedRaftLog extends RaftLogBase {
     this.fileLogWorker = new SegmentedRaftLogWorker(b.memberId, stateMachine,
         b.submitUpdateCommitEvent, b.server, storage, b.properties, 
getRaftLogMetrics());
     stateMachineCachingEnabled = 
RaftServerConfigKeys.Log.StateMachineData.cachingEnabled(b.properties);
+    this.readLockEnabled = 
RaftServerConfigKeys.Log.readLockEnabled(b.properties);
+  }
+
+  @Override
+  public AutoCloseableLock readLock() {
+    return readLockEnabled ? super.readLock() : null;
   }
 
   @Override
@@ -338,8 +345,7 @@ public final class SegmentedRaftLog extends RaftLogBase {
   public TermIndex getTermIndex(long index) {
     checkLogState();
     try(AutoCloseableLock readLock = readLock()) {
-      LogRecord record = cache.getLogRecord(index);
-      return record != null ? record.getTermIndex() : null;
+      return cache.getTermIndex(index);
     }
   }
 
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
index a1f0cdd8a..46acbcc3d 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
@@ -547,9 +547,13 @@ public class SegmentedRaftLogCache {
     }
   }
 
-  LogRecord getLogRecord(long index) {
+  TermIndex getTermIndex(long index) {
     LogSegment segment = getSegment(index);
-    return segment == null ? null : segment.getLogRecord(index);
+    if (segment == null) {
+      return null;
+    }
+    final LogRecord record = segment.getLogRecord(index);
+    return record != null ? record.getTermIndex() : null;
   }
 
   /**
@@ -610,8 +614,9 @@ public class SegmentedRaftLogCache {
 
   TermIndex getLastTermIndex() {
     try (AutoCloseableLock readLock = closedSegments.readLock()) {
-      return (openSegment != null && openSegment.numOfEntries() > 0) ?
-          openSegment.getLastTermIndex() :
+      LogSegment tmpSegment = openSegment;
+      return (tmpSegment != null && tmpSegment.getLastTermIndex() != null) ?
+          tmpSegment.getLastTermIndex() :
           (closedSegments.isEmpty() ? null :
               closedSegments.get(closedSegments.size() - 
1).getLastTermIndex());
     }
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java
 
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java
index 7c2dbac91..532e32c87 100644
--- 
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java
@@ -20,7 +20,6 @@ package org.apache.ratis.server.raftlog.segmented;
 import static org.apache.ratis.server.metrics.SegmentedRaftLogMetrics.*;
 import static 
org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogTestUtils.MAX_OP_SIZE;
 
-import java.io.IOException;
 import java.util.Iterator;
 import java.util.stream.IntStream;
 
@@ -282,12 +281,12 @@ public class TestSegmentedRaftLogCache {
     });
   }
 
-  private void testIterator(long startIndex) throws IOException {
+  private void testIterator(long startIndex) {
     Iterator<TermIndex> iterator = cache.iterator(startIndex);
     TermIndex prev = null;
     while (iterator.hasNext()) {
       TermIndex termIndex = iterator.next();
-      
Assertions.assertEquals(cache.getLogRecord(termIndex.getIndex()).getTermIndex(),
 termIndex);
+      Assertions.assertEquals(cache.getTermIndex(termIndex.getIndex()), 
termIndex);
       if (prev != null) {
         Assertions.assertEquals(prev.getIndex() + 1, termIndex.getIndex());
       }

Reply via email to