This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 13b8cdd0c RATIS-2186. Raft log should not purge index lower than the
log start index (#1175)
13b8cdd0c is described below
commit 13b8cdd0ccdd9ae2578600fbc454315b339da4a1
Author: Ivan Andika <[email protected]>
AuthorDate: Wed Nov 13 00:23:04 2024 +0800
RATIS-2186. Raft log should not purge index lower than the log start index
(#1175)
---
.../apache/ratis/server/raftlog/RaftLogBase.java | 20 ++++++++++-----
.../raftlog/segmented/SegmentedRaftLogCache.java | 4 +++
.../raftlog/segmented/TestSegmentedRaftLog.java | 30 +++++++++++++++++++---
3 files changed, 44 insertions(+), 10 deletions(-)
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java
index c92c04c43..9353612c8 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java
@@ -318,20 +318,28 @@ public abstract class RaftLogBase implements RaftLog {
@Override
public final CompletableFuture<Long> purge(long suggestedIndex) {
+ final long adjustedIndex;
if (purgePreservation > 0) {
final long currentIndex = getNextIndex() - 1;
- suggestedIndex = Math.min(suggestedIndex, currentIndex -
purgePreservation);
+ adjustedIndex = Math.min(suggestedIndex, currentIndex -
purgePreservation);
+ } else {
+ adjustedIndex = suggestedIndex;
}
final long lastPurge = purgeIndex.get();
- if (suggestedIndex - lastPurge < purgeGap) {
+ if (adjustedIndex - lastPurge < purgeGap) {
+ return CompletableFuture.completedFuture(lastPurge);
+ }
+ final long startIndex = getStartIndex();
+ if (adjustedIndex < startIndex) {
+ LOG.info("{}: purge({}) is skipped: adjustedIndex = {} < startIndex =
{}, purgePreservation = {}",
+ getName(), suggestedIndex, adjustedIndex, startIndex,
purgePreservation);
return CompletableFuture.completedFuture(lastPurge);
}
- LOG.info("{}: purge {}", getName(), suggestedIndex);
- final long finalSuggestedIndex = suggestedIndex;
- return purgeImpl(suggestedIndex).whenComplete((purged, e) -> {
+ LOG.info("{}: purge {}", getName(), adjustedIndex );
+ return purgeImpl(adjustedIndex).whenComplete((purged, e) -> {
updatePurgeIndex(purged);
if (e != null) {
- LOG.warn(getName() + ": Failed to purge " + finalSuggestedIndex, e);
+ LOG.warn(getName() + ": Failed to purge " + adjustedIndex, e);
}
});
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
index b574da545..e21e62a2e 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
@@ -358,6 +358,10 @@ public class SegmentedRaftLogCache {
TruncationSegments purge(long index) {
try (AutoCloseableLock writeLock = writeLock()) {
int segmentIndex = binarySearch(index);
+ if (segmentIndex == -1) {
+ // nothing to purge
+ return null;
+ }
List<LogSegment> list = new LinkedList<>();
if (segmentIndex == -segments.size() - 1) {
diff --git
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
index 52942279b..9bdbded8a 100644
---
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
+++
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
@@ -566,6 +566,7 @@ public class TestSegmentedRaftLog extends BaseTest {
int segmentSize = 200;
long beginIndexOfOpenSegment = segmentSize * (endTerm - startTerm - 1);
long expectedIndex = segmentSize * (endTerm - startTerm - 1);
+ long purgePreservation = 0L;
purgeAndVerify(startTerm, endTerm, segmentSize, 1,
beginIndexOfOpenSegment, expectedIndex);
}
@@ -602,15 +603,36 @@ public class TestSegmentedRaftLog extends BaseTest {
purgeAndVerify(startTerm, endTerm, segmentSize, 1000,
endIndexOfClosedSegment, expectedIndex);
}
+ @Test
+ public void testPurgeWithLargePurgePreservationAndSmallPurgeGap() throws
Exception {
+ int startTerm = 0;
+ int endTerm = 5;
+ int segmentSize = 200;
+ long endIndex = segmentSize * (endTerm - startTerm) - 1;
+ // start index is set so that the suggested index will not be negative,
which will not trigger any purge
+ long startIndex = 200;
+ // purge preservation is larger than the total size of the log entries
+ // which causes suggested index to be lower than the start index
+ long purgePreservation = (segmentSize * (endTerm - startTerm )) + 100;
+ // if the suggested index is lower than the start index due to the purge
preservation, we should not purge anything
+ purgeAndVerify(startTerm, endTerm, segmentSize, 1, endIndex, startIndex,
startIndex, purgePreservation);
+ }
+
private void purgeAndVerify(int startTerm, int endTerm, int segmentSize, int
purgeGap, long purgeIndex,
- long expectedIndex) throws Exception {
- List<SegmentRange> ranges = prepareRanges(startTerm, endTerm, segmentSize,
0);
+ long expectedIndex) throws Exception {
+ purgeAndVerify(startTerm, endTerm, segmentSize, purgeGap, purgeIndex,
expectedIndex, 0, 0);
+ }
+
+ private void purgeAndVerify(int startTerm, int endTerm, int segmentSize, int
purgeGap, long purgeIndex,
+ long expectedIndex, long startIndex, long purgePreservation) throws
Exception {
+ List<SegmentRange> ranges = prepareRanges(startTerm, endTerm, segmentSize,
startIndex);
List<LogEntryProto> entries = prepareLogEntries(ranges, null);
final RaftProperties p = new RaftProperties();
RaftServerConfigKeys.Log.setPurgeGap(p, purgeGap);
- try (SegmentedRaftLog raftLog = newSegmentedRaftLog(storage, p)) {
- raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
+ RaftServerConfigKeys.Log.setPurgePreservationLogNum(p, purgePreservation);
+ try (SegmentedRaftLog raftLog =
newSegmentedRaftLogWithSnapshotIndex(storage, p, () -> startIndex - 1)) {
+ raftLog.open(startIndex - 1, null);
entries.stream().map(raftLog::appendEntry).forEach(CompletableFuture::join);
final CompletableFuture<Long> f = raftLog.purge(purgeIndex);
final Long purged = f.get();