[FLINK-3982] let only ResourceManager of leading JobManager register

In HA mode, multiple ResourceManagers may register at the leading
JobManager. They register one after another at the JobManager. The last
registering ResourceManager stays registered with the JobManager. This
only applies to Standalone mode and doesn't affect functionality.

To prevent duplicate registration for the standalone ResourceManager,
the easiest solution is to only start registration when the leading
JobManager runs in the same ActorSystem as theResourceManager. Other
ResourceManager implementations may also run independently of the
JobManager.

This closes #2046


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1212b6d3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1212b6d3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1212b6d3

Branch: refs/heads/master
Commit: 1212b6d3ff676877f84a51e9c849f9e484297947
Parents: 9328006
Author: Maximilian Michels <[email protected]>
Authored: Fri May 27 21:03:04 2016 +0200
Committer: Maximilian Michels <[email protected]>
Committed: Mon May 30 12:29:20 2016 +0200

----------------------------------------------------------------------
 .../runtime/clusterframework/FlinkResourceManager.java |  5 +++--
 .../standalone/StandaloneResourceManager.java          | 13 +++++++++----
 .../apache/flink/runtime/taskmanager/TaskManager.scala |  2 +-
 3 files changed, 13 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1212b6d3/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java
index 95a8f1c..d4945b4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java
@@ -404,7 +404,7 @@ public abstract class FlinkResourceManager<WorkerType 
extends ResourceIDRetrieva
         * @param leaderAddress The address (Akka URL) of the new leader. Null 
if there is currently no leader.
         * @param leaderSessionID The unique session ID marking the leadership 
session.
         */
-       protected void newJobManagerLeaderAvailable(String leaderAddress, UUID 
leaderSessionID) {
+       private void newJobManagerLeaderAvailable(String leaderAddress, UUID 
leaderSessionID) {
                LOG.debug("Received new leading JobManager {}. Connecting.", 
leaderAddress);
 
                // disconnect from the current leader (no-op if no leader yet)
@@ -426,7 +426,8 @@ public abstract class FlinkResourceManager<WorkerType 
extends ResourceIDRetrieva
         *
         * @param leaderAddress The akka actor URL of the new leader JobManager.
         */
-       private void triggerConnectingToJobManager(String leaderAddress) {
+       protected void triggerConnectingToJobManager(String leaderAddress) {
+
                LOG.info("Trying to associate with JobManager leader " + 
leaderAddress);
 
                final Object registerMessage = decorateMessage(new 
RegisterResourceManager(self()));

http://git-wip-us.apache.org/repos/asf/flink/blob/1212b6d3/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/standalone/StandaloneResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/standalone/StandaloneResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/standalone/StandaloneResourceManager.java
index 88f5039..51a228a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/standalone/StandaloneResourceManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/standalone/StandaloneResourceManager.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.clusterframework.standalone;
 
+import akka.actor.ActorSelection;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
@@ -25,7 +26,6 @@ import 
org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 
 import java.util.Collection;
-import java.util.UUID;
 
 /**
  * A standalone implementation of the resource manager. Used when the system 
is started in
@@ -42,10 +42,15 @@ public class StandaloneResourceManager extends 
FlinkResourceManager<ResourceID>
        //  Framework specific behavior
        // 
------------------------------------------------------------------------
 
-
        @Override
-       protected void newJobManagerLeaderAvailable(String leaderAddress, UUID 
leaderSessionID) {
-               super.newJobManagerLeaderAvailable(leaderAddress, 
leaderSessionID);
+       protected void triggerConnectingToJobManager(String leaderAddress) {
+               ActorSelection jobManagerSel = 
context().actorSelection(leaderAddress);
+               // check if we are at the leading JobManager.
+               if 
(jobManagerSel.anchorPath().root().equals(self().path().root())) {
+                       super.triggerConnectingToJobManager(leaderAddress);
+               } else {
+                       LOG.info("Received leader address but not running in 
leader ActorSystem. Cancelling registration.");
+               }
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/1212b6d3/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index eb7a0ef..d711c47 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -323,7 +323,7 @@ class TaskManager(
       }
 
     case Disconnect(msg) =>
-      handleJobManagerDisconnect(sender(), s"ResourceManager requested 
disconnect: $msg")
+      handleJobManagerDisconnect(sender(), s"JobManager requested disconnect: 
$msg")
       triggerTaskManagerRegistration()
 
     case msg: StopCluster =>

Reply via email to