This is an automated email from the ASF dual-hosted git repository.

msingh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 346c024  RATIS-563. Purge logs in takeSnapshot only after all the 
nodes have caught up with the index. Contributed by Tsz Wo Nicholas Sze.
346c024 is described below

commit 346c024d1ce91b5c3b154f3bed58d8d1bfac6c64
Author: Mukul Kumar Singh <[email protected]>
AuthorDate: Tue May 28 23:50:11 2019 +0530

    RATIS-563. Purge logs in takeSnapshot only after all the nodes have caught 
up with the index. Contributed by Tsz Wo Nicholas Sze.
---
 .../apache/ratis/server/RaftServerConfigKeys.java  |  9 +++++
 .../org/apache/ratis/server/impl/ServerState.java  |  7 ++--
 .../ratis/server/impl/StateMachineUpdater.java     | 15 +++++++-
 .../org/apache/ratis/server/raftlog/RaftLog.java   | 42 +++++++++++++++++-----
 .../ratis/server/raftlog/memory/MemoryRaftLog.java |  5 +--
 .../server/raftlog/segmented/SegmentedRaftLog.java |  4 +--
 .../raftlog/segmented/SegmentedRaftLogCache.java   | 28 +++++++++++----
 .../raftlog/segmented/SegmentedRaftLogWorker.java  |  4 +--
 .../raftlog/segmented/TestSegmentedRaftLog.java    | 24 ++++++++++---
 9 files changed, 105 insertions(+), 33 deletions(-)

diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java 
b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
index e643bf4..0fdd622 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
@@ -158,6 +158,15 @@ public interface RaftServerConfigKeys {
       setInt(properties::setInt, QUEUE_BYTE_LIMIT_KEY, queueSize, 
requireMin(1));
     }
 
+    String PURGE_GAP_KEY = PREFIX + ".purge.gap";
+    int PURGE_GAP_DEFAULT = 1024;
+    static int purgeGap(RaftProperties properties) {
+      return getInt(properties::getInt, PURGE_GAP_KEY, PURGE_GAP_DEFAULT, 
getDefaultLog(), requireMin(1));
+    }
+    static void setPurgeGap(RaftProperties properties, int purgeGap) {
+      setInt(properties::setInt, PURGE_GAP_KEY, purgeGap, requireMin(1));
+    }
+
     String SEGMENT_SIZE_MAX_KEY = PREFIX + ".segment.size.max";
     SizeInBytes SEGMENT_SIZE_MAX_DEFAULT = SizeInBytes.valueOf("8MB");
     static SizeInBytes segmentSizeMax(RaftProperties properties) {
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
index 0c49716..b40a131 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
@@ -189,12 +189,9 @@ public class ServerState implements Closeable {
       throws IOException {
     final RaftLog log;
     if (RaftServerConfigKeys.Log.useMemory(prop)) {
-      final int maxBufferSize =
-          RaftServerConfigKeys.Log.Appender.bufferByteLimit(prop).getSizeInt();
-      log = new MemoryRaftLog(id, lastIndexInSnapshot, maxBufferSize);
+      log = new MemoryRaftLog(id, lastIndexInSnapshot, prop);
     } else {
-      log = new SegmentedRaftLog(id, server, this.storage,
-          lastIndexInSnapshot, prop);
+      log = new SegmentedRaftLog(id, server, this.storage, 
lastIndexInSnapshot, prop);
     }
     log.open(lastIndexInSnapshot, logConsumer);
     return log;
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
index d28c7ab..c380781 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
@@ -18,7 +18,9 @@
 package org.apache.ratis.server.impl;
 
 import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
 import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.StateMachineException;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.raftlog.RaftLog;
 import org.apache.ratis.proto.RaftProtos.LogEntryProto;
@@ -38,6 +40,7 @@ import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
+import java.util.stream.LongStream;
 
 /**
  * This class tracks the log entries that have been committed in a quorum and
@@ -221,6 +224,13 @@ class StateMachineUpdater implements Runnable {
     final long i;
     try {
       i = stateMachine.takeSnapshot();
+
+      final long appliedIndex = getLastAppliedIndex();
+      if (i > appliedIndex) {
+        throw new StateMachineException(
+            "Bug in StateMachine: snapshot index = " + i + " > appliedIndex = 
" + appliedIndex
+            + "; StateMachine class=" +  stateMachine.getClass().getName() + 
", stateMachine=" + stateMachine);
+      }
     } catch (IOException e) {
       LOG.error(name + ": Failed to take snapshot", e);
       return;
@@ -229,7 +239,10 @@ class StateMachineUpdater implements Runnable {
     if (i >= 0) {
       LOG.info("{}: Took a snapshot at index {}", name, i);
       snapshotIndex.updateIncreasingly(i, infoIndexChange);
-      raftLog.purge(i);
+
+      final LongStream commitIndexStream = 
server.getCommitInfos().stream().mapToLong(CommitInfoProto::getCommitIndex);
+      final long purgeIndex = LongStream.concat(LongStream.of(i), 
commitIndexStream).min().orElse(i);
+      raftLog.purge(purgeIndex);
     }
   }
 
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java
index fe7625d..ae357dc 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java
@@ -17,9 +17,11 @@
  */
 package org.apache.ratis.server.raftlog;
 
+import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.proto.RaftProtos.LogEntryProto;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.protocol.StateMachineException;
+import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.impl.LogAppender;
 import org.apache.ratis.server.impl.RaftConfiguration;
 import org.apache.ratis.server.impl.RaftServerConstants;
@@ -68,6 +70,9 @@ public abstract class RaftLog implements 
RaftLogSequentialOps, Closeable {
    * in the latest snapshot file.
    */
   private final RaftLogIndex commitIndex;
+  private final RaftLogIndex purgeIndex;
+  private final int purgeGap;
+
   private final RaftPeerId selfId;
   private final int maxBufferSize;
 
@@ -77,10 +82,13 @@ public abstract class RaftLog implements 
RaftLogSequentialOps, Closeable {
 
   private volatile LogEntryProto lastMetadataEntry = null;
 
-  public RaftLog(RaftPeerId selfId, long commitIndex, int maxBufferSize) {
+  protected RaftLog(RaftPeerId selfId, long commitIndex, RaftProperties 
properties) {
     this.selfId = selfId;
     this.commitIndex = new RaftLogIndex("commitIndex", commitIndex);
-    this.maxBufferSize = maxBufferSize;
+    this.purgeIndex = new RaftLogIndex("purgeIndex", LEAST_VALID_LOG_INDEX - 
1);
+    this.purgeGap = RaftServerConfigKeys.Log.purgeGap(properties);
+
+    this.maxBufferSize = 
RaftServerConfigKeys.Log.Appender.bufferByteLimit(properties).getSizeInt();
     this.state = new OpenCloseState(getName());
   }
 
@@ -245,6 +253,11 @@ public abstract class RaftLog implements 
RaftLogSequentialOps, Closeable {
     Optional.ofNullable(lastMetadataEntry).ifPresent(
         e -> commitIndex.updateToMax(e.getMetadataEntry().getCommitIndex(), 
infoIndexChange));
     state.open();
+
+    final long startIndex = getStartIndex();
+    if (startIndex > LEAST_VALID_LOG_INDEX) {
+      purgeIndex.updateIncreasingly(startIndex - 1, infoIndexChange);
+    }
   }
 
   protected void openImpl(long lastIndexInSnapshot, Consumer<LogEntryProto> 
consumer) throws IOException {
@@ -316,19 +329,30 @@ public abstract class RaftLog implements 
RaftLogSequentialOps, Closeable {
   protected abstract CompletableFuture<Long> truncateImpl(long index);
 
   /**
-   * Purge asynchronously delete the segment files which does not overlap with 
the given index.
-   * Open segment will not be considered for purging.
+   * Purge asynchronously the log transactions.
+   * The implementation may choose to purge an index other than the suggested 
index.
    *
-   * @param index - is inclusive.
+   * @param suggestedIndex the suggested index (inclusive) to be purged.
+   * @return the future of the actual purged log index.
    */
-  public final CompletableFuture<Long> purge(long index) {
-    LOG.info("{}: purge {}", getName(), index);
-    return purgeImpl(index);
+  public final CompletableFuture<Long> purge(long suggestedIndex) {
+    final long lastPurge = purgeIndex.get();
+    if (suggestedIndex - lastPurge < purgeGap) {
+      return CompletableFuture.completedFuture(lastPurge);
+    }
+    LOG.info("{}: purge {}", getName(), suggestedIndex);
+    return purgeImpl(suggestedIndex).whenComplete((purged, e) -> {
+      if (purged != null) {
+        purgeIndex.updateToMax(purged, infoIndexChange);
+      }
+      if (e != null) {
+        LOG.warn(getName() + ": Failed to purge " + suggestedIndex, e);
+      }
+    });
   }
 
   protected abstract CompletableFuture<Long> purgeImpl(long index);
 
-
   @Override
   public final CompletableFuture<Long> appendEntry(LogEntryProto entry) {
     return runner.runSequentially(() -> appendEntryImpl(entry));
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
index bbe2e1c..f203282 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
@@ -17,6 +17,7 @@
  */
 package org.apache.ratis.server.raftlog.memory;
 
+import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.impl.RaftServerConstants;
 import org.apache.ratis.server.impl.ServerProtoUtils;
@@ -71,8 +72,8 @@ public class MemoryRaftLog extends RaftLog {
   private final EntryList entries = new EntryList();
   private final AtomicReference<Metadata> metadata = new AtomicReference<>(new 
Metadata(null, 0));
 
-  public MemoryRaftLog(RaftPeerId selfId, long commitIndex, int maxBufferSize) 
{
-    super(selfId, commitIndex, maxBufferSize);
+  public MemoryRaftLog(RaftPeerId selfId, long commitIndex, RaftProperties 
properties) {
+    super(selfId, commitIndex, properties);
   }
 
   @Override
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
index 3e2fb7b..a5d261f 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
@@ -118,7 +118,7 @@ public class SegmentedRaftLog extends RaftLog {
   SegmentedRaftLog(RaftPeerId selfId, RaftServerImpl server,
       StateMachine stateMachine, Runnable submitUpdateCommitEvent,
       RaftStorage storage, long lastIndexInSnapshot, RaftProperties 
properties) {
-    super(selfId, lastIndexInSnapshot, 
RaftServerConfigKeys.Log.Appender.bufferByteLimit(properties).getSizeInt());
+    super(selfId, lastIndexInSnapshot, properties);
     this.server = Optional.ofNullable(server);
     this.storage = storage;
     segmentMaxSize = 
RaftServerConfigKeys.Log.segmentSizeMax(properties).getSize();
@@ -270,7 +270,7 @@ public class SegmentedRaftLog extends RaftLog {
   protected CompletableFuture<Long> purgeImpl(long index) {
     try (AutoCloseableLock writeLock = writeLock()) {
       SegmentedRaftLogCache.TruncationSegments ts = cache.purge(index);
-      LOG.debug("truncating segments:{}", ts);
+      LOG.debug("purging segments:{}", ts);
       if (ts != null) {
         Task task = fileLogWorker.purge(ts);
         return task.getFuture();
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
index b94c078..cbc9555 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
@@ -49,15 +49,20 @@ import static 
org.apache.ratis.server.impl.RaftServerConstants.INVALID_LOG_INDEX
 class SegmentedRaftLogCache {
   public static final Logger LOG = 
LoggerFactory.getLogger(SegmentedRaftLogCache.class);
 
-  static class SegmentFileInfo {
+  static final class SegmentFileInfo {
+    static SegmentFileInfo newClosedSegmentFileInfo(LogSegment ls) {
+      Objects.requireNonNull(ls, "ls == null");
+      Preconditions.assertTrue(!ls.isOpen(), ls + " is OPEN");
+      return new SegmentFileInfo(ls.getStartIndex(), ls.getEndIndex(), 
ls.isOpen(), 0, 0);
+    }
+
     final long startIndex; // start index of the segment
     final long endIndex; // original end index
     final boolean isOpen;
     final long targetLength; // position for truncation
     final long newEndIndex; // new end index after the truncation
 
-    SegmentFileInfo(long start, long end, boolean isOpen, long targetLength,
-        long newEndIndex) {
+    private SegmentFileInfo(long start, long end, boolean isOpen, long 
targetLength, long newEndIndex) {
       this.startIndex = start;
       this.endIndex = end;
       this.isOpen = isOpen;
@@ -84,6 +89,17 @@ class SegmentedRaftLogCache {
       this.toTruncate = toTruncate;
     }
 
+    long maxEndIndex() {
+      long max = Long.MIN_VALUE;
+      if (toTruncate != null) {
+        max = toTruncate.endIndex;
+      }
+      for(SegmentFileInfo d : toDelete) {
+        max = Math.max(max, d.endIndex);
+      }
+      return max;
+    }
+
     @Override
     public String toString() {
       return "toTruncate: " + toTruncate
@@ -246,15 +262,13 @@ class SegmentedRaftLogCache {
 
         if (segmentIndex == -segments.size() - 1) {
           for (LogSegment ls : segments) {
-            list.add(new SegmentFileInfo(ls.getStartIndex(), ls.getEndIndex(), 
false, 0, 0));
+            list.add(SegmentFileInfo.newClosedSegmentFileInfo(ls));
           }
           segments.clear();
         } else if (segmentIndex >= 0) {
           // we start to purge the closedSegments which do not overlap with 
index.
           for (int i = segmentIndex - 1; i >= 0; i--) {
-            LogSegment ls = segments.get(i);
-            list.add(new SegmentFileInfo(ls.getStartIndex(), ls.getEndIndex(), 
false, 0, 0));
-            segments.remove(i);
+            
list.add(SegmentFileInfo.newClosedSegmentFileInfo(segments.remove(i)));
           }
         } else {
           throw new IllegalStateException("Unexpected gap in segments: 
binarySearch(" + index + ") returns "
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
index a8752ec..bdec7ea 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
@@ -120,7 +120,7 @@ class SegmentedRaftLogWorker implements Runnable {
 
   SegmentedRaftLogWorker(RaftPeerId selfId, StateMachine stateMachine, 
Runnable submitUpdateCommitEvent,
       RaftStorage storage, RaftProperties properties) {
-    this.name = selfId + "-" + getClass().getSimpleName();
+    this.name = selfId + "-" + getClass().getSimpleName() + ":" + 
storage.getStorageDir();
     LOG.info("new {} for {}", name, storage);
 
     this.submitUpdateCommitEvent = submitUpdateCommitEvent;
@@ -342,7 +342,7 @@ class SegmentedRaftLogWorker implements Runnable {
 
     @Override
     long getEndIndex() {
-      return 0;
+      return segments.maxEndIndex();
     }
   }
 
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
 
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
index 0b2700a..07b4f2c 100644
--- 
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
@@ -371,7 +371,7 @@ public class TestSegmentedRaftLog extends BaseTest {
     int segmentSize = 200;
     long beginIndexOfOpenSegment = segmentSize * (endTerm - startTerm - 1);
     long expectedIndex = segmentSize * (endTerm - startTerm - 1);
-    purgeAndVerify(startTerm, endTerm, segmentSize, beginIndexOfOpenSegment, 
expectedIndex);
+    purgeAndVerify(startTerm, endTerm, segmentSize, 1, 
beginIndexOfOpenSegment, expectedIndex);
   }
 
   @Test
@@ -381,17 +381,31 @@ public class TestSegmentedRaftLog extends BaseTest {
     int segmentSize = 200;
     long endIndexOfClosedSegment = segmentSize * (endTerm - startTerm - 1) - 1;
     long expectedIndex = segmentSize * (endTerm - startTerm - 2);
-    purgeAndVerify(startTerm, endTerm, segmentSize, endIndexOfClosedSegment, 
expectedIndex);
+    purgeAndVerify(startTerm, endTerm, segmentSize, 1, 
endIndexOfClosedSegment, expectedIndex);
   }
 
-  private void purgeAndVerify(int startTerm, int endTerm, int segmentSize, 
long purgeIndex, long expectedIndex) throws IOException {
+  @Test
+  public void testPurgeOnClosedSegmentsWithPurgeGap() throws Exception {
+    int startTerm = 0;
+    int endTerm = 5;
+    int segmentSize = 200;
+    long endIndexOfClosedSegment = segmentSize * (endTerm - startTerm - 1) - 1;
+    long expectedIndex = RaftLog.LEAST_VALID_LOG_INDEX;
+    purgeAndVerify(startTerm, endTerm, segmentSize, 1000, 
endIndexOfClosedSegment, expectedIndex);
+  }
+
+  private void purgeAndVerify(int startTerm, int endTerm, int segmentSize, int 
purgeGap, long purgeIndex, long expectedIndex) throws Exception {
     List<SegmentRange> ranges = prepareRanges(startTerm, endTerm, segmentSize, 
0);
     List<LogEntryProto> entries = prepareLogEntries(ranges, null);
 
-    try (SegmentedRaftLog raftLog = new SegmentedRaftLog(peerId, null, 
storage, -1, properties)) {
+    final RaftProperties p = new RaftProperties();
+    RaftServerConfigKeys.Log.setPurgeGap(p, purgeGap);
+    try (SegmentedRaftLog raftLog = new SegmentedRaftLog(peerId, null, 
storage, -1, p)) {
       raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null);
       
entries.stream().map(raftLog::appendEntry).forEach(CompletableFuture::join);
-      raftLog.purge(purgeIndex).join();
+      final CompletableFuture<Long> f = raftLog.purge(purgeIndex);
+      final Long purged = f.get();
+      LOG.info("purgeIndex = {}, purged = {}", purgeIndex, purged);
       Assert.assertEquals(expectedIndex, 
raftLog.getRaftLogCache().getStartIndex());
     }
   }

Reply via email to