[hotfix] Let JobLeaderService terminate leader retrieval services

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/007cf2b2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/007cf2b2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/007cf2b2

Branch: refs/heads/release-1.5
Commit: 007cf2b2ae7ac9c31e04e183fbc0dbe527399d63
Parents: 8485912
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Thu Mar 1 19:04:15 2018 +0100
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Fri Mar 2 08:53:51 2018 +0100

----------------------------------------------------------------------
 .../apache/flink/runtime/taskexecutor/JobLeaderService.java   | 7 ++++++-
 1 file changed, 6 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/007cf2b2/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
index 5376362..500d7e4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
@@ -190,7 +190,12 @@ public class JobLeaderService {
 
                JobLeaderService.JobManagerLeaderListener 
jobManagerLeaderListener = new JobManagerLeaderListener(jobId);
 
-               jobLeaderServices.put(jobId, Tuple2.of(leaderRetrievalService, 
jobManagerLeaderListener));
+               final Tuple2<LeaderRetrievalService, JobManagerLeaderListener> 
oldEntry = jobLeaderServices.put(jobId, Tuple2.of(leaderRetrievalService, 
jobManagerLeaderListener));
+
+               if (oldEntry != null) {
+                       oldEntry.f0.stop();
+                       oldEntry.f1.stop();
+               }
 
                leaderRetrievalService.start(jobManagerLeaderListener);
        }

Reply via email to