[FLINK-4141] remove leaderUpdated() method from ResourceManager This removes the leaderUpdated method from the framework. Further it lets the RM client thread communicate directly with the ResourceManager actor. This is fine since the two are always spawned together. Failures of the ResourceManager actor will lead to dropped messages of the RM client thread. Failures of the RM client thread will inform the JobManager.
The leaderUpdated() method was used to signal the ResourceManager framework that a new leader was elected. However, the method was not always called when the leader changed, only when a new leader was elected. This dropped all messages from the async Yarn RM client thread (YarnResourceManagerCallbackHandler) for the time that the old leader had failed and no new leader had been elected. The Yarn RM client thread used leader tagged messages to communicate with the main Flink ResourceManager actor. This closes #2190 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f722b737 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f722b737 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f722b737 Branch: refs/heads/master Commit: f722b73772eb66cdb79a288300e38ff7026c7e1f Parents: 16cdb61 Author: Maximilian Michels <[email protected]> Authored: Fri Jul 1 16:27:18 2016 +0200 Committer: Maximilian Michels <[email protected]> Committed: Fri Jul 1 20:12:32 2016 +0200 ---------------------------------------------------------------------- .../clusterframework/FlinkResourceManager.java | 9 ------- .../standalone/StandaloneResourceManager.java | 5 ---- .../flink/yarn/YarnFlinkResourceManager.java | 11 +-------- .../YarnResourceManagerCallbackHandler.java | 26 +++++++++----------- .../yarn/messages/ContainersAllocated.java | 3 +-- .../flink/yarn/messages/ContainersComplete.java | 3 +-- 6 files changed, 15 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/f722b737/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java index 0aaf098..d28d4aa 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java @@ -494,9 +494,6 @@ public abstract class FlinkResourceManager<WorkerType extends ResourceIDRetrieva jobManager = newJobManagerLeader; - // inform the framework that we have updated the leader - leaderUpdated(); - if (workers.size() > 0) { LOG.info("Received TaskManagers that were registered at the leader JobManager. " + "Trying to consolidate."); @@ -645,12 +642,6 @@ public abstract class FlinkResourceManager<WorkerType extends ResourceIDRetrieva protected abstract void initialize() throws Exception; /** - * Provides codes to handle an update of the leader (relevant for HA). The framework has to deal - * with the consequences of a leader update. - */ - protected abstract void leaderUpdated(); - - /** * The framework specific code for shutting down the application. This should report the * application's final status and shut down the resource manager cleanly. * http://git-wip-us.apache.org/repos/asf/flink/blob/f722b737/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/standalone/StandaloneResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/standalone/StandaloneResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/standalone/StandaloneResourceManager.java index 51a228a..4626461 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/standalone/StandaloneResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/standalone/StandaloneResourceManager.java @@ -59,11 +59,6 @@ public class StandaloneResourceManager extends FlinkResourceManager<ResourceID> } @Override - protected void leaderUpdated() { - // nothing to update - } - - @Override protected void shutdownApplication(ApplicationStatus finalStatus, String optionalDiagnostics) { } http://git-wip-us.apache.org/repos/asf/flink/blob/f722b737/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java index da56ff8..883a860 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java @@ -29,8 +29,6 @@ import org.apache.flink.runtime.clusterframework.ApplicationStatus; import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; import org.apache.flink.runtime.clusterframework.messages.StopCluster; import org.apache.flink.runtime.clusterframework.types.ResourceID; -import org.apache.flink.runtime.instance.ActorGateway; -import org.apache.flink.runtime.instance.AkkaActorGateway; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.yarn.messages.ContainersAllocated; import org.apache.flink.yarn.messages.ContainersComplete; @@ -181,8 +179,7 @@ public class YarnFlinkResourceManager extends FlinkResourceManager<RegisteredYar LOG.info("Initializing YARN resource master"); // create the client to communicate with the ResourceManager - ActorGateway selfGateway = new AkkaActorGateway(self(), getLeaderSessionID()); - resourceManagerCallbackHandler = new YarnResourceManagerCallbackHandler(selfGateway); + resourceManagerCallbackHandler = new YarnResourceManagerCallbackHandler(self()); resourceManagerClient = AMRMClientAsync.createAMRMClientAsync( yarnHeartbeatIntervalMillis, resourceManagerCallbackHandler); @@ -224,12 +221,6 @@ public class YarnFlinkResourceManager extends FlinkResourceManager<RegisteredYar } @Override - protected void leaderUpdated() { - AkkaActorGateway newGateway = new AkkaActorGateway(self(), getLeaderSessionID()); - resourceManagerCallbackHandler.setCurrentLeaderGateway(newGateway); - } - - @Override protected void shutdownApplication(ApplicationStatus finalStatus, String optionalDiagnostics) { // first, de-register from YARN FinalApplicationStatus yarnStatus = getYarnStatus(finalStatus); http://git-wip-us.apache.org/repos/asf/flink/blob/f722b737/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerCallbackHandler.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerCallbackHandler.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerCallbackHandler.java index 84f1b3a..1e287c2 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerCallbackHandler.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerCallbackHandler.java @@ -18,8 +18,8 @@ package org.apache.flink.yarn; +import akka.actor.ActorRef; import org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred; -import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.yarn.messages.ContainersAllocated; import org.apache.flink.yarn.messages.ContainersComplete; @@ -38,13 +38,13 @@ import java.util.List; public class YarnResourceManagerCallbackHandler implements AMRMClientAsync.CallbackHandler { /** The yarn master to which we report the callbacks */ - private ActorGateway yarnFrameworkMaster; + private ActorRef yarnFrameworkMaster; /** The progress we report */ private float currentProgress; - public YarnResourceManagerCallbackHandler(ActorGateway yarnFrameworkMaster) { + public YarnResourceManagerCallbackHandler(ActorRef yarnFrameworkMaster) { this.yarnFrameworkMaster = yarnFrameworkMaster; } @@ -65,12 +65,16 @@ public class YarnResourceManagerCallbackHandler implements AMRMClientAsync.Callb @Override public void onContainersCompleted(List<ContainerStatus> list) { - yarnFrameworkMaster.tell(new ContainersComplete(list)); + yarnFrameworkMaster.tell( + new ContainersComplete(list), + ActorRef.noSender()); } @Override public void onContainersAllocated(List<Container> containers) { - yarnFrameworkMaster.tell(new ContainersAllocated(containers)); + yarnFrameworkMaster.tell( + new ContainersAllocated(containers), + ActorRef.noSender()); } @Override @@ -85,14 +89,8 @@ public class YarnResourceManagerCallbackHandler implements AMRMClientAsync.Callb @Override public void onError(Throwable error) { - yarnFrameworkMaster.tell(new FatalErrorOccurred("Connection to YARN Resource Manager failed", error)); - } - - /** - * Leaders may change. The current gateway can be adjusted here. - * @param gateway The current gateway to the leading job manager. - */ - public void setCurrentLeaderGateway(ActorGateway gateway) { - this.yarnFrameworkMaster = gateway; + yarnFrameworkMaster.tell( + new FatalErrorOccurred("Connection to YARN Resource Manager failed", error), + ActorRef.noSender()); } } http://git-wip-us.apache.org/repos/asf/flink/blob/f722b737/flink-yarn/src/main/java/org/apache/flink/yarn/messages/ContainersAllocated.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/messages/ContainersAllocated.java b/flink-yarn/src/main/java/org/apache/flink/yarn/messages/ContainersAllocated.java index 98783ab..2648e44 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/messages/ContainersAllocated.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/messages/ContainersAllocated.java @@ -18,7 +18,6 @@ package org.apache.flink.yarn.messages; -import org.apache.flink.runtime.messages.RequiresLeaderSessionID; import org.apache.flink.yarn.YarnFlinkResourceManager; import org.apache.hadoop.yarn.api.records.Container; @@ -30,7 +29,7 @@ import java.util.List; * * NOTE: This message is not serializable, because the Container object is not serializable. */ -public class ContainersAllocated implements RequiresLeaderSessionID { +public class ContainersAllocated { private final List<Container> containers; http://git-wip-us.apache.org/repos/asf/flink/blob/f722b737/flink-yarn/src/main/java/org/apache/flink/yarn/messages/ContainersComplete.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/messages/ContainersComplete.java b/flink-yarn/src/main/java/org/apache/flink/yarn/messages/ContainersComplete.java index 9bb07a1..65bafbc 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/messages/ContainersComplete.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/messages/ContainersComplete.java @@ -18,7 +18,6 @@ package org.apache.flink.yarn.messages; -import org.apache.flink.runtime.messages.RequiresLeaderSessionID; import org.apache.flink.yarn.YarnFlinkResourceManager; import org.apache.hadoop.yarn.api.records.ContainerStatus; @@ -31,7 +30,7 @@ import java.util.List; * * NOTE: This message is not serializable, because the ContainerStatus object is not serializable. */ -public class ContainersComplete implements RequiresLeaderSessionID { +public class ContainersComplete { private final List<ContainerStatus> containers;
