This is an automated email from the ASF dual-hosted git repository.
msingh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 346c024 RATIS-563. Purge logs in takeSnapshot only after all the
nodes have caught up with the index. Contributed by Tsz Wo Nicholas Sze.
346c024 is described below
commit 346c024d1ce91b5c3b154f3bed58d8d1bfac6c64
Author: Mukul Kumar Singh <[email protected]>
AuthorDate: Tue May 28 23:50:11 2019 +0530
RATIS-563. Purge logs in takeSnapshot only after all the nodes have caught
up with the index. Contributed by Tsz Wo Nicholas Sze.
---
.../apache/ratis/server/RaftServerConfigKeys.java | 9 +++++
.../org/apache/ratis/server/impl/ServerState.java | 7 ++--
.../ratis/server/impl/StateMachineUpdater.java | 15 +++++++-
.../org/apache/ratis/server/raftlog/RaftLog.java | 42 +++++++++++++++++-----
.../ratis/server/raftlog/memory/MemoryRaftLog.java | 5 +--
.../server/raftlog/segmented/SegmentedRaftLog.java | 4 +--
.../raftlog/segmented/SegmentedRaftLogCache.java | 28 +++++++++++----
.../raftlog/segmented/SegmentedRaftLogWorker.java | 4 +--
.../raftlog/segmented/TestSegmentedRaftLog.java | 24 ++++++++++---
9 files changed, 105 insertions(+), 33 deletions(-)
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
index e643bf4..0fdd622 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
@@ -158,6 +158,15 @@ public interface RaftServerConfigKeys {
setInt(properties::setInt, QUEUE_BYTE_LIMIT_KEY, queueSize,
requireMin(1));
}
+ String PURGE_GAP_KEY = PREFIX + ".purge.gap";
+ int PURGE_GAP_DEFAULT = 1024;
+ static int purgeGap(RaftProperties properties) {
+ return getInt(properties::getInt, PURGE_GAP_KEY, PURGE_GAP_DEFAULT,
getDefaultLog(), requireMin(1));
+ }
+ static void setPurgeGap(RaftProperties properties, int purgeGap) {
+ setInt(properties::setInt, PURGE_GAP_KEY, purgeGap, requireMin(1));
+ }
+
String SEGMENT_SIZE_MAX_KEY = PREFIX + ".segment.size.max";
SizeInBytes SEGMENT_SIZE_MAX_DEFAULT = SizeInBytes.valueOf("8MB");
static SizeInBytes segmentSizeMax(RaftProperties properties) {
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
index 0c49716..b40a131 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
@@ -189,12 +189,9 @@ public class ServerState implements Closeable {
throws IOException {
final RaftLog log;
if (RaftServerConfigKeys.Log.useMemory(prop)) {
- final int maxBufferSize =
- RaftServerConfigKeys.Log.Appender.bufferByteLimit(prop).getSizeInt();
- log = new MemoryRaftLog(id, lastIndexInSnapshot, maxBufferSize);
+ log = new MemoryRaftLog(id, lastIndexInSnapshot, prop);
} else {
- log = new SegmentedRaftLog(id, server, this.storage,
- lastIndexInSnapshot, prop);
+ log = new SegmentedRaftLog(id, server, this.storage,
lastIndexInSnapshot, prop);
}
log.open(lastIndexInSnapshot, logConsumer);
return log;
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
index d28c7ab..c380781 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
@@ -18,7 +18,9 @@
package org.apache.ratis.server.impl;
import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.StateMachineException;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
@@ -38,6 +40,7 @@ import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
+import java.util.stream.LongStream;
/**
* This class tracks the log entries that have been committed in a quorum and
@@ -221,6 +224,13 @@ class StateMachineUpdater implements Runnable {
final long i;
try {
i = stateMachine.takeSnapshot();
+
+ final long appliedIndex = getLastAppliedIndex();
+ if (i > appliedIndex) {
+ throw new StateMachineException(
+ "Bug in StateMachine: snapshot index = " + i + " > appliedIndex =
" + appliedIndex
+ + "; StateMachine class=" + stateMachine.getClass().getName() +
", stateMachine=" + stateMachine);
+ }
} catch (IOException e) {
LOG.error(name + ": Failed to take snapshot", e);
return;
@@ -229,7 +239,10 @@ class StateMachineUpdater implements Runnable {
if (i >= 0) {
LOG.info("{}: Took a snapshot at index {}", name, i);
snapshotIndex.updateIncreasingly(i, infoIndexChange);
- raftLog.purge(i);
+
+ final LongStream commitIndexStream =
server.getCommitInfos().stream().mapToLong(CommitInfoProto::getCommitIndex);
+ final long purgeIndex = LongStream.concat(LongStream.of(i),
commitIndexStream).min().orElse(i);
+ raftLog.purge(purgeIndex);
}
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java
index fe7625d..ae357dc 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java
@@ -17,9 +17,11 @@
*/
package org.apache.ratis.server.raftlog;
+import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.StateMachineException;
+import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.impl.LogAppender;
import org.apache.ratis.server.impl.RaftConfiguration;
import org.apache.ratis.server.impl.RaftServerConstants;
@@ -68,6 +70,9 @@ public abstract class RaftLog implements
RaftLogSequentialOps, Closeable {
* in the latest snapshot file.
*/
private final RaftLogIndex commitIndex;
+ private final RaftLogIndex purgeIndex;
+ private final int purgeGap;
+
private final RaftPeerId selfId;
private final int maxBufferSize;
@@ -77,10 +82,13 @@ public abstract class RaftLog implements
RaftLogSequentialOps, Closeable {
private volatile LogEntryProto lastMetadataEntry = null;
- public RaftLog(RaftPeerId selfId, long commitIndex, int maxBufferSize) {
+ protected RaftLog(RaftPeerId selfId, long commitIndex, RaftProperties
properties) {
this.selfId = selfId;
this.commitIndex = new RaftLogIndex("commitIndex", commitIndex);
- this.maxBufferSize = maxBufferSize;
+ this.purgeIndex = new RaftLogIndex("purgeIndex", LEAST_VALID_LOG_INDEX -
1);
+ this.purgeGap = RaftServerConfigKeys.Log.purgeGap(properties);
+
+ this.maxBufferSize =
RaftServerConfigKeys.Log.Appender.bufferByteLimit(properties).getSizeInt();
this.state = new OpenCloseState(getName());
}
@@ -245,6 +253,11 @@ public abstract class RaftLog implements
RaftLogSequentialOps, Closeable {
Optional.ofNullable(lastMetadataEntry).ifPresent(
e -> commitIndex.updateToMax(e.getMetadataEntry().getCommitIndex(),
infoIndexChange));
state.open();
+
+ final long startIndex = getStartIndex();
+ if (startIndex > LEAST_VALID_LOG_INDEX) {
+ purgeIndex.updateIncreasingly(startIndex - 1, infoIndexChange);
+ }
}
protected void openImpl(long lastIndexInSnapshot, Consumer<LogEntryProto>
consumer) throws IOException {
@@ -316,19 +329,30 @@ public abstract class RaftLog implements
RaftLogSequentialOps, Closeable {
protected abstract CompletableFuture<Long> truncateImpl(long index);
/**
- * Purge asynchronously delete the segment files which does not overlap with
the given index.
- * Open segment will not be considered for purging.
+ * Purge asynchronously the log transactions.
+ * The implementation may choose to purge an index other than the suggested
index.
*
- * @param index - is inclusive.
+ * @param suggestedIndex the suggested index (inclusive) to be purged.
+ * @return the future of the actual purged log index.
*/
- public final CompletableFuture<Long> purge(long index) {
- LOG.info("{}: purge {}", getName(), index);
- return purgeImpl(index);
+ public final CompletableFuture<Long> purge(long suggestedIndex) {
+ final long lastPurge = purgeIndex.get();
+ if (suggestedIndex - lastPurge < purgeGap) {
+ return CompletableFuture.completedFuture(lastPurge);
+ }
+ LOG.info("{}: purge {}", getName(), suggestedIndex);
+ return purgeImpl(suggestedIndex).whenComplete((purged, e) -> {
+ if (purged != null) {
+ purgeIndex.updateToMax(purged, infoIndexChange);
+ }
+ if (e != null) {
+ LOG.warn(getName() + ": Failed to purge " + suggestedIndex, e);
+ }
+ });
}
protected abstract CompletableFuture<Long> purgeImpl(long index);
-
@Override
public final CompletableFuture<Long> appendEntry(LogEntryProto entry) {
return runner.runSequentially(() -> appendEntryImpl(entry));
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
index bbe2e1c..f203282 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
@@ -17,6 +17,7 @@
*/
package org.apache.ratis.server.raftlog.memory;
+import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.impl.RaftServerConstants;
import org.apache.ratis.server.impl.ServerProtoUtils;
@@ -71,8 +72,8 @@ public class MemoryRaftLog extends RaftLog {
private final EntryList entries = new EntryList();
private final AtomicReference<Metadata> metadata = new AtomicReference<>(new
Metadata(null, 0));
- public MemoryRaftLog(RaftPeerId selfId, long commitIndex, int maxBufferSize)
{
- super(selfId, commitIndex, maxBufferSize);
+ public MemoryRaftLog(RaftPeerId selfId, long commitIndex, RaftProperties
properties) {
+ super(selfId, commitIndex, properties);
}
@Override
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
index 3e2fb7b..a5d261f 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
@@ -118,7 +118,7 @@ public class SegmentedRaftLog extends RaftLog {
SegmentedRaftLog(RaftPeerId selfId, RaftServerImpl server,
StateMachine stateMachine, Runnable submitUpdateCommitEvent,
RaftStorage storage, long lastIndexInSnapshot, RaftProperties
properties) {
- super(selfId, lastIndexInSnapshot,
RaftServerConfigKeys.Log.Appender.bufferByteLimit(properties).getSizeInt());
+ super(selfId, lastIndexInSnapshot, properties);
this.server = Optional.ofNullable(server);
this.storage = storage;
segmentMaxSize =
RaftServerConfigKeys.Log.segmentSizeMax(properties).getSize();
@@ -270,7 +270,7 @@ public class SegmentedRaftLog extends RaftLog {
protected CompletableFuture<Long> purgeImpl(long index) {
try (AutoCloseableLock writeLock = writeLock()) {
SegmentedRaftLogCache.TruncationSegments ts = cache.purge(index);
- LOG.debug("truncating segments:{}", ts);
+ LOG.debug("purging segments:{}", ts);
if (ts != null) {
Task task = fileLogWorker.purge(ts);
return task.getFuture();
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
index b94c078..cbc9555 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
@@ -49,15 +49,20 @@ import static
org.apache.ratis.server.impl.RaftServerConstants.INVALID_LOG_INDEX
class SegmentedRaftLogCache {
public static final Logger LOG =
LoggerFactory.getLogger(SegmentedRaftLogCache.class);
- static class SegmentFileInfo {
+ static final class SegmentFileInfo {
+ static SegmentFileInfo newClosedSegmentFileInfo(LogSegment ls) {
+ Objects.requireNonNull(ls, "ls == null");
+ Preconditions.assertTrue(!ls.isOpen(), ls + " is OPEN");
+ return new SegmentFileInfo(ls.getStartIndex(), ls.getEndIndex(),
ls.isOpen(), 0, 0);
+ }
+
final long startIndex; // start index of the segment
final long endIndex; // original end index
final boolean isOpen;
final long targetLength; // position for truncation
final long newEndIndex; // new end index after the truncation
- SegmentFileInfo(long start, long end, boolean isOpen, long targetLength,
- long newEndIndex) {
+ private SegmentFileInfo(long start, long end, boolean isOpen, long
targetLength, long newEndIndex) {
this.startIndex = start;
this.endIndex = end;
this.isOpen = isOpen;
@@ -84,6 +89,17 @@ class SegmentedRaftLogCache {
this.toTruncate = toTruncate;
}
+ long maxEndIndex() {
+ long max = Long.MIN_VALUE;
+ if (toTruncate != null) {
+ max = toTruncate.endIndex;
+ }
+ for(SegmentFileInfo d : toDelete) {
+ max = Math.max(max, d.endIndex);
+ }
+ return max;
+ }
+
@Override
public String toString() {
return "toTruncate: " + toTruncate
@@ -246,15 +262,13 @@ class SegmentedRaftLogCache {
if (segmentIndex == -segments.size() - 1) {
for (LogSegment ls : segments) {
- list.add(new SegmentFileInfo(ls.getStartIndex(), ls.getEndIndex(),
false, 0, 0));
+ list.add(SegmentFileInfo.newClosedSegmentFileInfo(ls));
}
segments.clear();
} else if (segmentIndex >= 0) {
// we start to purge the closedSegments which do not overlap with
index.
for (int i = segmentIndex - 1; i >= 0; i--) {
- LogSegment ls = segments.get(i);
- list.add(new SegmentFileInfo(ls.getStartIndex(), ls.getEndIndex(),
false, 0, 0));
- segments.remove(i);
+
list.add(SegmentFileInfo.newClosedSegmentFileInfo(segments.remove(i)));
}
} else {
throw new IllegalStateException("Unexpected gap in segments:
binarySearch(" + index + ") returns "
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
index a8752ec..bdec7ea 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
@@ -120,7 +120,7 @@ class SegmentedRaftLogWorker implements Runnable {
SegmentedRaftLogWorker(RaftPeerId selfId, StateMachine stateMachine,
Runnable submitUpdateCommitEvent,
RaftStorage storage, RaftProperties properties) {
- this.name = selfId + "-" + getClass().getSimpleName();
+ this.name = selfId + "-" + getClass().getSimpleName() + ":" +
storage.getStorageDir();
LOG.info("new {} for {}", name, storage);
this.submitUpdateCommitEvent = submitUpdateCommitEvent;
@@ -342,7 +342,7 @@ class SegmentedRaftLogWorker implements Runnable {
@Override
long getEndIndex() {
- return 0;
+ return segments.maxEndIndex();
}
}
diff --git
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
index 0b2700a..07b4f2c 100644
---
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
+++
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
@@ -371,7 +371,7 @@ public class TestSegmentedRaftLog extends BaseTest {
int segmentSize = 200;
long beginIndexOfOpenSegment = segmentSize * (endTerm - startTerm - 1);
long expectedIndex = segmentSize * (endTerm - startTerm - 1);
- purgeAndVerify(startTerm, endTerm, segmentSize, beginIndexOfOpenSegment,
expectedIndex);
+ purgeAndVerify(startTerm, endTerm, segmentSize, 1,
beginIndexOfOpenSegment, expectedIndex);
}
@Test
@@ -381,17 +381,31 @@ public class TestSegmentedRaftLog extends BaseTest {
int segmentSize = 200;
long endIndexOfClosedSegment = segmentSize * (endTerm - startTerm - 1) - 1;
long expectedIndex = segmentSize * (endTerm - startTerm - 2);
- purgeAndVerify(startTerm, endTerm, segmentSize, endIndexOfClosedSegment,
expectedIndex);
+ purgeAndVerify(startTerm, endTerm, segmentSize, 1,
endIndexOfClosedSegment, expectedIndex);
}
- private void purgeAndVerify(int startTerm, int endTerm, int segmentSize,
long purgeIndex, long expectedIndex) throws IOException {
+ @Test
+ public void testPurgeOnClosedSegmentsWithPurgeGap() throws Exception {
+ int startTerm = 0;
+ int endTerm = 5;
+ int segmentSize = 200;
+ long endIndexOfClosedSegment = segmentSize * (endTerm - startTerm - 1) - 1;
+ long expectedIndex = RaftLog.LEAST_VALID_LOG_INDEX;
+ purgeAndVerify(startTerm, endTerm, segmentSize, 1000,
endIndexOfClosedSegment, expectedIndex);
+ }
+
+ private void purgeAndVerify(int startTerm, int endTerm, int segmentSize, int
purgeGap, long purgeIndex, long expectedIndex) throws Exception {
List<SegmentRange> ranges = prepareRanges(startTerm, endTerm, segmentSize,
0);
List<LogEntryProto> entries = prepareLogEntries(ranges, null);
- try (SegmentedRaftLog raftLog = new SegmentedRaftLog(peerId, null,
storage, -1, properties)) {
+ final RaftProperties p = new RaftProperties();
+ RaftServerConfigKeys.Log.setPurgeGap(p, purgeGap);
+ try (SegmentedRaftLog raftLog = new SegmentedRaftLog(peerId, null,
storage, -1, p)) {
raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null);
entries.stream().map(raftLog::appendEntry).forEach(CompletableFuture::join);
- raftLog.purge(purgeIndex).join();
+ final CompletableFuture<Long> f = raftLog.purge(purgeIndex);
+ final Long purged = f.get();
+ LOG.info("purgeIndex = {}, purged = {}", purgeIndex, purged);
Assert.assertEquals(expectedIndex,
raftLog.getRaftLogCache().getStartIndex());
}
}