This is an automated email from the ASF dual-hosted git repository.
ljain pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new f40dad9 RATIS-614. Raft leader should use state machine's last
applied index for LeaderNotReady exception. Contributed by Lokesh Jain.
f40dad9 is described below
commit f40dad9b1504bff82a9f3559bb0f0ddf7205d6f0
Author: Lokesh Jain <[email protected]>
AuthorDate: Tue Nov 5 14:53:37 2019 +0530
RATIS-614. Raft leader should use state machine's last applied index for
LeaderNotReady exception. Contributed by Lokesh Jain.
---
.../java/org/apache/ratis/server/impl/ServerState.java | 2 +-
.../apache/ratis/server/impl/StateMachineUpdater.java | 6 +++++-
.../ratis/statemachine/impl/BaseStateMachine.java | 18 +++++++++++-------
.../src/test/java/org/apache/ratis/RaftBasicTests.java | 4 ----
.../ratis/statemachine/SimpleStateMachine4Testing.java | 4 +++-
5 files changed, 20 insertions(+), 14 deletions(-)
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
index b91f029..f3a74e7 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
@@ -433,7 +433,7 @@ public class ServerState implements Closeable {
}
public long getLastAppliedIndex() {
- return stateMachineUpdater.getLastAppliedIndex();
+ return stateMachineUpdater.getStateMachineLastAppliedIndex();
}
boolean containsTermIndex(TermIndex ti) {
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
index 2e9dbca..2b49120 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
@@ -288,7 +288,11 @@ class StateMachineUpdater implements Runnable {
return state == State.RUNNING && getLastAppliedIndex() -
snapshotIndex.get() >= autoSnapshotThreshold;
}
- long getLastAppliedIndex() {
+ private long getLastAppliedIndex() {
return appliedIndex.get();
}
+
+ long getStateMachineLastAppliedIndex() {
+ return stateMachine.getLastAppliedTermIndex().getIndex();
+ }
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java
b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java
index 156a698..78583fa 100644
---
a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java
+++
b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java
@@ -19,6 +19,7 @@
package org.apache.ratis.statemachine.impl;
import com.codahale.metrics.Timer;
+import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftGroupId;
@@ -31,12 +32,12 @@ import org.apache.ratis.statemachine.SnapshotInfo;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.statemachine.StateMachineStorage;
import org.apache.ratis.statemachine.TransactionContext;
-import
org.apache.ratis.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.ratis.util.LifeCycle;
import org.apache.ratis.util.Preconditions;
import java.io.IOException;
import java.util.Collection;
+import java.util.Objects;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
@@ -54,6 +55,10 @@ public class BaseStateMachine implements StateMachine {
private final SortedMap<Long, CompletableFuture<Void>> transactionFutures =
new TreeMap<>();
+ public BaseStateMachine() {
+ setLastAppliedTermIndex(TermIndex.newTermIndex(0, -1));
+ }
+
public RaftPeerId getId() {
return server.isDone()? server.join().getId(): null;
}
@@ -96,6 +101,8 @@ public class BaseStateMachine implements StateMachine {
@Override
public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
// return the same message contained in the entry
+ RaftProtos.LogEntryProto entry = Objects.requireNonNull(trx.getLogEntry());
+ updateLastAppliedTermIndex(entry.getTerm(), entry.getIndex());
return CompletableFuture.completedFuture(
Message.valueOf(trx.getLogEntry().getStateMachineLogEntry().getLogData()));
}
@@ -109,12 +116,9 @@ public class BaseStateMachine implements StateMachine {
lastAppliedTermIndex.set(newTI);
}
- /**
- * to be used for testing only.
- */
- @VisibleForTesting
- public void initLastAppliedTermIndex() {
- setLastAppliedTermIndex(TermIndex.newTermIndex(0, 0));
+ @Override
+ public void notifyIndexUpdate(long term, long index) {
+ updateLastAppliedTermIndex(term, index);
}
protected boolean updateLastAppliedTermIndex(long term, long index) {
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
index f550e47..04111ff 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
@@ -32,7 +32,6 @@ import org.apache.ratis.server.metrics.RatisMetricNames;
import org.apache.ratis.server.metrics.RatisMetrics;
import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
-import org.apache.ratis.statemachine.impl.BaseStateMachine;
import org.apache.ratis.util.ExitUtils;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.Log4jUtils;
@@ -438,11 +437,8 @@ public abstract class RaftBasicTests<CLUSTER extends
MiniRaftCluster>
public static void testStateMachineMetrics(boolean async,
MiniRaftCluster cluster, Logger LOG) throws Exception {
RaftServerImpl leader = waitForLeader(cluster);
- long time = System.currentTimeMillis();
try (final RaftClient client = cluster.createClient()) {
- // this is required because the lastAppliedTermIndex is not initialized
- ((BaseStateMachine) leader.getStateMachine()).initLastAppliedTermIndex();
Assert.assertTrue(leader.isLeader());
Gauge appliedIndexGauge = getStatemachineGaugeWithName(leader,
diff --git
a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
index f5f2459..5fb0312 100644
---
a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
+++
b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
@@ -308,8 +308,10 @@ public class SimpleStateMachine4Testing extends
BaseStateMachine {
updateLastAppliedTermIndex(entry.getTerm(), entry.getIndex());
}
}
+ // The end index is greater than last entry in indexMap as it also
+ // includes the configuration and metadata entries
Preconditions.assertTrue(
- !indexMap.isEmpty() && endIndex == indexMap.lastKey(),
+ !indexMap.isEmpty() && endIndex >= indexMap.lastKey(),
"endIndex=%s, indexMap=%s", endIndex, indexMap);
this.endIndexLastCkpt = endIndex;
setLastAppliedTermIndex(snapshot.getTermIndex());