[
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15206091#comment-15206091
]
ASF GitHub Bot commented on FLINK-3544:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/1741#discussion_r56960595
--- Diff:
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
---
@@ -312,59 +323,125 @@ class JobManager(
leaderSessionID = None
- case RegisterTaskManager(
- connectionInfo,
- hardwareInformation,
- numberOfSlots) =>
+ case msg: RegisterResourceManager =>
+ log.debug(s"Resource manager registration: $msg")
+
+ // ditch current resource manager (if any)
+ currentResourceManager = Option(msg.resourceManager())
+
+ val taskManagerResources =
instanceManager.getAllRegisteredInstances.asScala.map(
+ instance => instance.getResourceId).toList.asJava
+
+ // confirm registration and send known task managers with their
resource ids
+ sender ! decorateMessage(new RegisterResourceManagerSuccessful(self,
taskManagerResources))
+
+ case msg: DisconnectResourceManager =>
+ log.debug(s"Resource manager disconnect: $msg")
+
+ currentResourceManager match {
+ case Some(rm) if rm.equals(msg.resourceManager()) =>
+ // we should ditch the current resource manager
+ log.debug(s"Disconnecting resource manager $rm.")
+ // send the old one a disconnect message
+ rm ! decorateMessage(new TriggerRegistrationAtJobManager(self))
+ currentResourceManager = None
+ case None =>
+ // not connected, thus ignoring this message
+ log.warn(s"No resource manager ${msg.resourceManager()}
connected. Can't disconnect.")
+ }
+
+ case msg @ RegisterTaskManager(
+ resourceId,
+ connectionInfo,
+ hardwareInformation,
+ numberOfSlots) =>
+ // we are being informed by the ResourceManager that a new task
manager is available
+ log.debug(s"RegisterTaskManager: $msg")
val taskManager = sender()
+ currentResourceManager match {
+ case Some(rm) =>
+ val future = (rm ? decorateMessage(new
RegisterResource(taskManager, msg)))(timeout)
+ future.onComplete {
+ case scala.util.Success(response) =>
+ // the resource manager is available and answered
+ self ! response
+ case scala.util.Failure(t) =>
+ // slow or unreachable resource manager, register anyway and
let the rm reconnect
+ self ! decorateMessage(new
RegisterResourceSuccessful(taskManager, msg))
+ self ! decorateMessage(new DisconnectResourceManager(rm))
+ }(context.dispatcher)
+
+ case None =>
+ log.info("Task Manager Registration but not connected to
ResourceManager")
+ // ResourceManager not yet available
+ // sending task manager information later upon ResourceManager
registration
+ self ! decorateMessage(new
RegisterResourceSuccessful(taskManager, msg))
+ }
+
+ case msg: RegisterResourceSuccessful =>
+
+ val originalMsg = msg.getRegistrationMessage
+ val taskManager = msg.getTaskManager
+
+ // ResourceManager knows about the resource, now let's try to
register TaskManager
if (instanceManager.isRegistered(taskManager)) {
val instanceID =
instanceManager.getRegisteredInstance(taskManager).getId
- // IMPORTANT: Send the response to the "sender", which is not the
- // TaskManager actor, but the ask future!
- sender() ! decorateMessage(
+ taskManager ! decorateMessage(
AlreadyRegistered(
instanceID,
- libraryCacheManager.getBlobServerPort)
- )
- }
- else {
+ libraryCacheManager.getBlobServerPort))
+ } else {
try {
val instanceID = instanceManager.registerTaskManager(
taskManager,
- connectionInfo,
- hardwareInformation,
- numberOfSlots,
+ originalMsg.resourceId,
+ originalMsg.connectionInfo,
+ originalMsg.resources,
+ originalMsg.numberOfSlots,
leaderSessionID.orNull)
- // IMPORTANT: Send the response to the "sender", which is not the
- // TaskManager actor, but the ask future!
- sender() ! decorateMessage(
- AcknowledgeRegistration(
- instanceID,
- libraryCacheManager.getBlobServerPort)
- )
+ taskManager ! decorateMessage(
+ AcknowledgeRegistration(instanceID,
libraryCacheManager.getBlobServerPort))
// to be notified when the taskManager is no longer reachable
context.watch(taskManager)
- }
- catch {
+ } catch {
// registerTaskManager throws an IllegalStateException if it is
already shut down
// let the actor crash and restart itself in this case
case e: Exception =>
log.error("Failed to register TaskManager at instance
manager", e)
- // IMPORTANT: Send the response to the "sender", which is not
the
- // TaskManager actor, but the ask future!
- sender() ! decorateMessage(
+ taskManager ! decorateMessage(
RefuseRegistration(
- ExceptionUtils.stringifyException(e))
- )
+ ExceptionUtils.stringifyException(e)))
}
}
+ case msg: RegisterResourceFailed =>
+
+ val taskManager = msg.getTaskManager
+ val resourceId = msg.getResourceID
+ log.warn(s"TaskManager's resource id $resourceId is not registered
with ResourceManager. " +
+ s"Refusing registration.")
+
+ taskManager ! decorateMessage(
+ RefuseRegistration(
+ ExceptionUtils.stringifyException(new IllegalStateException(
--- End diff --
What do you mean with different class loaders? I think it should be
perfectly fine to have a
```
case class RefuseRegistration(t: Throwable)
```
because all `Throwables` inserted here should be known to the system class
loader.
> ResourceManager runtime components
> ----------------------------------
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
> Issue Type: Sub-task
> Components: ResourceManager
> Affects Versions: 1.1.0
> Reporter: Maximilian Michels
> Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)