tillrohrmann commented on a change in pull request #6464: [FLINK-9936][mesos] 
Mesos resource manager unable to connect to master after failover 
URL: https://github.com/apache/flink/pull/6464#discussion_r207491799
 
 

 ##########
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ##########
 @@ -892,30 +896,48 @@ protected void onFatalError(Throwable t) {
         */
        @Override
        public void grantLeadership(final UUID newLeaderSessionID) {
-               runAsyncWithoutFencing(
-                       () -> {
-                               final ResourceManagerId newResourceManagerId = 
ResourceManagerId.fromUuid(newLeaderSessionID);
-
-                               log.info("ResourceManager {} was granted 
leadership with fencing token {}", getAddress(), newResourceManagerId);
+               final CompletableFuture<Boolean> acceptLeadershipFuture = 
CompletableFuture.supplyAsync(
+                       () -> tryAcceptLeadership(newLeaderSessionID),
+                       
getUnfencedMainThreadExecutor()).thenCompose(Function.identity());
+
+               final CompletableFuture<Void> confirmationFuture = 
acceptLeadershipFuture.thenAcceptAsync(
+                       (acceptLeadership) -> {
+                               if (acceptLeadership) {
+                                       // confirming the leader session ID 
might be blocking,
+                                       
leaderElectionService.confirmLeaderSessionID(newLeaderSessionID);
+                               }
+                       },
+                       getRpcService().getExecutor());
 
-                               // clear the state if we've been the leader 
before
-                               if (getFencingToken() != null) {
-                                       clearStateInternal();
+               confirmationFuture.whenComplete(
+                       (Void ignored, Throwable throwable) -> {
+                               if (throwable != null) {
+                                       
onFatalError(ExceptionUtils.stripCompletionException(throwable));
                                }
+                       });
+       }
 
-                               setFencingToken(newResourceManagerId);
+       private CompletableFuture<Boolean> tryAcceptLeadership(final UUID 
newLeaderSessionID) {
+               if (leaderElectionService.hasLeadership(newLeaderSessionID)) {
+                       final ResourceManagerId newResourceManagerId = 
ResourceManagerId.fromUuid(newLeaderSessionID);
 
-                               slotManager.start(getFencingToken(), 
getMainThreadExecutor(), new ResourceActionsImpl());
+                       log.info("ResourceManager {} was granted leadership 
with fencing token {}", getAddress(), newResourceManagerId);
 
-                               prepareLeadershipAsync()
-                                       .thenRunAsync(() ->
-                                               // confirming the leader 
session ID might be blocking,
-                                               
leaderElectionService.confirmLeaderSessionID(newLeaderSessionID), 
getRpcService().getExecutor())
-                                       .exceptionally(t -> {
-                                               onFatalError(t);
-                                               return null;
-                                       });
-                       });
+                       // clear the state if we've been the leader before
+                       if (getFencingToken() != null) {
+                               clearStateInternal();
+                       }
+
+                       setFencingToken(newResourceManagerId);
+
+                       slotManager.start(getFencingToken(), 
getMainThreadExecutor(), new ResourceActionsImpl());
+
+                       return clearStateFuture
 
 Review comment:
   Shouldn't we wait for the completion of the `clearStateFuture` before 
setting the new fencing token and starting components? Otherwise, these newly 
started components might interact with the ones being shut down.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to