This is an automated email from the ASF dual-hosted git repository.

williamsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 4858b4229 RATIS-1904. Refactor 
RaftServerImpl.submitClientRequestAsync(..). (#935)
4858b4229 is described below

commit 4858b4229eb3d15dafafc7d3d7f05935fe3d7b38
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Tue Oct 17 01:10:47 2023 -0700

    RATIS-1904. Refactor RaftServerImpl.submitClientRequestAsync(..). (#935)
---
 .../apache/ratis/server/impl/RaftServerImpl.java   | 195 +++++++++++----------
 1 file changed, 101 insertions(+), 94 deletions(-)

diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 6127972cb..667e611b4 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -416,7 +416,7 @@ class RaftServerImpl implements RaftServer.Division,
    * The peer belongs to the current configuration, should start as a follower 
or listener
    */
   private void startAsPeer(RaftPeerRole newRole) {
-    Object reason = "";
+    final Object reason;
     if (newRole == RaftPeerRole.FOLLOWER) {
       reason = "startAsFollower";
       setRole(RaftPeerRole.FOLLOWER, reason);
@@ -485,8 +485,8 @@ class RaftServerImpl implements RaftServer.Division,
           break;
         } catch (NoSuchFileException e) {
           LOG.warn("{}: Some file does not exist {}", getMemberId(), dir, e);
-        } catch (Exception ignored) {
-          LOG.error("{}: Failed to remove RaftStorageDirectory {}", 
getMemberId(), dir, ignored);
+        } catch (Exception e) {
+          LOG.error("{}: Failed to remove RaftStorageDirectory {}", 
getMemberId(), dir, e);
           break;
         }
       }
@@ -514,45 +514,45 @@ class RaftServerImpl implements RaftServer.Division,
       LOG.info("{}: shutdown", getMemberId());
       try {
         jmxAdapter.unregister();
-      } catch (Exception ignored) {
-        LOG.warn("{}: Failed to un-register RaftServer JMX bean", 
getMemberId(), ignored);
+      } catch (Exception e) {
+        LOG.warn("{}: Failed to un-register RaftServer JMX bean", 
getMemberId(), e);
       }
       try {
         role.shutdownFollowerState();
-      } catch (Exception ignored) {
-        LOG.warn("{}: Failed to shutdown FollowerState", getMemberId(), 
ignored);
+      } catch (Exception e) {
+        LOG.warn("{}: Failed to shutdown FollowerState", getMemberId(), e);
       }
       try{
         role.shutdownLeaderElection();
-      } catch (Exception ignored) {
-        LOG.warn("{}: Failed to shutdown LeaderElection", getMemberId(), 
ignored);
+      } catch (Exception e) {
+        LOG.warn("{}: Failed to shutdown LeaderElection", getMemberId(), e);
       }
       try{
         role.shutdownLeaderState(true);
-      } catch (Exception ignored) {
-        LOG.warn("{}: Failed to shutdown LeaderState monitor", getMemberId(), 
ignored);
+      } catch (Exception e) {
+        LOG.warn("{}: Failed to shutdown LeaderState monitor", getMemberId(), 
e);
       }
       try{
         state.close();
-      } catch (Exception ignored) {
-        LOG.warn("{}: Failed to close state", getMemberId(), ignored);
+      } catch (Exception e) {
+        LOG.warn("{}: Failed to close state", getMemberId(), e);
       }
       try {
         leaderElectionMetrics.unregister();
         raftServerMetrics.unregister();
         RaftServerMetricsImpl.removeRaftServerMetrics(getMemberId());
-      } catch (Exception ignored) {
-        LOG.warn("{}: Failed to unregister metric", getMemberId(), ignored);
+      } catch (Exception e) {
+        LOG.warn("{}: Failed to unregister metric", getMemberId(), e);
       }
       try {
         ConcurrentUtils.shutdownAndWait(clientExecutor);
-      } catch (Exception ignored) {
-        LOG.warn(getMemberId() + ": Failed to shutdown clientExecutor", 
ignored);
+      } catch (Exception e) {
+        LOG.warn(getMemberId() + ": Failed to shutdown clientExecutor", e);
       }
       try {
         ConcurrentUtils.shutdownAndWait(serverExecutor);
-      } catch (Exception ignored) {
-        LOG.warn(getMemberId() + ": Failed to shutdown serverExecutor", 
ignored);
+      } catch (Exception e) {
+        LOG.warn(getMemberId() + ": Failed to shutdown serverExecutor", e);
       }
     });
   }
@@ -740,11 +740,14 @@ class RaftServerImpl implements RaftServer.Division,
         .build();
   }
 
+  private CompletableFuture<RaftClientReply> 
checkLeaderState(RaftClientRequest request) {
+    return checkLeaderState(request, null);
+  }
+
   /**
    * @return null if the server is in leader state.
    */
-  private CompletableFuture<RaftClientReply> 
checkLeaderState(RaftClientRequest request, CacheEntry entry,
-      boolean isWrite) {
+  private CompletableFuture<RaftClientReply> 
checkLeaderState(RaftClientRequest request, CacheEntry entry) {
     try {
       assertGroup(request.getRequestorId(), request.getRaftGroupId());
     } catch (GroupMismatchException e) {
@@ -766,7 +769,7 @@ class RaftServerImpl implements RaftServer.Division,
       return RetryCacheImpl.failWithReply(reply, entry);
     }
 
-    if (isWrite && isSteppingDown()) {
+    if (!request.isReadOnly() && isSteppingDown()) {
       final LeaderSteppingDownException lsde = new 
LeaderSteppingDownException(getMemberId() + " is stepping down");
       final RaftClientReply reply = newExceptionReply(request, lsde);
       return RetryCacheImpl.failWithReply(reply, entry);
@@ -790,10 +793,9 @@ class RaftServerImpl implements RaftServer.Division,
     return new NotLeaderException(getMemberId(), conf.getPeer(leaderId), 
peers);
   }
 
-  LifeCycle.State assertLifeCycleState(Set<LifeCycle.State> expected) throws 
ServerNotReadyException {
-    return lifeCycle.assertCurrentState((n, c) -> new ServerNotReadyException(
-        getMemberId() + " is not in " + expected + ": current state is " + c),
-        expected);
+  void assertLifeCycleState(Set<LifeCycle.State> expected) throws 
ServerNotReadyException {
+    lifeCycle.assertCurrentState((n, c) -> new ServerNotReadyException(
+        getMemberId() + " is not in " + expected + ": current state is " + c), 
expected);
   }
 
   void assertGroup(Object requestorId, RaftGroupId requestorGroupId) throws 
GroupMismatchException {
@@ -814,11 +816,10 @@ class RaftServerImpl implements RaftServer.Division,
         request.getClientId(), request, context, cacheEntry);
 
     assertLifeCycleState(LifeCycle.States.RUNNING);
-    CompletableFuture<RaftClientReply> reply;
 
     final PendingRequest pending;
     synchronized (this) {
-      reply = checkLeaderState(request, cacheEntry, true);
+      final CompletableFuture<RaftClientReply> reply = 
checkLeaderState(request, cacheEntry);
       if (reply != null) {
         return reply;
       }
@@ -890,75 +891,67 @@ class RaftServerImpl implements RaftServer.Division,
     LOG.debug("{}: receive client request({})", getMemberId(), request);
     final Timekeeper timer = 
raftServerMetrics.getClientRequestTimer(request.getType());
     final Optional<Timekeeper.Context> timerContext = 
Optional.ofNullable(timer).map(Timekeeper::time);
-
-    final CompletableFuture<RaftClientReply> replyFuture;
-
-    if (request.is(TypeCase.STALEREAD)) {
-      replyFuture = staleReadAsync(request);
-    } else if (request.is(TypeCase.READ)) {
-      replyFuture = readAsync(request);
-    } else {
-      // first check the server's leader state
-      CompletableFuture<RaftClientReply> reply = checkLeaderState(request, 
null,
-          !request.is(TypeCase.READ) && !request.is(TypeCase.WATCH));
-      if (reply != null) {
-        return reply;
+    return replyFuture(request).whenComplete((clientReply, exception) -> {
+      timerContext.ifPresent(Timekeeper.Context::stop);
+      if (exception != null || clientReply.getException() != null) {
+        raftServerMetrics.incFailedRequestCount(request.getType());
       }
+    });
+  }
 
-      // let the state machine handle read-only request from client
-      RaftClientRequest.Type type = request.getType();
-      if (type.is(TypeCase.MESSAGESTREAM)) {
-        if (type.getMessageStream().getEndOfRequest()) {
-          final CompletableFuture<RaftClientRequest> f = 
streamEndOfRequestAsync(request);
-          if (f.isCompletedExceptionally()) {
-            return f.thenApply(r -> null);
-          }
-          request = f.join();
-          type = request.getType();
-        }
-      }
+  private CompletableFuture<RaftClientReply> replyFuture(RaftClientRequest 
request) throws IOException {
+    final TypeCase type = request.getType().getTypeCase();
+    switch (type) {
+      case STALEREAD:
+        return staleReadAsync(request);
+      case READ:
+        return readAsync(request);
+      case WATCH:
+        return watchAsync(request);
+      case MESSAGESTREAM:
+        return messageStreamAsync(request);
+      case WRITE:
+      case FORWARD:
+        return writeAsync(request);
+      default:
+        throw new IllegalStateException("Unexpected request type: " + type + 
", request=" + request);
+    }
+  }
 
-      if (type.is(TypeCase.WATCH)) {
-        replyFuture = watchAsync(request);
-      } else if (type.is(TypeCase.MESSAGESTREAM)) {
-        replyFuture = streamAsync(request);
-      } else {
-        // query the retry cache
-        final RetryCacheImpl.CacheQueryResult queryResult = 
retryCache.queryCache(ClientInvocationId.valueOf(request));
-        final CacheEntry cacheEntry = queryResult.getEntry();
-        if (queryResult.isRetry()) {
-          // if the previous attempt is still pending or it succeeded, return 
its
-          // future
-          replyFuture = cacheEntry.getReplyFuture();
-        } else {
-          // TODO: this client request will not be added to pending requests 
until
-          // later which means that any failure in between will leave partial 
state in
-          // the state machine. We should call cancelTransaction() for failed 
requests
-          final TransactionContextImpl context = (TransactionContextImpl) 
stateMachine.startTransaction(
-              filterDataStreamRaftClientRequest(request));
-          if (context.getException() != null) {
-            final StateMachineException e = new 
StateMachineException(getMemberId(), context.getException());
-            final RaftClientReply exceptionReply = newExceptionReply(request, 
e);
-            cacheEntry.failWithReply(exceptionReply);
-            replyFuture =  CompletableFuture.completedFuture(exceptionReply);
-          } else {
-            replyFuture = appendTransaction(request, context, cacheEntry);
-          }
-        }
-      }
+  private CompletableFuture<RaftClientReply> writeAsync(RaftClientRequest 
request) throws IOException {
+    final CompletableFuture<RaftClientReply> reply = checkLeaderState(request);
+    if (reply != null) {
+      return reply;
     }
 
-    final RaftClientRequest.Type type = request.getType();
-    replyFuture.whenComplete((clientReply, exception) -> {
-      timerContext.ifPresent(Timekeeper.Context::stop);
-      if (exception != null || clientReply.getException() != null) {
-        raftServerMetrics.incFailedRequestCount(type);
-      }
-    });
-    return replyFuture;
+    // query the retry cache
+    final RetryCacheImpl.CacheQueryResult queryResult = 
retryCache.queryCache(ClientInvocationId.valueOf(request));
+    final CacheEntry cacheEntry = queryResult.getEntry();
+    if (queryResult.isRetry()) {
+      // return the cached future.
+      return cacheEntry.getReplyFuture();
+    }
+    // TODO: this client request will not be added to pending requests until
+    // later which means that any failure in between will leave partial state 
in
+    // the state machine. We should call cancelTransaction() for failed 
requests
+    final TransactionContextImpl context = (TransactionContextImpl) 
stateMachine.startTransaction(
+        filterDataStreamRaftClientRequest(request));
+    if (context.getException() != null) {
+      final StateMachineException e = new StateMachineException(getMemberId(), 
context.getException());
+      final RaftClientReply exceptionReply = newExceptionReply(request, e);
+      cacheEntry.failWithReply(exceptionReply);
+      return CompletableFuture.completedFuture(exceptionReply);
+    }
+
+    return appendTransaction(request, context, cacheEntry);
   }
 
   private CompletableFuture<RaftClientReply> watchAsync(RaftClientRequest 
request) {
+    final CompletableFuture<RaftClientReply> reply = checkLeaderState(request);
+    if (reply != null) {
+      return reply;
+    }
+
     return role.getLeaderState()
         .map(ls -> ls.addWatchReqeust(request))
         .orElseGet(() -> CompletableFuture.completedFuture(
@@ -1003,7 +996,7 @@ class RaftServerImpl implements RaftServer.Division,
   private CompletableFuture<RaftClientReply> readAsync(RaftClientRequest 
request) {
     if (request.getType().getRead().getPreferNonLinearizable()
         || readOption == RaftServerConfigKeys.Read.Option.DEFAULT) {
-      final CompletableFuture<RaftClientReply> reply = 
checkLeaderState(request, null, false);
+      final CompletableFuture<RaftClientReply> reply = 
checkLeaderState(request);
        if (reply != null) {
          return reply;
        }
@@ -1053,7 +1046,21 @@ class RaftServerImpl implements RaftServer.Division,
     }
   }
 
-  private CompletableFuture<RaftClientReply> streamAsync(RaftClientRequest 
request) {
+  private CompletableFuture<RaftClientReply> 
messageStreamAsync(RaftClientRequest request) throws IOException {
+    final CompletableFuture<RaftClientReply> reply = checkLeaderState(request);
+    if (reply != null) {
+      return reply;
+    }
+
+    if (request.getType().getMessageStream().getEndOfRequest()) {
+      final CompletableFuture<RaftClientRequest> f = 
streamEndOfRequestAsync(request);
+      if (f.isCompletedExceptionally()) {
+        return f.thenApply(r -> null);
+      }
+      // the message stream has ended and the request become a WRITE request
+      return replyFuture(f.join());
+    }
+
     return role.getLeaderState()
         .map(ls -> ls.streamAsync(request))
         .orElseGet(() -> CompletableFuture.completedFuture(
@@ -1146,7 +1153,7 @@ class RaftServerImpl implements RaftServer.Division,
     assertGroup(request.getRequestorId(), request.getRaftGroupId());
 
     synchronized (this) {
-      CompletableFuture<RaftClientReply> reply = checkLeaderState(request, 
null, false);
+      CompletableFuture<RaftClientReply> reply = checkLeaderState(request);
       if (reply != null) {
         return reply;
       }
@@ -1254,7 +1261,7 @@ class RaftServerImpl implements RaftServer.Division,
     assertLifeCycleState(LifeCycle.States.RUNNING);
     assertGroup(request.getRequestorId(), request.getRaftGroupId());
 
-    CompletableFuture<RaftClientReply> reply = checkLeaderState(request, null, 
true);
+    CompletableFuture<RaftClientReply> reply = checkLeaderState(request);
     if (reply != null) {
       return reply;
     }
@@ -1262,7 +1269,7 @@ class RaftServerImpl implements RaftServer.Division,
     final SetConfigurationRequest.Arguments arguments = request.getArguments();
     final PendingRequest pending;
     synchronized (this) {
-      reply = checkLeaderState(request, null, false);
+      reply = checkLeaderState(request);
       if (reply != null) {
         return reply;
       }

Reply via email to