[
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15204782#comment-15204782
]
ASF GitHub Bot commented on FLINK-3544:
---------------------------------------
Github user mxm commented on a diff in the pull request:
https://github.com/apache/flink/pull/1741#discussion_r56869070
--- Diff:
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
---
@@ -886,10 +957,42 @@ class JobManager(
if (instanceManager.isRegistered(taskManager)) {
log.info(s"Task manager ${taskManager.path} wants to disconnect,
because $msg.")
- instanceManager.unregisterTaskManager(taskManager, false)
+ instanceManager.unregisterTaskManager(taskManager, false)
context.unwatch(taskManager)
}
+ case msg: StopCluster =>
+
+ log.info(s"Stopping JobManager with final application status
${msg.finalStatus()} " +
+ s"and diagnostics: ${msg.message()}")
+
+ val respondTo = sender()
+
+ // stop all task managers
+ instanceManager.getAllRegisteredInstances.asScala foreach {
+ instance =>
+ instance.getActorGateway.tell(msg)
+ }
+
+ // send resource manager the ok
+ currentResourceManager match {
+ case Some(rm) =>
+
+ // inform rm
+ rm ! decorateMessage(msg)
+
+ respondTo ! decorateMessage(StopClusterSuccessful.get())
+
+ // trigger shutdown
+ shutdown()
+
+ case None =>
+ // retry
+ context.system.scheduler.scheduleOnce(
+ 2 seconds, self, decorateMessage(msg)
+ )(context.dispatcher)
--- End diff --
You're right. There should be an upper bound for the number of retries. If
there was never a RM, then is behaves the same.
> ResourceManager runtime components
> ----------------------------------
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
> Issue Type: Sub-task
> Components: ResourceManager
> Affects Versions: 1.1.0
> Reporter: Maximilian Michels
> Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)