This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 8704cd23d RATIS-2055. Move notifyTermIndexUpdated after 
leader.checkReady (#1068)
8704cd23d is described below

commit 8704cd23d3b38a90780ef34a7ed5f7c417ea1207
Author: Symious <[email protected]>
AuthorDate: Fri Apr 19 23:20:16 2024 +0800

    RATIS-2055. Move notifyTermIndexUpdated after leader.checkReady (#1068)
---
 .../apache/ratis/server/impl/RaftServerImpl.java   | 26 +++++++++++++++-------
 1 file changed, 18 insertions(+), 8 deletions(-)

diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 2cec09578..34fede600 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -1805,17 +1805,18 @@ class RaftServerImpl implements RaftServer.Division,
   CompletableFuture<Message> 
applyLogToStateMachine(ReferenceCountedObject<LogEntryProto> nextRef)
       throws RaftLogIOException {
     LogEntryProto next = nextRef.get();
-    if (!next.hasStateMachineLogEntry()) {
-      stateMachine.event().notifyTermIndexUpdated(next.getTerm(), 
next.getIndex());
-    }
+    CompletableFuture<Message> messageFuture = null;
 
-    if (next.hasConfigurationEntry()) {
+    switch (next.getLogEntryBodyCase()) {
+    case CONFIGURATIONENTRY:
       // the reply should have already been set. only need to record
       // the new conf in the metadata file and notify the StateMachine.
       state.writeRaftConfiguration(next);
-      stateMachine.event().notifyConfigurationChanged(next.getTerm(), 
next.getIndex(), next.getConfigurationEntry());
+      stateMachine.event().notifyConfigurationChanged(next.getTerm(), 
next.getIndex(),
+          next.getConfigurationEntry());
       role.getLeaderState().ifPresent(leader -> leader.checkReady(next));
-    } else if (next.hasStateMachineLogEntry()) {
+      break;
+    case STATEMACHINELOGENTRY:
       TransactionContext trx = getTransactionContext(next, true);
       final ClientInvocationId invocationId = 
ClientInvocationId.valueOf(next.getStateMachineLogEntry());
       writeIndexCache.add(invocationId.getClientId(), 
((TransactionContextImpl) trx).getLogIndexFuture());
@@ -1825,12 +1826,21 @@ class RaftServerImpl implements RaftServer.Division,
         trx = stateMachine.applyTransactionSerial(trx);
 
         final CompletableFuture<Message> stateMachineFuture = 
stateMachine.applyTransaction(trx);
-        return replyPendingRequest(invocationId, TermIndex.valueOf(next), 
stateMachineFuture);
+        messageFuture = replyPendingRequest(invocationId, 
TermIndex.valueOf(next), stateMachineFuture);
       } catch (Exception e) {
         throw new RaftLogIOException(e);
       }
+      break;
+    case METADATAENTRY:
+      break;
+    default:
+      throw new IllegalStateException("Unexpected LogEntryBodyCase " + 
next.getLogEntryBodyCase() + ", next=" + next);
     }
-    return null;
+
+    if (next.getLogEntryBodyCase() != 
LogEntryProto.LogEntryBodyCase.STATEMACHINELOGENTRY) {
+      stateMachine.event().notifyTermIndexUpdated(next.getTerm(), 
next.getIndex());
+    }
+    return messageFuture;
   }
 
   /**

Reply via email to