[FLINK-4555] wait for ResourceManager to cleanly unregister application This ensures that the ResourceManager has enough time to unregister the application before shutting down.
This closes #2514 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1e4b7ebd Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1e4b7ebd Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1e4b7ebd Branch: refs/heads/master Commit: 1e4b7ebd4aa7dba9aa87122fec9db561df198160 Parents: 40c978b Author: Maximilian Michels <[email protected]> Authored: Wed Sep 21 18:45:37 2016 +0200 Committer: Maximilian Michels <[email protected]> Committed: Sat Sep 24 13:45:50 2016 +0200 ---------------------------------------------------------------------- .../runtime/clusterframework/FlinkResourceManager.java | 8 +++++--- .../org/apache/flink/runtime/jobmanager/JobManager.scala | 11 +++++++++-- 2 files changed, 14 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/1e4b7ebd/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java index 7ea286d..911c1f6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java @@ -33,16 +33,17 @@ import org.apache.flink.runtime.akka.FlinkUntypedActor; import org.apache.flink.runtime.clusterframework.messages.CheckAndAllocateContainers; import org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred; import org.apache.flink.runtime.clusterframework.messages.InfoMessage; -import org.apache.flink.runtime.clusterframework.messages.RegisterInfoMessageListenerSuccessful; -import org.apache.flink.runtime.clusterframework.messages.NotifyResourceStarted; -import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManagerSuccessful; import org.apache.flink.runtime.clusterframework.messages.NewLeaderAvailable; +import org.apache.flink.runtime.clusterframework.messages.NotifyResourceStarted; import org.apache.flink.runtime.clusterframework.messages.RegisterInfoMessageListener; +import org.apache.flink.runtime.clusterframework.messages.RegisterInfoMessageListenerSuccessful; import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManager; +import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManagerSuccessful; import org.apache.flink.runtime.clusterframework.messages.RemoveResource; import org.apache.flink.runtime.clusterframework.messages.ResourceRemoved; import org.apache.flink.runtime.clusterframework.messages.SetWorkerPoolSize; import org.apache.flink.runtime.clusterframework.messages.StopCluster; +import org.apache.flink.runtime.clusterframework.messages.StopClusterSuccessful; import org.apache.flink.runtime.clusterframework.messages.TriggerRegistrationAtJobManager; import org.apache.flink.runtime.clusterframework.messages.UnRegisterInfoMessageListener; import org.apache.flink.runtime.clusterframework.types.ResourceID; @@ -253,6 +254,7 @@ public abstract class FlinkResourceManager<WorkerType extends ResourceIDRetrieva else if (message instanceof StopCluster) { StopCluster msg = (StopCluster) message; shutdownCluster(msg.finalStatus(), msg.message()); + sender().tell(decorateMessage(StopClusterSuccessful.getInstance()), ActorRef.noSender()); } // --- miscellaneous messages http://git-wip-us.apache.org/repos/asf/flink/blob/1e4b7ebd/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index fd96f86..a733943 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -1024,8 +1024,15 @@ class JobManager( // send resource manager the ok currentResourceManager match { case Some(rm) => - // inform rm - rm ! decorateMessage(msg) + try { + // inform rm and wait for it to confirm + val waitTime = FiniteDuration(5, TimeUnit.SECONDS) + val answer = (rm ? decorateMessage(msg))(waitTime) + Await.ready(answer, waitTime) + } catch { + case e: TimeoutException => + case e: InterruptedException => + } case None => // ResourceManager not available // we choose not to wait here beacuse it might block the shutdown forever
