This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 781d61d37 RATIS-2129. Low replication performance because LogAppender 
is often blocked by RaftLog's readLock. (#1141)
781d61d37 is described below

commit 781d61d37411b374f104eb0806e1e2c4090fb35e
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Fri Aug 30 10:37:27 2024 -0700

    RATIS-2129. Low replication performance because LogAppender is often 
blocked by RaftLog's readLock. (#1141)
---
 .../ratis/server/raftlog/segmented/LogSegment.java | 71 ++++++++++++++++------
 .../server/raftlog/segmented/SegmentedRaftLog.java | 43 +++++--------
 .../raftlog/segmented/SegmentedRaftLogCache.java   | 13 ++--
 .../segmented/TestSegmentedRaftLogCache.java       |  5 +-
 4 files changed, 80 insertions(+), 52 deletions(-)

diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
index f96e34e4c..c51464f9e 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
@@ -37,13 +37,13 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Comparator;
-import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
@@ -109,6 +109,44 @@ public final class LogSegment {
     }
   }
 
+  private static class Records {
+    private final ConcurrentNavigableMap<Long, LogRecord> map = new 
ConcurrentSkipListMap<>();
+
+    int size() {
+      return map.size();
+    }
+
+    LogRecord getFirst() {
+      final Map.Entry<Long, LogRecord> first = map.firstEntry();
+      return first != null? first.getValue() : null;
+    }
+
+    LogRecord getLast() {
+      final Map.Entry<Long, LogRecord> last = map.lastEntry();
+      return last != null? last.getValue() : null;
+    }
+
+    LogRecord get(long i) {
+      return map.get(i);
+    }
+
+    long append(LogRecord record) {
+      final long index = record.getTermIndex().getIndex();
+      final LogRecord previous = map.put(index, record);
+      Preconditions.assertNull(previous, "previous");
+      return index;
+    }
+
+    LogRecord removeLast() {
+      final Map.Entry<Long, LogRecord> last = map.pollLastEntry();
+      return Objects.requireNonNull(last, "last == null").getValue();
+    }
+
+    void clear() {
+      map.clear();
+    }
+  }
+
   static LogSegment newOpenSegment(RaftStorage storage, long start, 
SizeInBytes maxOpSize,
       SegmentedRaftLogMetrics raftLogMetrics) {
     Preconditions.assertTrue(start >= 0);
@@ -207,10 +245,12 @@ public final class LogSegment {
     final long expectedLastIndex = expectedStart + expectedEntryCount - 1;
     Preconditions.assertSame(expectedLastIndex, getEndIndex(), "Segment end 
index");
 
-    final LogRecord last = getLastRecord();
+    final LogRecord last = records.getLast();
     if (last != null) {
       Preconditions.assertSame(expectedLastIndex, 
last.getTermIndex().getIndex(), "Index at the last record");
-      Preconditions.assertSame(expectedStart, 
records.get(0).getTermIndex().getIndex(), "Index at the first record");
+      final LogRecord first = records.getFirst();
+      Objects.requireNonNull(first, "first record");
+      Preconditions.assertSame(expectedStart, first.getTermIndex().getIndex(), 
"Index at the first record");
     }
     if (!corrupted) {
       Preconditions.assertSame(expectedEnd, expectedLastIndex, "End/last 
Index");
@@ -306,7 +346,7 @@ public final class LogSegment {
   /**
    * the list of records is more like the index of a segment
    */
-  private final List<LogRecord> records = new ArrayList<>();
+  private final Records records = new Records();
   /**
    * the entryCache caches the content of log entries.
    */
@@ -366,20 +406,18 @@ public final class LogSegment {
 
   private LogRecord appendLogRecord(Op op, LogEntryProto entry) {
     Objects.requireNonNull(entry, "entry == null");
-    if (records.isEmpty()) {
+    final LogRecord currentLast = records.getLast();
+    if (currentLast == null) {
       Preconditions.assertTrue(entry.getIndex() == startIndex,
           "gap between start index %s and first entry to append %s",
           startIndex, entry.getIndex());
-    }
-
-    final LogRecord currentLast = getLastRecord();
-    if (currentLast != null) {
+    } else {
       Preconditions.assertTrue(entry.getIndex() == 
currentLast.getTermIndex().getIndex() + 1,
           "gap between entries %s and %s", entry.getIndex(), 
currentLast.getTermIndex().getIndex());
     }
 
     final LogRecord record = new LogRecord(totalFileSize, entry);
-    records.add(record);
+    records.append(record);
     totalFileSize += getEntrySize(entry, op);
     endIndex = entry.getIndex();
     return record;
@@ -406,17 +444,13 @@ public final class LogSegment {
 
   LogRecord getLogRecord(long index) {
     if (index >= startIndex && index <= endIndex) {
-      return records.get(Math.toIntExact(index - startIndex));
+      return records.get(index);
     }
     return null;
   }
 
-  private LogRecord getLastRecord() {
-    return records.isEmpty() ? null : records.get(records.size() - 1);
-  }
-
   TermIndex getLastTermIndex() {
-    LogRecord last = getLastRecord();
+    final LogRecord last = records.getLast();
     return last == null ? null : last.getTermIndex();
   }
 
@@ -434,7 +468,8 @@ public final class LogSegment {
   synchronized void truncate(long fromIndex) {
     Preconditions.assertTrue(fromIndex >= startIndex && fromIndex <= endIndex);
     for (long index = endIndex; index >= fromIndex; index--) {
-      LogRecord removed = records.remove(Math.toIntExact(index - startIndex));
+      final LogRecord removed = records.removeLast();
+      Preconditions.assertSame(index, removed.getTermIndex().getIndex(), 
"removedIndex");
       removeEntryCache(removed.getTermIndex());
       totalFileSize = removed.offset;
     }
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
index 79f0380ee..44b9c7599 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
@@ -296,23 +296,19 @@ public final class SegmentedRaftLog extends RaftLogBase {
   @Override
   public ReferenceCountedObject<LogEntryProto> retainLog(long index) throws 
RaftLogIOException {
     checkLogState();
-    final LogSegment segment;
-    final LogRecord record;
-    try (AutoCloseableLock readLock = readLock()) {
-      segment = cache.getSegment(index);
-      if (segment == null) {
-        return null;
-      }
-      record = segment.getLogRecord(index);
-      if (record == null) {
-        return null;
-      }
-      final ReferenceCountedObject<LogEntryProto> entry = 
segment.getEntryFromCache(record.getTermIndex());
-      if (entry != null) {
-        getRaftLogMetrics().onRaftLogCacheHit();
-        entry.retain();
-        return entry;
-      }
+    final LogSegment segment = cache.getSegment(index);
+    if (segment == null) {
+      return null;
+    }
+    final LogRecord record = segment.getLogRecord(index);
+    if (record == null) {
+      return null;
+    }
+    final ReferenceCountedObject<LogEntryProto> entry = 
segment.getEntryFromCache(record.getTermIndex());
+    if (entry != null) {
+      getRaftLogMetrics().onRaftLogCacheHit();
+      entry.retain();
+      return entry;
     }
 
     // the entry is not in the segment's cache. Load the cache without holding 
the lock.
@@ -369,26 +365,19 @@ public final class SegmentedRaftLog extends RaftLogBase {
   @Override
   public TermIndex getTermIndex(long index) {
     checkLogState();
-    try(AutoCloseableLock readLock = readLock()) {
-      LogRecord record = cache.getLogRecord(index);
-      return record != null ? record.getTermIndex() : null;
-    }
+    return cache.getTermIndex(index);
   }
 
   @Override
   public LogEntryHeader[] getEntries(long startIndex, long endIndex) {
     checkLogState();
-    try(AutoCloseableLock readLock = readLock()) {
-      return cache.getTermIndices(startIndex, endIndex);
-    }
+    return cache.getTermIndices(startIndex, endIndex);
   }
 
   @Override
   public TermIndex getLastEntryTermIndex() {
     checkLogState();
-    try(AutoCloseableLock readLock = readLock()) {
-      return cache.getLastTermIndex();
-    }
+    return cache.getLastTermIndex();
   }
 
   @Override
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
index ad1633232..8b194bbc9 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
@@ -523,16 +523,21 @@ public class SegmentedRaftLogCache {
   }
 
   LogSegment getSegment(long index) {
-    if (openSegment != null && index >= openSegment.getStartIndex()) {
-      return openSegment;
+    final LogSegment open = this.openSegment;
+    if (open != null && index >= open.getStartIndex()) {
+      return open;
     } else {
       return closedSegments.search(index);
     }
   }
 
-  LogRecord getLogRecord(long index) {
+  TermIndex getTermIndex(long index) {
     LogSegment segment = getSegment(index);
-    return segment == null ? null : segment.getLogRecord(index);
+    if (segment == null) {
+      return null;
+    }
+    final LogRecord record = segment.getLogRecord(index);
+    return record != null ? record.getTermIndex() : null;
   }
 
   /**
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java
 
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java
index 9f0df2fab..7bc495438 100644
--- 
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java
@@ -20,7 +20,6 @@ package org.apache.ratis.server.raftlog.segmented;
 import static org.apache.ratis.server.metrics.SegmentedRaftLogMetrics.*;
 import static 
org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogTestUtils.MAX_OP_SIZE;
 
-import java.io.IOException;
 import java.util.Iterator;
 import java.util.stream.IntStream;
 
@@ -286,12 +285,12 @@ public class TestSegmentedRaftLogCache {
     });
   }
 
-  private void testIterator(long startIndex) throws IOException {
+  private void testIterator(long startIndex) {
     Iterator<TermIndex> iterator = cache.iterator(startIndex);
     TermIndex prev = null;
     while (iterator.hasNext()) {
       TermIndex termIndex = iterator.next();
-      
Assertions.assertEquals(cache.getLogRecord(termIndex.getIndex()).getTermIndex(),
 termIndex);
+      Assertions.assertEquals(cache.getTermIndex(termIndex.getIndex()), 
termIndex);
       if (prev != null) {
         Assertions.assertEquals(prev.getIndex() + 1, termIndex.getIndex());
       }

Reply via email to