szetszwo commented on a change in pull request #372:
URL: https://github.com/apache/incubator-ratis/pull/372#discussion_r547844071



##########
File path: 
ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfigurationImpl.java
##########
@@ -145,6 +145,17 @@ boolean containsInConf(RaftPeerId peerId) {
     return conf.contains(peerId);
   }
 
+  boolean isHighestPriority(RaftPeerId peerId) {
+    RaftPeer target = getPeer(peerId);
+    Collection<RaftPeer> peers = getCurrentPeers();
+    for (RaftPeer peer : peers) {
+      if (peer.getPriority() >= target.getPriority() && !peer.equals(target)) {

Review comment:
       Should it be `>`?

##########
File path: 
ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
##########
@@ -866,18 +880,112 @@ RaftClientReply waitForReply(RaftClientRequest request, 
CompletableFuture<RaftCl
 
   @Override
   public RaftClientReply transferLeadership(TransferLeadershipRequest request) 
throws IOException {
-    //TODO(runzhiwang): implement transfer leadership in server
-    return null;
+    return waitForReply(request, transferLeadershipAsync(request));
+  }
+
+  private CompletableFuture<RaftClientReply> 
logAndReturnTransferLeadershipFail(
+      TransferLeadershipRequest request, String msg) {
+    LOG.warn(msg);
+    return CompletableFuture.completedFuture(newExceptionReply(request, new 
StateMachineException(msg)));
+  }
+
+  boolean isSteppingDown() {
+    return finishTransferLeader != null;
+  }
+
+  private Consumer<RaftPeerId> finishTransferLeader;
+
+  public Consumer<RaftPeerId> finishTransferLeader() {
+    return finishTransferLeader;
+  }
+
+  public Consumer<RaftPeerId> setFinishTransferLeader(Consumer<RaftPeerId> 
consumer) {
+    return finishTransferLeader = consumer;
+  }
+
+  private void timeoutTransferLeadership(
+      TransferLeadershipRequest request, CompletableFuture<RaftClientReply> 
replyFuture)
+      throws StateMachineException {
+    synchronized (replyFuture) {

Review comment:
       It is better to use `AtomicReference` instead of `synchronized`.

##########
File path: 
ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
##########
@@ -866,18 +880,112 @@ RaftClientReply waitForReply(RaftClientRequest request, 
CompletableFuture<RaftCl
 
   @Override
   public RaftClientReply transferLeadership(TransferLeadershipRequest request) 
throws IOException {
-    //TODO(runzhiwang): implement transfer leadership in server
-    return null;
+    return waitForReply(request, transferLeadershipAsync(request));
+  }
+
+  private CompletableFuture<RaftClientReply> 
logAndReturnTransferLeadershipFail(
+      TransferLeadershipRequest request, String msg) {
+    LOG.warn(msg);
+    return CompletableFuture.completedFuture(newExceptionReply(request, new 
StateMachineException(msg)));

Review comment:
       We should add a TransferLeadershipException.  StateMachineException 
should come from StateMachine.

##########
File path: 
ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
##########
@@ -866,18 +880,112 @@ RaftClientReply waitForReply(RaftClientRequest request, 
CompletableFuture<RaftCl
 
   @Override
   public RaftClientReply transferLeadership(TransferLeadershipRequest request) 
throws IOException {
-    //TODO(runzhiwang): implement transfer leadership in server
-    return null;
+    return waitForReply(request, transferLeadershipAsync(request));
+  }
+
+  private CompletableFuture<RaftClientReply> 
logAndReturnTransferLeadershipFail(
+      TransferLeadershipRequest request, String msg) {
+    LOG.warn(msg);
+    return CompletableFuture.completedFuture(newExceptionReply(request, new 
StateMachineException(msg)));
+  }
+
+  boolean isSteppingDown() {
+    return finishTransferLeader != null;
+  }
+
+  private Consumer<RaftPeerId> finishTransferLeader;
+
+  public Consumer<RaftPeerId> finishTransferLeader() {
+    return finishTransferLeader;
+  }
+
+  public Consumer<RaftPeerId> setFinishTransferLeader(Consumer<RaftPeerId> 
consumer) {
+    return finishTransferLeader = consumer;
+  }
+
+  private void timeoutTransferLeadership(
+      TransferLeadershipRequest request, CompletableFuture<RaftClientReply> 
replyFuture)
+      throws StateMachineException {
+    synchronized (replyFuture) {
+      if (replyFuture.isDone()) {
+        return;
+      }
+
+      setFinishTransferLeader(null);
+
+      if (state.getLeaderId().equals(request.getNewLeader())) {
+        replyFuture.complete(newSuccessReply(request));
+      } else {
+        StateMachineException sme = new StateMachineException("Failed to 
transfer leadership");
+        replyFuture.complete(newExceptionReply(request, sme));
+        throw sme;
+      }
+    }
   }
 
   @Override
   public CompletableFuture<RaftClientReply> 
transferLeadershipAsync(TransferLeadershipRequest request)
       throws IOException {
-    //TODO(runzhiwang): implement transfer leadership in server
-    return null;
+    LOG.info("{}: receive transferLeadership {}", getMemberId(), request);
+    assertLifeCycleState(LifeCycle.States.RUNNING);
+    assertGroup(request.getRequestorId(), request.getRaftGroupId());
+
+    synchronized (this) {
+      CompletableFuture<RaftClientReply> reply = checkLeaderState(request, 
null, false);
+      if (reply != null) {
+        return reply;
+      }
+
+      if (getId().equals(request.getNewLeader())) {
+        return CompletableFuture.completedFuture(newSuccessReply(request));
+      }
+
+      final RaftConfigurationImpl conf = getRaftConf();
+      final LeaderStateImpl leaderState = role.getLeaderStateNonNull();
+
+      // make sure there is no raft reconfiguration in progress
+      if (!conf.isStable() || leaderState.inStagingState() || 
!state.isConfCommitted()) {
+        String msg = getMemberId() + " refused to transfer leadership to peer 
" + request.getNewLeader() +
+            " when raft reconfiguration in progress.";
+        return logAndReturnTransferLeadershipFail(request, msg);
+      }
+
+      if (!conf.containsInConf(request.getNewLeader())) {
+        String msg = getMemberId() + " refused to transfer leadership to peer 
" + request.getNewLeader() +
+            " as it is not in " + conf;
+        return logAndReturnTransferLeadershipFail(request, msg);
+      }
+
+      if (!conf.isHighestPriority(request.getNewLeader())) {
+        String msg = getMemberId() + " refused to transfer leadership to peer 
" + request.getNewLeader() +
+            " as it does not has highest priority " + conf;
+        return logAndReturnTransferLeadershipFail(request, msg);
+      }
+
+      CompletableFuture<RaftClientReply> replyFuture = new 
CompletableFuture<>();
+
+      setFinishTransferLeader(currLeader -> {
+        synchronized (replyFuture) {
+          if (currLeader == null || replyFuture.isDone()) {
+            return;
+          }
+
+          if (currLeader.equals(request.getNewLeader())) {
+            replyFuture.complete(newSuccessReply(request));
+            setFinishTransferLeader(null);
+          }
+        }
+      });
+
+      scheduler.onTimeout(TimeDuration.valueOf(1000, TimeUnit.MILLISECONDS),
+          () -> timeoutTransferLeadership(request, replyFuture),
+          LOG, () -> "Timeout check failed for append entry request: " + 
request);
+
+      return replyFuture;

Review comment:
       This part can be moved to 
TransferLeadership.start(TransferLeadershipRequest).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to