[FLINK-3982] let only ResourceManager of leading JobManager register In HA mode, multiple ResourceManagers may register at the leading JobManager. They register one after another at the JobManager. The last registering ResourceManager stays registered with the JobManager. This only applies to Standalone mode and doesn't affect functionality.
To prevent duplicate registration for the standalone ResourceManager, the easiest solution is to only start registration when the leading JobManager runs in the same ActorSystem as theResourceManager. Other ResourceManager implementations may also run independently of the JobManager. This closes #2046 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1212b6d3 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1212b6d3 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1212b6d3 Branch: refs/heads/master Commit: 1212b6d3ff676877f84a51e9c849f9e484297947 Parents: 9328006 Author: Maximilian Michels <[email protected]> Authored: Fri May 27 21:03:04 2016 +0200 Committer: Maximilian Michels <[email protected]> Committed: Mon May 30 12:29:20 2016 +0200 ---------------------------------------------------------------------- .../runtime/clusterframework/FlinkResourceManager.java | 5 +++-- .../standalone/StandaloneResourceManager.java | 13 +++++++++---- .../apache/flink/runtime/taskmanager/TaskManager.scala | 2 +- 3 files changed, 13 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/1212b6d3/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java index 95a8f1c..d4945b4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java @@ -404,7 +404,7 @@ public abstract class FlinkResourceManager<WorkerType extends ResourceIDRetrieva * @param leaderAddress The address (Akka URL) of the new leader. Null if there is currently no leader. * @param leaderSessionID The unique session ID marking the leadership session. */ - protected void newJobManagerLeaderAvailable(String leaderAddress, UUID leaderSessionID) { + private void newJobManagerLeaderAvailable(String leaderAddress, UUID leaderSessionID) { LOG.debug("Received new leading JobManager {}. Connecting.", leaderAddress); // disconnect from the current leader (no-op if no leader yet) @@ -426,7 +426,8 @@ public abstract class FlinkResourceManager<WorkerType extends ResourceIDRetrieva * * @param leaderAddress The akka actor URL of the new leader JobManager. */ - private void triggerConnectingToJobManager(String leaderAddress) { + protected void triggerConnectingToJobManager(String leaderAddress) { + LOG.info("Trying to associate with JobManager leader " + leaderAddress); final Object registerMessage = decorateMessage(new RegisterResourceManager(self())); http://git-wip-us.apache.org/repos/asf/flink/blob/1212b6d3/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/standalone/StandaloneResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/standalone/StandaloneResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/standalone/StandaloneResourceManager.java index 88f5039..51a228a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/standalone/StandaloneResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/standalone/StandaloneResourceManager.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.clusterframework.standalone; +import akka.actor.ActorSelection; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.clusterframework.FlinkResourceManager; import org.apache.flink.runtime.clusterframework.ApplicationStatus; @@ -25,7 +26,6 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import java.util.Collection; -import java.util.UUID; /** * A standalone implementation of the resource manager. Used when the system is started in @@ -42,10 +42,15 @@ public class StandaloneResourceManager extends FlinkResourceManager<ResourceID> // Framework specific behavior // ------------------------------------------------------------------------ - @Override - protected void newJobManagerLeaderAvailable(String leaderAddress, UUID leaderSessionID) { - super.newJobManagerLeaderAvailable(leaderAddress, leaderSessionID); + protected void triggerConnectingToJobManager(String leaderAddress) { + ActorSelection jobManagerSel = context().actorSelection(leaderAddress); + // check if we are at the leading JobManager. + if (jobManagerSel.anchorPath().root().equals(self().path().root())) { + super.triggerConnectingToJobManager(leaderAddress); + } else { + LOG.info("Received leader address but not running in leader ActorSystem. Cancelling registration."); + } } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/1212b6d3/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index eb7a0ef..d711c47 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -323,7 +323,7 @@ class TaskManager( } case Disconnect(msg) => - handleJobManagerDisconnect(sender(), s"ResourceManager requested disconnect: $msg") + handleJobManagerDisconnect(sender(), s"JobManager requested disconnect: $msg") triggerTaskManagerRegistration() case msg: StopCluster =>
