This is an automated email from the ASF dual-hosted git repository.

ljain pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new f40dad9  RATIS-614. Raft leader should use state machine's last 
applied index for LeaderNotReady exception. Contributed by Lokesh Jain.
f40dad9 is described below

commit f40dad9b1504bff82a9f3559bb0f0ddf7205d6f0
Author: Lokesh Jain <[email protected]>
AuthorDate: Tue Nov 5 14:53:37 2019 +0530

    RATIS-614. Raft leader should use state machine's last applied index for 
LeaderNotReady exception. Contributed by Lokesh Jain.
---
 .../java/org/apache/ratis/server/impl/ServerState.java |  2 +-
 .../apache/ratis/server/impl/StateMachineUpdater.java  |  6 +++++-
 .../ratis/statemachine/impl/BaseStateMachine.java      | 18 +++++++++++-------
 .../src/test/java/org/apache/ratis/RaftBasicTests.java |  4 ----
 .../ratis/statemachine/SimpleStateMachine4Testing.java |  4 +++-
 5 files changed, 20 insertions(+), 14 deletions(-)

diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
index b91f029..f3a74e7 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
@@ -433,7 +433,7 @@ public class ServerState implements Closeable {
   }
 
   public long getLastAppliedIndex() {
-    return stateMachineUpdater.getLastAppliedIndex();
+    return stateMachineUpdater.getStateMachineLastAppliedIndex();
   }
 
   boolean containsTermIndex(TermIndex ti) {
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
index 2e9dbca..2b49120 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
@@ -288,7 +288,11 @@ class StateMachineUpdater implements Runnable {
     return state == State.RUNNING && getLastAppliedIndex() - 
snapshotIndex.get() >= autoSnapshotThreshold;
   }
 
-  long getLastAppliedIndex() {
+  private long getLastAppliedIndex() {
     return appliedIndex.get();
   }
+
+  long getStateMachineLastAppliedIndex() {
+    return stateMachine.getLastAppliedTermIndex().getIndex();
+  }
 }
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java
 
b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java
index 156a698..78583fa 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java
@@ -19,6 +19,7 @@
 package org.apache.ratis.statemachine.impl;
 
 import com.codahale.metrics.Timer;
+import org.apache.ratis.proto.RaftProtos;
 import org.apache.ratis.protocol.Message;
 import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.protocol.RaftGroupId;
@@ -31,12 +32,12 @@ import org.apache.ratis.statemachine.SnapshotInfo;
 import org.apache.ratis.statemachine.StateMachine;
 import org.apache.ratis.statemachine.StateMachineStorage;
 import org.apache.ratis.statemachine.TransactionContext;
-import 
org.apache.ratis.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.ratis.util.LifeCycle;
 import org.apache.ratis.util.Preconditions;
 
 import java.io.IOException;
 import java.util.Collection;
+import java.util.Objects;
 import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.concurrent.CompletableFuture;
@@ -54,6 +55,10 @@ public class BaseStateMachine implements StateMachine {
 
   private final SortedMap<Long, CompletableFuture<Void>> transactionFutures = 
new TreeMap<>();
 
+  public BaseStateMachine() {
+    setLastAppliedTermIndex(TermIndex.newTermIndex(0, -1));
+  }
+
   public RaftPeerId getId() {
     return server.isDone()? server.join().getId(): null;
   }
@@ -96,6 +101,8 @@ public class BaseStateMachine implements StateMachine {
   @Override
   public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
     // return the same message contained in the entry
+    RaftProtos.LogEntryProto entry = Objects.requireNonNull(trx.getLogEntry());
+    updateLastAppliedTermIndex(entry.getTerm(), entry.getIndex());
     return CompletableFuture.completedFuture(
         
Message.valueOf(trx.getLogEntry().getStateMachineLogEntry().getLogData()));
   }
@@ -109,12 +116,9 @@ public class BaseStateMachine implements StateMachine {
     lastAppliedTermIndex.set(newTI);
   }
 
-  /**
-   * to be used for testing only.
-   */
-  @VisibleForTesting
-  public void initLastAppliedTermIndex() {
-    setLastAppliedTermIndex(TermIndex.newTermIndex(0, 0));
+  @Override
+  public void notifyIndexUpdate(long term, long index) {
+    updateLastAppliedTermIndex(term, index);
   }
 
   protected boolean updateLastAppliedTermIndex(long term, long index) {
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java 
b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
index f550e47..04111ff 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
@@ -32,7 +32,6 @@ import org.apache.ratis.server.metrics.RatisMetricNames;
 import org.apache.ratis.server.metrics.RatisMetrics;
 import org.apache.ratis.server.raftlog.RaftLog;
 import org.apache.ratis.proto.RaftProtos.LogEntryProto;
-import org.apache.ratis.statemachine.impl.BaseStateMachine;
 import org.apache.ratis.util.ExitUtils;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.Log4jUtils;
@@ -438,11 +437,8 @@ public abstract class RaftBasicTests<CLUSTER extends 
MiniRaftCluster>
   public static void testStateMachineMetrics(boolean async,
       MiniRaftCluster cluster, Logger LOG) throws Exception {
     RaftServerImpl leader = waitForLeader(cluster);
-    long time = System.currentTimeMillis();
     try (final RaftClient client = cluster.createClient()) {
 
-      // this is required because the lastAppliedTermIndex is not initialized
-      ((BaseStateMachine) leader.getStateMachine()).initLastAppliedTermIndex();
       Assert.assertTrue(leader.isLeader());
 
       Gauge appliedIndexGauge = getStatemachineGaugeWithName(leader,
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
 
b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
index f5f2459..5fb0312 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
@@ -308,8 +308,10 @@ public class SimpleStateMachine4Testing extends 
BaseStateMachine {
           updateLastAppliedTermIndex(entry.getTerm(), entry.getIndex());
         }
       }
+      // The end index is greater than last entry in indexMap as it also
+      // includes the configuration and metadata entries
       Preconditions.assertTrue(
-          !indexMap.isEmpty() && endIndex == indexMap.lastKey(),
+          !indexMap.isEmpty() && endIndex >= indexMap.lastKey(),
           "endIndex=%s, indexMap=%s", endIndex, indexMap);
       this.endIndexLastCkpt = endIndex;
       setLastAppliedTermIndex(snapshot.getTermIndex());

Reply via email to