This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 8aa510b705bdcfe5b8ff69bc0e294a56b437f53e Author: wangyang0918 <[email protected]> AuthorDate: Fri Mar 26 13:23:37 2021 +0800 [FLINK-21942][coordination] Remove job from JobLeaderIdService when disconnecting JobManager with globally terminal state --- .../runtime/resourcemanager/ResourceManager.java | 23 ++++--- .../resourcemanager/ResourceManagerTest.java | 70 ++++++++++++++++++++-- .../resourcemanager/TestingResourceManager.java | 7 +++ 3 files changed, 84 insertions(+), 16 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java index b6936be..750c016 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java @@ -510,12 +510,11 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable> @Override public void disconnectJobManager( final JobID jobId, JobStatus jobStatus, final Exception cause) { - closeJobManagerConnection( - jobId, - jobStatus.isGloballyTerminalState() - ? ResourceRequirementHandling.CLEAR - : ResourceRequirementHandling.RETAIN, - cause); + if (jobStatus.isGloballyTerminalState()) { + removeJob(jobId, cause); + } else { + closeJobManagerConnection(jobId, ResourceRequirementHandling.RETAIN, cause); + } } @Override @@ -1102,7 +1101,7 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable> } } - protected void removeJob(JobID jobId) { + protected void removeJob(JobID jobId, Exception cause) { try { jobLeaderIdService.removeJob(jobId); } catch (Exception e) { @@ -1113,10 +1112,7 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable> } if (jobManagerRegistrations.containsKey(jobId)) { - closeJobManagerConnection( - jobId, - ResourceRequirementHandling.CLEAR, - new Exception("Job " + jobId + "was removed")); + closeJobManagerConnection(jobId, ResourceRequirementHandling.CLEAR, cause); } } @@ -1478,7 +1474,10 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable> @Override public void run() { if (jobLeaderIdService.isValidTimeout(jobId, timeoutId)) { - removeJob(jobId); + removeJob( + jobId, + new Exception( + "Job " + jobId + "was removed because of timeout")); } } }); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java index 26577d9..9cfb526 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.resourcemanager; import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; @@ -54,7 +55,9 @@ import org.junit.BeforeClass; import org.junit.Test; import java.util.UUID; +import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import static org.hamcrest.Matchers.anyOf; @@ -301,6 +304,59 @@ public class ResourceManagerTest extends TestLogger { }); } + @Test + public void testDisconnectJobManagerWithTerminalStatusShouldRemoveJob() throws Exception { + testDisconnectJobManager(JobStatus.CANCELED); + } + + @Test + public void testDisconnectJobManagerWithNonTerminalStatusShouldNotRemoveJob() throws Exception { + testDisconnectJobManager(JobStatus.FAILING); + } + + private void testDisconnectJobManager(JobStatus jobStatus) throws Exception { + final TestingJobMasterGateway jobMasterGateway = + new TestingJobMasterGatewayBuilder() + .setAddress(UUID.randomUUID().toString()) + .build(); + rpcService.registerGateway(jobMasterGateway.getAddress(), jobMasterGateway); + + final JobLeaderIdService jobLeaderIdService = + new JobLeaderIdService( + highAvailabilityServices, + rpcService.getScheduledExecutor(), + TestingUtils.infiniteTime()); + resourceManager = createAndStartResourceManager(heartbeatServices, jobLeaderIdService); + + highAvailabilityServices.setJobMasterLeaderRetrieverFunction( + requestedJobId -> + new SettableLeaderRetrievalService( + jobMasterGateway.getAddress(), + jobMasterGateway.getFencingToken().toUUID())); + + final JobID jobId = JobID.generate(); + final ResourceManagerGateway resourceManagerGateway = + resourceManager.getSelfGateway(ResourceManagerGateway.class); + resourceManagerGateway.registerJobManager( + jobMasterGateway.getFencingToken(), + ResourceID.generate(), + jobMasterGateway.getAddress(), + jobId, + TIMEOUT); + final boolean isAdded = runInMainThread(() -> jobLeaderIdService.containsJob(jobId)); + assertThat(isAdded, is(true)); + + resourceManagerGateway.disconnectJobManager(jobId, jobStatus, null); + final boolean isRemoved = runInMainThread(() -> !jobLeaderIdService.containsJob(jobId)); + assertThat(isRemoved, is(jobStatus.isGloballyTerminalState())); + } + + private <T> T runInMainThread(Callable<T> callable) throws Exception { + return resourceManager + .runInMainThread(callable, TIMEOUT) + .get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS); + } + private void runHeartbeatTimeoutTest( ThrowingConsumer<ResourceManagerGateway, Exception> registerComponentAtResourceManager, ThrowingConsumer<ResourceID, Exception> verifyHeartbeatTimeout) @@ -315,15 +371,21 @@ public class ResourceManagerTest extends TestLogger { private TestingResourceManager createAndStartResourceManager( HeartbeatServices heartbeatServices) throws Exception { - final SlotManager slotManager = - SlotManagerBuilder.newBuilder() - .setScheduledExecutor(rpcService.getScheduledExecutor()) - .build(); final JobLeaderIdService jobLeaderIdService = new JobLeaderIdService( highAvailabilityServices, rpcService.getScheduledExecutor(), TestingUtils.infiniteTime()); + return createAndStartResourceManager(heartbeatServices, jobLeaderIdService); + } + + private TestingResourceManager createAndStartResourceManager( + HeartbeatServices heartbeatServices, JobLeaderIdService jobLeaderIdService) + throws Exception { + final SlotManager slotManager = + SlotManagerBuilder.newBuilder() + .setScheduledExecutor(rpcService.getScheduledExecutor()) + .build(); final TestingResourceManager resourceManager = new TestingResourceManager( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java index 6479a45..f3b6396 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.resourcemanager; +import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.clusterframework.ApplicationStatus; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.entrypoint.ClusterInformation; @@ -33,6 +34,8 @@ import org.apache.flink.runtime.rpc.RpcUtils; import javax.annotation.Nullable; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ForkJoinPool; /** Simple {@link ResourceManager} implementation for testing purposes. */ @@ -95,4 +98,8 @@ public class TestingResourceManager extends ResourceManager<ResourceID> { // cannot stop workers return false; } + + <T> CompletableFuture<T> runInMainThread(Callable<T> callable, Time timeout) { + return callAsync(callable, timeout); + } }
