This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new d824c4e88 RATIS-2317. Move acquire PendingRequestPermit out of 
synchronized block in RaftServerImpl#appendTransaction. (#1275)
d824c4e88 is described below

commit d824c4e88c4e1804819a3e72465f4730c0891c10
Author: Nandakumar Vadivelu <na...@apache.org>
AuthorDate: Wed Jul 9 15:32:36 2025 +0530

    RATIS-2317. Move acquire PendingRequestPermit out of synchronized block in 
RaftServerImpl#appendTransaction. (#1275)
---
 .../apache/ratis/server/impl/RaftServerImpl.java   | 50 ++++++++++++++--------
 .../apache/ratis/server/impl/RetryCacheImpl.java   | 13 +-----
 2 files changed, 33 insertions(+), 30 deletions(-)

diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index e5d9bfeca..3c10e103b 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -171,7 +171,7 @@ class RaftServerImpl implements RaftServer.Division,
 
     @Override
     public boolean isLeaderReady() {
-      return isLeader() && getRole().isLeaderReady();
+      return getRole().isLeaderReady();
     }
 
     @Override
@@ -752,6 +752,11 @@ class RaftServerImpl implements RaftServer.Division,
   }
 
   private CompletableFuture<RaftClientReply> 
checkLeaderState(RaftClientRequest request) {
+    try {
+      assertGroup(getMemberId(), request);
+    } catch (GroupMismatchException e) {
+      return JavaUtils.completeExceptionally(e);
+    }
     return checkLeaderState(request, null);
   }
 
@@ -759,12 +764,6 @@ class RaftServerImpl implements RaftServer.Division,
    * @return null if the server is in leader state.
    */
   private CompletableFuture<RaftClientReply> 
checkLeaderState(RaftClientRequest request, CacheEntry entry) {
-    try {
-      assertGroup(getMemberId(), request);
-    } catch (GroupMismatchException e) {
-      return RetryCacheImpl.failWithException(e, entry);
-    }
-
     if (!getInfo().isLeader()) {
       NotLeaderException exception = generateNotLeaderException();
       final RaftClientReply reply = newExceptionReply(request, exception);
@@ -809,6 +808,11 @@ class RaftServerImpl implements RaftServer.Division,
         getMemberId() + " is not in " + expected + ": current state is " + c), 
expected);
   }
 
+  private CompletableFuture<RaftClientReply> 
getResourceUnavailableReply(RaftClientRequest request, CacheEntry entry) {
+    return entry.failWithException(new ResourceUnavailableException(
+        getMemberId() + ": Failed to acquire a pending write request for " + 
request));
+    }
+
   /**
    * Handle a normal update request from client.
    */
@@ -819,6 +823,17 @@ class RaftServerImpl implements RaftServer.Division,
 
     assertLifeCycleState(LifeCycle.States.RUNNING);
 
+    final LeaderStateImpl unsyncedLeaderState = 
role.getLeaderState().orElse(null);
+    if (unsyncedLeaderState == null) {
+      final RaftClientReply reply = newExceptionReply(request, 
generateNotLeaderException());
+      return RetryCacheImpl.failWithReply(reply, cacheEntry);
+    }
+    final PendingRequests.Permit unsyncedPermit = 
unsyncedLeaderState.tryAcquirePendingRequest(request.getMessage());
+    if (unsyncedPermit == null) {
+      return getResourceUnavailableReply(request, cacheEntry);
+    }
+
+    final LeaderStateImpl leaderState;
     final PendingRequest pending;
     synchronized (this) {
       final CompletableFuture<RaftClientReply> reply = 
checkLeaderState(request, cacheEntry);
@@ -826,16 +841,15 @@ class RaftServerImpl implements RaftServer.Division,
         return reply;
       }
 
-      // append the message to its local log
-      final LeaderStateImpl leaderState = role.getLeaderStateNonNull();
-      writeIndexCache.add(request.getClientId(), context.getLogIndexFuture());
-
-      final PendingRequests.Permit permit = 
leaderState.tryAcquirePendingRequest(request.getMessage());
+      leaderState = role.getLeaderStateNonNull();
+      final PendingRequests.Permit permit = leaderState == 
unsyncedLeaderState? unsyncedPermit
+          : leaderState.tryAcquirePendingRequest(request.getMessage());
       if (permit == null) {
-        cacheEntry.failWithException(new ResourceUnavailableException(
-            getMemberId() + ": Failed to acquire a pending write request for " 
+ request));
-        return cacheEntry.getReplyFuture();
+        return getResourceUnavailableReply(request, cacheEntry);
       }
+
+      // append the message to its local log
+      writeIndexCache.add(request.getClientId(), context.getLogIndexFuture());
       try {
         state.appendLog(context);
       } catch (StateMachineException e) {
@@ -853,13 +867,11 @@ class RaftServerImpl implements RaftServer.Division,
       // put the request into the pending queue
       pending = leaderState.addPendingRequest(permit, request, context);
       if (pending == null) {
-        cacheEntry.failWithException(new ResourceUnavailableException(
+        return cacheEntry.failWithException(new ResourceUnavailableException(
             getMemberId() + ": Failed to add a pending write request for " + 
request));
-        return cacheEntry.getReplyFuture();
       }
-      leaderState.notifySenders();
     }
-
+    leaderState.notifySenders();
     return pending.getFuture();
   }
 
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RetryCacheImpl.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RetryCacheImpl.java
index 50d238b07..4da459ae9 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RetryCacheImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RetryCacheImpl.java
@@ -84,9 +84,10 @@ class RetryCacheImpl implements RetryCache {
       replyFuture.complete(reply);
     }
 
-    void failWithException(Throwable t) {
+    CompletableFuture<RaftClientReply> failWithException(Throwable t) {
       failed = true;
       replyFuture.completeExceptionally(t);
+      return replyFuture;
     }
 
     @Override
@@ -266,14 +267,4 @@ class RetryCacheImpl implements RetryCache {
       return CompletableFuture.completedFuture(reply);
     }
   }
-
-  static CompletableFuture<RaftClientReply> failWithException(
-      Throwable t, CacheEntry entry) {
-    if (entry != null) {
-      entry.failWithException(t);
-      return entry.getReplyFuture();
-    } else {
-      return JavaUtils.completeExceptionally(t);
-    }
-  }
 }

Reply via email to