This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8aa510b705bdcfe5b8ff69bc0e294a56b437f53e
Author: wangyang0918 <[email protected]>
AuthorDate: Fri Mar 26 13:23:37 2021 +0800

    [FLINK-21942][coordination] Remove job from JobLeaderIdService when 
disconnecting JobManager with globally terminal state
---
 .../runtime/resourcemanager/ResourceManager.java   | 23 ++++---
 .../resourcemanager/ResourceManagerTest.java       | 70 ++++++++++++++++++++--
 .../resourcemanager/TestingResourceManager.java    |  7 +++
 3 files changed, 84 insertions(+), 16 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index b6936be..750c016 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -510,12 +510,11 @@ public abstract class ResourceManager<WorkerType extends 
ResourceIDRetrievable>
     @Override
     public void disconnectJobManager(
             final JobID jobId, JobStatus jobStatus, final Exception cause) {
-        closeJobManagerConnection(
-                jobId,
-                jobStatus.isGloballyTerminalState()
-                        ? ResourceRequirementHandling.CLEAR
-                        : ResourceRequirementHandling.RETAIN,
-                cause);
+        if (jobStatus.isGloballyTerminalState()) {
+            removeJob(jobId, cause);
+        } else {
+            closeJobManagerConnection(jobId, 
ResourceRequirementHandling.RETAIN, cause);
+        }
     }
 
     @Override
@@ -1102,7 +1101,7 @@ public abstract class ResourceManager<WorkerType extends 
ResourceIDRetrievable>
         }
     }
 
-    protected void removeJob(JobID jobId) {
+    protected void removeJob(JobID jobId, Exception cause) {
         try {
             jobLeaderIdService.removeJob(jobId);
         } catch (Exception e) {
@@ -1113,10 +1112,7 @@ public abstract class ResourceManager<WorkerType extends 
ResourceIDRetrievable>
         }
 
         if (jobManagerRegistrations.containsKey(jobId)) {
-            closeJobManagerConnection(
-                    jobId,
-                    ResourceRequirementHandling.CLEAR,
-                    new Exception("Job " + jobId + "was removed"));
+            closeJobManagerConnection(jobId, 
ResourceRequirementHandling.CLEAR, cause);
         }
     }
 
@@ -1478,7 +1474,10 @@ public abstract class ResourceManager<WorkerType extends 
ResourceIDRetrievable>
                         @Override
                         public void run() {
                             if (jobLeaderIdService.isValidTimeout(jobId, 
timeoutId)) {
-                                removeJob(jobId);
+                                removeJob(
+                                        jobId,
+                                        new Exception(
+                                                "Job " + jobId + "was removed 
because of timeout"));
                             }
                         }
                     });
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
index 26577d9..9cfb526 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.resourcemanager;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
@@ -54,7 +55,9 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.util.UUID;
+import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 import static org.hamcrest.Matchers.anyOf;
@@ -301,6 +304,59 @@ public class ResourceManagerTest extends TestLogger {
                 });
     }
 
+    @Test
+    public void testDisconnectJobManagerWithTerminalStatusShouldRemoveJob() 
throws Exception {
+        testDisconnectJobManager(JobStatus.CANCELED);
+    }
+
+    @Test
+    public void 
testDisconnectJobManagerWithNonTerminalStatusShouldNotRemoveJob() throws 
Exception {
+        testDisconnectJobManager(JobStatus.FAILING);
+    }
+
+    private void testDisconnectJobManager(JobStatus jobStatus) throws 
Exception {
+        final TestingJobMasterGateway jobMasterGateway =
+                new TestingJobMasterGatewayBuilder()
+                        .setAddress(UUID.randomUUID().toString())
+                        .build();
+        rpcService.registerGateway(jobMasterGateway.getAddress(), 
jobMasterGateway);
+
+        final JobLeaderIdService jobLeaderIdService =
+                new JobLeaderIdService(
+                        highAvailabilityServices,
+                        rpcService.getScheduledExecutor(),
+                        TestingUtils.infiniteTime());
+        resourceManager = createAndStartResourceManager(heartbeatServices, 
jobLeaderIdService);
+
+        highAvailabilityServices.setJobMasterLeaderRetrieverFunction(
+                requestedJobId ->
+                        new SettableLeaderRetrievalService(
+                                jobMasterGateway.getAddress(),
+                                jobMasterGateway.getFencingToken().toUUID()));
+
+        final JobID jobId = JobID.generate();
+        final ResourceManagerGateway resourceManagerGateway =
+                resourceManager.getSelfGateway(ResourceManagerGateway.class);
+        resourceManagerGateway.registerJobManager(
+                jobMasterGateway.getFencingToken(),
+                ResourceID.generate(),
+                jobMasterGateway.getAddress(),
+                jobId,
+                TIMEOUT);
+        final boolean isAdded = runInMainThread(() -> 
jobLeaderIdService.containsJob(jobId));
+        assertThat(isAdded, is(true));
+
+        resourceManagerGateway.disconnectJobManager(jobId, jobStatus, null);
+        final boolean isRemoved = runInMainThread(() -> 
!jobLeaderIdService.containsJob(jobId));
+        assertThat(isRemoved, is(jobStatus.isGloballyTerminalState()));
+    }
+
+    private <T> T runInMainThread(Callable<T> callable) throws Exception {
+        return resourceManager
+                .runInMainThread(callable, TIMEOUT)
+                .get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
+    }
+
     private void runHeartbeatTimeoutTest(
             ThrowingConsumer<ResourceManagerGateway, Exception> 
registerComponentAtResourceManager,
             ThrowingConsumer<ResourceID, Exception> verifyHeartbeatTimeout)
@@ -315,15 +371,21 @@ public class ResourceManagerTest extends TestLogger {
 
     private TestingResourceManager createAndStartResourceManager(
             HeartbeatServices heartbeatServices) throws Exception {
-        final SlotManager slotManager =
-                SlotManagerBuilder.newBuilder()
-                        
.setScheduledExecutor(rpcService.getScheduledExecutor())
-                        .build();
         final JobLeaderIdService jobLeaderIdService =
                 new JobLeaderIdService(
                         highAvailabilityServices,
                         rpcService.getScheduledExecutor(),
                         TestingUtils.infiniteTime());
+        return createAndStartResourceManager(heartbeatServices, 
jobLeaderIdService);
+    }
+
+    private TestingResourceManager createAndStartResourceManager(
+            HeartbeatServices heartbeatServices, JobLeaderIdService 
jobLeaderIdService)
+            throws Exception {
+        final SlotManager slotManager =
+                SlotManagerBuilder.newBuilder()
+                        
.setScheduledExecutor(rpcService.getScheduledExecutor())
+                        .build();
 
         final TestingResourceManager resourceManager =
                 new TestingResourceManager(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java
index 6479a45..f3b6396 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.resourcemanager;
 
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.entrypoint.ClusterInformation;
@@ -33,6 +34,8 @@ import org.apache.flink.runtime.rpc.RpcUtils;
 
 import javax.annotation.Nullable;
 
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ForkJoinPool;
 
 /** Simple {@link ResourceManager} implementation for testing purposes. */
@@ -95,4 +98,8 @@ public class TestingResourceManager extends 
ResourceManager<ResourceID> {
         // cannot stop workers
         return false;
     }
+
+    <T> CompletableFuture<T> runInMainThread(Callable<T> callable, Time 
timeout) {
+        return callAsync(callable, timeout);
+    }
 }

Reply via email to