This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 4157bba7 RATIS-1626. Preserve recent logs when purge logs (#688)
4157bba7 is described below
commit 4157bba768aca3d177fa114663893b41b1fea8a6
Author: William Song <[email protected]>
AuthorDate: Sun Jul 24 23:45:43 2022 +0800
RATIS-1626. Preserve recent logs when purge logs (#688)
---
.../java/org/apache/ratis/server/RaftServerConfigKeys.java | 10 ++++++++++
.../main/java/org/apache/ratis/server/raftlog/RaftLogBase.java | 9 ++++++++-
2 files changed, 18 insertions(+), 1 deletion(-)
diff --git
a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
index f1e5efb0..ad05536f 100644
---
a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
+++
b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
@@ -285,6 +285,16 @@ public interface RaftServerConfigKeys {
setBoolean(properties::setBoolean, PURGE_UPTO_SNAPSHOT_INDEX_KEY,
shouldPurgeUptoSnapshotIndex);
}
+ String PURGE_PRESERVATION_LOG_NUM_KEY = PREFIX +
".purge.preservation.log.num";
+ long PURGE_PRESERVATION_LOG_NUM_DEFAULT = 0L;
+ static long purgePreservationLogNum(RaftProperties properties) {
+ return getLong(properties::getLong, PURGE_PRESERVATION_LOG_NUM_KEY,
+ PURGE_PRESERVATION_LOG_NUM_DEFAULT, getDefaultLog());
+ }
+ static void setPurgePreservationLogNum(RaftProperties properties, long
purgePreserveLogNum) {
+ setLong(properties::setLong, PURGE_PRESERVATION_LOG_NUM_KEY,
purgePreserveLogNum);
+ }
+
String SEGMENT_SIZE_MAX_KEY = PREFIX + ".segment.size.max";
SizeInBytes SEGMENT_SIZE_MAX_DEFAULT = SizeInBytes.valueOf("8MB");
static SizeInBytes segmentSizeMax(RaftProperties properties) {
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java
index 0cce06f1..60081edd 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java
@@ -76,6 +76,7 @@ public abstract class RaftLogBase implements RaftLog {
private final OpenCloseState state;
private final LongSupplier getSnapshotIndexFromStateMachine;
private final TimeDuration stateMachineDataReadTimeout;
+ private final long purgePreservation;
private volatile LogEntryProto lastMetadataEntry = null;
@@ -93,6 +94,7 @@ public abstract class RaftLogBase implements RaftLog {
this.state = new OpenCloseState(getName());
this.getSnapshotIndexFromStateMachine = getSnapshotIndexFromStateMachine;
this.stateMachineDataReadTimeout =
RaftServerConfigKeys.Log.StateMachineData.readTimeout(properties);
+ this.purgePreservation =
RaftServerConfigKeys.Log.purgePreservationLogNum(properties);
}
@Override
@@ -301,17 +303,22 @@ public abstract class RaftLogBase implements RaftLog {
@Override
public final CompletableFuture<Long> purge(long suggestedIndex) {
+ if (purgePreservation > 0) {
+ final long currentIndex = getNextIndex() - 1;
+ suggestedIndex = Math.min(suggestedIndex, currentIndex -
purgePreservation);
+ }
final long lastPurge = purgeIndex.get();
if (suggestedIndex - lastPurge < purgeGap) {
return CompletableFuture.completedFuture(lastPurge);
}
LOG.info("{}: purge {}", getName(), suggestedIndex);
+ final long finalSuggestedIndex = suggestedIndex;
return purgeImpl(suggestedIndex).whenComplete((purged, e) -> {
if (purged != null) {
purgeIndex.updateToMax(purged, infoIndexChange);
}
if (e != null) {
- LOG.warn(getName() + ": Failed to purge " + suggestedIndex, e);
+ LOG.warn(getName() + ": Failed to purge " + finalSuggestedIndex, e);
}
});
}