szetszwo commented on pull request #372:
URL: https://github.com/apache/incubator-ratis/pull/372#issuecomment-750036972


   Tried to move the code during review but I have not tested it.
   ```
   class TransferLeadership {
     public static final Logger LOG = 
LoggerFactory.getLogger(TransferLeadership.class);
   
     class PendingRequest {
       private final TransferLeadershipRequest request;
       private final CompletableFuture<RaftClientReply> replyFuture = new 
CompletableFuture<>();
   
       PendingRequest(TransferLeadershipRequest request) {
         this.request = request;
       }
   
       TransferLeadershipRequest getRequest() {
         return request;
       }
   
       CompletableFuture<RaftClientReply> getReplyFuture() {
         return replyFuture;
       }
   
       void complete(RaftPeerId currentLeader) {
         if (replyFuture.isDone()) {
           return;
         }
   
         if (currentLeader.equals(request.getNewLeader())) {
           replyFuture.complete(server.newSuccessReply(request));
         } else {
           final TransferLeadershipException e = new 
TransferLeadershipException(
               "Failed to transfer leadership to " + request.getNewLeader() + 
": current leader is " + currentLeader);
           replyFuture.complete(server.newExceptionReply(request, e));
         }
       }
   
       @Override
       public String toString() {
         return request.toString();
       }
     }
   
     private final RaftServerImpl server;
     private final TimeoutScheduler scheduler = TimeoutScheduler.getInstance();
     private final AtomicReference<PendingRequest> pending = new 
AtomicReference<>();
   
     TransferLeadership(RaftServerImpl server) {
       this.server = server;
     }
   
     CompletableFuture<RaftClientReply> start(TransferLeadershipRequest 
request) {
       final MemoizedSupplier<PendingRequest> supplier = JavaUtils.memoize(() 
-> new PendingRequest(request));
       final PendingRequest previous = pending.getAndUpdate(f -> f != null? f: 
supplier.get());
       if (previous != null) {
         if 
(request.getNewLeader().equals(previous.getRequest().getNewLeader())) {
           final CompletableFuture<RaftClientReply> replyFuture = new 
CompletableFuture<>();
           previous.getReplyFuture().whenComplete((r, e) -> {
             if (e != null) {
               replyFuture.completeExceptionally(e);
             } else {
               replyFuture.complete(r.isSuccess()? 
server.newSuccessReply(request)
                   : server.newExceptionReply(request, r.getException()));
             }
           });
           return replyFuture;
         } else {
           final StateMachineException sme = new StateMachineException(
               "Failed to transfer leadership to " + request.getNewLeader() + 
": a previous " + previous + " exists");
           return 
CompletableFuture.completedFuture(server.newExceptionReply(request, sme));
         }
       }
   
       scheduler.onTimeout(TimeDuration.ONE_SECOND,
           () -> finish(server.getState().getLeaderId()),
           LOG, () -> "Timeout check failed for append entry request: " + 
request);
       return supplier.get().getReplyFuture();
     }
   
     void finish(RaftPeerId currentLeader) {
       Optional.ofNullable(pending.getAndSet(null))
           .ifPresent(r -> r.complete(currentLeader));
     }
   }
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to