tillrohrmann commented on a change in pull request #6464: [FLINK-9936][mesos]
Mesos resource manager unable to connect to master after failover
URL: https://github.com/apache/flink/pull/6464#discussion_r207491799
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
##########
@@ -892,30 +896,48 @@ protected void onFatalError(Throwable t) {
*/
@Override
public void grantLeadership(final UUID newLeaderSessionID) {
- runAsyncWithoutFencing(
- () -> {
- final ResourceManagerId newResourceManagerId =
ResourceManagerId.fromUuid(newLeaderSessionID);
-
- log.info("ResourceManager {} was granted
leadership with fencing token {}", getAddress(), newResourceManagerId);
+ final CompletableFuture<Boolean> acceptLeadershipFuture =
CompletableFuture.supplyAsync(
+ () -> tryAcceptLeadership(newLeaderSessionID),
+
getUnfencedMainThreadExecutor()).thenCompose(Function.identity());
+
+ final CompletableFuture<Void> confirmationFuture =
acceptLeadershipFuture.thenAcceptAsync(
+ (acceptLeadership) -> {
+ if (acceptLeadership) {
+ // confirming the leader session ID
might be blocking,
+
leaderElectionService.confirmLeaderSessionID(newLeaderSessionID);
+ }
+ },
+ getRpcService().getExecutor());
- // clear the state if we've been the leader
before
- if (getFencingToken() != null) {
- clearStateInternal();
+ confirmationFuture.whenComplete(
+ (Void ignored, Throwable throwable) -> {
+ if (throwable != null) {
+
onFatalError(ExceptionUtils.stripCompletionException(throwable));
}
+ });
+ }
- setFencingToken(newResourceManagerId);
+ private CompletableFuture<Boolean> tryAcceptLeadership(final UUID
newLeaderSessionID) {
+ if (leaderElectionService.hasLeadership(newLeaderSessionID)) {
+ final ResourceManagerId newResourceManagerId =
ResourceManagerId.fromUuid(newLeaderSessionID);
- slotManager.start(getFencingToken(),
getMainThreadExecutor(), new ResourceActionsImpl());
+ log.info("ResourceManager {} was granted leadership
with fencing token {}", getAddress(), newResourceManagerId);
- prepareLeadershipAsync()
- .thenRunAsync(() ->
- // confirming the leader
session ID might be blocking,
-
leaderElectionService.confirmLeaderSessionID(newLeaderSessionID),
getRpcService().getExecutor())
- .exceptionally(t -> {
- onFatalError(t);
- return null;
- });
- });
+ // clear the state if we've been the leader before
+ if (getFencingToken() != null) {
+ clearStateInternal();
+ }
+
+ setFencingToken(newResourceManagerId);
+
+ slotManager.start(getFencingToken(),
getMainThreadExecutor(), new ResourceActionsImpl());
+
+ return clearStateFuture
Review comment:
Shouldn't we wait for the completion of the `clearStateFuture` before
setting the new fencing token and starting components? Otherwise, these newly
started components might interact with the ones being shut down.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services