[
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15206850#comment-15206850
]
ASF GitHub Bot commented on FLINK-3544:
---------------------------------------
Github user mxm commented on a diff in the pull request:
https://github.com/apache/flink/pull/1741#discussion_r57031901
--- Diff:
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
---
@@ -312,59 +323,121 @@ class JobManager(
leaderSessionID = None
- case RegisterTaskManager(
- connectionInfo,
- hardwareInformation,
- numberOfSlots) =>
+ case msg: RegisterResourceManager =>
+ log.debug(s"Resource manager registration: $msg")
+
+ // ditch current resource manager (if any)
+ currentResourceManager = Option(msg.resourceManager())
+
+ val taskManagerResources =
instanceManager.getAllRegisteredInstances.asScala.map(
+ instance => instance.getResourceId).toList.asJava
+
+ // confirm registration and send known task managers with their
resource ids
+ sender ! decorateMessage(new RegisterResourceManagerSuccessful(self,
taskManagerResources))
+
+ case msg: DisconnectResourceManager =>
+ log.debug(s"Resource manager disconnect: $msg")
+
+ currentResourceManager match {
+ case Some(rm) if rm.equals(msg.resourceManager()) =>
+ // we should ditch the current resource manager
+ log.debug(s"Disconnecting resource manager $rm.")
+ // send the old one a disconnect message
+ rm ! decorateMessage(new TriggerRegistrationAtJobManager(self))
+ currentResourceManager = None
+ case None =>
+ // not connected, thus ignoring this message
+ log.warn(s"No resource manager ${msg.resourceManager()}
connected. Can't disconnect.")
+ }
+
+ case msg @ RegisterTaskManager(
+ resourceId,
+ connectionInfo,
+ hardwareInformation,
+ numberOfSlots) =>
+ // we are being informed by the ResourceManager that a new task
manager is available
+ log.debug(s"RegisterTaskManager: $msg")
val taskManager = sender()
+ currentResourceManager match {
+ case Some(rm) =>
+ val future = (rm ? decorateMessage(new
RegisterResource(taskManager, msg)))(timeout)
+ future.onComplete {
+ case scala.util.Success(response) =>
+ // the resource manager is available and answered
+ self ! response
+ case scala.util.Failure(t) =>
--- End diff --
Done (doesn't change the diff).
> ResourceManager runtime components
> ----------------------------------
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
> Issue Type: Sub-task
> Components: ResourceManager
> Affects Versions: 1.1.0
> Reporter: Maximilian Michels
> Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)