This is an automated email from the ASF dual-hosted git repository.
ljain pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new f5ebbb7 RATIS-1088. SegmentedRaftLogWorker#closeLogSegment should
also roll the open segment maintained in the cache. (#217)
f5ebbb7 is described below
commit f5ebbb7225288e2c2ba75d30acadef57660250e7
Author: avijayanhwx <[email protected]>
AuthorDate: Tue Oct 20 00:50:07 2020 -0700
RATIS-1088. SegmentedRaftLogWorker#closeLogSegment should also roll the
open segment maintained in the cache. (#217)
---
.../java/org/apache/ratis/server/raftlog/RaftLog.java | 2 +-
.../ratis/server/raftlog/memory/MemoryRaftLog.java | 3 ++-
.../ratis/server/raftlog/segmented/LogSegment.java | 5 +++++
.../ratis/server/raftlog/segmented/SegmentedRaftLog.java | 16 ++++++++++++----
.../server/raftlog/segmented/SegmentedRaftLogCache.java | 7 ++++++-
.../server/raftlog/segmented/TestSegmentedRaftLog.java | 4 ++--
.../raftlog/segmented/TestSegmentedRaftLogCache.java | 5 +++--
7 files changed, 31 insertions(+), 11 deletions(-)
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java
index 8f88020..69dff24 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java
@@ -428,7 +428,7 @@ public abstract class RaftLog implements
RaftLogSequentialOps, Closeable {
public abstract Metadata loadMetadata() throws IOException;
- public abstract void syncWithSnapshot(long lastSnapshotIndex);
+ public abstract CompletableFuture<Long> syncWithSnapshot(long
lastSnapshotIndex);
public abstract boolean isConfigEntry(TermIndex ti);
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
index 1d8e3b5..aeb9deb 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
@@ -217,7 +217,8 @@ public class MemoryRaftLog extends RaftLog {
}
@Override
- public void syncWithSnapshot(long lastSnapshotIndex) {
+ public CompletableFuture<Long> syncWithSnapshot(long lastSnapshotIndex) {
+ return CompletableFuture.completedFuture(lastSnapshotIndex);
// do nothing
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
index 93e5049..fb58a4a 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
@@ -401,4 +401,9 @@ public class LogSegment implements Comparable<Long> {
boolean containsIndex(long index) {
return startIndex <= index && endIndex >= index;
}
+
+ boolean hasEntries() {
+ return numOfEntries() > 0;
+ }
+
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
index df5e11a..27e733e 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
@@ -484,7 +484,7 @@ public class SegmentedRaftLog extends RaftLog {
}
@Override
- public void syncWithSnapshot(long lastSnapshotIndex) {
+ public CompletableFuture<Long> syncWithSnapshot(long lastSnapshotIndex) {
fileLogWorker.syncWithSnapshot(lastSnapshotIndex);
// TODO purge normal/tmp/corrupt snapshot files
// if the last index in snapshot is larger than the index of the last
@@ -493,10 +493,18 @@ public class SegmentedRaftLog extends RaftLog {
// Close open log segment if entries are already included in snapshot
LogSegment openSegment = cache.getOpenSegment();
- if (openSegment != null && openSegment.getEndIndex() <= lastSnapshotIndex)
{
- fileLogWorker.closeLogSegment(openSegment);
+ if (openSegment != null && openSegment.hasEntries()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("syncWithSnapshot : Found open segment {}, with end index
{},"
+ + " snapshotIndex {}", openSegment, openSegment.getEndIndex(),
+ lastSnapshotIndex);
+ }
+ if (openSegment.getEndIndex() <= lastSnapshotIndex) {
+ fileLogWorker.closeLogSegment(openSegment);
+ cache.rollOpenSegment(true);
+ }
}
- purgeImpl(lastSnapshotIndex);
+ return purgeImpl(lastSnapshotIndex);
}
@Override
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
index 3e4d62c..50c668b 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
@@ -307,7 +307,12 @@ public class SegmentedRaftLogCache {
sizeInBytes = 0;
} else if (segmentIndex >= 0) {
// we start to purge the closedSegments which do not overlap with
index.
- for (int i = segmentIndex - 1; i >= 0; i--) {
+ LogSegment overlappedSegment = segments.get(segmentIndex);
+ // if a segment's end index matches the passed in index, it is OK
+ // to purge that.
+ int startIndex = (overlappedSegment.getEndIndex() == index) ?
+ segmentIndex : segmentIndex - 1;
+ for (int i = startIndex; i >= 0; i--) {
LogSegment segment = segments.remove(i);
sizeInBytes -= segment.getTotalSize();
list.add(SegmentFileInfo.newClosedSegmentFileInfo(segment));
diff --git
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
index 5ba5e91..fddcb04 100644
---
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
+++
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
@@ -398,7 +398,7 @@ public class TestSegmentedRaftLog extends BaseTest {
int endTerm = 5;
int segmentSize = 200;
long endIndexOfClosedSegment = segmentSize * (endTerm - startTerm - 1) - 1;
- long expectedIndex = segmentSize * (endTerm - startTerm - 2);
+ long expectedIndex = segmentSize * (endTerm - startTerm - 1);
purgeAndVerify(startTerm, endTerm, segmentSize, 1,
endIndexOfClosedSegment, expectedIndex);
}
@@ -408,7 +408,7 @@ public class TestSegmentedRaftLog extends BaseTest {
int endTerm = 5;
int segmentSize = 200;
long endIndexOfClosedSegment = segmentSize * (endTerm - startTerm - 1) - 1;
- long expectedIndex = segmentSize * (endTerm - startTerm - 2);
+ long expectedIndex = segmentSize * (endTerm - startTerm - 1);
RatisMetricRegistry metricRegistryForLogWorker = new
RaftLogMetrics(memberId.toString()).getRegistry();
purgeAndVerify(startTerm, endTerm, segmentSize, 1,
endIndexOfClosedSegment, expectedIndex);
Assert.assertTrue(metricRegistryForLogWorker.timer("purgeLog").getCount()
> 0);
diff --git
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java
index 1d4f8ca..67775e6 100644
---
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java
+++
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java
@@ -262,8 +262,9 @@ public class TestSegmentedRaftLogCache {
int purgeIndex = (end - start) * segmentSize - 1;
- // overlapped close segment will not purged.
- TruncationSegments ts = cache.purge(purgeIndex);
+ // overlapped close segment will not purged. Passing in index - 1 since
+ // we purge a closed segment when end index == passed in purge index.
+ TruncationSegments ts = cache.purge(purgeIndex - 1);
Assert.assertNull(ts.getToTruncate());
Assert.assertEquals(end - start - 1, ts.getToDelete().length);
Assert.assertEquals(1, cache.getNumOfSegments());