[FLINK-4406] [cluster management] Implement job master registration at resource manager
[FLINK-4406] [cluster management] Skip new connection if new resource manager's address and leader id are both not changing [FLINK-4406] [cluster management] Verify registration response with leader id This closes #2565. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ef9b4b45 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ef9b4b45 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ef9b4b45 Branch: refs/heads/flip-6 Commit: ef9b4b45ea96bb1b66726bd4d99f38bc9761032d Parents: 88deb11 Author: Kurt Young <[email protected]> Authored: Thu Sep 29 08:56:27 2016 +0800 Committer: Till Rohrmann <[email protected]> Committed: Thu Oct 20 19:46:56 2016 +0200 ---------------------------------------------------------------------- .../runtime/jobmaster/JobManagerRunner.java | 8 +- .../flink/runtime/jobmaster/JobMaster.java | 222 +++++++++++++++++-- .../runtime/jobmaster/JobMasterGateway.java | 17 +- .../jobmaster/JobMasterRegistrationSuccess.java | 18 +- .../JobMasterToResourceManagerConnection.java | 117 ---------- .../resourcemanager/ResourceManager.java | 2 +- .../jobmaster/JobManagerRunnerMockTest.java | 25 ++- 7 files changed, 239 insertions(+), 170 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/ef9b4b45/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java index bc2bf9a..6944d85 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java @@ -63,9 +63,6 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions { private final JobMaster jobManager; - /** Leader session id when granted leadership */ - private UUID leaderSessionID; - /** flag marking the runner as shut down */ private volatile boolean shutdown; @@ -93,7 +90,6 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions { this.executionContext = rpcService.getExecutor(); this.checkpointRecoveryFactory = haServices.getCheckpointRecoveryFactory(); this.leaderElectionService = haServices.getJobMasterLeaderElectionService(jobGraph.getJobID()); - this.leaderSessionID = null; this.jobManager = new JobMaster( jobGraph, configuration, rpcService, haServices, @@ -232,7 +228,6 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions { // The operation may be blocking, but since this runner is idle before it been granted the leadership, // it's okay that job manager wait for the operation complete leaderElectionService.confirmLeaderSessionID(leaderSessionID); - this.leaderSessionID = leaderSessionID; // Double check the leadership after we confirm that, there is a small chance that multiple // job managers schedule the same job after if they try to recover at the same time. @@ -242,7 +237,7 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions { log.info("Job {} ({}) already finished by others.", jobGraph.getName(), jobGraph.getJobID()); jobFinishedByOther(); } else { - jobManager.getSelf().startJob(); + jobManager.getSelf().startJob(leaderSessionID); } } } @@ -259,7 +254,6 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions { log.info("JobManager for job {} ({}) was revoked leadership at {}.", jobGraph.getName(), jobGraph.getJobID(), getAddress()); - leaderSessionID = null; jobManager.getSelf().suspendJob(new Exception("JobManager is no longer the leader.")); } } http://git-wip-us.apache.org/repos/asf/flink/blob/ef9b4b45/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index b52a23c..1e01c55 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.jobmaster; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.ConfigConstants; @@ -34,6 +35,7 @@ import org.apache.flink.runtime.checkpoint.stats.DisabledCheckpointStatsTracker; import org.apache.flink.runtime.checkpoint.stats.SimpleCheckpointStatsTracker; import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.runtime.client.JobSubmissionException; +import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager; import org.apache.flink.runtime.executiongraph.ExecutionGraph; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; @@ -47,18 +49,26 @@ import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator; import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings; import org.apache.flink.runtime.jobmanager.OnCompletionActions; import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; +import org.apache.flink.runtime.registration.RegisteredRpcConnection; +import org.apache.flink.runtime.registration.RegistrationResponse; +import org.apache.flink.runtime.registration.RetryingRegistration; import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; import org.apache.flink.runtime.rpc.RpcEndpoint; import org.apache.flink.runtime.rpc.RpcMethod; import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.taskmanager.TaskExecutionState; +import org.slf4j.Logger; + import scala.concurrent.ExecutionContext$; import scala.concurrent.duration.FiniteDuration; import java.util.ArrayList; import java.util.List; +import java.util.UUID; import java.util.concurrent.Executor; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -76,9 +86,6 @@ import static org.apache.flink.util.Preconditions.checkNotNull; */ public class JobMaster extends RpcEndpoint<JobMasterGateway> { - /** Gateway to connected resource manager, null iff not connected */ - private ResourceManagerGateway resourceManager = null; - /** Logical representation of the job */ private final JobGraph jobGraph; @@ -123,6 +130,18 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { private MetricGroup jobMetrics; + private volatile UUID leaderSessionID; + + // --------- resource manager -------- + + /** Leader retriever service used to locate ResourceManager's address */ + private LeaderRetrievalService resourceManagerLeaderRetriever; + + /** Connection with ResourceManager, null if not located address yet or we close it initiative */ + private volatile ResourceManagerConnection resourceManagerConnection; + + // ------------------------------------------------------------------------ + public JobMaster( JobGraph jobGraph, Configuration configuration, @@ -151,10 +170,6 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { this.jobCompletionActions = checkNotNull(jobCompletionActions); } - public ResourceManagerGateway getResourceManager() { - return resourceManager; - } - //---------------------------------------------------------------------------------------------- // Lifecycle management //---------------------------------------------------------------------------------------------- @@ -196,7 +211,8 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { .getRestartStrategy(); if (restartStrategyConfiguration != null) { restartStrategy = RestartStrategyFactory.createRestartStrategy(restartStrategyConfiguration); - } else { + } + else { restartStrategy = restartStrategyFactory.createRestartStrategy(); } @@ -216,6 +232,13 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { throw new JobSubmissionException(jobGraph.getJobID(), "Could not get the checkpoint recovery factory.", e); } + try { + resourceManagerLeaderRetriever = highAvailabilityServices.getResourceManagerLeaderRetriever(); + } catch (Exception e) { + log.error("Could not get the resource manager leader retriever.", e); + throw new JobSubmissionException(jobGraph.getJobID(), + "Could not get the resource manager leader retriever.", e); + } } catch (Throwable t) { log.error("Failed to initializing job {} ({})", jobGraph.getName(), jobGraph.getJobID(), t); @@ -223,7 +246,8 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { if (t instanceof JobSubmissionException) { throw (JobSubmissionException) t; - } else { + } + else { throw new JobSubmissionException(jobGraph.getJobID(), "Failed to initialize job " + jobGraph.getName() + " (" + jobGraph.getJobID() + ")", t); } @@ -240,8 +264,12 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { super.shutDown(); suspendJob(new Exception("JobManager is shutting down.")); + + disposeCommunicationWithResourceManager(); } + + //---------------------------------------------------------------------------------------------- // RPC methods //---------------------------------------------------------------------------------------------- @@ -251,8 +279,10 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { * being recovered. After this, we will begin to schedule the job. */ @RpcMethod - public void startJob() { - log.info("Starting job {} ({}).", jobGraph.getName(), jobGraph.getJobID()); + public void startJob(final UUID leaderSessionID) { + log.info("Starting job {} ({}) with leaderId {}.", jobGraph.getName(), jobGraph.getJobID(), leaderSessionID); + + this.leaderSessionID = leaderSessionID; if (executionGraph != null) { executionGraph = new ExecutionGraph( @@ -267,7 +297,8 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { jobGraph.getClasspaths(), userCodeLoader, jobMetrics); - } else { + } + else { // TODO: update last active time in JobInfo } @@ -343,7 +374,8 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { final CheckpointStatsTracker checkpointStatsTracker; if (isStatsDisabled) { checkpointStatsTracker = new DisabledCheckpointStatsTracker(); - } else { + } + else { int historySize = configuration.getInteger( ConfigConstants.JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE, ConfigConstants.DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE); @@ -397,6 +429,8 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { } */ + // job is good to go, try to locate resource manager's address + resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener()); } catch (Throwable t) { log.error("Failed to start job {} ({})", jobGraph.getName(), jobGraph.getJobID(), t); @@ -406,7 +440,8 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { final Throwable rt; if (t instanceof JobExecutionException) { rt = (JobExecutionException) t; - } else { + } + else { rt = new JobExecutionException(jobGraph.getJobID(), "Failed to start job " + jobGraph.getJobID() + " (" + jobGraph.getName() + ")", t); } @@ -439,10 +474,14 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { */ @RpcMethod public void suspendJob(final Throwable cause) { + leaderSessionID = null; + if (executionGraph != null) { executionGraph.suspend(cause); executionGraph = null; } + + disposeCommunicationWithResourceManager(); } /** @@ -457,14 +496,90 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { return Acknowledge.get(); } - /** - * Triggers the registration of the job master at the resource manager. - * - * @param address Address of the resource manager - */ - @RpcMethod - public void registerAtResourceManager(final String address) { - //TODO:: register at the RM + //----------------------------------------------------------------------------------------------⨠+ // Internal methods⨠+ // ----------------------------------------------------------------------------------------------â¨â¨ + + private void handleFatalError(final Throwable cause) { + runAsync(new Runnable() { + @Override + public void run() { + log.error("Fatal error occurred on JobManager, cause: {}", cause.getMessage(), cause); + shutDown(); + jobCompletionActions.onFatalError(cause); + } + }); + } + + private void notifyOfNewResourceManagerLeader( + final String resourceManagerAddress, final UUID resourceManagerLeaderId) + { + // IMPORTANT: executed by main thread to avoid concurrence + runAsync(new Runnable() { + @Override + public void run() { + if (resourceManagerConnection != null) { + if (resourceManagerAddress != null) { + if (resourceManagerAddress.equals(resourceManagerConnection.getTargetAddress()) + && resourceManagerLeaderId.equals(resourceManagerConnection.getTargetLeaderId())) + { + // both address and leader id are not changed, we can keep the old connection + return; + } + log.info("ResourceManager leader changed from {} to {}. Registering at new leader.", + resourceManagerConnection.getTargetAddress(), resourceManagerAddress); + } + else { + log.info("Current ResourceManager {} lost leader status. Waiting for new ResourceManager leader.", + resourceManagerConnection.getTargetAddress()); + } + } + + closeResourceManagerConnection(); + + if (resourceManagerAddress != null) { + log.info("Attempting to register at ResourceManager {}", resourceManagerAddress); + resourceManagerConnection = new ResourceManagerConnection( + log, jobGraph.getJobID(), leaderSessionID, + resourceManagerAddress, resourceManagerLeaderId, executionContext); + resourceManagerConnection.start(); + } + } + }); + } + + private void onResourceManagerRegistrationSuccess(final JobMasterRegistrationSuccess success) { + getRpcService().execute(new Runnable() { + @Override + public void run() { + // TODO - add tests for comment in https://github.com/apache/flink/pull/2565 + // verify the response with current connection + if (resourceManagerConnection != null + && resourceManagerConnection.getTargetLeaderId().equals(success.getResourceManagerLeaderId())) { + log.info("JobManager successfully registered at ResourceManager, leader id: {}.", + success.getResourceManagerLeaderId()); + } + } + }); + } + + private void disposeCommunicationWithResourceManager() { + // 1. stop the leader retriever so we will not receiving updates anymore + try { + resourceManagerLeaderRetriever.stop(); + } catch (Exception e) { + log.warn("Failed to stop resource manager leader retriever."); + } + + // 2. close current connection with ResourceManager if exists + closeResourceManagerConnection(); + } + + private void closeResourceManagerConnection() { + if (resourceManagerConnection != null) { + resourceManagerConnection.close(); + resourceManagerConnection = null; + } } //---------------------------------------------------------------------------------------------- @@ -494,4 +609,67 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { } return ret; } + + //---------------------------------------------------------------------------------------------- + // Utility classes + //---------------------------------------------------------------------------------------------- + + private class ResourceManagerLeaderListener implements LeaderRetrievalListener { + @Override + public void notifyLeaderAddress(final String leaderAddress, final UUID leaderSessionID) { + notifyOfNewResourceManagerLeader(leaderAddress, leaderSessionID); + } + + @Override + public void handleError(final Exception exception) { + handleFatalError(exception); + } + } + + private class ResourceManagerConnection + extends RegisteredRpcConnection<ResourceManagerGateway, JobMasterRegistrationSuccess> + { + private final JobID jobID; + + private final UUID jobManagerLeaderID; + + ResourceManagerConnection( + final Logger log, + final JobID jobID, + final UUID jobManagerLeaderID, + final String resourceManagerAddress, + final UUID resourceManagerLeaderID, + final Executor executor) + { + super(log, resourceManagerAddress, resourceManagerLeaderID, executor); + this.jobID = checkNotNull(jobID); + this.jobManagerLeaderID = checkNotNull(jobManagerLeaderID); + } + + @Override + protected RetryingRegistration<ResourceManagerGateway, JobMasterRegistrationSuccess> generateRegistration() { + return new RetryingRegistration<ResourceManagerGateway, JobMasterRegistrationSuccess>( + log, getRpcService(), "ResourceManager", ResourceManagerGateway.class, + getTargetAddress(), getTargetLeaderId()) + { + @Override + protected Future<RegistrationResponse> invokeRegistration(ResourceManagerGateway gateway, UUID leaderId, + long timeoutMillis) throws Exception + { + Time timeout = Time.milliseconds(timeoutMillis); + return gateway.registerJobMaster(leaderId, jobManagerLeaderID, getAddress(), jobID, timeout); + } + }; + } + + @Override + protected void onRegistrationSuccess(final JobMasterRegistrationSuccess success) { + onResourceManagerRegistrationSuccess(success); + } + + @Override + protected void onRegistrationFailure(final Throwable failure) { + handleFatalError(failure); + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/ef9b4b45/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java index b281ea8..6587ccb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java @@ -23,19 +23,21 @@ import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.rpc.RpcGateway; import org.apache.flink.runtime.taskmanager.TaskExecutionState; +import java.util.UUID; + /** * {@link JobMaster} rpc gateway interface */ public interface JobMasterGateway extends RpcGateway { /** - * Making this job begins to run. + * Starting the job under the given leader session ID. */ - void startJob(); + void startJob(final UUID leaderSessionID); /** - * Suspending job, all the running tasks will be cancelled, and runtime status will be cleared. Should re-submit - * the job before restarting it. + * Suspending job, all the running tasks will be cancelled, and runtime status will be cleared. + * Should re-submit the job before restarting it. * * @param cause The reason of why this job been suspended. */ @@ -48,11 +50,4 @@ public interface JobMasterGateway extends RpcGateway { * @return Future acknowledge of the task execution state update */ Future<Acknowledge> updateTaskExecutionState(TaskExecutionState taskExecutionState); - - /** - * Triggers the registration of the job master at the resource manager. - * - * @param address Address of the resource manager - */ - void registerAtResourceManager(final String address); } http://git-wip-us.apache.org/repos/asf/flink/blob/ef9b4b45/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterRegistrationSuccess.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterRegistrationSuccess.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterRegistrationSuccess.java index 031c38e..4058452 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterRegistrationSuccess.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterRegistrationSuccess.java @@ -20,6 +20,10 @@ package org.apache.flink.runtime.jobmaster; import org.apache.flink.runtime.registration.RegistrationResponse; +import java.util.UUID; + +import static org.apache.flink.util.Preconditions.checkNotNull; + /** * Base class for responses from the ResourceManager to a registration attempt by a JobMaster. */ @@ -29,8 +33,11 @@ public class JobMasterRegistrationSuccess extends RegistrationResponse.Success { private final long heartbeatInterval; - public JobMasterRegistrationSuccess(long heartbeatInterval) { + private final UUID resourceManagerLeaderId; + + public JobMasterRegistrationSuccess(final long heartbeatInterval, final UUID resourceManagerLeaderId) { this.heartbeatInterval = heartbeatInterval; + this.resourceManagerLeaderId = checkNotNull(resourceManagerLeaderId); } /** @@ -42,8 +49,15 @@ public class JobMasterRegistrationSuccess extends RegistrationResponse.Success { return heartbeatInterval; } + public UUID getResourceManagerLeaderId() { + return resourceManagerLeaderId; + } + @Override public String toString() { - return "JobMasterRegistrationSuccess(" + heartbeatInterval + ')'; + return "JobMasterRegistrationSuccess{" + + "heartbeatInterval=" + heartbeatInterval + + ", resourceManagerLeaderId=" + resourceManagerLeaderId + + '}'; } } http://git-wip-us.apache.org/repos/asf/flink/blob/ef9b4b45/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterToResourceManagerConnection.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterToResourceManagerConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterToResourceManagerConnection.java deleted file mode 100644 index 71fce8c..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterToResourceManagerConnection.java +++ /dev/null @@ -1,117 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.jobmaster; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.time.Time; -import org.apache.flink.runtime.registration.RegisteredRpcConnection; -import org.apache.flink.runtime.registration.RegistrationResponse; -import org.apache.flink.runtime.registration.RetryingRegistration; -import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; -import org.apache.flink.runtime.rpc.RpcService; -import org.apache.flink.runtime.concurrent.Future; - -import org.slf4j.Logger; - -import java.util.UUID; -import java.util.concurrent.Executor; - -import static org.apache.flink.util.Preconditions.checkNotNull; - -/** - * The connection between a JobMaster and the ResourceManager. - */ -public class JobMasterToResourceManagerConnection - extends RegisteredRpcConnection<ResourceManagerGateway, JobMasterRegistrationSuccess> { - - /** the JobMaster whose connection to the ResourceManager this represents */ - private final JobMaster jobMaster; - - private final JobID jobID; - - private final UUID jobMasterLeaderId; - - public JobMasterToResourceManagerConnection( - Logger log, - JobID jobID, - JobMaster jobMaster, - UUID jobMasterLeaderId, - String resourceManagerAddress, - UUID resourceManagerLeaderId, - Executor executor) { - - super(log, resourceManagerAddress, resourceManagerLeaderId, executor); - this.jobMaster = checkNotNull(jobMaster); - this.jobID = checkNotNull(jobID); - this.jobMasterLeaderId = checkNotNull(jobMasterLeaderId); - } - - @Override - protected RetryingRegistration<ResourceManagerGateway, JobMasterRegistrationSuccess> generateRegistration() { - return new JobMasterToResourceManagerConnection.ResourceManagerRegistration( - log, jobMaster.getRpcService(), - getTargetAddress(), getTargetLeaderId(), - jobMaster.getAddress(),jobID, jobMasterLeaderId); - } - - @Override - protected void onRegistrationSuccess(JobMasterRegistrationSuccess success) { - } - - @Override - protected void onRegistrationFailure(Throwable failure) { - } - - // ------------------------------------------------------------------------ - // Utilities - // ------------------------------------------------------------------------ - - private static class ResourceManagerRegistration - extends RetryingRegistration<ResourceManagerGateway, JobMasterRegistrationSuccess> { - - private final String jobMasterAddress; - - private final JobID jobID; - - private final UUID jobMasterLeaderId; - - ResourceManagerRegistration( - Logger log, - RpcService rpcService, - String targetAddress, - UUID leaderId, - String jobMasterAddress, - JobID jobID, - UUID jobMasterLeaderId) { - - super(log, rpcService, "ResourceManager", ResourceManagerGateway.class, targetAddress, leaderId); - this.jobMasterAddress = checkNotNull(jobMasterAddress); - this.jobID = checkNotNull(jobID); - this.jobMasterLeaderId = checkNotNull(jobMasterLeaderId); - } - - @Override - protected Future<RegistrationResponse> invokeRegistration( - ResourceManagerGateway gateway, UUID leaderId, long timeoutMillis) throws Exception { - - Time timeout = Time.milliseconds(timeoutMillis); - return gateway.registerJobMaster(leaderId, jobMasterLeaderId,jobMasterAddress, jobID, timeout); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/ef9b4b45/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java index 190a4de..f695de4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java @@ -215,7 +215,7 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable> if (existingGateway != null) { log.info("Replacing gateway for registered JobID {}.", jobID); } - return new JobMasterRegistrationSuccess(5000); + return new JobMasterRegistrationSuccess(5000, resourceManagerLeaderId); } } }, getMainThreadExecutor()); http://git-wip-us.apache.org/repos/asf/flink/blob/ef9b4b45/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java index dc3b5fd..bfe5f55 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java @@ -142,8 +142,9 @@ public class JobManagerRunnerMockTest { public void testJobFinished() throws Exception { runner.start(); - runner.grantLeadership(UUID.randomUUID()); - verify(jobManagerGateway).startJob(); + UUID leaderSessionID = UUID.randomUUID(); + runner.grantLeadership(leaderSessionID); + verify(jobManagerGateway).startJob(leaderSessionID); assertTrue(!jobCompletion.isJobFinished()); // runner been told by JobManager that job is finished @@ -160,8 +161,9 @@ public class JobManagerRunnerMockTest { public void testJobFailed() throws Exception { runner.start(); - runner.grantLeadership(UUID.randomUUID()); - verify(jobManagerGateway).startJob(); + UUID leaderSessionID = UUID.randomUUID(); + runner.grantLeadership(leaderSessionID); + verify(jobManagerGateway).startJob(leaderSessionID); assertTrue(!jobCompletion.isJobFinished()); // runner been told by JobManager that job is failed @@ -177,8 +179,9 @@ public class JobManagerRunnerMockTest { public void testLeadershipRevoked() throws Exception { runner.start(); - runner.grantLeadership(UUID.randomUUID()); - verify(jobManagerGateway).startJob(); + UUID leaderSessionID = UUID.randomUUID(); + runner.grantLeadership(leaderSessionID); + verify(jobManagerGateway).startJob(leaderSessionID); assertTrue(!jobCompletion.isJobFinished()); runner.revokeLeadership(); @@ -190,16 +193,18 @@ public class JobManagerRunnerMockTest { public void testRegainLeadership() throws Exception { runner.start(); - runner.grantLeadership(UUID.randomUUID()); - verify(jobManagerGateway).startJob(); + UUID leaderSessionID = UUID.randomUUID(); + runner.grantLeadership(leaderSessionID); + verify(jobManagerGateway).startJob(leaderSessionID); assertTrue(!jobCompletion.isJobFinished()); runner.revokeLeadership(); verify(jobManagerGateway).suspendJob(any(Throwable.class)); assertFalse(runner.isShutdown()); - runner.grantLeadership(UUID.randomUUID()); - verify(jobManagerGateway, times(2)).startJob(); + UUID leaderSessionID2 = UUID.randomUUID(); + runner.grantLeadership(leaderSessionID2); + verify(jobManagerGateway, times(2)).startJob(leaderSessionID2); } private static class TestingOnCompletionActions implements OnCompletionActions {
