[
https://issues.apache.org/jira/browse/FLINK-9421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16488810#comment-16488810
]
ASF GitHub Bot commented on FLINK-9421:
---------------------------------------
Github user GJL commented on a diff in the pull request:
https://github.com/apache/flink/pull/6068#discussion_r190500268
--- Diff:
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
---
@@ -271,6 +279,80 @@ public void
testBlobServerCleanupWhenClosingDispatcher() throws Exception {
assertThat(deleteAllFuture.isDone(), is(false));
}
+ /**
+ * Tests that the {@link RunningJobsRegistry} entries are cleared after
the
+ * job reached a terminal state.
+ */
+ @Test
+ public void testRunningJobsRegistryCleanup() throws Exception {
+ submitJob();
+
+ runningJobsRegistry.setJobRunning(jobId);
+ assertThat(runningJobsRegistry.contains(jobId), is(true));
+
+ resultFuture.complete(new
ArchivedExecutionGraphBuilder().setState(JobStatus.FINISHED).setJobID(jobId).build());
+
+ // wait for the clearing
+ runningJobsRegistry.getClearedFuture().get();
+
+ assertThat(runningJobsRegistry.contains(jobId), is(false));
+ }
+
+ private static final class TestingRunningJobsRegistry implements
RunningJobsRegistry {
+
+ private final JobID jobId;
+
+ private final CompletableFuture<Void> clearedFuture = new
CompletableFuture<>();
+
+ private JobSchedulingStatus jobSchedulingStatus =
JobSchedulingStatus.PENDING;
+
+ private boolean containsJob = false;
+
+ private TestingRunningJobsRegistry(JobID jobId) {
+ this.jobId = jobId;
+ }
+
+ public CompletableFuture<Void> getClearedFuture() {
+ return clearedFuture;
+ }
+
+ @Override
+ public void setJobRunning(JobID jobID) throws IOException {
+ checkJobId(jobID);
+ containsJob = true;
+ jobSchedulingStatus = JobSchedulingStatus.RUNNING;
+ }
+
+ private void checkJobId(JobID jobID) {
+ Preconditions.checkArgument(jobId.equals(jobID));
--- End diff --
Not the best variable names here: `jobId` vs. `jobID`
> RunningJobsRegistry entries are not cleaned up after job termination
> --------------------------------------------------------------------
>
> Key: FLINK-9421
> URL: https://issues.apache.org/jira/browse/FLINK-9421
> Project: Flink
> Issue Type: Bug
> Components: Distributed Coordination
> Affects Versions: 1.5.0, 1.6.0
> Reporter: Till Rohrmann
> Assignee: Till Rohrmann
> Priority: Critical
>
> Currently, the {{Dispatcher}} does not clean up the {{RunningJobsRegistry}}
> after the job has finished. The consequence is that a ZNode with the JobID
> and a state num per job remains in ZooKeeper.
> We should clean up these ZNodes to avoid a resource leak.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)