This is an automated email from the ASF dual-hosted git repository. szetszwo pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push: new d824c4e88 RATIS-2317. Move acquire PendingRequestPermit out of synchronized block in RaftServerImpl#appendTransaction. (#1275) d824c4e88 is described below commit d824c4e88c4e1804819a3e72465f4730c0891c10 Author: Nandakumar Vadivelu <na...@apache.org> AuthorDate: Wed Jul 9 15:32:36 2025 +0530 RATIS-2317. Move acquire PendingRequestPermit out of synchronized block in RaftServerImpl#appendTransaction. (#1275) --- .../apache/ratis/server/impl/RaftServerImpl.java | 50 ++++++++++++++-------- .../apache/ratis/server/impl/RetryCacheImpl.java | 13 +----- 2 files changed, 33 insertions(+), 30 deletions(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index e5d9bfeca..3c10e103b 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -171,7 +171,7 @@ class RaftServerImpl implements RaftServer.Division, @Override public boolean isLeaderReady() { - return isLeader() && getRole().isLeaderReady(); + return getRole().isLeaderReady(); } @Override @@ -752,6 +752,11 @@ class RaftServerImpl implements RaftServer.Division, } private CompletableFuture<RaftClientReply> checkLeaderState(RaftClientRequest request) { + try { + assertGroup(getMemberId(), request); + } catch (GroupMismatchException e) { + return JavaUtils.completeExceptionally(e); + } return checkLeaderState(request, null); } @@ -759,12 +764,6 @@ class RaftServerImpl implements RaftServer.Division, * @return null if the server is in leader state. */ private CompletableFuture<RaftClientReply> checkLeaderState(RaftClientRequest request, CacheEntry entry) { - try { - assertGroup(getMemberId(), request); - } catch (GroupMismatchException e) { - return RetryCacheImpl.failWithException(e, entry); - } - if (!getInfo().isLeader()) { NotLeaderException exception = generateNotLeaderException(); final RaftClientReply reply = newExceptionReply(request, exception); @@ -809,6 +808,11 @@ class RaftServerImpl implements RaftServer.Division, getMemberId() + " is not in " + expected + ": current state is " + c), expected); } + private CompletableFuture<RaftClientReply> getResourceUnavailableReply(RaftClientRequest request, CacheEntry entry) { + return entry.failWithException(new ResourceUnavailableException( + getMemberId() + ": Failed to acquire a pending write request for " + request)); + } + /** * Handle a normal update request from client. */ @@ -819,6 +823,17 @@ class RaftServerImpl implements RaftServer.Division, assertLifeCycleState(LifeCycle.States.RUNNING); + final LeaderStateImpl unsyncedLeaderState = role.getLeaderState().orElse(null); + if (unsyncedLeaderState == null) { + final RaftClientReply reply = newExceptionReply(request, generateNotLeaderException()); + return RetryCacheImpl.failWithReply(reply, cacheEntry); + } + final PendingRequests.Permit unsyncedPermit = unsyncedLeaderState.tryAcquirePendingRequest(request.getMessage()); + if (unsyncedPermit == null) { + return getResourceUnavailableReply(request, cacheEntry); + } + + final LeaderStateImpl leaderState; final PendingRequest pending; synchronized (this) { final CompletableFuture<RaftClientReply> reply = checkLeaderState(request, cacheEntry); @@ -826,16 +841,15 @@ class RaftServerImpl implements RaftServer.Division, return reply; } - // append the message to its local log - final LeaderStateImpl leaderState = role.getLeaderStateNonNull(); - writeIndexCache.add(request.getClientId(), context.getLogIndexFuture()); - - final PendingRequests.Permit permit = leaderState.tryAcquirePendingRequest(request.getMessage()); + leaderState = role.getLeaderStateNonNull(); + final PendingRequests.Permit permit = leaderState == unsyncedLeaderState? unsyncedPermit + : leaderState.tryAcquirePendingRequest(request.getMessage()); if (permit == null) { - cacheEntry.failWithException(new ResourceUnavailableException( - getMemberId() + ": Failed to acquire a pending write request for " + request)); - return cacheEntry.getReplyFuture(); + return getResourceUnavailableReply(request, cacheEntry); } + + // append the message to its local log + writeIndexCache.add(request.getClientId(), context.getLogIndexFuture()); try { state.appendLog(context); } catch (StateMachineException e) { @@ -853,13 +867,11 @@ class RaftServerImpl implements RaftServer.Division, // put the request into the pending queue pending = leaderState.addPendingRequest(permit, request, context); if (pending == null) { - cacheEntry.failWithException(new ResourceUnavailableException( + return cacheEntry.failWithException(new ResourceUnavailableException( getMemberId() + ": Failed to add a pending write request for " + request)); - return cacheEntry.getReplyFuture(); } - leaderState.notifySenders(); } - + leaderState.notifySenders(); return pending.getFuture(); } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RetryCacheImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RetryCacheImpl.java index 50d238b07..4da459ae9 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RetryCacheImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RetryCacheImpl.java @@ -84,9 +84,10 @@ class RetryCacheImpl implements RetryCache { replyFuture.complete(reply); } - void failWithException(Throwable t) { + CompletableFuture<RaftClientReply> failWithException(Throwable t) { failed = true; replyFuture.completeExceptionally(t); + return replyFuture; } @Override @@ -266,14 +267,4 @@ class RetryCacheImpl implements RetryCache { return CompletableFuture.completedFuture(reply); } } - - static CompletableFuture<RaftClientReply> failWithException( - Throwable t, CacheEntry entry) { - if (entry != null) { - entry.failWithException(t); - return entry.getReplyFuture(); - } else { - return JavaUtils.completeExceptionally(t); - } - } }