[
https://issues.apache.org/jira/browse/FLINK-4406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15532836#comment-15532836
]
ASF GitHub Bot commented on FLINK-4406:
---------------------------------------
Github user KurtYoung commented on a diff in the pull request:
https://github.com/apache/flink/pull/2565#discussion_r81141545
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
---
@@ -457,14 +505,94 @@ public Acknowledge
updateTaskExecutionState(TaskExecutionState taskExecutionStat
return Acknowledge.get();
}
- /**
- * Triggers the registration of the job master at the resource manager.
- *
- * @param address Address of the resource manager
- */
- @RpcMethod
- public void registerAtResourceManager(final String address) {
- //TODO:: register at the RM
+
//----------------------------------------------------------------------------------------------
+ // Internal methods
+ //
----------------------------------------------------------------------------------------------
+
+ private void handleFatalError(final Throwable cause) {
+ runAsync(new Runnable() {
+ @Override
+ public void run() {
+ log.error("Fatal error occurred on JobManager,
cause: {}", cause.getMessage(), cause);
+ shutDown();
+ jobCompletionActions.onFatalError(cause);
+ }
+ });
+ }
+
+ private void notifyOfNewResourceManagerLeader(
+ final String resourceManagerAddress, final UUID
resourceManagerLeaderId)
+ {
+ // IMPORTANT: executed by main thread to avoid concurrence
+ runAsync(new Runnable() {
+ @Override
+ public void run() {
+ if (resourceManagerConnection != null) {
+ if (resourceManagerAddress != null) {
+ if
(resourceManagerAddress.equals(resourceManagerConnection.getTargetAddress())
+ &&
resourceManagerLeaderId.equals(resourceManagerConnection.getTargetLeaderId()))
+ {
+ // both address and
leader id are not changed, we can keep the old connection
+ return;
+ }
+ log.info("ResourceManager
leader changed from {} to {}. Registering at new leader.",
+
resourceManagerConnection.getTargetAddress(), resourceManagerAddress);
+ }
+ else {
+ log.info("Current
ResourceManager {} lost leader status. Waiting for new ResourceManager leader.",
+
resourceManagerConnection.getTargetAddress());
+ }
+ }
+
+ closeResourceManagerConnection();
+
+ if (resourceManagerAddress != null) {
+ log.info("Attempting to register at
ResourceManager {}", resourceManagerAddress);
+ resourceManagerConnection = new
ResourceManagerConnection(
+ log, jobGraph.getJobID(),
leaderSessionID,
+ resourceManagerAddress,
resourceManagerLeaderId, executionContext);
+ resourceManagerConnection.start();
+ }
+ }
+ });
+ }
+
+ private void onResourceManagerRegistrationSuccess(final
JobMasterRegistrationSuccess success) {
+ // IMPORTANT: executed by main thread to avoid concurrence
+ runAsync(new Runnable() {
+ @Override
+ public void run() {
+ // only process if we haven't been connected in
the meantime
+ if (resourceManagerGateway == null) {
--- End diff --
Good catch, there indeed exists come conflicts, for the following events:
a. register at rm1
b. rm1 success notify
c. leader changes to rm2, register at rm2
d. rm2 success notify
(a) and (c) will happen in order, but (b) and (d) are not.
so when the order are: a c b d, we will result in keep a wrong gateway.
Seems we should attach resource manager's leader id in response so we can do
some verification.
> Implement job master registration at resource manager
> -----------------------------------------------------
>
> Key: FLINK-4406
> URL: https://issues.apache.org/jira/browse/FLINK-4406
> Project: Flink
> Issue Type: Sub-task
> Components: Cluster Management
> Reporter: Wenlong Lyu
> Assignee: Kurt Young
>
> Job Master needs to register to Resource Manager when starting and then
> watches leadership changes of RM, and trigger re-registration.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)