[FLINK-4351] [cluster management] JobManager handle TaskManager's registration
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a19cae3b Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a19cae3b Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a19cae3b Branch: refs/heads/flip-6 Commit: a19cae3b07963776c07c0aae7bee806004f59429 Parents: e91b82d Author: Kurt Young <ykt...@gmail.com> Authored: Sun Oct 16 23:00:57 2016 +0800 Committer: Stephan Ewen <se...@apache.org> Committed: Sun Oct 16 22:14:41 2016 +0200 ---------------------------------------------------------------------- .../flink/runtime/jobmaster/JobMaster.java | 75 +++++++++++++++++--- .../runtime/jobmaster/JobMasterGateway.java | 15 ++-- .../runtime/taskexecutor/JobLeaderService.java | 55 ++++++-------- .../runtime/taskexecutor/TaskExecutor.java | 2 +- .../taskexecutor/TaskManagerServices.java | 2 +- .../runtime/taskexecutor/TaskExecutorTest.java | 10 ++- 6 files changed, 102 insertions(+), 57 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/a19cae3b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index 05c20d3..8cb9946 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.jobmaster; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.io.InputSplit; import org.apache.flink.core.io.InputSplitAssigner; @@ -36,7 +37,9 @@ import org.apache.flink.runtime.client.SerializedJobExecutionResult; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.concurrent.ApplyFunction; +import org.apache.flink.runtime.concurrent.BiFunction; import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager; import org.apache.flink.runtime.executiongraph.Execution; @@ -81,7 +84,9 @@ import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.rpc.StartStoppable; import org.apache.flink.runtime.state.CheckpointStateHandles; import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; import org.apache.flink.runtime.taskmanager.TaskExecutionState; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.util.SerializedThrowable; import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.SerializedValue; @@ -89,8 +94,10 @@ import org.slf4j.Logger; import javax.annotation.Nullable; import java.io.IOException; +import java.util.HashMap; import java.util.Map; import java.util.UUID; +import java.util.concurrent.Callable; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; @@ -122,6 +129,8 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { /** Configuration of the JobManager */ private final Configuration configuration; + private final Time rpcTimeout; + /** Service to contend for and retrieve the leadership of JM and RM */ private final HighAvailabilityServices highAvailabilityServices; @@ -152,7 +161,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { private volatile UUID leaderSessionID; - // --------- resource manager -------- + // --------- ResourceManager -------- /** Leader retriever service used to locate ResourceManager's address */ private LeaderRetrievalService resourceManagerLeaderRetriever; @@ -160,6 +169,9 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { /** Connection with ResourceManager, null if not located address yet or we close it initiative */ private ResourceManagerConnection resourceManagerConnection; + // --------- TaskManagers -------- + + private final Map<ResourceID, Tuple2<TaskManagerLocation, TaskExecutorGateway>> registeredTaskManagers; // ------------------------------------------------------------------------ @@ -181,6 +193,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { this.jobGraph = checkNotNull(jobGraph); this.configuration = checkNotNull(configuration); + this.rpcTimeout = rpcAskTimeout; this.highAvailabilityServices = checkNotNull(highAvailabilityService); this.libraryCacheManager = checkNotNull(libraryCacheManager); this.executionContext = checkNotNull(executorService); @@ -243,6 +256,8 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { this.slotPool = new SlotPool(executorService); this.allocationTimeout = Time.of(5, TimeUnit.SECONDS); + + this.registeredTaskManagers = new HashMap<>(4); } //---------------------------------------------------------------------------------------------- @@ -379,8 +394,10 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { } closeResourceManagerConnection(); - // TODO: disconnect from all registered task managers - + for (ResourceID taskManagerId : registeredTaskManagers.keySet()) { + slotPool.releaseResource(taskManagerId); + } + registeredTaskManagers.clear(); } //---------------------------------------------------------------------------------------------- @@ -662,11 +679,53 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { } @RpcMethod - public RegistrationResponse registerTaskManager( - final String taskManagerAddress, - final ResourceID taskManagerProcessId, - final UUID leaderId) { - throw new UnsupportedOperationException("Has to be implemented."); + public Future<RegistrationResponse> registerTaskManager( + final TaskManagerLocation taskManagerLocation, + final UUID leaderId) throws Exception + { + if (!JobMaster.this.leaderSessionID.equals(leaderId)) { + log.warn("Discard registration from TaskExecutor {} at ({}) because the expected " + + "leader session ID {} did not equal the received leader session ID {}.", + taskManagerLocation.getResourceID(), taskManagerLocation.addressString(), + JobMaster.this.leaderSessionID, leaderId); + throw new Exception("Leader id not match, expected: " + JobMaster.this.leaderSessionID + + ", actual: " + leaderId); + } + + final ResourceID taskManagerId = taskManagerLocation.getResourceID(); + + if (registeredTaskManagers.containsKey(taskManagerId)) { + final RegistrationResponse response = new JMTMRegistrationSuccess( + taskManagerId, libraryCacheManager.getBlobServerPort()); + return FlinkCompletableFuture.completed(response); + } else { + return getRpcService().execute(new Callable<TaskExecutorGateway>() { + @Override + public TaskExecutorGateway call() throws Exception { + return getRpcService().connect(taskManagerLocation.addressString(), TaskExecutorGateway.class) + .get(rpcTimeout.getSize(), rpcTimeout.getUnit()); + } + }).handleAsync(new BiFunction<TaskExecutorGateway, Throwable, RegistrationResponse>() { + @Override + public RegistrationResponse apply(TaskExecutorGateway taskExecutorGateway, Throwable throwable) { + if (throwable != null) { + return new RegistrationResponse.Decline(throwable.getMessage()); + } + + if (!JobMaster.this.leaderSessionID.equals(leaderId)) { + log.warn("Discard registration from TaskExecutor {} at ({}) because the expected " + + "leader session ID {} did not equal the received leader session ID {}.", + taskManagerLocation.getResourceID(), taskManagerLocation.addressString(), + JobMaster.this.leaderSessionID, leaderId); + return new RegistrationResponse.Decline("Invalid leader session id"); + } + + slotPool.registerResource(taskManagerId); + registeredTaskManagers.put(taskManagerId, Tuple2.of(taskManagerLocation, taskExecutorGateway)); + return new JMTMRegistrationSuccess(taskManagerId, libraryCacheManager.getBlobServerPort()); + } + }, getMainThreadExecutor()); + } } //---------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/a19cae3b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java index 0f155a4..4c85839 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java @@ -41,6 +41,7 @@ import org.apache.flink.runtime.rpc.RpcTimeout; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KvState; import org.apache.flink.runtime.taskmanager.TaskExecutionState; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import java.util.UUID; @@ -195,15 +196,13 @@ public interface JobMasterGateway extends CheckpointCoordinatorGateway { /** * Register the task manager at the job manager. * - * @param taskManagerAddress address of the task manager - * @param taskManagerProcessId identifying the task manager - * @param leaderId identifying the job leader - * @param timeout for the rpc call + * @param taskManagerLocation location of the task manager + * @param leaderId identifying the job leader + * @param timeout for the rpc call * @return Future registration response indicating whether the registration was successful or not */ Future<RegistrationResponse> registerTaskManager( - final String taskManagerAddress, - final ResourceID taskManagerProcessId, - final UUID leaderId, - @RpcTimeout final Time timeout); + final TaskManagerLocation taskManagerLocation, + final UUID leaderId, + @RpcTimeout final Time timeout); } http://git-wip-us.apache.org/repos/asf/flink/blob/a19cae3b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java index e7f52e2..14d36ab 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java @@ -21,7 +21,6 @@ package org.apache.flink.runtime.taskexecutor; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess; @@ -32,6 +31,7 @@ import org.apache.flink.runtime.registration.RegisteredRpcConnection; import org.apache.flink.runtime.registration.RegistrationResponse; import org.apache.flink.runtime.registration.RetryingRegistration; import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,8 +53,8 @@ public class JobLeaderService { private static final Logger LOG = LoggerFactory.getLogger(JobLeaderService.class); - /** Process id of the owning process */ - private final ResourceID ownerProcessId; + /** Self's location, used for the job manager connection */ + private final TaskManagerLocation ownLocation; /** The leader retrieval service and listener for each registered job */ private final Map<JobID, Tuple2<LeaderRetrievalService, JobLeaderService.JobManagerLeaderListener>> jobLeaderServices; @@ -62,9 +62,6 @@ public class JobLeaderService { /** Internal state of the service */ private volatile JobLeaderService.State state; - /** Address of the owner of this service. This address is used for the job manager connection */ - private String ownerAddress; - /** Rpc service to use for establishing connections */ private RpcService rpcService; @@ -74,14 +71,13 @@ public class JobLeaderService { /** Job leader listener listening for job leader changes */ private JobLeaderListener jobLeaderListener; - public JobLeaderService(ResourceID ownerProcessId) { - this.ownerProcessId = Preconditions.checkNotNull(ownerProcessId); + public JobLeaderService(TaskManagerLocation location) { + this.ownLocation = Preconditions.checkNotNull(location); jobLeaderServices = new HashMap<>(4); state = JobLeaderService.State.CREATED; - ownerAddress = null; rpcService = null; highAvailabilityServices = null; jobLeaderListener = null; @@ -94,13 +90,11 @@ public class JobLeaderService { /** * Start the job leader service with the given services. * - * @param initialOwnerAddress to be used for establishing connections (source address) * @param initialRpcService to be used to create rpc connections * @param initialHighAvailabilityServices to create leader retrieval services for the different jobs * @param initialJobLeaderListener listening for job leader changes */ public void start( - final String initialOwnerAddress, final RpcService initialRpcService, final HighAvailabilityServices initialHighAvailabilityServices, final JobLeaderListener initialJobLeaderListener) { @@ -110,7 +104,6 @@ public class JobLeaderService { } else { LOG.info("Start job leader service."); - this.ownerAddress = Preconditions.checkNotNull(initialOwnerAddress); this.rpcService = Preconditions.checkNotNull(initialRpcService); this.highAvailabilityServices = Preconditions.checkNotNull(initialHighAvailabilityServices); this.jobLeaderListener = Preconditions.checkNotNull(initialJobLeaderListener); @@ -311,14 +304,13 @@ public class JobLeaderService { @Override protected RetryingRegistration<JobMasterGateway, JMTMRegistrationSuccess> generateRegistration() { return new JobLeaderService.JobManagerRetryingRegistration( - LOG, - rpcService, - "JobManager", - JobMasterGateway.class, - getTargetAddress(), - getTargetLeaderId(), - ownerAddress, - ownerProcessId); + LOG, + rpcService, + "JobManager", + JobMasterGateway.class, + getTargetAddress(), + getTargetLeaderId(), + ownLocation); } @Override @@ -349,10 +341,11 @@ public class JobLeaderService { /** * Retrying registration for the job manager <--> task manager connection. */ - private static final class JobManagerRetryingRegistration extends RetryingRegistration<JobMasterGateway, JMTMRegistrationSuccess> { + private static final class JobManagerRetryingRegistration + extends RetryingRegistration<JobMasterGateway, JMTMRegistrationSuccess> + { - private final String taskManagerAddress; - private final ResourceID taskManagerProcessId; + private final TaskManagerLocation taskManagerLocation; JobManagerRetryingRegistration( Logger log, @@ -361,22 +354,18 @@ public class JobLeaderService { Class<JobMasterGateway> targetType, String targetAddress, UUID leaderId, - String taskManagerAddress, - ResourceID taskManagerProcessId) { + TaskManagerLocation taskManagerLocation) { super(log, rpcService, targetName, targetType, targetAddress, leaderId); - this.taskManagerAddress = Preconditions.checkNotNull(taskManagerAddress); - this.taskManagerProcessId = Preconditions.checkNotNull(taskManagerProcessId); + this.taskManagerLocation = Preconditions.checkNotNull(taskManagerLocation); } @Override - protected Future<RegistrationResponse> invokeRegistration(JobMasterGateway gateway, UUID leaderId, long timeoutMillis) throws Exception { - return gateway.registerTaskManager( - taskManagerAddress, - taskManagerProcessId, - leaderId, - Time.milliseconds(timeoutMillis)); + protected Future<RegistrationResponse> invokeRegistration( + JobMasterGateway gateway, UUID leaderId, long timeoutMillis) throws Exception + { + return gateway.registerTaskManager(taskManagerLocation, leaderId, Time.milliseconds(timeoutMillis)); } } http://git-wip-us.apache.org/repos/asf/flink/blob/a19cae3b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java index 3e3a544..1201281 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java @@ -206,7 +206,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { taskSlotTable.start(new SlotActionsImpl()); // start the job leader service - jobLeaderService.start(getAddress(), getRpcService(), haServices, new JobLeaderListenerImpl()); + jobLeaderService.start(getRpcService(), haServices, new JobLeaderListenerImpl()); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/a19cae3b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java index 7575ba3..e8de1b1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java @@ -207,7 +207,7 @@ public class TaskManagerServices { final JobManagerTable jobManagerTable = new JobManagerTable(); - final JobLeaderService jobLeaderService = new JobLeaderService(resourceID); + final JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation); return new TaskManagerServices( taskManagerLocation, http://git-wip-us.apache.org/repos/asf/flink/blob/a19cae3b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java index 23c6833..2220f12 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java @@ -350,7 +350,7 @@ public class TaskExecutorTest extends TestLogger { final TimerService<AllocationID> timerService = mock(TimerService.class); final TaskSlotTable taskSlotTable = new TaskSlotTable(Arrays.asList(mock(ResourceProfile.class)), timerService); final JobManagerTable jobManagerTable = new JobManagerTable(); - final JobLeaderService jobLeaderService = new JobLeaderService(resourceId); + final JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation); final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler(); final TestingLeaderRetrievalService resourceManagerLeaderRetrievalService = new TestingLeaderRetrievalService(); @@ -379,8 +379,7 @@ public class TaskExecutorTest extends TestLogger { final JobMasterGateway jobMasterGateway = mock(JobMasterGateway.class); when(jobMasterGateway.registerTaskManager( - any(String.class), - eq(resourceId), + eq(taskManagerLocation), eq(jobManagerLeaderId), any(Time.class) )).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new JMTMRegistrationSuccess(jmResourceId, blobPort))); @@ -451,7 +450,7 @@ public class TaskExecutorTest extends TestLogger { final TimerService<AllocationID> timerService = mock(TimerService.class); final TaskSlotTable taskSlotTable = new TaskSlotTable(Arrays.asList(mock(ResourceProfile.class), mock(ResourceProfile.class)), timerService); final JobManagerTable jobManagerTable = new JobManagerTable(); - final JobLeaderService jobLeaderService = new JobLeaderService(resourceId); + final JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation); final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler(); final String resourceManagerAddress = "rm"; @@ -484,8 +483,7 @@ public class TaskExecutorTest extends TestLogger { final JobMasterGateway jobMasterGateway = mock(JobMasterGateway.class); when(jobMasterGateway.registerTaskManager( - any(String.class), - eq(resourceId), + eq(taskManagerLocation), eq(jobManagerLeaderId), any(Time.class) )).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new JMTMRegistrationSuccess(jmResourceId, blobPort)));