This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 781d61d37 RATIS-2129. Low replication performance because LogAppender
is often blocked by RaftLog's readLock. (#1141)
781d61d37 is described below
commit 781d61d37411b374f104eb0806e1e2c4090fb35e
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Fri Aug 30 10:37:27 2024 -0700
RATIS-2129. Low replication performance because LogAppender is often
blocked by RaftLog's readLock. (#1141)
---
.../ratis/server/raftlog/segmented/LogSegment.java | 71 ++++++++++++++++------
.../server/raftlog/segmented/SegmentedRaftLog.java | 43 +++++--------
.../raftlog/segmented/SegmentedRaftLogCache.java | 13 ++--
.../segmented/TestSegmentedRaftLogCache.java | 5 +-
4 files changed, 80 insertions(+), 52 deletions(-)
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
index f96e34e4c..c51464f9e 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
@@ -37,13 +37,13 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.Comparator;
-import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@@ -109,6 +109,44 @@ public final class LogSegment {
}
}
+ private static class Records {
+ private final ConcurrentNavigableMap<Long, LogRecord> map = new
ConcurrentSkipListMap<>();
+
+ int size() {
+ return map.size();
+ }
+
+ LogRecord getFirst() {
+ final Map.Entry<Long, LogRecord> first = map.firstEntry();
+ return first != null? first.getValue() : null;
+ }
+
+ LogRecord getLast() {
+ final Map.Entry<Long, LogRecord> last = map.lastEntry();
+ return last != null? last.getValue() : null;
+ }
+
+ LogRecord get(long i) {
+ return map.get(i);
+ }
+
+ long append(LogRecord record) {
+ final long index = record.getTermIndex().getIndex();
+ final LogRecord previous = map.put(index, record);
+ Preconditions.assertNull(previous, "previous");
+ return index;
+ }
+
+ LogRecord removeLast() {
+ final Map.Entry<Long, LogRecord> last = map.pollLastEntry();
+ return Objects.requireNonNull(last, "last == null").getValue();
+ }
+
+ void clear() {
+ map.clear();
+ }
+ }
+
static LogSegment newOpenSegment(RaftStorage storage, long start,
SizeInBytes maxOpSize,
SegmentedRaftLogMetrics raftLogMetrics) {
Preconditions.assertTrue(start >= 0);
@@ -207,10 +245,12 @@ public final class LogSegment {
final long expectedLastIndex = expectedStart + expectedEntryCount - 1;
Preconditions.assertSame(expectedLastIndex, getEndIndex(), "Segment end
index");
- final LogRecord last = getLastRecord();
+ final LogRecord last = records.getLast();
if (last != null) {
Preconditions.assertSame(expectedLastIndex,
last.getTermIndex().getIndex(), "Index at the last record");
- Preconditions.assertSame(expectedStart,
records.get(0).getTermIndex().getIndex(), "Index at the first record");
+ final LogRecord first = records.getFirst();
+ Objects.requireNonNull(first, "first record");
+ Preconditions.assertSame(expectedStart, first.getTermIndex().getIndex(),
"Index at the first record");
}
if (!corrupted) {
Preconditions.assertSame(expectedEnd, expectedLastIndex, "End/last
Index");
@@ -306,7 +346,7 @@ public final class LogSegment {
/**
* the list of records is more like the index of a segment
*/
- private final List<LogRecord> records = new ArrayList<>();
+ private final Records records = new Records();
/**
* the entryCache caches the content of log entries.
*/
@@ -366,20 +406,18 @@ public final class LogSegment {
private LogRecord appendLogRecord(Op op, LogEntryProto entry) {
Objects.requireNonNull(entry, "entry == null");
- if (records.isEmpty()) {
+ final LogRecord currentLast = records.getLast();
+ if (currentLast == null) {
Preconditions.assertTrue(entry.getIndex() == startIndex,
"gap between start index %s and first entry to append %s",
startIndex, entry.getIndex());
- }
-
- final LogRecord currentLast = getLastRecord();
- if (currentLast != null) {
+ } else {
Preconditions.assertTrue(entry.getIndex() ==
currentLast.getTermIndex().getIndex() + 1,
"gap between entries %s and %s", entry.getIndex(),
currentLast.getTermIndex().getIndex());
}
final LogRecord record = new LogRecord(totalFileSize, entry);
- records.add(record);
+ records.append(record);
totalFileSize += getEntrySize(entry, op);
endIndex = entry.getIndex();
return record;
@@ -406,17 +444,13 @@ public final class LogSegment {
LogRecord getLogRecord(long index) {
if (index >= startIndex && index <= endIndex) {
- return records.get(Math.toIntExact(index - startIndex));
+ return records.get(index);
}
return null;
}
- private LogRecord getLastRecord() {
- return records.isEmpty() ? null : records.get(records.size() - 1);
- }
-
TermIndex getLastTermIndex() {
- LogRecord last = getLastRecord();
+ final LogRecord last = records.getLast();
return last == null ? null : last.getTermIndex();
}
@@ -434,7 +468,8 @@ public final class LogSegment {
synchronized void truncate(long fromIndex) {
Preconditions.assertTrue(fromIndex >= startIndex && fromIndex <= endIndex);
for (long index = endIndex; index >= fromIndex; index--) {
- LogRecord removed = records.remove(Math.toIntExact(index - startIndex));
+ final LogRecord removed = records.removeLast();
+ Preconditions.assertSame(index, removed.getTermIndex().getIndex(),
"removedIndex");
removeEntryCache(removed.getTermIndex());
totalFileSize = removed.offset;
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
index 79f0380ee..44b9c7599 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
@@ -296,23 +296,19 @@ public final class SegmentedRaftLog extends RaftLogBase {
@Override
public ReferenceCountedObject<LogEntryProto> retainLog(long index) throws
RaftLogIOException {
checkLogState();
- final LogSegment segment;
- final LogRecord record;
- try (AutoCloseableLock readLock = readLock()) {
- segment = cache.getSegment(index);
- if (segment == null) {
- return null;
- }
- record = segment.getLogRecord(index);
- if (record == null) {
- return null;
- }
- final ReferenceCountedObject<LogEntryProto> entry =
segment.getEntryFromCache(record.getTermIndex());
- if (entry != null) {
- getRaftLogMetrics().onRaftLogCacheHit();
- entry.retain();
- return entry;
- }
+ final LogSegment segment = cache.getSegment(index);
+ if (segment == null) {
+ return null;
+ }
+ final LogRecord record = segment.getLogRecord(index);
+ if (record == null) {
+ return null;
+ }
+ final ReferenceCountedObject<LogEntryProto> entry =
segment.getEntryFromCache(record.getTermIndex());
+ if (entry != null) {
+ getRaftLogMetrics().onRaftLogCacheHit();
+ entry.retain();
+ return entry;
}
// the entry is not in the segment's cache. Load the cache without holding
the lock.
@@ -369,26 +365,19 @@ public final class SegmentedRaftLog extends RaftLogBase {
@Override
public TermIndex getTermIndex(long index) {
checkLogState();
- try(AutoCloseableLock readLock = readLock()) {
- LogRecord record = cache.getLogRecord(index);
- return record != null ? record.getTermIndex() : null;
- }
+ return cache.getTermIndex(index);
}
@Override
public LogEntryHeader[] getEntries(long startIndex, long endIndex) {
checkLogState();
- try(AutoCloseableLock readLock = readLock()) {
- return cache.getTermIndices(startIndex, endIndex);
- }
+ return cache.getTermIndices(startIndex, endIndex);
}
@Override
public TermIndex getLastEntryTermIndex() {
checkLogState();
- try(AutoCloseableLock readLock = readLock()) {
- return cache.getLastTermIndex();
- }
+ return cache.getLastTermIndex();
}
@Override
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
index ad1633232..8b194bbc9 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
@@ -523,16 +523,21 @@ public class SegmentedRaftLogCache {
}
LogSegment getSegment(long index) {
- if (openSegment != null && index >= openSegment.getStartIndex()) {
- return openSegment;
+ final LogSegment open = this.openSegment;
+ if (open != null && index >= open.getStartIndex()) {
+ return open;
} else {
return closedSegments.search(index);
}
}
- LogRecord getLogRecord(long index) {
+ TermIndex getTermIndex(long index) {
LogSegment segment = getSegment(index);
- return segment == null ? null : segment.getLogRecord(index);
+ if (segment == null) {
+ return null;
+ }
+ final LogRecord record = segment.getLogRecord(index);
+ return record != null ? record.getTermIndex() : null;
}
/**
diff --git
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java
index 9f0df2fab..7bc495438 100644
---
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java
+++
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java
@@ -20,7 +20,6 @@ package org.apache.ratis.server.raftlog.segmented;
import static org.apache.ratis.server.metrics.SegmentedRaftLogMetrics.*;
import static
org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogTestUtils.MAX_OP_SIZE;
-import java.io.IOException;
import java.util.Iterator;
import java.util.stream.IntStream;
@@ -286,12 +285,12 @@ public class TestSegmentedRaftLogCache {
});
}
- private void testIterator(long startIndex) throws IOException {
+ private void testIterator(long startIndex) {
Iterator<TermIndex> iterator = cache.iterator(startIndex);
TermIndex prev = null;
while (iterator.hasNext()) {
TermIndex termIndex = iterator.next();
-
Assertions.assertEquals(cache.getLogRecord(termIndex.getIndex()).getTermIndex(),
termIndex);
+ Assertions.assertEquals(cache.getTermIndex(termIndex.getIndex()),
termIndex);
if (prev != null) {
Assertions.assertEquals(prev.getIndex() + 1, termIndex.getIndex());
}