Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/6068#discussion_r190571191
--- Diff:
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
---
@@ -271,6 +279,80 @@ public void
testBlobServerCleanupWhenClosingDispatcher() throws Exception {
assertThat(deleteAllFuture.isDone(), is(false));
}
+ /**
+ * Tests that the {@link RunningJobsRegistry} entries are cleared after
the
+ * job reached a terminal state.
+ */
+ @Test
+ public void testRunningJobsRegistryCleanup() throws Exception {
+ submitJob();
+
+ runningJobsRegistry.setJobRunning(jobId);
+ assertThat(runningJobsRegistry.contains(jobId), is(true));
+
+ resultFuture.complete(new
ArchivedExecutionGraphBuilder().setState(JobStatus.FINISHED).setJobID(jobId).build());
+
+ // wait for the clearing
+ runningJobsRegistry.getClearedFuture().get();
+
+ assertThat(runningJobsRegistry.contains(jobId), is(false));
+ }
+
+ private static final class TestingRunningJobsRegistry implements
RunningJobsRegistry {
+
+ private final JobID jobId;
+
+ private final CompletableFuture<Void> clearedFuture = new
CompletableFuture<>();
+
+ private JobSchedulingStatus jobSchedulingStatus =
JobSchedulingStatus.PENDING;
+
+ private boolean containsJob = false;
+
+ private TestingRunningJobsRegistry(JobID jobId) {
+ this.jobId = jobId;
+ }
+
+ public CompletableFuture<Void> getClearedFuture() {
+ return clearedFuture;
+ }
+
+ @Override
+ public void setJobRunning(JobID jobID) throws IOException {
+ checkJobId(jobID);
+ containsJob = true;
+ jobSchedulingStatus = JobSchedulingStatus.RUNNING;
+ }
+
+ private void checkJobId(JobID jobID) {
+ Preconditions.checkArgument(jobId.equals(jobID));
--- End diff --
True, will change it.
---