This is an automated email from the ASF dual-hosted git repository.
williamsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 4858b4229 RATIS-1904. Refactor
RaftServerImpl.submitClientRequestAsync(..). (#935)
4858b4229 is described below
commit 4858b4229eb3d15dafafc7d3d7f05935fe3d7b38
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Tue Oct 17 01:10:47 2023 -0700
RATIS-1904. Refactor RaftServerImpl.submitClientRequestAsync(..). (#935)
---
.../apache/ratis/server/impl/RaftServerImpl.java | 195 +++++++++++----------
1 file changed, 101 insertions(+), 94 deletions(-)
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 6127972cb..667e611b4 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -416,7 +416,7 @@ class RaftServerImpl implements RaftServer.Division,
* The peer belongs to the current configuration, should start as a follower
or listener
*/
private void startAsPeer(RaftPeerRole newRole) {
- Object reason = "";
+ final Object reason;
if (newRole == RaftPeerRole.FOLLOWER) {
reason = "startAsFollower";
setRole(RaftPeerRole.FOLLOWER, reason);
@@ -485,8 +485,8 @@ class RaftServerImpl implements RaftServer.Division,
break;
} catch (NoSuchFileException e) {
LOG.warn("{}: Some file does not exist {}", getMemberId(), dir, e);
- } catch (Exception ignored) {
- LOG.error("{}: Failed to remove RaftStorageDirectory {}",
getMemberId(), dir, ignored);
+ } catch (Exception e) {
+ LOG.error("{}: Failed to remove RaftStorageDirectory {}",
getMemberId(), dir, e);
break;
}
}
@@ -514,45 +514,45 @@ class RaftServerImpl implements RaftServer.Division,
LOG.info("{}: shutdown", getMemberId());
try {
jmxAdapter.unregister();
- } catch (Exception ignored) {
- LOG.warn("{}: Failed to un-register RaftServer JMX bean",
getMemberId(), ignored);
+ } catch (Exception e) {
+ LOG.warn("{}: Failed to un-register RaftServer JMX bean",
getMemberId(), e);
}
try {
role.shutdownFollowerState();
- } catch (Exception ignored) {
- LOG.warn("{}: Failed to shutdown FollowerState", getMemberId(),
ignored);
+ } catch (Exception e) {
+ LOG.warn("{}: Failed to shutdown FollowerState", getMemberId(), e);
}
try{
role.shutdownLeaderElection();
- } catch (Exception ignored) {
- LOG.warn("{}: Failed to shutdown LeaderElection", getMemberId(),
ignored);
+ } catch (Exception e) {
+ LOG.warn("{}: Failed to shutdown LeaderElection", getMemberId(), e);
}
try{
role.shutdownLeaderState(true);
- } catch (Exception ignored) {
- LOG.warn("{}: Failed to shutdown LeaderState monitor", getMemberId(),
ignored);
+ } catch (Exception e) {
+ LOG.warn("{}: Failed to shutdown LeaderState monitor", getMemberId(),
e);
}
try{
state.close();
- } catch (Exception ignored) {
- LOG.warn("{}: Failed to close state", getMemberId(), ignored);
+ } catch (Exception e) {
+ LOG.warn("{}: Failed to close state", getMemberId(), e);
}
try {
leaderElectionMetrics.unregister();
raftServerMetrics.unregister();
RaftServerMetricsImpl.removeRaftServerMetrics(getMemberId());
- } catch (Exception ignored) {
- LOG.warn("{}: Failed to unregister metric", getMemberId(), ignored);
+ } catch (Exception e) {
+ LOG.warn("{}: Failed to unregister metric", getMemberId(), e);
}
try {
ConcurrentUtils.shutdownAndWait(clientExecutor);
- } catch (Exception ignored) {
- LOG.warn(getMemberId() + ": Failed to shutdown clientExecutor",
ignored);
+ } catch (Exception e) {
+ LOG.warn(getMemberId() + ": Failed to shutdown clientExecutor", e);
}
try {
ConcurrentUtils.shutdownAndWait(serverExecutor);
- } catch (Exception ignored) {
- LOG.warn(getMemberId() + ": Failed to shutdown serverExecutor",
ignored);
+ } catch (Exception e) {
+ LOG.warn(getMemberId() + ": Failed to shutdown serverExecutor", e);
}
});
}
@@ -740,11 +740,14 @@ class RaftServerImpl implements RaftServer.Division,
.build();
}
+ private CompletableFuture<RaftClientReply>
checkLeaderState(RaftClientRequest request) {
+ return checkLeaderState(request, null);
+ }
+
/**
* @return null if the server is in leader state.
*/
- private CompletableFuture<RaftClientReply>
checkLeaderState(RaftClientRequest request, CacheEntry entry,
- boolean isWrite) {
+ private CompletableFuture<RaftClientReply>
checkLeaderState(RaftClientRequest request, CacheEntry entry) {
try {
assertGroup(request.getRequestorId(), request.getRaftGroupId());
} catch (GroupMismatchException e) {
@@ -766,7 +769,7 @@ class RaftServerImpl implements RaftServer.Division,
return RetryCacheImpl.failWithReply(reply, entry);
}
- if (isWrite && isSteppingDown()) {
+ if (!request.isReadOnly() && isSteppingDown()) {
final LeaderSteppingDownException lsde = new
LeaderSteppingDownException(getMemberId() + " is stepping down");
final RaftClientReply reply = newExceptionReply(request, lsde);
return RetryCacheImpl.failWithReply(reply, entry);
@@ -790,10 +793,9 @@ class RaftServerImpl implements RaftServer.Division,
return new NotLeaderException(getMemberId(), conf.getPeer(leaderId),
peers);
}
- LifeCycle.State assertLifeCycleState(Set<LifeCycle.State> expected) throws
ServerNotReadyException {
- return lifeCycle.assertCurrentState((n, c) -> new ServerNotReadyException(
- getMemberId() + " is not in " + expected + ": current state is " + c),
- expected);
+ void assertLifeCycleState(Set<LifeCycle.State> expected) throws
ServerNotReadyException {
+ lifeCycle.assertCurrentState((n, c) -> new ServerNotReadyException(
+ getMemberId() + " is not in " + expected + ": current state is " + c),
expected);
}
void assertGroup(Object requestorId, RaftGroupId requestorGroupId) throws
GroupMismatchException {
@@ -814,11 +816,10 @@ class RaftServerImpl implements RaftServer.Division,
request.getClientId(), request, context, cacheEntry);
assertLifeCycleState(LifeCycle.States.RUNNING);
- CompletableFuture<RaftClientReply> reply;
final PendingRequest pending;
synchronized (this) {
- reply = checkLeaderState(request, cacheEntry, true);
+ final CompletableFuture<RaftClientReply> reply =
checkLeaderState(request, cacheEntry);
if (reply != null) {
return reply;
}
@@ -890,75 +891,67 @@ class RaftServerImpl implements RaftServer.Division,
LOG.debug("{}: receive client request({})", getMemberId(), request);
final Timekeeper timer =
raftServerMetrics.getClientRequestTimer(request.getType());
final Optional<Timekeeper.Context> timerContext =
Optional.ofNullable(timer).map(Timekeeper::time);
-
- final CompletableFuture<RaftClientReply> replyFuture;
-
- if (request.is(TypeCase.STALEREAD)) {
- replyFuture = staleReadAsync(request);
- } else if (request.is(TypeCase.READ)) {
- replyFuture = readAsync(request);
- } else {
- // first check the server's leader state
- CompletableFuture<RaftClientReply> reply = checkLeaderState(request,
null,
- !request.is(TypeCase.READ) && !request.is(TypeCase.WATCH));
- if (reply != null) {
- return reply;
+ return replyFuture(request).whenComplete((clientReply, exception) -> {
+ timerContext.ifPresent(Timekeeper.Context::stop);
+ if (exception != null || clientReply.getException() != null) {
+ raftServerMetrics.incFailedRequestCount(request.getType());
}
+ });
+ }
- // let the state machine handle read-only request from client
- RaftClientRequest.Type type = request.getType();
- if (type.is(TypeCase.MESSAGESTREAM)) {
- if (type.getMessageStream().getEndOfRequest()) {
- final CompletableFuture<RaftClientRequest> f =
streamEndOfRequestAsync(request);
- if (f.isCompletedExceptionally()) {
- return f.thenApply(r -> null);
- }
- request = f.join();
- type = request.getType();
- }
- }
+ private CompletableFuture<RaftClientReply> replyFuture(RaftClientRequest
request) throws IOException {
+ final TypeCase type = request.getType().getTypeCase();
+ switch (type) {
+ case STALEREAD:
+ return staleReadAsync(request);
+ case READ:
+ return readAsync(request);
+ case WATCH:
+ return watchAsync(request);
+ case MESSAGESTREAM:
+ return messageStreamAsync(request);
+ case WRITE:
+ case FORWARD:
+ return writeAsync(request);
+ default:
+ throw new IllegalStateException("Unexpected request type: " + type +
", request=" + request);
+ }
+ }
- if (type.is(TypeCase.WATCH)) {
- replyFuture = watchAsync(request);
- } else if (type.is(TypeCase.MESSAGESTREAM)) {
- replyFuture = streamAsync(request);
- } else {
- // query the retry cache
- final RetryCacheImpl.CacheQueryResult queryResult =
retryCache.queryCache(ClientInvocationId.valueOf(request));
- final CacheEntry cacheEntry = queryResult.getEntry();
- if (queryResult.isRetry()) {
- // if the previous attempt is still pending or it succeeded, return
its
- // future
- replyFuture = cacheEntry.getReplyFuture();
- } else {
- // TODO: this client request will not be added to pending requests
until
- // later which means that any failure in between will leave partial
state in
- // the state machine. We should call cancelTransaction() for failed
requests
- final TransactionContextImpl context = (TransactionContextImpl)
stateMachine.startTransaction(
- filterDataStreamRaftClientRequest(request));
- if (context.getException() != null) {
- final StateMachineException e = new
StateMachineException(getMemberId(), context.getException());
- final RaftClientReply exceptionReply = newExceptionReply(request,
e);
- cacheEntry.failWithReply(exceptionReply);
- replyFuture = CompletableFuture.completedFuture(exceptionReply);
- } else {
- replyFuture = appendTransaction(request, context, cacheEntry);
- }
- }
- }
+ private CompletableFuture<RaftClientReply> writeAsync(RaftClientRequest
request) throws IOException {
+ final CompletableFuture<RaftClientReply> reply = checkLeaderState(request);
+ if (reply != null) {
+ return reply;
}
- final RaftClientRequest.Type type = request.getType();
- replyFuture.whenComplete((clientReply, exception) -> {
- timerContext.ifPresent(Timekeeper.Context::stop);
- if (exception != null || clientReply.getException() != null) {
- raftServerMetrics.incFailedRequestCount(type);
- }
- });
- return replyFuture;
+ // query the retry cache
+ final RetryCacheImpl.CacheQueryResult queryResult =
retryCache.queryCache(ClientInvocationId.valueOf(request));
+ final CacheEntry cacheEntry = queryResult.getEntry();
+ if (queryResult.isRetry()) {
+ // return the cached future.
+ return cacheEntry.getReplyFuture();
+ }
+ // TODO: this client request will not be added to pending requests until
+ // later which means that any failure in between will leave partial state
in
+ // the state machine. We should call cancelTransaction() for failed
requests
+ final TransactionContextImpl context = (TransactionContextImpl)
stateMachine.startTransaction(
+ filterDataStreamRaftClientRequest(request));
+ if (context.getException() != null) {
+ final StateMachineException e = new StateMachineException(getMemberId(),
context.getException());
+ final RaftClientReply exceptionReply = newExceptionReply(request, e);
+ cacheEntry.failWithReply(exceptionReply);
+ return CompletableFuture.completedFuture(exceptionReply);
+ }
+
+ return appendTransaction(request, context, cacheEntry);
}
private CompletableFuture<RaftClientReply> watchAsync(RaftClientRequest
request) {
+ final CompletableFuture<RaftClientReply> reply = checkLeaderState(request);
+ if (reply != null) {
+ return reply;
+ }
+
return role.getLeaderState()
.map(ls -> ls.addWatchReqeust(request))
.orElseGet(() -> CompletableFuture.completedFuture(
@@ -1003,7 +996,7 @@ class RaftServerImpl implements RaftServer.Division,
private CompletableFuture<RaftClientReply> readAsync(RaftClientRequest
request) {
if (request.getType().getRead().getPreferNonLinearizable()
|| readOption == RaftServerConfigKeys.Read.Option.DEFAULT) {
- final CompletableFuture<RaftClientReply> reply =
checkLeaderState(request, null, false);
+ final CompletableFuture<RaftClientReply> reply =
checkLeaderState(request);
if (reply != null) {
return reply;
}
@@ -1053,7 +1046,21 @@ class RaftServerImpl implements RaftServer.Division,
}
}
- private CompletableFuture<RaftClientReply> streamAsync(RaftClientRequest
request) {
+ private CompletableFuture<RaftClientReply>
messageStreamAsync(RaftClientRequest request) throws IOException {
+ final CompletableFuture<RaftClientReply> reply = checkLeaderState(request);
+ if (reply != null) {
+ return reply;
+ }
+
+ if (request.getType().getMessageStream().getEndOfRequest()) {
+ final CompletableFuture<RaftClientRequest> f =
streamEndOfRequestAsync(request);
+ if (f.isCompletedExceptionally()) {
+ return f.thenApply(r -> null);
+ }
+ // the message stream has ended and the request become a WRITE request
+ return replyFuture(f.join());
+ }
+
return role.getLeaderState()
.map(ls -> ls.streamAsync(request))
.orElseGet(() -> CompletableFuture.completedFuture(
@@ -1146,7 +1153,7 @@ class RaftServerImpl implements RaftServer.Division,
assertGroup(request.getRequestorId(), request.getRaftGroupId());
synchronized (this) {
- CompletableFuture<RaftClientReply> reply = checkLeaderState(request,
null, false);
+ CompletableFuture<RaftClientReply> reply = checkLeaderState(request);
if (reply != null) {
return reply;
}
@@ -1254,7 +1261,7 @@ class RaftServerImpl implements RaftServer.Division,
assertLifeCycleState(LifeCycle.States.RUNNING);
assertGroup(request.getRequestorId(), request.getRaftGroupId());
- CompletableFuture<RaftClientReply> reply = checkLeaderState(request, null,
true);
+ CompletableFuture<RaftClientReply> reply = checkLeaderState(request);
if (reply != null) {
return reply;
}
@@ -1262,7 +1269,7 @@ class RaftServerImpl implements RaftServer.Division,
final SetConfigurationRequest.Arguments arguments = request.getArguments();
final PendingRequest pending;
synchronized (this) {
- reply = checkLeaderState(request, null, false);
+ reply = checkLeaderState(request);
if (reply != null) {
return reply;
}