szetszwo commented on pull request #372:
URL: https://github.com/apache/incubator-ratis/pull/372#issuecomment-750036972
Tried to move the code during review but I have not tested it.
```
class TransferLeadership {
public static final Logger LOG =
LoggerFactory.getLogger(TransferLeadership.class);
class PendingRequest {
private final TransferLeadershipRequest request;
private final CompletableFuture<RaftClientReply> replyFuture = new
CompletableFuture<>();
PendingRequest(TransferLeadershipRequest request) {
this.request = request;
}
TransferLeadershipRequest getRequest() {
return request;
}
CompletableFuture<RaftClientReply> getReplyFuture() {
return replyFuture;
}
void complete(RaftPeerId currentLeader) {
if (replyFuture.isDone()) {
return;
}
if (currentLeader.equals(request.getNewLeader())) {
replyFuture.complete(server.newSuccessReply(request));
} else {
final TransferLeadershipException e = new
TransferLeadershipException(
"Failed to transfer leadership to " + request.getNewLeader() +
": current leader is " + currentLeader);
replyFuture.complete(server.newExceptionReply(request, e));
}
}
@Override
public String toString() {
return request.toString();
}
}
private final RaftServerImpl server;
private final TimeoutScheduler scheduler = TimeoutScheduler.getInstance();
private final AtomicReference<PendingRequest> pending = new
AtomicReference<>();
TransferLeadership(RaftServerImpl server) {
this.server = server;
}
CompletableFuture<RaftClientReply> start(TransferLeadershipRequest
request) {
final MemoizedSupplier<PendingRequest> supplier = JavaUtils.memoize(()
-> new PendingRequest(request));
final PendingRequest previous = pending.getAndUpdate(f -> f != null? f:
supplier.get());
if (previous != null) {
if
(request.getNewLeader().equals(previous.getRequest().getNewLeader())) {
final CompletableFuture<RaftClientReply> replyFuture = new
CompletableFuture<>();
previous.getReplyFuture().whenComplete((r, e) -> {
if (e != null) {
replyFuture.completeExceptionally(e);
} else {
replyFuture.complete(r.isSuccess()?
server.newSuccessReply(request)
: server.newExceptionReply(request, r.getException()));
}
});
return replyFuture;
} else {
final StateMachineException sme = new StateMachineException(
"Failed to transfer leadership to " + request.getNewLeader() +
": a previous " + previous + " exists");
return
CompletableFuture.completedFuture(server.newExceptionReply(request, sme));
}
}
scheduler.onTimeout(TimeDuration.ONE_SECOND,
() -> finish(server.getState().getLeaderId()),
LOG, () -> "Timeout check failed for append entry request: " +
request);
return supplier.get().getReplyFuture();
}
void finish(RaftPeerId currentLeader) {
Optional.ofNullable(pending.getAndSet(null))
.ifPresent(r -> r.complete(currentLeader));
}
}
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]