This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch release-3.1.3_review
in repository https://gitbox.apache.org/repos/asf/ratis.git

commit 1571cc73e36cec1a918321ec766fb745761a06d3
Author: Ivan Andika <ivan_andi...@rocketmail.com>
AuthorDate: Wed Nov 13 00:23:04 2024 +0800

    RATIS-2186. Raft log should not purge index lower than the log start index 
(#1175)
---
 .../apache/ratis/server/raftlog/RaftLogBase.java   | 20 ++++++++++-----
 .../raftlog/segmented/SegmentedRaftLogCache.java   |  9 +++++--
 .../raftlog/segmented/TestSegmentedRaftLog.java    | 30 +++++++++++++++++++---
 3 files changed, 47 insertions(+), 12 deletions(-)

diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java
index 1e22e07bd..9b0367213 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java
@@ -314,20 +314,28 @@ public abstract class RaftLogBase implements RaftLog {
 
   @Override
   public final CompletableFuture<Long> purge(long suggestedIndex) {
+    final long adjustedIndex;
     if (purgePreservation > 0) {
       final long currentIndex = getNextIndex() - 1;
-      suggestedIndex = Math.min(suggestedIndex, currentIndex - 
purgePreservation);
+      adjustedIndex = Math.min(suggestedIndex, currentIndex - 
purgePreservation);
+    } else {
+      adjustedIndex = suggestedIndex;
     }
     final long lastPurge = purgeIndex.get();
-    if (suggestedIndex - lastPurge < purgeGap) {
+    if (adjustedIndex - lastPurge < purgeGap) {
+      return CompletableFuture.completedFuture(lastPurge);
+    }
+    final long startIndex = getStartIndex();
+    if (adjustedIndex < startIndex) {
+      LOG.info("{}: purge({}) is skipped: adjustedIndex = {} < startIndex = 
{}, purgePreservation = {}",
+          getName(), suggestedIndex, adjustedIndex, startIndex, 
purgePreservation);
       return CompletableFuture.completedFuture(lastPurge);
     }
-    LOG.info("{}: purge {}", getName(), suggestedIndex);
-    final long finalSuggestedIndex = suggestedIndex;
-    return purgeImpl(suggestedIndex).whenComplete((purged, e) -> {
+    LOG.info("{}: purge {}", getName(), adjustedIndex );
+    return purgeImpl(adjustedIndex).whenComplete((purged, e) -> {
       updatePurgeIndex(purged);
       if (e != null) {
-        LOG.warn(getName() + ": Failed to purge " + finalSuggestedIndex, e);
+        LOG.warn(getName() + ": Failed to purge " + adjustedIndex, e);
       }
     });
   }
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
index 20b7a5c37..e2ddb1508 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
@@ -357,6 +357,10 @@ public class SegmentedRaftLogCache {
       try (AutoCloseableLock writeLock = writeLock()) {
         int segmentIndex = binarySearch(index);
         List<SegmentFileInfo> list = new ArrayList<>();
+        if (segmentIndex == -1) {
+          // nothing to purge
+          return null;
+        }
 
         if (segmentIndex == -segments.size() - 1) {
           for (LogSegment ls : segments) {
@@ -527,8 +531,9 @@ public class SegmentedRaftLogCache {
   }
 
   LogSegment getSegment(long index) {
-    if (openSegment != null && index >= openSegment.getStartIndex()) {
-      return openSegment;
+    final LogSegment open = this.openSegment;
+    if (open != null && index >= open.getStartIndex()) {
+      return open;
     } else {
       return closedSegments.search(index);
     }
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
 
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
index 93eb7db0e..a772b0002 100644
--- 
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
@@ -563,6 +563,7 @@ public class TestSegmentedRaftLog extends BaseTest {
     int segmentSize = 200;
     long beginIndexOfOpenSegment = segmentSize * (endTerm - startTerm - 1);
     long expectedIndex = segmentSize * (endTerm - startTerm - 1);
+    long purgePreservation = 0L;
     purgeAndVerify(startTerm, endTerm, segmentSize, 1, 
beginIndexOfOpenSegment, expectedIndex);
   }
 
@@ -599,15 +600,36 @@ public class TestSegmentedRaftLog extends BaseTest {
     purgeAndVerify(startTerm, endTerm, segmentSize, 1000, 
endIndexOfClosedSegment, expectedIndex);
   }
 
+  @Test
+  public void testPurgeWithLargePurgePreservationAndSmallPurgeGap() throws 
Exception {
+    int startTerm = 0;
+    int endTerm = 5;
+    int segmentSize = 200;
+    long endIndex = segmentSize * (endTerm - startTerm) - 1;
+    // start index is set so that the suggested index will not be negative, 
which will not trigger any purge
+    long startIndex = 200;
+    // purge preservation is larger than the total size of the log entries
+    // which causes suggested index to be lower than the start index
+    long purgePreservation = (segmentSize * (endTerm - startTerm )) + 100;
+    // if the suggested index is lower than the start index due to the purge 
preservation, we should not purge anything
+    purgeAndVerify(startTerm, endTerm, segmentSize, 1, endIndex, startIndex, 
startIndex, purgePreservation);
+  }
+
   private void purgeAndVerify(int startTerm, int endTerm, int segmentSize, int 
purgeGap, long purgeIndex,
-      long expectedIndex) throws Exception {
-    List<SegmentRange> ranges = prepareRanges(startTerm, endTerm, segmentSize, 
0);
+                              long expectedIndex) throws Exception {
+    purgeAndVerify(startTerm, endTerm, segmentSize, purgeGap, purgeIndex, 
expectedIndex, 0, 0);
+  }
+
+  private void purgeAndVerify(int startTerm, int endTerm, int segmentSize, int 
purgeGap, long purgeIndex,
+      long expectedIndex, long startIndex, long purgePreservation) throws 
Exception {
+    List<SegmentRange> ranges = prepareRanges(startTerm, endTerm, segmentSize, 
startIndex);
     List<LogEntryProto> entries = prepareLogEntries(ranges, null);
 
     final RaftProperties p = new RaftProperties();
     RaftServerConfigKeys.Log.setPurgeGap(p, purgeGap);
-    try (SegmentedRaftLog raftLog = newSegmentedRaftLog(storage, p)) {
-      raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
+    RaftServerConfigKeys.Log.setPurgePreservationLogNum(p, purgePreservation);
+    try (SegmentedRaftLog raftLog = 
newSegmentedRaftLogWithSnapshotIndex(storage, p, () -> startIndex - 1)) {
+      raftLog.open(startIndex - 1, null);
       
entries.stream().map(raftLog::appendEntry).forEach(CompletableFuture::join);
       final CompletableFuture<Long> f = raftLog.purge(purgeIndex);
       final Long purged = f.get();

Reply via email to