This is an automated email from the ASF dual-hosted git repository.

ljain pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new f5ebbb7  RATIS-1088. SegmentedRaftLogWorker#closeLogSegment should 
also roll the open segment maintained in the cache. (#217)
f5ebbb7 is described below

commit f5ebbb7225288e2c2ba75d30acadef57660250e7
Author: avijayanhwx <[email protected]>
AuthorDate: Tue Oct 20 00:50:07 2020 -0700

    RATIS-1088. SegmentedRaftLogWorker#closeLogSegment should also roll the 
open segment maintained in the cache. (#217)
---
 .../java/org/apache/ratis/server/raftlog/RaftLog.java    |  2 +-
 .../ratis/server/raftlog/memory/MemoryRaftLog.java       |  3 ++-
 .../ratis/server/raftlog/segmented/LogSegment.java       |  5 +++++
 .../ratis/server/raftlog/segmented/SegmentedRaftLog.java | 16 ++++++++++++----
 .../server/raftlog/segmented/SegmentedRaftLogCache.java  |  7 ++++++-
 .../server/raftlog/segmented/TestSegmentedRaftLog.java   |  4 ++--
 .../raftlog/segmented/TestSegmentedRaftLogCache.java     |  5 +++--
 7 files changed, 31 insertions(+), 11 deletions(-)

diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java
index 8f88020..69dff24 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java
@@ -428,7 +428,7 @@ public abstract class RaftLog implements 
RaftLogSequentialOps, Closeable {
 
   public abstract Metadata loadMetadata() throws IOException;
 
-  public abstract void syncWithSnapshot(long lastSnapshotIndex);
+  public abstract CompletableFuture<Long> syncWithSnapshot(long 
lastSnapshotIndex);
 
   public abstract boolean isConfigEntry(TermIndex ti);
 
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
index 1d8e3b5..aeb9deb 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
@@ -217,7 +217,8 @@ public class MemoryRaftLog extends RaftLog {
   }
 
   @Override
-  public void syncWithSnapshot(long lastSnapshotIndex) {
+  public CompletableFuture<Long> syncWithSnapshot(long lastSnapshotIndex) {
+    return CompletableFuture.completedFuture(lastSnapshotIndex);
     // do nothing
   }
 
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
index 93e5049..fb58a4a 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
@@ -401,4 +401,9 @@ public class LogSegment implements Comparable<Long> {
   boolean containsIndex(long index) {
     return startIndex <= index && endIndex >= index;
   }
+
+  boolean hasEntries() {
+    return numOfEntries() > 0;
+  }
+
 }
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
index df5e11a..27e733e 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
@@ -484,7 +484,7 @@ public class SegmentedRaftLog extends RaftLog {
   }
 
   @Override
-  public void syncWithSnapshot(long lastSnapshotIndex) {
+  public CompletableFuture<Long> syncWithSnapshot(long lastSnapshotIndex) {
     fileLogWorker.syncWithSnapshot(lastSnapshotIndex);
     // TODO purge normal/tmp/corrupt snapshot files
     // if the last index in snapshot is larger than the index of the last
@@ -493,10 +493,18 @@ public class SegmentedRaftLog extends RaftLog {
 
     // Close open log segment if entries are already included in snapshot
     LogSegment openSegment = cache.getOpenSegment();
-    if (openSegment != null && openSegment.getEndIndex() <= lastSnapshotIndex) 
{
-      fileLogWorker.closeLogSegment(openSegment);
+    if (openSegment != null && openSegment.hasEntries()) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("syncWithSnapshot : Found open segment {}, with end index 
{},"
+                + " snapshotIndex {}", openSegment, openSegment.getEndIndex(),
+            lastSnapshotIndex);
+      }
+      if (openSegment.getEndIndex() <= lastSnapshotIndex) {
+        fileLogWorker.closeLogSegment(openSegment);
+        cache.rollOpenSegment(true);
+      }
     }
-    purgeImpl(lastSnapshotIndex);
+    return purgeImpl(lastSnapshotIndex);
   }
 
   @Override
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
index 3e4d62c..50c668b 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
@@ -307,7 +307,12 @@ public class SegmentedRaftLogCache {
           sizeInBytes = 0;
         } else if (segmentIndex >= 0) {
           // we start to purge the closedSegments which do not overlap with 
index.
-          for (int i = segmentIndex - 1; i >= 0; i--) {
+          LogSegment overlappedSegment = segments.get(segmentIndex);
+          // if a segment's end index matches the passed in index, it is OK
+          // to purge that.
+          int startIndex = (overlappedSegment.getEndIndex() == index) ?
+              segmentIndex : segmentIndex - 1;
+          for (int i = startIndex; i >= 0; i--) {
             LogSegment segment = segments.remove(i);
             sizeInBytes -= segment.getTotalSize();
             list.add(SegmentFileInfo.newClosedSegmentFileInfo(segment));
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
 
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
index 5ba5e91..fddcb04 100644
--- 
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
@@ -398,7 +398,7 @@ public class TestSegmentedRaftLog extends BaseTest {
     int endTerm = 5;
     int segmentSize = 200;
     long endIndexOfClosedSegment = segmentSize * (endTerm - startTerm - 1) - 1;
-    long expectedIndex = segmentSize * (endTerm - startTerm - 2);
+    long expectedIndex = segmentSize * (endTerm - startTerm - 1);
     purgeAndVerify(startTerm, endTerm, segmentSize, 1, 
endIndexOfClosedSegment, expectedIndex);
   }
 
@@ -408,7 +408,7 @@ public class TestSegmentedRaftLog extends BaseTest {
     int endTerm = 5;
     int segmentSize = 200;
     long endIndexOfClosedSegment = segmentSize * (endTerm - startTerm - 1) - 1;
-    long expectedIndex = segmentSize * (endTerm - startTerm - 2);
+    long expectedIndex = segmentSize * (endTerm - startTerm - 1);
     RatisMetricRegistry metricRegistryForLogWorker = new 
RaftLogMetrics(memberId.toString()).getRegistry();
     purgeAndVerify(startTerm, endTerm, segmentSize, 1, 
endIndexOfClosedSegment, expectedIndex);
     Assert.assertTrue(metricRegistryForLogWorker.timer("purgeLog").getCount() 
> 0);
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java
 
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java
index 1d4f8ca..67775e6 100644
--- 
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java
@@ -262,8 +262,9 @@ public class TestSegmentedRaftLogCache {
 
     int purgeIndex = (end - start) * segmentSize - 1;
 
-    // overlapped close segment will not purged.
-    TruncationSegments ts = cache.purge(purgeIndex);
+    // overlapped close segment will not purged. Passing in index - 1 since
+    // we purge a closed segment when end index == passed in purge index.
+    TruncationSegments ts = cache.purge(purgeIndex - 1);
     Assert.assertNull(ts.getToTruncate());
     Assert.assertEquals(end - start - 1, ts.getToDelete().length);
     Assert.assertEquals(1, cache.getNumOfSegments());

Reply via email to