This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 8704cd23d RATIS-2055. Move notifyTermIndexUpdated after
leader.checkReady (#1068)
8704cd23d is described below
commit 8704cd23d3b38a90780ef34a7ed5f7c417ea1207
Author: Symious <[email protected]>
AuthorDate: Fri Apr 19 23:20:16 2024 +0800
RATIS-2055. Move notifyTermIndexUpdated after leader.checkReady (#1068)
---
.../apache/ratis/server/impl/RaftServerImpl.java | 26 +++++++++++++++-------
1 file changed, 18 insertions(+), 8 deletions(-)
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 2cec09578..34fede600 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -1805,17 +1805,18 @@ class RaftServerImpl implements RaftServer.Division,
CompletableFuture<Message>
applyLogToStateMachine(ReferenceCountedObject<LogEntryProto> nextRef)
throws RaftLogIOException {
LogEntryProto next = nextRef.get();
- if (!next.hasStateMachineLogEntry()) {
- stateMachine.event().notifyTermIndexUpdated(next.getTerm(),
next.getIndex());
- }
+ CompletableFuture<Message> messageFuture = null;
- if (next.hasConfigurationEntry()) {
+ switch (next.getLogEntryBodyCase()) {
+ case CONFIGURATIONENTRY:
// the reply should have already been set. only need to record
// the new conf in the metadata file and notify the StateMachine.
state.writeRaftConfiguration(next);
- stateMachine.event().notifyConfigurationChanged(next.getTerm(),
next.getIndex(), next.getConfigurationEntry());
+ stateMachine.event().notifyConfigurationChanged(next.getTerm(),
next.getIndex(),
+ next.getConfigurationEntry());
role.getLeaderState().ifPresent(leader -> leader.checkReady(next));
- } else if (next.hasStateMachineLogEntry()) {
+ break;
+ case STATEMACHINELOGENTRY:
TransactionContext trx = getTransactionContext(next, true);
final ClientInvocationId invocationId =
ClientInvocationId.valueOf(next.getStateMachineLogEntry());
writeIndexCache.add(invocationId.getClientId(),
((TransactionContextImpl) trx).getLogIndexFuture());
@@ -1825,12 +1826,21 @@ class RaftServerImpl implements RaftServer.Division,
trx = stateMachine.applyTransactionSerial(trx);
final CompletableFuture<Message> stateMachineFuture =
stateMachine.applyTransaction(trx);
- return replyPendingRequest(invocationId, TermIndex.valueOf(next),
stateMachineFuture);
+ messageFuture = replyPendingRequest(invocationId,
TermIndex.valueOf(next), stateMachineFuture);
} catch (Exception e) {
throw new RaftLogIOException(e);
}
+ break;
+ case METADATAENTRY:
+ break;
+ default:
+ throw new IllegalStateException("Unexpected LogEntryBodyCase " +
next.getLogEntryBodyCase() + ", next=" + next);
}
- return null;
+
+ if (next.getLogEntryBodyCase() !=
LogEntryProto.LogEntryBodyCase.STATEMACHINELOGENTRY) {
+ stateMachine.event().notifyTermIndexUpdated(next.getTerm(),
next.getIndex());
+ }
+ return messageFuture;
}
/**