This is an automated email from the ASF dual-hosted git repository. xtsong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit d0c8547819d726ed8879132d82daf2129b086396 Author: Xintong Song <[email protected]> AuthorDate: Sun Feb 20 17:32:25 2022 +0800 [FLINK-25893][runtime] Fix that ResourceManagerServiceImpl may call ResourceManager#deregisterApplication before RM being fully started. --- .../ResourceManagerServiceImpl.java | 36 ++++++++++++++++------ .../ResourceManagerServiceImplTest.java | 34 ++++++++++++++++++++ 2 files changed, 61 insertions(+), 9 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImpl.java index e0add8e..692993a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImpl.java @@ -128,20 +128,38 @@ public class ResourceManagerServiceImpl implements ResourceManagerService, Leade @Override public CompletableFuture<Void> deregisterApplication( final ApplicationStatus applicationStatus, final @Nullable String diagnostics) { + synchronized (lock) { - if (running && leaderResourceManager != null) { - return leaderResourceManager - .getSelfGateway(ResourceManagerGateway.class) - .deregisterApplication(applicationStatus, diagnostics) - .thenApply(ack -> null); - } else { - return FutureUtils.completedExceptionally( - new FlinkException( - "Cannot deregister application. Resource manager service is not available.")); + if (!running || leaderResourceManager == null) { + return deregisterWithoutLeaderRm(); } + + final ResourceManager<?> currentLeaderRM = leaderResourceManager; + return currentLeaderRM + .getStartedFuture() + .thenCompose( + ignore -> { + synchronized (lock) { + if (isLeader(currentLeaderRM)) { + return currentLeaderRM + .getSelfGateway(ResourceManagerGateway.class) + .deregisterApplication( + applicationStatus, diagnostics) + .thenApply(ack -> null); + } else { + return deregisterWithoutLeaderRm(); + } + } + }); } } + private static CompletableFuture<Void> deregisterWithoutLeaderRm() { + return FutureUtils.completedExceptionally( + new FlinkException( + "Cannot deregister application. Resource manager service is not available.")); + } + @Override public CompletableFuture<Void> closeAsync() { synchronized (lock) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImplTest.java index 3242fc7..e365faa 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImplTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImplTest.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.resourcemanager; import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.entrypoint.ClusterInformation; import org.apache.flink.runtime.heartbeat.HeartbeatServices; @@ -448,6 +449,39 @@ public class ResourceManagerServiceImplTest extends TestLogger { closeServiceFuture.get(TIMEOUT.getSize(), TIMEOUT.getUnit()); } + @Test + public void deregisterApplication_leaderRmNotStarted() throws Exception { + final CompletableFuture<Void> startRmInitializationFuture = new CompletableFuture<>(); + final CompletableFuture<Void> finishRmInitializationFuture = new CompletableFuture<>(); + + rmFactoryBuilder.setInitializeConsumer( + (ignore) -> { + startRmInitializationFuture.complete(null); + blockOnFuture(finishRmInitializationFuture); + }); + + createAndStartResourceManager(); + + // grant leadership + leaderElectionService.isLeader(UUID.randomUUID()); + + // make sure leader RM is created + startRmInitializationFuture.get(TIMEOUT.getSize(), TIMEOUT.getUnit()); + + // deregister application + final CompletableFuture<Void> deregisterApplicationFuture = + resourceManagerService.deregisterApplication(ApplicationStatus.CANCELED, null); + + // RM not fully started, future should not complete + assertNotComplete(deregisterApplicationFuture); + + // finish starting RM + finishRmInitializationFuture.complete(null); + + // should perform deregistration + deregisterApplicationFuture.get(TIMEOUT.getSize(), TIMEOUT.getUnit()); + } + private static void blockOnFuture(CompletableFuture<?> future) { try { future.get();
