[FLINK-4354] [heartbeat] Implement heartbeat logic between TaskManager and ResourceManager
This closes #3591. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fd90672f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fd90672f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fd90672f Branch: refs/heads/table-retraction Commit: fd90672f9ccf7a0e02e5eb9c6251dc3d451ce8ba Parents: d20fb09 Author: Zhijiang <[email protected]> Authored: Wed Mar 22 15:12:33 2017 +0800 Committer: Till Rohrmann <[email protected]> Committed: Thu Mar 23 13:58:44 2017 +0100 ---------------------------------------------------------------------- .../heartbeat/TestingHeartbeatServices.java | 52 ++++++++ .../flink/runtime/jobmaster/JobMaster.java | 4 +- .../flink/runtime/minicluster/MiniCluster.java | 5 +- .../RegistrationConnectionListener.java | 40 ++++++ .../resourcemanager/ResourceManager.java | 117 ++++++++++++++---- .../resourcemanager/ResourceManagerGateway.java | 16 +++ .../resourcemanager/ResourceManagerRunner.java | 9 +- .../StandaloneResourceManager.java | 5 + .../runtime/taskexecutor/TaskExecutor.java | 106 ++++++++++++++-- .../taskexecutor/TaskExecutorGateway.java | 14 +++ .../TaskExecutorRegistrationSuccess.java | 16 ++- ...TaskExecutorToResourceManagerConnection.java | 21 +++- .../clusterframework/ResourceManagerTest.java | 108 +++++++++++++++++ .../flink/runtime/jobmaster/JobMasterTest.java | 38 +----- .../resourcemanager/ResourceManagerHATest.java | 7 ++ .../ResourceManagerJobMasterTest.java | 7 ++ .../ResourceManagerTaskExecutorTest.java | 7 ++ .../slotmanager/SlotProtocolTest.java | 17 +++ .../taskexecutor/TaskExecutorITCase.java | 3 + .../runtime/taskexecutor/TaskExecutorTest.java | 121 +++++++++++++++++-- .../yarn/YarnFlinkApplicationMasterRunner.java | 21 ++-- .../apache/flink/yarn/YarnResourceManager.java | 8 +- 22 files changed, 640 insertions(+), 102 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/fd90672f/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatServices.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatServices.java new file mode 100644 index 0000000..e628db5 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatServices.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.heartbeat; + +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.concurrent.ScheduledExecutor; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; + +public class TestingHeartbeatServices extends HeartbeatServices { + + private final ScheduledExecutor scheduledExecutorToUse; + + public TestingHeartbeatServices(long heartbeatInterval, long heartbeatTimeout, ScheduledExecutor scheduledExecutorToUse) { + super(heartbeatInterval, heartbeatTimeout); + + this.scheduledExecutorToUse = Preconditions.checkNotNull(scheduledExecutorToUse); + } + + @Override + public <I, O> HeartbeatManager<I, O> createHeartbeatManagerSender( + ResourceID resourceId, + HeartbeatListener<I, O> heartbeatListener, + ScheduledExecutor scheduledExecutor, + Logger log) { + + return new HeartbeatManagerSenderImpl<>( + heartbeatInterval, + heartbeatTimeout, + resourceId, + heartbeatListener, + org.apache.flink.runtime.concurrent.Executors.directExecutor(), + scheduledExecutorToUse, + log); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/fd90672f/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index 243b57f..81fc541 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -1043,11 +1043,11 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { @Override public void notifyHeartbeatTimeout(ResourceID resourceID) { - log.info("Task manager with id {} timed out.", resourceID); + log.info("Task manager with id {} heartbeat timed out.", resourceID); getSelf().disconnectTaskManager( resourceID, - new TimeoutException("The heartbeat of TaskManager with id " + resourceID + " timed out.")); + new TimeoutException("Task manager with id " + resourceID + " heartbeat timed out.")); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/fd90672f/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java index 25c4aba..2cfba7b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java @@ -241,7 +241,7 @@ public class MiniCluster { // bring up the ResourceManager(s) LOG.info("Starting {} ResourceManger(s)", numResourceManagers); resourceManagerRunners = startResourceManagers( - configuration, haServices, metricRegistry, numResourceManagers, resourceManagerRpcServices); + configuration, haServices, heartbeatServices, metricRegistry, numResourceManagers, resourceManagerRpcServices); // bring up the TaskManager(s) for the mini cluster LOG.info("Starting {} TaskManger(s)", numTaskManagers); @@ -508,6 +508,7 @@ public class MiniCluster { protected ResourceManagerRunner[] startResourceManagers( Configuration configuration, HighAvailabilityServices haServices, + HeartbeatServices heartbeatServices, MetricRegistry metricRegistry, int numResourceManagers, RpcService[] resourceManagerRpcServices) throws Exception { @@ -517,9 +518,11 @@ public class MiniCluster { for (int i = 0; i < numResourceManagers; i++) { resourceManagerRunners[i] = new ResourceManagerRunner( + ResourceID.generate(), configuration, resourceManagerRpcServices[i], haServices, + heartbeatServices, metricRegistry); resourceManagerRunners[i].start(); http://git-wip-us.apache.org/repos/asf/flink/blob/fd90672f/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegistrationConnectionListener.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegistrationConnectionListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegistrationConnectionListener.java new file mode 100644 index 0000000..360f982 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegistrationConnectionListener.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.registration; + +/** + * Classes which want to be notified about the registration result by the {@link RegisteredRpcConnection} + * have to implement this interface. + */ +public interface RegistrationConnectionListener<Success extends RegistrationResponse.Success> { + + /** + * This method is called by the {@link RegisteredRpcConnection} when the registration is success. + * + * @param success The concrete response information for successful registration. + */ + void onRegistrationSuccess(Success success); + + /** + * This method is called by the {@link RegisteredRpcConnection} when the registration fails. + * + * @param failure The exception which causes the registration failure. + */ + void onRegistrationFailure(Throwable failure); +} http://git-wip-us.apache.org/repos/asf/flink/blob/fd90672f/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java index 1430a49..9a7a790 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java @@ -30,6 +30,10 @@ import org.apache.flink.runtime.concurrent.ApplyFunction; import org.apache.flink.runtime.concurrent.BiFunction; import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; +import org.apache.flink.runtime.heartbeat.HeartbeatListener; +import org.apache.flink.runtime.heartbeat.HeartbeatManager; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; +import org.apache.flink.runtime.heartbeat.HeartbeatTarget; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.highavailability.LeaderIdMismatchException; import org.apache.flink.runtime.instance.InstanceID; @@ -64,6 +68,7 @@ import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executor; +import java.util.concurrent.TimeoutException; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -81,6 +86,9 @@ public abstract class ResourceManager<WorkerType extends Serializable> extends RpcEndpoint<ResourceManagerGateway> implements LeaderContender { + /** Unique id of the resource manager */ + private final ResourceID resourceId; + /** Configuration of the resource manager */ private final ResourceManagerConfiguration resourceManagerConfiguration; @@ -96,6 +104,9 @@ public abstract class ResourceManager<WorkerType extends Serializable> /** High availability services for leader retrieval and election. */ private final HighAvailabilityServices highAvailabilityServices; + /** The heartbeat manager with task managers. */ + private final HeartbeatManager<Void, Void> taskManagerHeartbeatManager; + /** The factory to construct the SlotManager. */ private final SlotManagerFactory slotManagerFactory; @@ -118,9 +129,11 @@ public abstract class ResourceManager<WorkerType extends Serializable> private ConcurrentMap<String, InfoMessageListenerRpcGateway> infoMessageListeners; public ResourceManager( + ResourceID resourceId, RpcService rpcService, ResourceManagerConfiguration resourceManagerConfiguration, HighAvailabilityServices highAvailabilityServices, + HeartbeatServices heartbeatServices, SlotManagerFactory slotManagerFactory, MetricRegistry metricRegistry, JobLeaderIdService jobLeaderIdService, @@ -128,6 +141,7 @@ public abstract class ResourceManager<WorkerType extends Serializable> super(rpcService); + this.resourceId = checkNotNull(resourceId); this.resourceManagerConfiguration = checkNotNull(resourceManagerConfiguration); this.highAvailabilityServices = checkNotNull(highAvailabilityServices); this.slotManagerFactory = checkNotNull(slotManagerFactory); @@ -135,6 +149,12 @@ public abstract class ResourceManager<WorkerType extends Serializable> this.jobLeaderIdService = checkNotNull(jobLeaderIdService); this.fatalErrorHandler = checkNotNull(fatalErrorHandler); + this.taskManagerHeartbeatManager = heartbeatServices.createHeartbeatManagerSender( + resourceId, + new TaskManagerHeartbeatListener(), + rpcService.getScheduledExecutor(), + log); + this.jobManagerRegistrations = new HashMap<>(4); this.taskExecutors = new HashMap<>(8); this.leaderSessionId = null; @@ -178,6 +198,8 @@ public abstract class ResourceManager<WorkerType extends Serializable> public void shutDown() throws Exception { Exception exception = null; + taskManagerHeartbeatManager.stop(); + try { super.shutDown(); } catch (Exception e) { @@ -326,7 +348,7 @@ public abstract class ResourceManager<WorkerType extends Serializable> * * @param resourceManagerLeaderId The fencing token for the ResourceManager leader * @param taskExecutorAddress The address of the TaskExecutor that registers - * @param resourceID The resource ID of the TaskExecutor that registers + * @param taskExecutorResourceId The resource ID of the TaskExecutor that registers * * @return The response by the ResourceManager. */ @@ -334,7 +356,7 @@ public abstract class ResourceManager<WorkerType extends Serializable> public Future<RegistrationResponse> registerTaskExecutor( final UUID resourceManagerLeaderId, final String taskExecutorAddress, - final ResourceID resourceID, + final ResourceID taskExecutorResourceId, final SlotReport slotReport) { if (leaderSessionId.equals(resourceManagerLeaderId)) { @@ -342,25 +364,37 @@ public abstract class ResourceManager<WorkerType extends Serializable> return taskExecutorGatewayFuture.handleAsync(new BiFunction<TaskExecutorGateway, Throwable, RegistrationResponse>() { @Override - public RegistrationResponse apply(TaskExecutorGateway taskExecutorGateway, Throwable throwable) { + public RegistrationResponse apply(final TaskExecutorGateway taskExecutorGateway, Throwable throwable) { if (throwable != null) { return new RegistrationResponse.Decline(throwable.getMessage()); } else { - WorkerRegistration<WorkerType> oldRegistration = taskExecutors.remove(resourceID); + WorkerRegistration<WorkerType> oldRegistration = taskExecutors.remove(taskExecutorResourceId); if (oldRegistration != null) { // TODO :: suggest old taskExecutor to stop itself - log.info("Replacing old instance of worker for ResourceID {}", resourceID); + log.info("Replacing old instance of worker for ResourceID {}", taskExecutorResourceId); } - WorkerType newWorker = workerStarted(resourceID); + WorkerType newWorker = workerStarted(taskExecutorResourceId); WorkerRegistration<WorkerType> registration = new WorkerRegistration<>(taskExecutorGateway, newWorker); - taskExecutors.put(resourceID, registration); - slotManager.registerTaskExecutor(resourceID, registration, slotReport); + taskExecutors.put(taskExecutorResourceId, registration); + slotManager.registerTaskExecutor(taskExecutorResourceId, registration, slotReport); + + taskManagerHeartbeatManager.monitorTarget(taskExecutorResourceId, new HeartbeatTarget<Void>() { + @Override + public void receiveHeartbeat(ResourceID resourceID, Void payload) { + // the task manager will not request heartbeat, so this method will never be called currently + } + + @Override + public void requestHeartbeat(ResourceID resourceID, Void payload) { + taskExecutorGateway.heartbeatFromResourceManager(resourceID); + } + }); return new TaskExecutorRegistrationSuccess( - registration.getInstanceID(), + registration.getInstanceID(), resourceId, resourceManagerConfiguration.getHeartbeatInterval().toMilliseconds()); } } @@ -368,7 +402,7 @@ public abstract class ResourceManager<WorkerType extends Serializable> } else { log.warn("Discard registration from TaskExecutor {} at ({}) because the expected leader session ID {} did " + "not equal the received leader session ID {}", - resourceID, taskExecutorAddress, leaderSessionId, resourceManagerLeaderId); + taskExecutorResourceId, taskExecutorAddress, leaderSessionId, resourceManagerLeaderId); return FlinkCompletableFuture.<RegistrationResponse>completed( new RegistrationResponse.Decline("Discard registration because the leader id " + @@ -377,6 +411,16 @@ public abstract class ResourceManager<WorkerType extends Serializable> } } + @RpcMethod + public void heartbeatFromTaskManager(final ResourceID resourceID) { + taskManagerHeartbeatManager.receiveHeartbeat(resourceID, null); + } + + @RpcMethod + public void disconnectTaskManager(final ResourceID resourceId, final Exception cause) { + closeTaskManagerConnection(resourceId, cause); + } + /** * Requests a slot from the resource manager. * @@ -716,24 +760,24 @@ public abstract class ResourceManager<WorkerType extends Serializable> * This method should be called by the framework once it detects that a currently registered * task executor has failed. * - * @param resourceID Id of the worker that has failed. - * @param message An informational message that explains why the worker failed. + * @param resourceID Id of the TaskManager that has failed. + * @param cause The exception which cause the TaskManager failed. */ - public void notifyWorkerFailed(final ResourceID resourceID, final String message) { - runAsync(new Runnable() { - @Override - public void run() { - WorkerRegistration<WorkerType> workerRegistration = taskExecutors.remove(resourceID); + public void closeTaskManagerConnection(final ResourceID resourceID, final Exception cause) { + taskManagerHeartbeatManager.unmonitorTarget(resourceID); - if (workerRegistration != null) { - log.info("Task manager {} failed because {}.", resourceID, message); - // TODO :: suggest failed task executor to stop itself - slotManager.notifyTaskManagerFailure(resourceID); - } else { - log.debug("Could not find a registered task manager with the process id {}.", resourceID); - } - } - }); + WorkerRegistration<WorkerType> workerRegistration = taskExecutors.remove(resourceID); + + if (workerRegistration != null) { + log.info("Task manager {} failed because {}.", resourceID, cause); + + // TODO :: suggest failed task executor to stop itself + slotManager.notifyTaskManagerFailure(resourceID); + + workerRegistration.getTaskExecutorGateway().disconnectResourceManager(cause); + } else { + log.debug("Could not find a registered task manager with the process id {}.", resourceID); + } } // ------------------------------------------------------------------------ @@ -827,5 +871,26 @@ public abstract class ResourceManager<WorkerType extends Serializable> onFatalErrorAsync(error); } } + + private class TaskManagerHeartbeatListener implements HeartbeatListener<Void, Void> { + + @Override + public void notifyHeartbeatTimeout(ResourceID resourceID) { + log.info("The heartbeat of TaskManager with id {} timed out.", resourceID); + + closeTaskManagerConnection(resourceID, new TimeoutException( + "Task manager with id " + resourceID + " heartbeat timed out.")); + } + + @Override + public void reportPayload(ResourceID resourceID, Void payload) { + // nothing to do since there is no payload + } + + @Override + public Future<Void> retrievePayload() { + return FlinkCompletableFuture.completed(null); + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/fd90672f/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java index 8235ea7..7741e0d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java @@ -130,4 +130,20 @@ public interface ResourceManagerGateway extends RpcGateway { * @return The future to the number of registered TaskManagers. */ Future<Integer> getNumberOfRegisteredTaskManagers(UUID leaderSessionId); + + /** + * Sends the heartbeat to resource manager from task manager + * + * @param resourceID unique id of the task manager + */ + void heartbeatFromTaskManager(final ResourceID resourceID); + + /** + * Disconnects the given {@link org.apache.flink.runtime.taskexecutor.TaskExecutor} from the + * {@link ResourceManager}. + * + * @param resourceID identifying the TaskManager to disconnect + * @param cause for the disconnection of the TaskManager + */ + void disconnectTaskManager(ResourceID resourceID, Exception cause); } http://git-wip-us.apache.org/repos/asf/flink/blob/fd90672f/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java index 73b27b5..d07e373 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java @@ -19,6 +19,8 @@ package org.apache.flink.runtime.resourcemanager; import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.rpc.FatalErrorHandler; @@ -43,14 +45,18 @@ public class ResourceManagerRunner implements FatalErrorHandler { private final ResourceManager<?> resourceManager; public ResourceManagerRunner( + final ResourceID resourceId, final Configuration configuration, final RpcService rpcService, final HighAvailabilityServices highAvailabilityServices, + final HeartbeatServices heartbeatServices, final MetricRegistry metricRegistry) throws Exception { + Preconditions.checkNotNull(resourceId); Preconditions.checkNotNull(configuration); Preconditions.checkNotNull(rpcService); Preconditions.checkNotNull(highAvailabilityServices); + Preconditions.checkNotNull(heartbeatServices); Preconditions.checkNotNull(metricRegistry); final ResourceManagerConfiguration resourceManagerConfiguration = ResourceManagerConfiguration.fromConfiguration(configuration); @@ -63,9 +69,11 @@ public class ResourceManagerRunner implements FatalErrorHandler { rpcService.getScheduledExecutor()); this.resourceManager = new StandaloneResourceManager( + resourceId, rpcService, resourceManagerConfiguration, highAvailabilityServices, + heartbeatServices, resourceManagerRuntimeServices.getSlotManagerFactory(), metricRegistry, resourceManagerRuntimeServices.getJobLeaderIdService(), @@ -87,7 +95,6 @@ public class ResourceManagerRunner implements FatalErrorHandler { private void shutDownInternally() throws Exception { Exception exception = null; synchronized (lock) { - try { resourceManager.shutDown(); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/flink/blob/fd90672f/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java index 73c8a2d..e2d6538 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.resourcemanager; import org.apache.flink.runtime.clusterframework.ApplicationStatus; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException; @@ -37,17 +38,21 @@ import org.apache.flink.runtime.rpc.RpcService; public class StandaloneResourceManager extends ResourceManager<ResourceID> { public StandaloneResourceManager( + ResourceID resourceId, RpcService rpcService, ResourceManagerConfiguration resourceManagerConfiguration, HighAvailabilityServices highAvailabilityServices, + HeartbeatServices heartbeatServices, SlotManagerFactory slotManagerFactory, MetricRegistry metricRegistry, JobLeaderIdService jobLeaderIdService, FatalErrorHandler fatalErrorHandler) { super( + resourceId, rpcService, resourceManagerConfiguration, highAvailabilityServices, + heartbeatServices, slotManagerFactory, metricRegistry, jobLeaderIdService, http://git-wip-us.apache.org/repos/asf/flink/blob/fd90672f/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java index 83c225f..f3e1ff3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java @@ -57,6 +57,7 @@ import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener; import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.registration.RegistrationConnectionListener; import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup; import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; @@ -135,6 +136,9 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { /** The heartbeat manager for job manager in the task manager */ private final HeartbeatManager<Void, Void> jobManagerHeartbeatManager; + /** The heartbeat manager for resource manager in the task manager */ + private final HeartbeatManager<Void, Void> resourceManagerHeartbeatManager; + /** The fatal error handler to use in case of a fatal error */ private final FatalErrorHandler fatalErrorHandler; @@ -206,6 +210,12 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { new JobManagerHeartbeatListener(), rpcService.getScheduledExecutor(), log); + + this.resourceManagerHeartbeatManager = heartbeatServices.createHeartbeatManager( + getResourceID(), + new ResourceManagerHeartbeatListener(), + rpcService.getScheduledExecutor(), + log); } // ------------------------------------------------------------------------ @@ -247,6 +257,8 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { jobManagerHeartbeatManager.stop(); + resourceManagerHeartbeatManager.stop(); + ioManager.shutdown(); memoryManager.shutdown(); @@ -497,6 +509,11 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { jobManagerHeartbeatManager.requestHeartbeat(resourceID, null); } + @RpcMethod + public void heartbeatFromResourceManager(ResourceID resourceID) { + resourceManagerHeartbeatManager.requestHeartbeat(resourceID, null); + } + // ---------------------------------------------------------------------- // Checkpointing RPCs // ---------------------------------------------------------------------- @@ -619,11 +636,20 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { return new TMSlotRequestRegistered(resourceManagerConnection.getRegistrationId(), getResourceID(), allocationId); } + // ---------------------------------------------------------------------- + // Disconnection RPCs + // ---------------------------------------------------------------------- + @RpcMethod public void disconnectJobManager(JobID jobId, Exception cause) { closeJobManagerConnection(jobId, cause); } + @RpcMethod + public void disconnectResourceManager(Exception cause) { + closeResourceManagerConnection(cause); + } + // ====================================================================== // Internal methods // ====================================================================== @@ -665,11 +691,25 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { newLeaderAddress, newLeaderId, getMainThreadExecutor(), - new ForwardingFatalErrorHandler()); + new ResourceManagerRegistrationListener()); resourceManagerConnection.start(); } } + private void closeResourceManagerConnection(Exception cause) { + log.info("Close ResourceManager connection for {}.", cause); + + if (isConnectedToResourceManager()) { + resourceManagerHeartbeatManager.unmonitorTarget(resourceManagerConnection.getResourceManagerId()); + + ResourceManagerGateway resourceManagerGateway = resourceManagerConnection.getTargetGateway(); + resourceManagerConnection.close(); + resourceManagerConnection = null; + + resourceManagerGateway.disconnectTaskManager(getResourceID(), cause); + } + } + // ------------------------------------------------------------------------ // Internal job manager connection methods // ------------------------------------------------------------------------ @@ -747,10 +787,10 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { offerSlotsToJobManager(jobId); } else { log.warn("Slot offering to JobManager failed. Freeing the slots " + - "and returning them to the ResourceManager.", throwable); + "and returning them to the ResourceManager.", throwable); // We encountered an exception. Free the slots and return them to the RM. - for (SlotOffer reservedSlot: reservedSlots) { + for (SlotOffer reservedSlot : reservedSlots) { freeSlot(reservedSlot.getAllocationId(), throwable); } } @@ -1137,11 +1177,32 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { } } - private final class ForwardingFatalErrorHandler implements FatalErrorHandler { + private final class ResourceManagerRegistrationListener implements RegistrationConnectionListener<TaskExecutorRegistrationSuccess> { @Override - public void onFatalError(Throwable exception) { - onFatalErrorAsync(exception); + public void onRegistrationSuccess(TaskExecutorRegistrationSuccess success) { + final ResourceID resourceManagerId = success.getResourceManagerId(); + + // monitor the resource manager as heartbeat target + resourceManagerHeartbeatManager.monitorTarget(resourceManagerId, new HeartbeatTarget<Void>() { + @Override + public void receiveHeartbeat(ResourceID resourceID, Void payload) { + if (isConnectedToResourceManager()) { + ResourceManagerGateway resourceManagerGateway = resourceManagerConnection.getTargetGateway(); + resourceManagerGateway.heartbeatFromTaskManager(resourceID); + } + } + + @Override + public void requestHeartbeat(ResourceID resourceID, Void payload) { + // request heartbeat will never be called on the task manager side + } + }); + } + + @Override + public void onRegistrationFailure(Throwable failure) { + onFatalErrorAsync(failure); } } @@ -1216,15 +1277,14 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { runAsync(new Runnable() { @Override public void run() { - log.info("The JobManager connection {} has timed out.", resourceID); + log.info("Job manager with id {} heartbeat timed out.", resourceID); if (jobManagerConnections.containsKey(resourceID)) { JobManagerConnection jobManagerConnection = jobManagerConnections.get(resourceID); if (jobManagerConnection != null) { closeJobManagerConnection( jobManagerConnection.getJobID(), - new TimeoutException("The heartbeat of JobManager with id " + - resourceID + " timed out.")); + new TimeoutException("Job manager with id " + resourceID + " heartbeat timed out.")); } } } @@ -1241,4 +1301,32 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { return FlinkCompletableFuture.completed(null); } } + + private class ResourceManagerHeartbeatListener implements HeartbeatListener<Void, Void> { + + @Override + public void notifyHeartbeatTimeout(final ResourceID resourceID) { + runAsync(new Runnable() { + @Override + public void run() { + log.info("Resource manager with id {} heartbeat timed out.", resourceID); + + if (isConnectedToResourceManager() && resourceManagerConnection.getResourceManagerId().equals(resourceID)) { + closeResourceManagerConnection( + new TimeoutException("Resource manager with id " + resourceID + " heartbeat timed out.")); + } + } + }); + } + + @Override + public void reportPayload(ResourceID resourceID, Void payload) { + // nothing to do since the payload is of type Void + } + + @Override + public Future<Void> retrievePayload() { + return FlinkCompletableFuture.completed(null); + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/fd90672f/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java index 2dcc3a4..2bbf0e6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java @@ -141,10 +141,24 @@ public interface TaskExecutorGateway extends RpcGateway { void heartbeatFromJobManager(ResourceID heartbeatOrigin); /** + * Heartbeat request from the resource manager + * + * @param heartbeatOrigin unique id of the resource manager + */ + void heartbeatFromResourceManager(ResourceID heartbeatOrigin); + + /** * Disconnects the given JobManager from the TaskManager. * * @param jobId JobID for which the JobManager was the leader * @param cause for the disconnection from the JobManager */ void disconnectJobManager(JobID jobId, Exception cause); + + /** + * Disconnects the ResourceManager from the TaskManager. + * + * @param cause for the disconnection from the ResourceManager + */ + void disconnectResourceManager(Exception cause); } http://git-wip-us.apache.org/repos/asf/flink/blob/fd90672f/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorRegistrationSuccess.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorRegistrationSuccess.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorRegistrationSuccess.java index b357f52..4b61f68 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorRegistrationSuccess.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorRegistrationSuccess.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.taskexecutor; +import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.instance.InstanceID; import org.apache.flink.runtime.registration.RegistrationResponse; @@ -33,16 +34,20 @@ public final class TaskExecutorRegistrationSuccess extends RegistrationResponse. private final InstanceID registrationId; + private final ResourceID resourceManagerResourceId; + private final long heartbeatInterval; /** * Create a new {@code TaskExecutorRegistrationSuccess} message. * * @param registrationId The ID that the ResourceManager assigned the registration. + * @param resourceManagerResourceId The unique ID that identifies the ResourceManager. * @param heartbeatInterval The interval in which the ResourceManager will heartbeat the TaskExecutor. */ - public TaskExecutorRegistrationSuccess(InstanceID registrationId, long heartbeatInterval) { + public TaskExecutorRegistrationSuccess(InstanceID registrationId, ResourceID resourceManagerResourceId, long heartbeatInterval) { this.registrationId = registrationId; + this.resourceManagerResourceId = resourceManagerResourceId; this.heartbeatInterval = heartbeatInterval; } @@ -54,6 +59,13 @@ public final class TaskExecutorRegistrationSuccess extends RegistrationResponse. } /** + * Gets the unique ID that identifies the ResourceManager. + */ + public ResourceID getResourceManagerId() { + return resourceManagerResourceId; + } + + /** * Gets the interval in which the ResourceManager will heartbeat the TaskExecutor. */ public long getHeartbeatInterval() { @@ -62,7 +74,7 @@ public final class TaskExecutorRegistrationSuccess extends RegistrationResponse. @Override public String toString() { - return "TaskExecutorRegistrationSuccess (" + registrationId + " / " + heartbeatInterval + ')'; + return "TaskExecutorRegistrationSuccess (" + registrationId + " / " + resourceManagerResourceId + " / " + heartbeatInterval + ')'; } } http://git-wip-us.apache.org/repos/asf/flink/blob/fd90672f/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java index 6e3e39b..775482c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java @@ -22,7 +22,7 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.instance.InstanceID; import org.apache.flink.runtime.registration.RegisteredRpcConnection; -import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.registration.RegistrationConnectionListener; import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.registration.RegistrationResponse; import org.apache.flink.runtime.registration.RetryingRegistration; @@ -51,10 +51,12 @@ public class TaskExecutorToResourceManagerConnection private final SlotReport slotReport; - private final FatalErrorHandler fatalErrorHandler; + private final RegistrationConnectionListener<TaskExecutorRegistrationSuccess> registrationListener; private InstanceID registrationId; + private ResourceID resourceManagerResourceId; + public TaskExecutorToResourceManagerConnection( Logger log, RpcService rpcService, @@ -64,7 +66,7 @@ public class TaskExecutorToResourceManagerConnection String resourceManagerAddress, UUID resourceManagerLeaderId, Executor executor, - FatalErrorHandler fatalErrorHandler) { + RegistrationConnectionListener<TaskExecutorRegistrationSuccess> registrationListener) { super(log, resourceManagerAddress, resourceManagerLeaderId, executor); @@ -72,7 +74,7 @@ public class TaskExecutorToResourceManagerConnection this.taskManagerAddress = Preconditions.checkNotNull(taskManagerAddress); this.taskManagerResourceId = Preconditions.checkNotNull(taskManagerResourceId); this.slotReport = Preconditions.checkNotNull(slotReport); - this.fatalErrorHandler = Preconditions.checkNotNull(fatalErrorHandler); + this.registrationListener = Preconditions.checkNotNull(registrationListener); } @@ -94,13 +96,15 @@ public class TaskExecutorToResourceManagerConnection getTargetAddress(), success.getRegistrationId()); registrationId = success.getRegistrationId(); + resourceManagerResourceId = success.getResourceManagerId(); + registrationListener.onRegistrationSuccess(success); } @Override protected void onRegistrationFailure(Throwable failure) { log.info("Failed to register at resource manager {}.", getTargetAddress(), failure); - fatalErrorHandler.onFatalError(failure); + registrationListener.onRegistrationFailure(failure); } /** @@ -111,6 +115,13 @@ public class TaskExecutorToResourceManagerConnection return registrationId; } + /** + * Gets the unique id of ResourceManager, that is returned when registration success. + */ + public ResourceID getResourceManagerId() { + return resourceManagerResourceId; + } + // ------------------------------------------------------------------------ // Utilities // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/fd90672f/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java index ca8a07a..e7f2439 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.clusterframework; import akka.actor.ActorSystem; import akka.testkit.JavaTestKit; +import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; @@ -30,21 +31,47 @@ import org.apache.flink.runtime.clusterframework.messages.RemoveResource; import org.apache.flink.runtime.clusterframework.messages.ResourceRemoved; import org.apache.flink.runtime.clusterframework.messages.TriggerRegistrationAtJobManager; import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.concurrent.ScheduledExecutor; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; +import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices; +import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.messages.JobManagerMessages; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.registration.RegistrationResponse; +import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; +import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; +import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager; +import org.apache.flink.runtime.resourcemanager.TestingSlotManagerFactory; +import org.apache.flink.runtime.rpc.TestingSerialRpcService; +import org.apache.flink.runtime.taskexecutor.SlotReport; +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; +import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.runtime.testutils.TestingResourceManager; +import org.apache.flink.runtime.util.TestingFatalErrorHandler; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; +import org.mockito.ArgumentCaptor; import scala.Option; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import static org.junit.Assert.*; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; /** * General tests for the resource manager component. @@ -335,4 +362,85 @@ public class ResourceManagerTest { }}; }}; } + + @Test + public void testHeartbeatTimeoutWithTaskExecutor() throws Exception { + final String taskManagerAddress = "tm"; + final ResourceID taskManagerResourceID = new ResourceID(taskManagerAddress); + final ResourceID resourceManagerResourceID = ResourceID.generate(); + final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class); + + final TestingSerialRpcService rpcService = new TestingSerialRpcService(); + rpcService.registerGateway(taskManagerAddress, taskExecutorGateway); + + final ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration( + Time.seconds(5L), + Time.seconds(5L)); + + final TestingLeaderElectionService rmLeaderElectionService = new TestingLeaderElectionService(); + final TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices(); + highAvailabilityServices.setResourceManagerLeaderElectionService(rmLeaderElectionService); + + final long heartbeatInterval = 1L; + final long heartbeatTimeout = 5L; + final ScheduledExecutor scheduledExecutor = mock(ScheduledExecutor.class); + final HeartbeatServices heartbeatServices = new TestingHeartbeatServices(heartbeatInterval, heartbeatTimeout, scheduledExecutor); + + final TestingSlotManagerFactory slotManagerFactory = new TestingSlotManagerFactory(); + final MetricRegistry metricRegistry = mock(MetricRegistry.class); + final JobLeaderIdService jobLeaderIdService = mock(JobLeaderIdService.class); + final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler(); + + try { + final StandaloneResourceManager resourceManager = new StandaloneResourceManager( + resourceManagerResourceID, + rpcService, + resourceManagerConfiguration, + highAvailabilityServices, + heartbeatServices, + slotManagerFactory, + metricRegistry, + jobLeaderIdService, + testingFatalErrorHandler); + + resourceManager.start(); + + final UUID rmLeaderSessionId = UUID.randomUUID(); + rmLeaderElectionService.isLeader(rmLeaderSessionId); + + final SlotReport slotReport = new SlotReport(); + // test registration response successful and it will trigger monitor heartbeat target, schedule heartbeat request at interval time + Future<RegistrationResponse> successfulFuture = + resourceManager.registerTaskExecutor(rmLeaderSessionId, taskManagerAddress, taskManagerResourceID, slotReport); + RegistrationResponse response = successfulFuture.get(5, TimeUnit.SECONDS); + assertTrue(response instanceof TaskExecutorRegistrationSuccess); + + ArgumentCaptor<Runnable> heartbeatRunnableCaptor = ArgumentCaptor.forClass(Runnable.class); + verify(scheduledExecutor, times(1)).scheduleAtFixedRate( + heartbeatRunnableCaptor.capture(), + eq(0L), + eq(heartbeatInterval), + eq(TimeUnit.MILLISECONDS)); + + Runnable heartbeatRunnable = heartbeatRunnableCaptor.getValue(); + + ArgumentCaptor<Runnable> timeoutRunnableCaptor = ArgumentCaptor.forClass(Runnable.class); + verify(scheduledExecutor).schedule(timeoutRunnableCaptor.capture(), eq(heartbeatTimeout), eq(TimeUnit.MILLISECONDS)); + + Runnable timeoutRunnable = timeoutRunnableCaptor.getValue(); + + // run the first heartbeat request + heartbeatRunnable.run(); + + verify(taskExecutorGateway, times(1)).heartbeatFromResourceManager(eq(resourceManagerResourceID)); + + // run the timeout runnable to simulate a heartbeat timeout + timeoutRunnable.run(); + + verify(taskExecutorGateway).disconnectResourceManager(any(TimeoutException.class)); + + } finally { + rpcService.stopService(); + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/fd90672f/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java index 43536b6..73da244 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java @@ -26,10 +26,7 @@ import org.apache.flink.runtime.concurrent.ScheduledExecutor; import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager; import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoader; import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory; -import org.apache.flink.runtime.heartbeat.HeartbeatListener; -import org.apache.flink.runtime.heartbeat.HeartbeatManager; -import org.apache.flink.runtime.heartbeat.HeartbeatManagerSenderImpl; -import org.apache.flink.runtime.heartbeat.HeartbeatServices; +import org.apache.flink.runtime.heartbeat.*; import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobmanager.OnCompletionActions; @@ -38,14 +35,12 @@ import org.apache.flink.runtime.rpc.TestingSerialRpcService; import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.util.TestingFatalErrorHandler; -import org.apache.flink.util.Preconditions; import org.apache.flink.util.TestLogger; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.ArgumentCaptor; import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; -import org.slf4j.Logger; import java.net.InetAddress; import java.net.URL; @@ -108,10 +103,9 @@ public class JobMasterTest extends TestLogger { testingFatalErrorHandler, new FlinkUserCodeClassLoader(new URL[0])); - // also start the heartbeat manager in job manager jobMaster.start(jmLeaderId); - // register task manager will trigger monitoring heartbeat target, schedule heartbeat request in interval time + // register task manager will trigger monitor heartbeat target, schedule heartbeat request at interval time jobMaster.registerTaskManager(taskManagerAddress, taskManagerLocation, jmLeaderId); ArgumentCaptor<Runnable> heartbeatRunnableCaptor = ArgumentCaptor.forClass(Runnable.class); @@ -145,32 +139,4 @@ public class JobMasterTest extends TestLogger { rpc.stopService(); } } - - private static class TestingHeartbeatServices extends HeartbeatServices { - - private final ScheduledExecutor scheduledExecutorToUse; - - public TestingHeartbeatServices(long heartbeatInterval, long heartbeatTimeout, ScheduledExecutor scheduledExecutorToUse) { - super(heartbeatInterval, heartbeatTimeout); - - this.scheduledExecutorToUse = Preconditions.checkNotNull(scheduledExecutorToUse); - } - - @Override - public <I, O> HeartbeatManager<I, O> createHeartbeatManagerSender( - ResourceID resourceId, - HeartbeatListener<I, O> heartbeatListener, - ScheduledExecutor scheduledExecutor, - Logger log) { - - return new HeartbeatManagerSenderImpl<>( - heartbeatInterval, - heartbeatTimeout, - resourceId, - heartbeatListener, - org.apache.flink.runtime.concurrent.Executors.directExecutor(), - scheduledExecutorToUse, - log); - } - } } http://git-wip-us.apache.org/repos/asf/flink/blob/fd90672f/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java index 1aa799b..39594df 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java @@ -19,6 +19,8 @@ package org.apache.flink.runtime.resourcemanager; import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; import org.apache.flink.runtime.metrics.MetricRegistry; @@ -40,12 +42,15 @@ public class ResourceManagerHATest { @Test public void testGrantAndRevokeLeadership() throws Exception { + ResourceID rmResourceId = ResourceID.generate(); RpcService rpcService = new TestingSerialRpcService(); TestingLeaderElectionService leaderElectionService = new TestingLeaderElectionService(); TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices(); highAvailabilityServices.setResourceManagerLeaderElectionService(leaderElectionService); + HeartbeatServices heartbeatServices = mock(HeartbeatServices.class); + ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration( Time.seconds(5L), Time.seconds(5L)); @@ -63,9 +68,11 @@ public class ResourceManagerHATest { final ResourceManager resourceManager = new StandaloneResourceManager( + rmResourceId, rpcService, resourceManagerConfiguration, highAvailabilityServices, + heartbeatServices, slotManagerFactory, metricRegistry, resourceManagerRuntimeServices.getJobLeaderIdService(), http://git-wip-us.apache.org/repos/asf/flink/blob/fd90672f/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java index 9a68eca..0401f9e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java @@ -20,7 +20,9 @@ package org.apache.flink.runtime.resourcemanager; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; import org.apache.flink.runtime.jobmaster.JobMasterGateway; @@ -196,10 +198,13 @@ public class ResourceManagerJobMasterTest { JobID jobID, TestingLeaderRetrievalService jobMasterLeaderRetrievalService, FatalErrorHandler fatalErrorHandler) throws Exception { + ResourceID rmResourceId = ResourceID.generate(); TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices(); highAvailabilityServices.setResourceManagerLeaderElectionService(resourceManagerLeaderElectionService); highAvailabilityServices.setJobMasterLeaderRetriever(jobID, jobMasterLeaderRetrievalService); + HeartbeatServices heartbeatServices = mock(HeartbeatServices.class); + ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration( Time.seconds(5L), Time.seconds(5L)); @@ -211,9 +216,11 @@ public class ResourceManagerJobMasterTest { Time.minutes(5L)); ResourceManager resourceManager = new StandaloneResourceManager( + rmResourceId, rpcService, resourceManagerConfiguration, highAvailabilityServices, + heartbeatServices, slotManagerFactory, metricRegistry, jobLeaderIdService, http://git-wip-us.apache.org/repos/asf/flink/blob/fd90672f/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java index 0a1addb..7c811d9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.resourcemanager; import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; import org.apache.flink.runtime.metrics.MetricRegistry; @@ -52,6 +53,8 @@ public class ResourceManagerTaskExecutorTest { private ResourceID taskExecutorResourceID; + private ResourceID resourceManagerResourceID; + private StandaloneResourceManager resourceManager; private UUID leaderSessionId; @@ -63,6 +66,7 @@ public class ResourceManagerTaskExecutorTest { rpcService = new TestingSerialRpcService(); taskExecutorResourceID = mockTaskExecutor(taskExecutorAddress); + resourceManagerResourceID = ResourceID.generate(); TestingLeaderElectionService rmLeaderElectionService = new TestingLeaderElectionService(); testingFatalErrorHandler = new TestingFatalErrorHandler(); resourceManager = createAndStartResourceManager(rmLeaderElectionService, testingFatalErrorHandler); @@ -144,6 +148,7 @@ public class ResourceManagerTaskExecutorTest { private StandaloneResourceManager createAndStartResourceManager(TestingLeaderElectionService rmLeaderElectionService, FatalErrorHandler fatalErrorHandler) throws Exception { TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices(); + HeartbeatServices heartbeatServices = mock(HeartbeatServices.class); highAvailabilityServices.setResourceManagerLeaderElectionService(rmLeaderElectionService); TestingSlotManagerFactory slotManagerFactory = new TestingSlotManagerFactory(); ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration( @@ -158,9 +163,11 @@ public class ResourceManagerTaskExecutorTest { StandaloneResourceManager resourceManager = new StandaloneResourceManager( + resourceManagerResourceID, rpcService, resourceManagerConfiguration, highAvailabilityServices, + heartbeatServices, slotManagerFactory, metricRegistry, jobLeaderIdService, http://git-wip-us.apache.org/repos/asf/flink/blob/fd90672f/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java index ea660f8..28ed697 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java @@ -25,6 +25,7 @@ import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.clusterframework.types.SlotID; import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; import org.apache.flink.runtime.jobmaster.JobMasterGateway; @@ -98,6 +99,7 @@ public class SlotProtocolTest extends TestLogger { final String rmAddress = "/rm1"; final String jmAddress = "/jm1"; final JobID jobID = new JobID(); + final ResourceID rmResourceId = new ResourceID(rmAddress); testRpcService.registerGateway(jmAddress, mock(JobMasterGateway.class)); @@ -117,11 +119,16 @@ public class SlotProtocolTest extends TestLogger { Time.seconds(5L)); final TestingSlotManagerFactory slotManagerFactory = new TestingSlotManagerFactory(); + + final HeartbeatServices heartbeatServices = mock(HeartbeatServices.class); + SpiedResourceManager resourceManager = new SpiedResourceManager( + rmResourceId, testRpcService, resourceManagerConfiguration, testingHaServices, + heartbeatServices, slotManagerFactory, mock(MetricRegistry.class), jobLeaderIdService, @@ -198,6 +205,7 @@ public class SlotProtocolTest extends TestLogger { final String jmAddress = "/jm1"; final String tmAddress = "/tm1"; final JobID jobID = new JobID(); + final ResourceID rmResourceId = new ResourceID(rmAddress); testRpcService.registerGateway(jmAddress, mock(JobMasterGateway.class)); @@ -224,11 +232,16 @@ public class SlotProtocolTest extends TestLogger { Time.seconds(5L)); TestingSlotManagerFactory slotManagerFactory = new TestingSlotManagerFactory(); + + HeartbeatServices heartbeatServices = mock(HeartbeatServices.class); + ResourceManager<ResourceID> resourceManager = Mockito.spy(new StandaloneResourceManager( + rmResourceId, testRpcService, resourceManagerConfiguration, testingHaServices, + heartbeatServices, slotManagerFactory, mock(MetricRegistry.class), jobLeaderIdService, @@ -302,17 +315,21 @@ public class SlotProtocolTest extends TestLogger { private int startNewWorkerCalled = 0; public SpiedResourceManager( + ResourceID resourceId, RpcService rpcService, ResourceManagerConfiguration resourceManagerConfiguration, HighAvailabilityServices highAvailabilityServices, + HeartbeatServices heartbeatServices, SlotManagerFactory slotManagerFactory, MetricRegistry metricRegistry, JobLeaderIdService jobLeaderIdService, FatalErrorHandler fatalErrorHandler) { super( + resourceId, rpcService, resourceManagerConfiguration, highAvailabilityServices, + heartbeatServices, slotManagerFactory, metricRegistry, jobLeaderIdService, http://git-wip-us.apache.org/repos/asf/flink/blob/fd90672f/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java index f6c2dce..4e76486 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java @@ -87,6 +87,7 @@ public class TaskExecutorITCase { final String rmAddress = "rm"; final String jmAddress = "jm"; final UUID jmLeaderId = UUID.randomUUID(); + final ResourceID rmResourceId = new ResourceID(rmAddress); final JobID jobId = new JobID(); final ResourceProfile resourceProfile = new ResourceProfile(1.0, 1); @@ -119,9 +120,11 @@ public class TaskExecutorITCase { final JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation); ResourceManager<ResourceID> resourceManager = new StandaloneResourceManager( + rmResourceId, rpcService, resourceManagerConfiguration, testingHAServices, + heartbeatServices, slotManagerFactory, metricRegistry, jobLeaderIdService, http://git-wip-us.apache.org/repos/asf/flink/blob/fd90672f/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java index 67196aa..d1f6e2e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java @@ -213,9 +213,104 @@ public class TaskExecutorTest extends TestLogger { } @Test + public void testHeartbeatTimeoutWithResourceManager() throws Exception { + final String rmAddress = "rm"; + final String tmAddress = "tm"; + final ResourceID rmResourceId = new ResourceID(rmAddress); + final ResourceID tmResourceId = new ResourceID(tmAddress); + final UUID rmLeaderId = UUID.randomUUID(); + + // register the mock resource manager gateway + ResourceManagerGateway rmGateway = mock(ResourceManagerGateway.class); + when(rmGateway.registerTaskExecutor( + any(UUID.class), anyString(), any(ResourceID.class), any(SlotReport.class), any(Time.class))) + .thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed( + new TaskExecutorRegistrationSuccess(new InstanceID(), rmResourceId, 10L))); + + final TestingSerialRpcService rpc = new TestingSerialRpcService(); + rpc.registerGateway(rmAddress, rmGateway); + + final TestingLeaderRetrievalService testLeaderService = new TestingLeaderRetrievalService(); + final TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices(); + haServices.setResourceManagerLeaderRetriever(testLeaderService); + + final TaskManagerConfiguration taskManagerConfiguration = mock(TaskManagerConfiguration.class); + when(taskManagerConfiguration.getNumberSlots()).thenReturn(1); + + final TaskManagerLocation taskManagerLocation = mock(TaskManagerLocation.class); + when(taskManagerLocation.getResourceID()).thenReturn(tmResourceId); + + final TaskSlotTable taskSlotTable = mock(TaskSlotTable.class); + final SlotReport slotReport = new SlotReport(); + when(taskSlotTable.createSlotReport(any(ResourceID.class))).thenReturn(slotReport); + + final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler(); + + final long heartbeatTimeout = 10L; + HeartbeatServices heartbeatServices = mock(HeartbeatServices.class); + when(heartbeatServices.createHeartbeatManager( + eq(taskManagerLocation.getResourceID()), + any(HeartbeatListener.class), + any(ScheduledExecutor.class), + any(Logger.class))).thenAnswer( + new Answer<HeartbeatManagerImpl<Void, Void>>() { + @Override + public HeartbeatManagerImpl<Void, Void> answer(InvocationOnMock invocation) throws Throwable { + return new HeartbeatManagerImpl<>( + heartbeatTimeout, + taskManagerLocation.getResourceID(), + (HeartbeatListener<Void, Void>)invocation.getArguments()[1], + (Executor)invocation.getArguments()[2], + (ScheduledExecutor)invocation.getArguments()[2], + (Logger)invocation.getArguments()[3]); + } + } + ); + + try { + final TaskExecutor taskManager = new TaskExecutor( + taskManagerConfiguration, + taskManagerLocation, + rpc, + mock(MemoryManager.class), + mock(IOManager.class), + mock(NetworkEnvironment.class), + haServices, + heartbeatServices, + mock(MetricRegistry.class), + mock(TaskManagerMetricGroup.class), + mock(BroadcastVariableManager.class), + mock(FileCache.class), + taskSlotTable, + mock(JobManagerTable.class), + mock(JobLeaderService.class), + testingFatalErrorHandler); + + taskManager.start(); + + // define a leader and see that a registration happens + testLeaderService.notifyListener(rmAddress, rmLeaderId); + + // register resource manager success will trigger monitoring heartbeat target between tm and rm + verify(rmGateway).registerTaskExecutor( + eq(rmLeaderId), eq(taskManager.getAddress()), eq(tmResourceId), any(SlotReport.class), any(Time.class)); + + // heartbeat timeout should trigger disconnect TaskManager from ResourceManager + verify(rmGateway, timeout(heartbeatTimeout * 5)).disconnectTaskManager(eq(taskManagerLocation.getResourceID()), any(TimeoutException.class)); + + // check if a concurrent error occurred + testingFatalErrorHandler.rethrowError(); + + } finally { + rpc.stopService(); + } + } + + @Test public void testImmediatelyRegistersIfLeaderIsKnown() throws Exception { final ResourceID resourceID = ResourceID.generate(); final String resourceManagerAddress = "/resource/manager/address/one"; + final ResourceID resourceManagerResourceId = new ResourceID(resourceManagerAddress); final TestingSerialRpcService rpc = new TestingSerialRpcService(); try { @@ -223,7 +318,8 @@ public class TaskExecutorTest extends TestLogger { ResourceManagerGateway rmGateway = mock(ResourceManagerGateway.class); when(rmGateway.registerTaskExecutor( any(UUID.class), anyString(), any(ResourceID.class), any(SlotReport.class), any(Time.class))) - .thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new TaskExecutorRegistrationSuccess(new InstanceID(), 10L))); + .thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new TaskExecutorRegistrationSuccess( + new InstanceID(), resourceManagerResourceId, 10L))); TaskManagerConfiguration taskManagerServicesConfiguration = mock(TaskManagerConfiguration.class); when(taskManagerServicesConfiguration.getNumberSlots()).thenReturn(1); @@ -275,12 +371,14 @@ public class TaskExecutorTest extends TestLogger { @Test public void testTriggerRegistrationOnLeaderChange() throws Exception { - final ResourceID resourceID = ResourceID.generate(); + final ResourceID tmResourceID = ResourceID.generate(); final String address1 = "/resource/manager/address/one"; final String address2 = "/resource/manager/address/two"; final UUID leaderId1 = UUID.randomUUID(); final UUID leaderId2 = UUID.randomUUID(); + final ResourceID rmResourceId1 = new ResourceID(address1); + final ResourceID rmResourceId2 = new ResourceID(address2); final TestingSerialRpcService rpc = new TestingSerialRpcService(); try { @@ -291,11 +389,11 @@ public class TaskExecutorTest extends TestLogger { when(rmGateway1.registerTaskExecutor( any(UUID.class), anyString(), any(ResourceID.class), any(SlotReport.class), any(Time.class))) .thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed( - new TaskExecutorRegistrationSuccess(new InstanceID(), 10L))); + new TaskExecutorRegistrationSuccess(new InstanceID(), rmResourceId1, 10L))); when(rmGateway2.registerTaskExecutor( any(UUID.class), anyString(), any(ResourceID.class), any(SlotReport.class), any(Time.class))) .thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed( - new TaskExecutorRegistrationSuccess(new InstanceID(), 10L))); + new TaskExecutorRegistrationSuccess(new InstanceID(), rmResourceId2, 10L))); rpc.registerGateway(address1, rmGateway1); rpc.registerGateway(address2, rmGateway2); @@ -313,7 +411,7 @@ public class TaskExecutorTest extends TestLogger { when(taskManagerServicesConfiguration.getTmpDirectories()).thenReturn(new String[1]); TaskManagerLocation taskManagerLocation = mock(TaskManagerLocation.class); - when(taskManagerLocation.getResourceID()).thenReturn(resourceID); + when(taskManagerLocation.getResourceID()).thenReturn(tmResourceID); when(taskManagerLocation.getHostname()).thenReturn("foobar"); final TaskSlotTable taskSlotTable = mock(TaskSlotTable.class); @@ -350,7 +448,7 @@ public class TaskExecutorTest extends TestLogger { testLeaderService.notifyListener(address1, leaderId1); verify(rmGateway1).registerTaskExecutor( - eq(leaderId1), eq(taskManagerAddress), eq(resourceID), any(SlotReport.class), any(Time.class)); + eq(leaderId1), eq(taskManagerAddress), eq(tmResourceID), any(SlotReport.class), any(Time.class)); assertNotNull(taskManager.getResourceManagerConnection()); // cancel the leader @@ -360,7 +458,7 @@ public class TaskExecutorTest extends TestLogger { testLeaderService.notifyListener(address2, leaderId2); verify(rmGateway2).registerTaskExecutor( - eq(leaderId2), eq(taskManagerAddress), eq(resourceID), eq(slotReport), any(Time.class)); + eq(leaderId2), eq(taskManagerAddress), eq(tmResourceID), eq(slotReport), any(Time.class)); assertNotNull(taskManager.getResourceManagerConnection()); // check if a concurrent error occurred @@ -531,6 +629,7 @@ public class TaskExecutorTest extends TestLogger { final String resourceManagerAddress = "rm"; final UUID resourceManagerLeaderId = UUID.randomUUID(); + final ResourceID resourceManagerResourceId = new ResourceID(resourceManagerAddress); final ResourceManagerGateway resourceManagerGateway = mock(ResourceManagerGateway.class); final InstanceID registrationId = new InstanceID(); @@ -540,7 +639,7 @@ public class TaskExecutorTest extends TestLogger { any(String.class), eq(resourceId), any(SlotReport.class), - any(Time.class))).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new TaskExecutorRegistrationSuccess(registrationId, 1000L))); + any(Time.class))).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new TaskExecutorRegistrationSuccess(registrationId, resourceManagerResourceId, 1000L))); final String jobManagerAddress = "jm"; final UUID jobManagerLeaderId = UUID.randomUUID(); @@ -638,6 +737,7 @@ public class TaskExecutorTest extends TestLogger { final String resourceManagerAddress = "rm"; final UUID resourceManagerLeaderId = UUID.randomUUID(); + final ResourceID resourceManagerResourceId = new ResourceID(resourceManagerAddress); final String jobManagerAddress = "jm"; final UUID jobManagerLeaderId = UUID.randomUUID(); @@ -655,7 +755,7 @@ public class TaskExecutorTest extends TestLogger { any(String.class), eq(resourceId), any(SlotReport.class), - any(Time.class))).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new TaskExecutorRegistrationSuccess(registrationId, 1000L))); + any(Time.class))).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new TaskExecutorRegistrationSuccess(registrationId, resourceManagerResourceId, 1000L))); final ResourceID jmResourceId = new ResourceID(jobManagerAddress); final int blobPort = 42; @@ -844,6 +944,7 @@ public class TaskExecutorTest extends TestLogger { final String resourceManagerAddress = "rm"; final UUID resourceManagerLeaderId = UUID.randomUUID(); + final ResourceID resourceManagerResourceId = new ResourceID(resourceManagerAddress); final String jobManagerAddress = "jm"; final UUID jobManagerLeaderId = UUID.randomUUID(); @@ -862,7 +963,7 @@ public class TaskExecutorTest extends TestLogger { eq(resourceId), any(SlotReport.class), any(Time.class))).thenReturn( - FlinkCompletableFuture.<RegistrationResponse>completed(new TaskExecutorRegistrationSuccess(registrationId, 1000L))); + FlinkCompletableFuture.<RegistrationResponse>completed(new TaskExecutorRegistrationSuccess(registrationId, resourceManagerResourceId, 1000L))); final ResourceID jmResourceId = new ResourceID(jobManagerAddress); final int blobPort = 42; http://git-wip-us.apache.org/repos/asf/flink/blob/fd90672f/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java index 7a0dbbe..ed672a3 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java @@ -198,15 +198,18 @@ public class YarnFlinkApplicationMasterRunner extends AbstractYarnFlinkApplicati haServices, commonRpcService.getScheduledExecutor()); - return new YarnResourceManager(config, - ENV, - commonRpcService, - resourceManagerConfiguration, - haServices, - resourceManagerRuntimeServices.getSlotManagerFactory(), - metricRegistry, - resourceManagerRuntimeServices.getJobLeaderIdService(), - this); + return new YarnResourceManager( + ResourceID.generate(), + config, + ENV, + commonRpcService, + resourceManagerConfiguration, + haServices, + heartbeatServices, + resourceManagerRuntimeServices.getSlotManagerFactory(), + metricRegistry, + resourceManagerRuntimeServices.getJobLeaderIdService(), + this); } private JobManagerRunner createJobManagerRunner(Configuration config) throws Exception{ http://git-wip-us.apache.org/repos/asf/flink/blob/fd90672f/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java index ab96441..a308079 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java @@ -26,6 +26,7 @@ import org.apache.flink.runtime.clusterframework.BootstrapTools; import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; @@ -106,19 +107,23 @@ public class YarnResourceManager extends ResourceManager<ResourceID> implements final private Map<ResourceProfile, Integer> resourcePriorities = new HashMap<>(); public YarnResourceManager( + ResourceID resourceId, Configuration flinkConfig, Map<String, String> env, RpcService rpcService, ResourceManagerConfiguration resourceManagerConfiguration, HighAvailabilityServices highAvailabilityServices, + HeartbeatServices heartbeatServices, SlotManagerFactory slotManagerFactory, MetricRegistry metricRegistry, JobLeaderIdService jobLeaderIdService, FatalErrorHandler fatalErrorHandler) { super( + resourceId, rpcService, resourceManagerConfiguration, highAvailabilityServices, + heartbeatServices, slotManagerFactory, metricRegistry, jobLeaderIdService, @@ -231,7 +236,8 @@ public class YarnResourceManager extends ResourceManager<ResourceID> implements public void onContainersCompleted(List<ContainerStatus> list) { for (ContainerStatus container : list) { if (container.getExitStatus() < 0) { - notifyWorkerFailed(new ResourceID(container.getContainerId().toString()), container.getDiagnostics()); + closeTaskManagerConnection(new ResourceID( + container.getContainerId().toString()), new Exception(container.getDiagnostics())); } } }
