[
https://issues.apache.org/jira/browse/FLINK-4406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15532810#comment-15532810
]
ASF GitHub Bot commented on FLINK-4406:
---------------------------------------
Github user KurtYoung commented on a diff in the pull request:
https://github.com/apache/flink/pull/2565#discussion_r81139389
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
---
@@ -457,14 +505,94 @@ public Acknowledge
updateTaskExecutionState(TaskExecutionState taskExecutionStat
return Acknowledge.get();
}
- /**
- * Triggers the registration of the job master at the resource manager.
- *
- * @param address Address of the resource manager
- */
- @RpcMethod
- public void registerAtResourceManager(final String address) {
- //TODO:: register at the RM
+
//----------------------------------------------------------------------------------------------
+ // Internal methods
+ //
----------------------------------------------------------------------------------------------
+
+ private void handleFatalError(final Throwable cause) {
+ runAsync(new Runnable() {
+ @Override
+ public void run() {
+ log.error("Fatal error occurred on JobManager,
cause: {}", cause.getMessage(), cause);
+ shutDown();
+ jobCompletionActions.onFatalError(cause);
+ }
+ });
+ }
+
+ private void notifyOfNewResourceManagerLeader(
+ final String resourceManagerAddress, final UUID
resourceManagerLeaderId)
+ {
+ // IMPORTANT: executed by main thread to avoid concurrence
+ runAsync(new Runnable() {
+ @Override
+ public void run() {
+ if (resourceManagerConnection != null) {
+ if (resourceManagerAddress != null) {
+ if
(resourceManagerAddress.equals(resourceManagerConnection.getTargetAddress())
+ &&
resourceManagerLeaderId.equals(resourceManagerConnection.getTargetLeaderId()))
+ {
+ // both address and
leader id are not changed, we can keep the old connection
+ return;
+ }
+ log.info("ResourceManager
leader changed from {} to {}. Registering at new leader.",
+
resourceManagerConnection.getTargetAddress(), resourceManagerAddress);
+ }
+ else {
+ log.info("Current
ResourceManager {} lost leader status. Waiting for new ResourceManager leader.",
+
resourceManagerConnection.getTargetAddress());
+ }
+ }
+
+ closeResourceManagerConnection();
+
+ if (resourceManagerAddress != null) {
+ log.info("Attempting to register at
ResourceManager {}", resourceManagerAddress);
+ resourceManagerConnection = new
ResourceManagerConnection(
+ log, jobGraph.getJobID(),
leaderSessionID,
+ resourceManagerAddress,
resourceManagerLeaderId, executionContext);
+ resourceManagerConnection.start();
+ }
+ }
+ });
+ }
+
+ private void onResourceManagerRegistrationSuccess(final
JobMasterRegistrationSuccess success) {
+ // IMPORTANT: executed by main thread to avoid concurrence
+ runAsync(new Runnable() {
+ @Override
+ public void run() {
+ // only process if we haven't been connected in
the meantime
+ if (resourceManagerGateway == null) {
+ // double check the connection is still
effective
+ if (resourceManagerConnection != null) {
--- End diff --
This will not happen because all changes to the connection is executed in
main thread
> Implement job master registration at resource manager
> -----------------------------------------------------
>
> Key: FLINK-4406
> URL: https://issues.apache.org/jira/browse/FLINK-4406
> Project: Flink
> Issue Type: Sub-task
> Components: Cluster Management
> Reporter: Wenlong Lyu
> Assignee: Kurt Young
>
> Job Master needs to register to Resource Manager when starting and then
> watches leadership changes of RM, and trigger re-registration.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)