Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/6068#discussion_r190571226
--- Diff:
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
---
@@ -271,6 +279,80 @@ public void
testBlobServerCleanupWhenClosingDispatcher() throws Exception {
assertThat(deleteAllFuture.isDone(), is(false));
}
+ /**
+ * Tests that the {@link RunningJobsRegistry} entries are cleared after
the
+ * job reached a terminal state.
+ */
+ @Test
+ public void testRunningJobsRegistryCleanup() throws Exception {
+ submitJob();
+
+ runningJobsRegistry.setJobRunning(jobId);
+ assertThat(runningJobsRegistry.contains(jobId), is(true));
+
+ resultFuture.complete(new
ArchivedExecutionGraphBuilder().setState(JobStatus.FINISHED).setJobID(jobId).build());
+
+ // wait for the clearing
+ runningJobsRegistry.getClearedFuture().get();
+
+ assertThat(runningJobsRegistry.contains(jobId), is(false));
+ }
+
+ private static final class TestingRunningJobsRegistry implements
RunningJobsRegistry {
+
+ private final JobID jobId;
+
+ private final CompletableFuture<Void> clearedFuture = new
CompletableFuture<>();
--- End diff --
Good point. Will change it.
---