szetszwo commented on a change in pull request #322:
URL: https://github.com/apache/incubator-ratis/pull/322#discussion_r536504800



##########
File path: 
ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
##########
@@ -110,22 +111,31 @@
         RaftServerConfigKeys.Log.corruptionPolicy(prop));
     snapshotManager = new SnapshotManager(storage, id);
 
-    long lastApplied = initStatemachine(stateMachine, group.getGroupId());
+    initStatemachine(stateMachine, group.getGroupId());
 
     // On start the leader is null, start the clock now
     leaderId = null;
     this.lastNoLeaderTime = Timestamp.currentTime();
     this.noLeaderTimeout = 
RaftServerConfigKeys.Notification.noLeaderTimeout(prop);
 
+    LongSupplier snapshotIndexSupplier = () -> {
+      SnapshotInfo snapshot = stateMachine.getLatestSnapshot();
+      if (snapshot == null || snapshot.getTermIndex().getIndex() < 0) {
+        return RaftLog.INVALID_LOG_INDEX;
+      }
+      return snapshot.getIndex();
+    };

Review comment:
       We may use Optional
   ```
       final LongSupplier getSnapshotIndexFromStateMachine = () -> 
Optional.ofNullable(stateMachine.getLatestSnapshot())
           .map(SnapshotInfo::getIndex)
           .filter(i -> i >= 0)
           .orElse(RaftLog.INVALID_LOG_INDEX);
   ```

##########
File path: 
ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
##########
@@ -182,17 +185,19 @@ void start() {
   }
 
   private static RaftLog initRaftLog(RaftGroupMemberId memberId, 
RaftServerImpl server, RaftStorage storage,
-      Consumer<LogEntryProto> logConsumer, long lastIndexInSnapshot, 
RaftProperties prop) throws IOException {
+      Consumer<LogEntryProto> logConsumer, LongSupplier snapshotIndexSupplier,
+      RaftProperties prop) throws IOException {
     final RaftLog log;
     if (RaftServerConfigKeys.Log.useMemory(prop)) {
-      log = new MemoryRaftLog(memberId, lastIndexInSnapshot, prop);
+      log = new MemoryRaftLog(memberId, snapshotIndexSupplier, prop);
     } else {
       log = new SegmentedRaftLog(memberId, server,
           server.getStateMachine(),
           server::notifyTruncatedLogEntry,
           server::submitUpdateCommitEvent,
-          storage, lastIndexInSnapshot, prop);
+          storage, snapshotIndexSupplier, prop);
     }
+    long lastIndexInSnapshot = snapshotIndexSupplier.getAsLong();
     log.open(lastIndexInSnapshot, logConsumer);

Review comment:
       snapshotIndexSupplier.getAsLong() is implemented by StateMachine so that 
it could be expensive.  Let's use log.getSnapshotIndex().
   ```
       log.open(log.getSnapshotIndex(), logConsumer);
   ```
   

##########
File path: 
ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java
##########
@@ -84,19 +85,24 @@
   private final Runner runner = new Runner(this::getName);
   private final OpenCloseState state;
   private final RaftLogMetrics raftLogMetrics;
+  private final LongSupplier snapshotIndexSupplier;

Review comment:
       Since we already have snapshotIndex, let's rename this to
   ```
     private final LongSupplier getSnapshotIndexFromStateMachine;
   ```

##########
File path: 
ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
##########
@@ -110,22 +111,31 @@
         RaftServerConfigKeys.Log.corruptionPolicy(prop));
     snapshotManager = new SnapshotManager(storage, id);
 
-    long lastApplied = initStatemachine(stateMachine, group.getGroupId());
+    initStatemachine(stateMachine, group.getGroupId());
 
     // On start the leader is null, start the clock now
     leaderId = null;
     this.lastNoLeaderTime = Timestamp.currentTime();
     this.noLeaderTimeout = 
RaftServerConfigKeys.Notification.noLeaderTimeout(prop);
 
+    LongSupplier snapshotIndexSupplier = () -> {
+      SnapshotInfo snapshot = stateMachine.getLatestSnapshot();
+      if (snapshot == null || snapshot.getTermIndex().getIndex() < 0) {
+        return RaftLog.INVALID_LOG_INDEX;
+      }
+      return snapshot.getIndex();
+    };
+
     // we cannot apply log entries to the state machine in this step, since we
     // do not know whether the local log entries have been committed.
-    this.log = initRaftLog(getMemberId(), server, storage, this::setRaftConf, 
lastApplied, prop);
+    this.log = initRaftLog(getMemberId(), server, storage, this::setRaftConf, 
snapshotIndexSupplier, prop);
 
     RaftLog.Metadata metadata = log.loadMetadata();
     currentTerm.set(metadata.getTerm());
     votedFor = metadata.getVotedFor();
 
-    stateMachineUpdater = new StateMachineUpdater(stateMachine, server, this, 
lastApplied, prop);
+    stateMachineUpdater = new StateMachineUpdater(stateMachine, server, this,
+        snapshotIndexSupplier.getAsLong(), prop);

Review comment:
       snapshotIndexSupplier.getAsLong() is implemented by StateMachine so that 
it could be expensive.  Let's use log.getSnapshotIndex().
   ```
       stateMachineUpdater = new StateMachineUpdater(stateMachine, server, 
this, log.getSnapshotIndex(), prop);
   ```
   

##########
File path: 
ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
##########
@@ -186,8 +187,9 @@ public void notifyTruncatedLogEntry(TermIndex ti) {
   @SuppressWarnings("parameternumber")
   public SegmentedRaftLog(RaftGroupMemberId memberId, RaftServer.Division 
server,
       StateMachine stateMachine, Consumer<LogEntryProto> 
notifyTruncatedLogEntry, Runnable submitUpdateCommitEvent,
-      RaftStorage storage, long lastIndexInSnapshot, RaftProperties 
properties) {
-    super(memberId, lastIndexInSnapshot, properties);
+      RaftStorage storage, LongSupplier snapshotIndexSupplier,
+                          RaftProperties properties) {

Review comment:
       Let's put this a single line?

##########
File path: 
ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java
##########
@@ -359,6 +365,9 @@ protected void validateLogEntry(LogEntryProto entry) {
               "is greater than 1, entry: %s",
           lastTermIndex.getIndex(), latestSnapshotIndex, entry);
     } else {
+      // No logs are present. Check state machine's snapshot index.
+      updateSnapshotIndex(snapshotIndexSupplier.getAsLong());
+      latestSnapshotIndex = getSnapshotIndex();

Review comment:
       Updating in validateLogEntry is expensive since it is called for every 
entry.  How about updating it in purge?
   ```
   +++ 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
   @@ -357,6 +357,7 @@ public class SegmentedRaftLog extends RaftLog {
      protected CompletableFuture<Long> purgeImpl(long index) {
        try (AutoCloseableLock writeLock = writeLock()) {
          SegmentedRaftLogCache.TruncationSegments ts = cache.purge(index);
   +      updateSnapshotIndexFromStateMachine();
          LOG.debug("purging segments:{}", ts);
          if (ts != null) {
            Task task = fileLogWorker.purge(ts);
   ```
   where updateSnapshotIndexFromStateMachine() is a new method in RaftLog.java
   ```
   +++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java
   @@ -153,6 +153,10 @@ public abstract class RaftLog implements 
RaftLogSequentialOps, Closeable {
        return false;
      }
    
   +  protected void updateSnapshotIndexFromStateMachine() {
   +    updateSnapshotIndex(getSnapshotIndexFromStateMachine.getAsLong());
   +  }
   +
   ```
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to