[FLINK-4354] [heartbeat] Implement heartbeat logic between TaskManager and 
ResourceManager

This closes #3591.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fd90672f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fd90672f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fd90672f

Branch: refs/heads/table-retraction
Commit: fd90672f9ccf7a0e02e5eb9c6251dc3d451ce8ba
Parents: d20fb09
Author: Zhijiang <[email protected]>
Authored: Wed Mar 22 15:12:33 2017 +0800
Committer: Till Rohrmann <[email protected]>
Committed: Thu Mar 23 13:58:44 2017 +0100

----------------------------------------------------------------------
 .../heartbeat/TestingHeartbeatServices.java     |  52 ++++++++
 .../flink/runtime/jobmaster/JobMaster.java      |   4 +-
 .../flink/runtime/minicluster/MiniCluster.java  |   5 +-
 .../RegistrationConnectionListener.java         |  40 ++++++
 .../resourcemanager/ResourceManager.java        | 117 ++++++++++++++----
 .../resourcemanager/ResourceManagerGateway.java |  16 +++
 .../resourcemanager/ResourceManagerRunner.java  |   9 +-
 .../StandaloneResourceManager.java              |   5 +
 .../runtime/taskexecutor/TaskExecutor.java      | 106 ++++++++++++++--
 .../taskexecutor/TaskExecutorGateway.java       |  14 +++
 .../TaskExecutorRegistrationSuccess.java        |  16 ++-
 ...TaskExecutorToResourceManagerConnection.java |  21 +++-
 .../clusterframework/ResourceManagerTest.java   | 108 +++++++++++++++++
 .../flink/runtime/jobmaster/JobMasterTest.java  |  38 +-----
 .../resourcemanager/ResourceManagerHATest.java  |   7 ++
 .../ResourceManagerJobMasterTest.java           |   7 ++
 .../ResourceManagerTaskExecutorTest.java        |   7 ++
 .../slotmanager/SlotProtocolTest.java           |  17 +++
 .../taskexecutor/TaskExecutorITCase.java        |   3 +
 .../runtime/taskexecutor/TaskExecutorTest.java  | 121 +++++++++++++++++--
 .../yarn/YarnFlinkApplicationMasterRunner.java  |  21 ++--
 .../apache/flink/yarn/YarnResourceManager.java  |   8 +-
 22 files changed, 640 insertions(+), 102 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fd90672f/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatServices.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatServices.java
new file mode 100644
index 0000000..e628db5
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatServices.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.heartbeat;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+
+public class TestingHeartbeatServices extends HeartbeatServices {
+
+       private final ScheduledExecutor scheduledExecutorToUse;
+
+       public TestingHeartbeatServices(long heartbeatInterval, long 
heartbeatTimeout, ScheduledExecutor scheduledExecutorToUse) {
+               super(heartbeatInterval, heartbeatTimeout);
+
+               this.scheduledExecutorToUse = 
Preconditions.checkNotNull(scheduledExecutorToUse);
+       }
+
+       @Override
+       public <I, O> HeartbeatManager<I, O> createHeartbeatManagerSender(
+               ResourceID resourceId,
+               HeartbeatListener<I, O> heartbeatListener,
+               ScheduledExecutor scheduledExecutor,
+               Logger log) {
+
+               return new HeartbeatManagerSenderImpl<>(
+                       heartbeatInterval,
+                       heartbeatTimeout,
+                       resourceId,
+                       heartbeatListener,
+                       
org.apache.flink.runtime.concurrent.Executors.directExecutor(),
+                       scheduledExecutorToUse,
+                       log);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fd90672f/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 243b57f..81fc541 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -1043,11 +1043,11 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
 
                @Override
                public void notifyHeartbeatTimeout(ResourceID resourceID) {
-                       log.info("Task manager with id {} timed out.", 
resourceID);
+                       log.info("Task manager with id {} heartbeat timed 
out.", resourceID);
 
                        getSelf().disconnectTaskManager(
                                resourceID,
-                               new TimeoutException("The heartbeat of 
TaskManager with id " + resourceID + " timed out."));
+                               new TimeoutException("Task manager with id " + 
resourceID + " heartbeat timed out."));
                }
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/fd90672f/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 25c4aba..2cfba7b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -241,7 +241,7 @@ public class MiniCluster {
                                // bring up the ResourceManager(s)
                                LOG.info("Starting {} ResourceManger(s)", 
numResourceManagers);
                                resourceManagerRunners = startResourceManagers(
-                                               configuration, haServices, 
metricRegistry, numResourceManagers, resourceManagerRpcServices);
+                                               configuration, haServices, 
heartbeatServices, metricRegistry, numResourceManagers, 
resourceManagerRpcServices);
 
                                // bring up the TaskManager(s) for the mini 
cluster
                                LOG.info("Starting {} TaskManger(s)", 
numTaskManagers);
@@ -508,6 +508,7 @@ public class MiniCluster {
        protected ResourceManagerRunner[] startResourceManagers(
                        Configuration configuration,
                        HighAvailabilityServices haServices,
+                       HeartbeatServices heartbeatServices,
                        MetricRegistry metricRegistry,
                        int numResourceManagers,
                        RpcService[] resourceManagerRpcServices) throws 
Exception {
@@ -517,9 +518,11 @@ public class MiniCluster {
                for (int i = 0; i < numResourceManagers; i++) {
 
                        resourceManagerRunners[i] = new ResourceManagerRunner(
+                               ResourceID.generate(),
                                configuration,
                                resourceManagerRpcServices[i],
                                haServices,
+                               heartbeatServices,
                                metricRegistry);
 
                        resourceManagerRunners[i].start();

http://git-wip-us.apache.org/repos/asf/flink/blob/fd90672f/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegistrationConnectionListener.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegistrationConnectionListener.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegistrationConnectionListener.java
new file mode 100644
index 0000000..360f982
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegistrationConnectionListener.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.registration;
+
+/**
+ * Classes which want to be notified about the registration result by the 
{@link RegisteredRpcConnection}
+ * have to implement this interface.
+ */
+public interface RegistrationConnectionListener<Success extends 
RegistrationResponse.Success> {
+
+       /**
+        * This method is called by the {@link RegisteredRpcConnection} when 
the registration is success.
+        *
+        * @param success The concrete response information for successful 
registration.
+        */
+       void onRegistrationSuccess(Success success);
+
+       /**
+        * This method is called by the {@link RegisteredRpcConnection} when 
the registration fails.
+        *
+        * @param failure The exception which causes the registration failure.
+        */
+       void onRegistrationFailure(Throwable failure);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fd90672f/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 1430a49..9a7a790 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -30,6 +30,10 @@ import org.apache.flink.runtime.concurrent.ApplyFunction;
 import org.apache.flink.runtime.concurrent.BiFunction;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.heartbeat.HeartbeatListener;
+import org.apache.flink.runtime.heartbeat.HeartbeatManager;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.heartbeat.HeartbeatTarget;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.LeaderIdMismatchException;
 import org.apache.flink.runtime.instance.InstanceID;
@@ -64,6 +68,7 @@ import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executor;
+import java.util.concurrent.TimeoutException;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -81,6 +86,9 @@ public abstract class ResourceManager<WorkerType extends 
Serializable>
                extends RpcEndpoint<ResourceManagerGateway>
                implements LeaderContender {
 
+       /** Unique id of the resource manager */
+       private final ResourceID resourceId;
+
        /** Configuration of the resource manager */
        private final ResourceManagerConfiguration resourceManagerConfiguration;
 
@@ -96,6 +104,9 @@ public abstract class ResourceManager<WorkerType extends 
Serializable>
        /** High availability services for leader retrieval and election. */
        private final HighAvailabilityServices highAvailabilityServices;
 
+       /** The heartbeat manager with task managers. */
+       private final HeartbeatManager<Void, Void> taskManagerHeartbeatManager;
+
        /** The factory to construct the SlotManager. */
        private final SlotManagerFactory slotManagerFactory;
 
@@ -118,9 +129,11 @@ public abstract class ResourceManager<WorkerType extends 
Serializable>
        private ConcurrentMap<String, InfoMessageListenerRpcGateway> 
infoMessageListeners;
 
        public ResourceManager(
+                       ResourceID resourceId,
                        RpcService rpcService,
                        ResourceManagerConfiguration 
resourceManagerConfiguration,
                        HighAvailabilityServices highAvailabilityServices,
+                       HeartbeatServices heartbeatServices,
                        SlotManagerFactory slotManagerFactory,
                        MetricRegistry metricRegistry,
                        JobLeaderIdService jobLeaderIdService,
@@ -128,6 +141,7 @@ public abstract class ResourceManager<WorkerType extends 
Serializable>
 
                super(rpcService);
 
+               this.resourceId = checkNotNull(resourceId);
                this.resourceManagerConfiguration = 
checkNotNull(resourceManagerConfiguration);
                this.highAvailabilityServices = 
checkNotNull(highAvailabilityServices);
                this.slotManagerFactory = checkNotNull(slotManagerFactory);
@@ -135,6 +149,12 @@ public abstract class ResourceManager<WorkerType extends 
Serializable>
                this.jobLeaderIdService = checkNotNull(jobLeaderIdService);
                this.fatalErrorHandler = checkNotNull(fatalErrorHandler);
 
+               this.taskManagerHeartbeatManager = 
heartbeatServices.createHeartbeatManagerSender(
+                               resourceId,
+                               new TaskManagerHeartbeatListener(),
+                               rpcService.getScheduledExecutor(),
+                               log);
+
                this.jobManagerRegistrations = new HashMap<>(4);
                this.taskExecutors = new HashMap<>(8);
                this.leaderSessionId = null;
@@ -178,6 +198,8 @@ public abstract class ResourceManager<WorkerType extends 
Serializable>
        public void shutDown() throws Exception {
                Exception exception = null;
 
+               taskManagerHeartbeatManager.stop();
+
                try {
                        super.shutDown();
                } catch (Exception e) {
@@ -326,7 +348,7 @@ public abstract class ResourceManager<WorkerType extends 
Serializable>
         *
         * @param resourceManagerLeaderId  The fencing token for the 
ResourceManager leader
         * @param taskExecutorAddress      The address of the TaskExecutor that 
registers
-        * @param resourceID               The resource ID of the TaskExecutor 
that registers
+        * @param taskExecutorResourceId  The resource ID of the TaskExecutor 
that registers
         *
         * @return The response by the ResourceManager.
         */
@@ -334,7 +356,7 @@ public abstract class ResourceManager<WorkerType extends 
Serializable>
        public Future<RegistrationResponse> registerTaskExecutor(
                final UUID resourceManagerLeaderId,
                final String taskExecutorAddress,
-               final ResourceID resourceID,
+               final ResourceID taskExecutorResourceId,
                final SlotReport slotReport) {
 
                if (leaderSessionId.equals(resourceManagerLeaderId)) {
@@ -342,25 +364,37 @@ public abstract class ResourceManager<WorkerType extends 
Serializable>
 
                        return taskExecutorGatewayFuture.handleAsync(new 
BiFunction<TaskExecutorGateway, Throwable, RegistrationResponse>() {
                                @Override
-                               public RegistrationResponse 
apply(TaskExecutorGateway taskExecutorGateway, Throwable throwable) {
+                               public RegistrationResponse apply(final 
TaskExecutorGateway taskExecutorGateway, Throwable throwable) {
                                        if (throwable != null) {
                                                return new 
RegistrationResponse.Decline(throwable.getMessage());
                                        } else {
-                                               WorkerRegistration<WorkerType> 
oldRegistration = taskExecutors.remove(resourceID);
+                                               WorkerRegistration<WorkerType> 
oldRegistration = taskExecutors.remove(taskExecutorResourceId);
                                                if (oldRegistration != null) {
                                                        // TODO :: suggest old 
taskExecutor to stop itself
-                                                       log.info("Replacing old 
instance of worker for ResourceID {}", resourceID);
+                                                       log.info("Replacing old 
instance of worker for ResourceID {}", taskExecutorResourceId);
                                                }
 
-                                               WorkerType newWorker = 
workerStarted(resourceID);
+                                               WorkerType newWorker = 
workerStarted(taskExecutorResourceId);
                                                WorkerRegistration<WorkerType> 
registration =
                                                        new 
WorkerRegistration<>(taskExecutorGateway, newWorker);
 
-                                               taskExecutors.put(resourceID, 
registration);
-                                               
slotManager.registerTaskExecutor(resourceID, registration, slotReport);
+                                               
taskExecutors.put(taskExecutorResourceId, registration);
+                                               
slotManager.registerTaskExecutor(taskExecutorResourceId, registration, 
slotReport);
+
+                                               
taskManagerHeartbeatManager.monitorTarget(taskExecutorResourceId, new 
HeartbeatTarget<Void>() {
+                                                       @Override
+                                                       public void 
receiveHeartbeat(ResourceID resourceID, Void payload) {
+                                                               // the task 
manager will not request heartbeat, so this method will never be called 
currently
+                                                       }
+
+                                                       @Override
+                                                       public void 
requestHeartbeat(ResourceID resourceID, Void payload) {
+                                                               
taskExecutorGateway.heartbeatFromResourceManager(resourceID);
+                                                       }
+                                               });
 
                                                return new 
TaskExecutorRegistrationSuccess(
-                                                       
registration.getInstanceID(),
+                                                       
registration.getInstanceID(), resourceId,
                                                        
resourceManagerConfiguration.getHeartbeatInterval().toMilliseconds());
                                        }
                                }
@@ -368,7 +402,7 @@ public abstract class ResourceManager<WorkerType extends 
Serializable>
                } else {
                        log.warn("Discard registration from TaskExecutor {} at 
({}) because the expected leader session ID {} did " +
                                        "not equal the received leader session 
ID  {}",
-                               resourceID, taskExecutorAddress, 
leaderSessionId, resourceManagerLeaderId);
+                               taskExecutorResourceId, taskExecutorAddress, 
leaderSessionId, resourceManagerLeaderId);
 
                        return 
FlinkCompletableFuture.<RegistrationResponse>completed(
                                new RegistrationResponse.Decline("Discard 
registration because the leader id " +
@@ -377,6 +411,16 @@ public abstract class ResourceManager<WorkerType extends 
Serializable>
                }
        }
 
+       @RpcMethod
+       public void heartbeatFromTaskManager(final ResourceID resourceID) {
+               taskManagerHeartbeatManager.receiveHeartbeat(resourceID, null);
+       }
+
+       @RpcMethod
+       public void disconnectTaskManager(final ResourceID resourceId, final 
Exception cause) {
+               closeTaskManagerConnection(resourceId, cause);
+       }
+
        /**
         * Requests a slot from the resource manager.
         *
@@ -716,24 +760,24 @@ public abstract class ResourceManager<WorkerType extends 
Serializable>
         * This method should be called by the framework once it detects that a 
currently registered
         * task executor has failed.
         *
-        * @param resourceID Id of the worker that has failed.
-        * @param message An informational message that explains why the worker 
failed.
+        * @param resourceID Id of the TaskManager that has failed.
+        * @param cause The exception which cause the TaskManager failed.
         */
-       public void notifyWorkerFailed(final ResourceID resourceID, final 
String message) {
-               runAsync(new Runnable() {
-                       @Override
-                       public void run() {
-                               WorkerRegistration<WorkerType> 
workerRegistration = taskExecutors.remove(resourceID);
+       public void closeTaskManagerConnection(final ResourceID resourceID, 
final Exception cause) {
+               taskManagerHeartbeatManager.unmonitorTarget(resourceID);
 
-                               if (workerRegistration != null) {
-                                       log.info("Task manager {} failed 
because {}.", resourceID, message);
-                                       // TODO :: suggest failed task executor 
to stop itself
-                                       
slotManager.notifyTaskManagerFailure(resourceID);
-                               } else {
-                                       log.debug("Could not find a registered 
task manager with the process id {}.", resourceID);
-                               }
-                       }
-               });
+               WorkerRegistration<WorkerType> workerRegistration = 
taskExecutors.remove(resourceID);
+
+               if (workerRegistration != null) {
+                       log.info("Task manager {} failed because {}.", 
resourceID, cause);
+
+                       // TODO :: suggest failed task executor to stop itself
+                       slotManager.notifyTaskManagerFailure(resourceID);
+
+                       
workerRegistration.getTaskExecutorGateway().disconnectResourceManager(cause);
+               } else {
+                       log.debug("Could not find a registered task manager 
with the process id {}.", resourceID);
+               }
        }
 
        // 
------------------------------------------------------------------------
@@ -827,5 +871,26 @@ public abstract class ResourceManager<WorkerType extends 
Serializable>
                        onFatalErrorAsync(error);
                }
        }
+
+       private class TaskManagerHeartbeatListener implements 
HeartbeatListener<Void, Void> {
+
+               @Override
+               public void notifyHeartbeatTimeout(ResourceID resourceID) {
+                       log.info("The heartbeat of TaskManager with id {} timed 
out.", resourceID);
+
+                       closeTaskManagerConnection(resourceID, new 
TimeoutException(
+                                       "Task manager with id " + resourceID + 
" heartbeat timed out."));
+               }
+
+               @Override
+               public void reportPayload(ResourceID resourceID, Void payload) {
+                       // nothing to do since there is no payload
+               }
+
+               @Override
+               public Future<Void> retrievePayload() {
+                       return FlinkCompletableFuture.completed(null);
+               }
+       }
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fd90672f/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
index 8235ea7..7741e0d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
@@ -130,4 +130,20 @@ public interface ResourceManagerGateway extends RpcGateway 
{
         * @return The future to the number of registered TaskManagers.
         */
        Future<Integer> getNumberOfRegisteredTaskManagers(UUID leaderSessionId);
+
+       /**
+        * Sends the heartbeat to resource manager from task manager
+        *
+        * @param resourceID unique id of the task manager
+        */
+       void heartbeatFromTaskManager(final ResourceID resourceID);
+
+       /**
+        * Disconnects the given {@link 
org.apache.flink.runtime.taskexecutor.TaskExecutor} from the
+        * {@link ResourceManager}.
+        *
+        * @param resourceID identifying the TaskManager to disconnect
+        * @param cause for the disconnection of the TaskManager
+        */
+       void disconnectTaskManager(ResourceID resourceID, Exception cause);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fd90672f/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
index 73b27b5..d07e373 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
@@ -19,6 +19,8 @@
 package org.apache.flink.runtime.resourcemanager;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
@@ -43,14 +45,18 @@ public class ResourceManagerRunner implements 
FatalErrorHandler {
        private final ResourceManager<?> resourceManager;
 
        public ResourceManagerRunner(
+                       final ResourceID resourceId,
                        final Configuration configuration,
                        final RpcService rpcService,
                        final HighAvailabilityServices highAvailabilityServices,
+                       final HeartbeatServices heartbeatServices,
                        final MetricRegistry metricRegistry) throws Exception {
 
+               Preconditions.checkNotNull(resourceId);
                Preconditions.checkNotNull(configuration);
                Preconditions.checkNotNull(rpcService);
                Preconditions.checkNotNull(highAvailabilityServices);
+               Preconditions.checkNotNull(heartbeatServices);
                Preconditions.checkNotNull(metricRegistry);
 
                final ResourceManagerConfiguration resourceManagerConfiguration 
= ResourceManagerConfiguration.fromConfiguration(configuration);
@@ -63,9 +69,11 @@ public class ResourceManagerRunner implements 
FatalErrorHandler {
                        rpcService.getScheduledExecutor());
 
                this.resourceManager = new StandaloneResourceManager(
+                       resourceId,
                        rpcService,
                        resourceManagerConfiguration,
                        highAvailabilityServices,
+                       heartbeatServices,
                        resourceManagerRuntimeServices.getSlotManagerFactory(),
                        metricRegistry,
                        resourceManagerRuntimeServices.getJobLeaderIdService(),
@@ -87,7 +95,6 @@ public class ResourceManagerRunner implements 
FatalErrorHandler {
        private void shutDownInternally() throws Exception {
                Exception exception = null;
                synchronized (lock) {
-
                        try {
                                resourceManager.shutDown();
                        } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/fd90672f/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
index 73c8a2d..e2d6538 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.resourcemanager;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import 
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
@@ -37,17 +38,21 @@ import org.apache.flink.runtime.rpc.RpcService;
 public class StandaloneResourceManager extends ResourceManager<ResourceID> {
 
        public StandaloneResourceManager(
+                       ResourceID resourceId,
                        RpcService rpcService,
                        ResourceManagerConfiguration 
resourceManagerConfiguration,
                        HighAvailabilityServices highAvailabilityServices,
+                       HeartbeatServices heartbeatServices,
                        SlotManagerFactory slotManagerFactory,
                        MetricRegistry metricRegistry,
                        JobLeaderIdService jobLeaderIdService,
                        FatalErrorHandler fatalErrorHandler) {
                super(
+                       resourceId,
                        rpcService,
                        resourceManagerConfiguration,
                        highAvailabilityServices,
+                       heartbeatServices,
                        slotManagerFactory,
                        metricRegistry,
                        jobLeaderIdService,

http://git-wip-us.apache.org/repos/asf/flink/blob/fd90672f/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 83c225f..f3e1ff3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -57,6 +57,7 @@ import 
org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.registration.RegistrationConnectionListener;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
@@ -135,6 +136,9 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
        /** The heartbeat manager for job manager in the task manager */
        private final HeartbeatManager<Void, Void> jobManagerHeartbeatManager;
 
+       /** The heartbeat manager for resource manager in the task manager */
+       private final HeartbeatManager<Void, Void> 
resourceManagerHeartbeatManager;
+
        /** The fatal error handler to use in case of a fatal error */
        private final FatalErrorHandler fatalErrorHandler;
 
@@ -206,6 +210,12 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
                        new JobManagerHeartbeatListener(),
                        rpcService.getScheduledExecutor(),
                        log);
+
+               this.resourceManagerHeartbeatManager = 
heartbeatServices.createHeartbeatManager(
+                               getResourceID(),
+                               new ResourceManagerHeartbeatListener(),
+                               rpcService.getScheduledExecutor(),
+                               log);
        }
 
        // 
------------------------------------------------------------------------
@@ -247,6 +257,8 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
 
                jobManagerHeartbeatManager.stop();
 
+               resourceManagerHeartbeatManager.stop();
+
                ioManager.shutdown();
 
                memoryManager.shutdown();
@@ -497,6 +509,11 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
                jobManagerHeartbeatManager.requestHeartbeat(resourceID, null);
        }
 
+       @RpcMethod
+       public void heartbeatFromResourceManager(ResourceID resourceID) {
+               resourceManagerHeartbeatManager.requestHeartbeat(resourceID, 
null);
+       }
+
        // 
----------------------------------------------------------------------
        // Checkpointing RPCs
        // 
----------------------------------------------------------------------
@@ -619,11 +636,20 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
                return new 
TMSlotRequestRegistered(resourceManagerConnection.getRegistrationId(), 
getResourceID(), allocationId);
        }
 
+       // 
----------------------------------------------------------------------
+       // Disconnection RPCs
+       // 
----------------------------------------------------------------------
+
        @RpcMethod
        public void disconnectJobManager(JobID jobId, Exception cause) {
                closeJobManagerConnection(jobId, cause);
        }
 
+       @RpcMethod
+       public void disconnectResourceManager(Exception cause) {
+               closeResourceManagerConnection(cause);
+       }
+
        // 
======================================================================
        //  Internal methods
        // 
======================================================================
@@ -665,11 +691,25 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
                                        newLeaderAddress,
                                        newLeaderId,
                                        getMainThreadExecutor(),
-                                       new ForwardingFatalErrorHandler());
+                                       new 
ResourceManagerRegistrationListener());
                        resourceManagerConnection.start();
                }
        }
 
+       private void closeResourceManagerConnection(Exception cause) {
+               log.info("Close ResourceManager connection for {}.", cause);
+
+               if (isConnectedToResourceManager()) {
+                       
resourceManagerHeartbeatManager.unmonitorTarget(resourceManagerConnection.getResourceManagerId());
+
+                       ResourceManagerGateway resourceManagerGateway = 
resourceManagerConnection.getTargetGateway();
+                       resourceManagerConnection.close();
+                       resourceManagerConnection = null;
+
+                       
resourceManagerGateway.disconnectTaskManager(getResourceID(), cause);
+               }
+       }
+
        // 
------------------------------------------------------------------------
        //  Internal job manager connection methods
        // 
------------------------------------------------------------------------
@@ -747,10 +787,10 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
                                                        
offerSlotsToJobManager(jobId);
                                                } else {
                                                        log.warn("Slot offering 
to JobManager failed. Freeing the slots " +
-                                                               "and returning 
them to the ResourceManager.", throwable);
+                                                                       "and 
returning them to the ResourceManager.", throwable);
 
                                                        // We encountered an 
exception. Free the slots and return them to the RM.
-                                                       for (SlotOffer 
reservedSlot: reservedSlots) {
+                                                       for (SlotOffer 
reservedSlot : reservedSlots) {
                                                                
freeSlot(reservedSlot.getAllocationId(), throwable);
                                                        }
                                                }
@@ -1137,11 +1177,32 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
                }
        }
 
-       private final class ForwardingFatalErrorHandler implements 
FatalErrorHandler {
+       private final class ResourceManagerRegistrationListener implements 
RegistrationConnectionListener<TaskExecutorRegistrationSuccess> {
 
                @Override
-               public void onFatalError(Throwable exception) {
-                       onFatalErrorAsync(exception);
+               public void 
onRegistrationSuccess(TaskExecutorRegistrationSuccess success) {
+                       final ResourceID resourceManagerId = 
success.getResourceManagerId();
+
+                       // monitor the resource manager as heartbeat target
+                       
resourceManagerHeartbeatManager.monitorTarget(resourceManagerId, new 
HeartbeatTarget<Void>() {
+                               @Override
+                               public void receiveHeartbeat(ResourceID 
resourceID, Void payload) {
+                                       if (isConnectedToResourceManager()) {
+                                               ResourceManagerGateway 
resourceManagerGateway = resourceManagerConnection.getTargetGateway();
+                                               
resourceManagerGateway.heartbeatFromTaskManager(resourceID);
+                                       }
+                               }
+
+                               @Override
+                               public void requestHeartbeat(ResourceID 
resourceID, Void payload) {
+                                       // request heartbeat will never be 
called on the task manager side
+                               }
+                       });
+               }
+
+               @Override
+               public void onRegistrationFailure(Throwable failure) {
+                       onFatalErrorAsync(failure);
                }
        }
 
@@ -1216,15 +1277,14 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
                        runAsync(new Runnable() {
                                @Override
                                public void run() {
-                                       log.info("The JobManager connection {} 
has timed out.", resourceID);
+                                       log.info("Job manager with id {} 
heartbeat timed out.", resourceID);
 
                                        if 
(jobManagerConnections.containsKey(resourceID)) {
                                                JobManagerConnection 
jobManagerConnection = jobManagerConnections.get(resourceID);
                                                if (jobManagerConnection != 
null) {
                                                        
closeJobManagerConnection(
                                                                
jobManagerConnection.getJobID(),
-                                                               new 
TimeoutException("The heartbeat of JobManager with id " +
-                                                                       
resourceID + " timed out."));
+                                                               new 
TimeoutException("Job manager with id " + resourceID + " heartbeat timed 
out."));
                                                }
                                        }
                                }
@@ -1241,4 +1301,32 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
                        return FlinkCompletableFuture.completed(null);
                }
        }
+
+       private class ResourceManagerHeartbeatListener implements 
HeartbeatListener<Void, Void> {
+
+               @Override
+               public void notifyHeartbeatTimeout(final ResourceID resourceID) 
{
+                       runAsync(new Runnable() {
+                               @Override
+                               public void run() {
+                                       log.info("Resource manager with id {} 
heartbeat timed out.", resourceID);
+
+                                       if (isConnectedToResourceManager() && 
resourceManagerConnection.getResourceManagerId().equals(resourceID)) {
+                                               closeResourceManagerConnection(
+                                                               new 
TimeoutException("Resource manager with id " + resourceID + " heartbeat timed 
out."));
+                                       }
+                               }
+                       });
+               }
+
+               @Override
+               public void reportPayload(ResourceID resourceID, Void payload) {
+                       // nothing to do since the payload is of type Void
+               }
+
+               @Override
+               public Future<Void> retrievePayload() {
+                       return FlinkCompletableFuture.completed(null);
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fd90672f/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
index 2dcc3a4..2bbf0e6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
@@ -141,10 +141,24 @@ public interface TaskExecutorGateway extends RpcGateway {
        void heartbeatFromJobManager(ResourceID heartbeatOrigin);
 
        /**
+        * Heartbeat request from the resource manager
+        *
+        * @param heartbeatOrigin unique id of the resource manager
+        */
+       void heartbeatFromResourceManager(ResourceID heartbeatOrigin);
+
+       /**
         * Disconnects the given JobManager from the TaskManager.
         *
         * @param jobId JobID for which the JobManager was the leader
         * @param cause for the disconnection from the JobManager
         */
        void disconnectJobManager(JobID jobId, Exception cause);
+
+       /**
+        * Disconnects the ResourceManager from the TaskManager.
+        *
+        * @param cause for the disconnection from the ResourceManager
+        */
+       void disconnectResourceManager(Exception cause);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fd90672f/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorRegistrationSuccess.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorRegistrationSuccess.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorRegistrationSuccess.java
index b357f52..4b61f68 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorRegistrationSuccess.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorRegistrationSuccess.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.taskexecutor;
 
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.registration.RegistrationResponse;
 
@@ -33,16 +34,20 @@ public final class TaskExecutorRegistrationSuccess extends 
RegistrationResponse.
 
        private final InstanceID registrationId;
 
+       private final ResourceID resourceManagerResourceId;
+
        private final long heartbeatInterval;
 
        /**
         * Create a new {@code TaskExecutorRegistrationSuccess} message.
         * 
         * @param registrationId     The ID that the ResourceManager assigned 
the registration.
+        * @param resourceManagerResourceId The unique ID that identifies the 
ResourceManager.
         * @param heartbeatInterval  The interval in which the ResourceManager 
will heartbeat the TaskExecutor.
         */
-       public TaskExecutorRegistrationSuccess(InstanceID registrationId, long 
heartbeatInterval) {
+       public TaskExecutorRegistrationSuccess(InstanceID registrationId, 
ResourceID resourceManagerResourceId, long heartbeatInterval) {
                this.registrationId = registrationId;
+               this.resourceManagerResourceId = resourceManagerResourceId;
                this.heartbeatInterval = heartbeatInterval;
        }
 
@@ -54,6 +59,13 @@ public final class TaskExecutorRegistrationSuccess extends 
RegistrationResponse.
        }
 
        /**
+        * Gets the unique ID that identifies the ResourceManager.
+        */
+       public ResourceID getResourceManagerId() {
+               return resourceManagerResourceId;
+       }
+
+       /**
         * Gets the interval in which the ResourceManager will heartbeat the 
TaskExecutor.
         */
        public long getHeartbeatInterval() {
@@ -62,7 +74,7 @@ public final class TaskExecutorRegistrationSuccess extends 
RegistrationResponse.
 
        @Override
        public String toString() {
-               return "TaskExecutorRegistrationSuccess (" + registrationId + " 
/ " + heartbeatInterval + ')';
+               return "TaskExecutorRegistrationSuccess (" + registrationId + " 
/ " + resourceManagerResourceId + " / " + heartbeatInterval + ')';
        }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fd90672f/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
index 6e3e39b..775482c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
@@ -22,7 +22,7 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.registration.RegisteredRpcConnection;
-import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.registration.RegistrationConnectionListener;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.registration.RetryingRegistration;
@@ -51,10 +51,12 @@ public class TaskExecutorToResourceManagerConnection
 
        private final SlotReport slotReport;
 
-       private final FatalErrorHandler fatalErrorHandler;
+       private final 
RegistrationConnectionListener<TaskExecutorRegistrationSuccess> 
registrationListener;
 
        private InstanceID registrationId;
 
+       private ResourceID resourceManagerResourceId;
+
        public TaskExecutorToResourceManagerConnection(
                        Logger log,
                        RpcService rpcService,
@@ -64,7 +66,7 @@ public class TaskExecutorToResourceManagerConnection
                        String resourceManagerAddress,
                        UUID resourceManagerLeaderId,
                        Executor executor,
-                       FatalErrorHandler fatalErrorHandler) {
+                       
RegistrationConnectionListener<TaskExecutorRegistrationSuccess> 
registrationListener) {
 
                super(log, resourceManagerAddress, resourceManagerLeaderId, 
executor);
 
@@ -72,7 +74,7 @@ public class TaskExecutorToResourceManagerConnection
                this.taskManagerAddress = 
Preconditions.checkNotNull(taskManagerAddress);
                this.taskManagerResourceId = 
Preconditions.checkNotNull(taskManagerResourceId);
                this.slotReport = Preconditions.checkNotNull(slotReport);
-               this.fatalErrorHandler = 
Preconditions.checkNotNull(fatalErrorHandler);
+               this.registrationListener = 
Preconditions.checkNotNull(registrationListener);
        }
 
 
@@ -94,13 +96,15 @@ public class TaskExecutorToResourceManagerConnection
                        getTargetAddress(), success.getRegistrationId());
 
                registrationId = success.getRegistrationId();
+               resourceManagerResourceId = success.getResourceManagerId();
+               registrationListener.onRegistrationSuccess(success);
        }
 
        @Override
        protected void onRegistrationFailure(Throwable failure) {
                log.info("Failed to register at resource manager {}.", 
getTargetAddress(), failure);
 
-               fatalErrorHandler.onFatalError(failure);
+               registrationListener.onRegistrationFailure(failure);
        }
 
        /**
@@ -111,6 +115,13 @@ public class TaskExecutorToResourceManagerConnection
                return registrationId;
        }
 
+       /**
+        * Gets the unique id of ResourceManager, that is returned when 
registration success.
+        */
+       public ResourceID getResourceManagerId() {
+               return resourceManagerResourceId;
+       }
+
        // 
------------------------------------------------------------------------
        //  Utilities
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/fd90672f/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
index ca8a07a..e7f2439 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.clusterframework;
 
 import akka.actor.ActorSystem;
 import akka.testkit.JavaTestKit;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
@@ -30,21 +31,47 @@ import 
org.apache.flink.runtime.clusterframework.messages.RemoveResource;
 import org.apache.flink.runtime.clusterframework.messages.ResourceRemoved;
 import 
org.apache.flink.runtime.clusterframework.messages.TriggerRegistrationAtJobManager;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
+import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager;
+import org.apache.flink.runtime.resourcemanager.TestingSlotManagerFactory;
+import org.apache.flink.runtime.rpc.TestingSerialRpcService;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testutils.TestingResourceManager;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.mockito.ArgumentCaptor;
 import scala.Option;
 
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import static org.junit.Assert.*;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 
 /**
  * General tests for the resource manager component.
@@ -335,4 +362,85 @@ public class ResourceManagerTest {
                }};
                }};
        }
+
+       @Test
+       public void testHeartbeatTimeoutWithTaskExecutor() throws Exception {
+               final String taskManagerAddress = "tm";
+               final ResourceID taskManagerResourceID = new 
ResourceID(taskManagerAddress);
+               final ResourceID resourceManagerResourceID = 
ResourceID.generate();
+               final TaskExecutorGateway taskExecutorGateway = 
mock(TaskExecutorGateway.class);
+
+               final TestingSerialRpcService rpcService = new 
TestingSerialRpcService();
+               rpcService.registerGateway(taskManagerAddress, 
taskExecutorGateway);
+
+               final ResourceManagerConfiguration resourceManagerConfiguration 
= new ResourceManagerConfiguration(
+                       Time.seconds(5L),
+                       Time.seconds(5L));
+
+               final TestingLeaderElectionService rmLeaderElectionService = 
new TestingLeaderElectionService();
+               final TestingHighAvailabilityServices highAvailabilityServices 
= new TestingHighAvailabilityServices();
+               
highAvailabilityServices.setResourceManagerLeaderElectionService(rmLeaderElectionService);
+
+               final long heartbeatInterval = 1L;
+               final long heartbeatTimeout = 5L;
+               final ScheduledExecutor scheduledExecutor = 
mock(ScheduledExecutor.class);
+               final HeartbeatServices heartbeatServices = new 
TestingHeartbeatServices(heartbeatInterval, heartbeatTimeout, 
scheduledExecutor);
+
+               final TestingSlotManagerFactory slotManagerFactory = new 
TestingSlotManagerFactory();
+               final MetricRegistry metricRegistry = 
mock(MetricRegistry.class);
+               final JobLeaderIdService jobLeaderIdService = 
mock(JobLeaderIdService.class);
+               final TestingFatalErrorHandler testingFatalErrorHandler = new 
TestingFatalErrorHandler();
+
+               try {
+                       final StandaloneResourceManager resourceManager = new 
StandaloneResourceManager(
+                               resourceManagerResourceID,
+                               rpcService,
+                               resourceManagerConfiguration,
+                               highAvailabilityServices,
+                               heartbeatServices,
+                               slotManagerFactory,
+                               metricRegistry,
+                               jobLeaderIdService,
+                               testingFatalErrorHandler);
+
+                       resourceManager.start();
+
+                       final UUID rmLeaderSessionId = UUID.randomUUID();
+                       rmLeaderElectionService.isLeader(rmLeaderSessionId);
+
+                       final SlotReport slotReport = new SlotReport();
+                       // test registration response successful and it will 
trigger monitor heartbeat target, schedule heartbeat request at interval time
+                       Future<RegistrationResponse> successfulFuture =
+                                       
resourceManager.registerTaskExecutor(rmLeaderSessionId, taskManagerAddress, 
taskManagerResourceID, slotReport);
+                       RegistrationResponse response = successfulFuture.get(5, 
TimeUnit.SECONDS);
+                       assertTrue(response instanceof 
TaskExecutorRegistrationSuccess);
+
+                       ArgumentCaptor<Runnable> heartbeatRunnableCaptor = 
ArgumentCaptor.forClass(Runnable.class);
+                       verify(scheduledExecutor, times(1)).scheduleAtFixedRate(
+                               heartbeatRunnableCaptor.capture(),
+                               eq(0L),
+                               eq(heartbeatInterval),
+                               eq(TimeUnit.MILLISECONDS));
+
+                       Runnable heartbeatRunnable = 
heartbeatRunnableCaptor.getValue();
+
+                       ArgumentCaptor<Runnable> timeoutRunnableCaptor = 
ArgumentCaptor.forClass(Runnable.class);
+                       
verify(scheduledExecutor).schedule(timeoutRunnableCaptor.capture(), 
eq(heartbeatTimeout), eq(TimeUnit.MILLISECONDS));
+
+                       Runnable timeoutRunnable = 
timeoutRunnableCaptor.getValue();
+
+                       // run the first heartbeat request
+                       heartbeatRunnable.run();
+
+                       verify(taskExecutorGateway, 
times(1)).heartbeatFromResourceManager(eq(resourceManagerResourceID));
+
+                       // run the timeout runnable to simulate a heartbeat 
timeout
+                       timeoutRunnable.run();
+
+                       
verify(taskExecutorGateway).disconnectResourceManager(any(TimeoutException.class));
+
+               } finally {
+                       rpcService.stopService();
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fd90672f/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index 43536b6..73da244 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -26,10 +26,7 @@ import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
 import 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoader;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
-import org.apache.flink.runtime.heartbeat.HeartbeatListener;
-import org.apache.flink.runtime.heartbeat.HeartbeatManager;
-import org.apache.flink.runtime.heartbeat.HeartbeatManagerSenderImpl;
-import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.heartbeat.*;
 import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmanager.OnCompletionActions;
@@ -38,14 +35,12 @@ import org.apache.flink.runtime.rpc.TestingSerialRpcService;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
-import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.ArgumentCaptor;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
-import org.slf4j.Logger;
 
 import java.net.InetAddress;
 import java.net.URL;
@@ -108,10 +103,9 @@ public class JobMasterTest extends TestLogger {
                                testingFatalErrorHandler,
                                new FlinkUserCodeClassLoader(new URL[0]));
 
-                       // also start the heartbeat manager in job manager
                        jobMaster.start(jmLeaderId);
 
-                       // register task manager will trigger monitoring 
heartbeat target, schedule heartbeat request in interval time
+                       // register task manager will trigger monitor heartbeat 
target, schedule heartbeat request at interval time
                        jobMaster.registerTaskManager(taskManagerAddress, 
taskManagerLocation, jmLeaderId);
 
                        ArgumentCaptor<Runnable> heartbeatRunnableCaptor = 
ArgumentCaptor.forClass(Runnable.class);
@@ -145,32 +139,4 @@ public class JobMasterTest extends TestLogger {
                        rpc.stopService();
                }
        }
-
-       private static class TestingHeartbeatServices extends HeartbeatServices 
{
-
-               private final ScheduledExecutor scheduledExecutorToUse;
-
-               public TestingHeartbeatServices(long heartbeatInterval, long 
heartbeatTimeout, ScheduledExecutor scheduledExecutorToUse) {
-                       super(heartbeatInterval, heartbeatTimeout);
-
-                       this.scheduledExecutorToUse = 
Preconditions.checkNotNull(scheduledExecutorToUse);
-               }
-
-               @Override
-               public <I, O> HeartbeatManager<I, O> 
createHeartbeatManagerSender(
-                       ResourceID resourceId,
-                       HeartbeatListener<I, O> heartbeatListener,
-                       ScheduledExecutor scheduledExecutor,
-                       Logger log) {
-
-                       return new HeartbeatManagerSenderImpl<>(
-                               heartbeatInterval,
-                               heartbeatTimeout,
-                               resourceId,
-                               heartbeatListener,
-                               
org.apache.flink.runtime.concurrent.Executors.directExecutor(),
-                               scheduledExecutorToUse,
-                               log);
-               }
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fd90672f/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
index 1aa799b..39594df 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
@@ -19,6 +19,8 @@
 package org.apache.flink.runtime.resourcemanager;
 
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
 import org.apache.flink.runtime.metrics.MetricRegistry;
@@ -40,12 +42,15 @@ public class ResourceManagerHATest {
 
        @Test
        public void testGrantAndRevokeLeadership() throws Exception {
+               ResourceID rmResourceId = ResourceID.generate();
                RpcService rpcService = new TestingSerialRpcService();
 
                TestingLeaderElectionService leaderElectionService = new 
TestingLeaderElectionService();
                TestingHighAvailabilityServices highAvailabilityServices = new 
TestingHighAvailabilityServices();
                
highAvailabilityServices.setResourceManagerLeaderElectionService(leaderElectionService);
 
+               HeartbeatServices heartbeatServices = 
mock(HeartbeatServices.class);
+
                ResourceManagerConfiguration resourceManagerConfiguration = new 
ResourceManagerConfiguration(
                        Time.seconds(5L),
                        Time.seconds(5L));
@@ -63,9 +68,11 @@ public class ResourceManagerHATest {
 
                final ResourceManager resourceManager =
                        new StandaloneResourceManager(
+                               rmResourceId,
                                rpcService,
                                resourceManagerConfiguration,
                                highAvailabilityServices,
+                               heartbeatServices,
                                slotManagerFactory,
                                metricRegistry,
                                
resourceManagerRuntimeServices.getJobLeaderIdService(),

http://git-wip-us.apache.org/repos/asf/flink/blob/fd90672f/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
index 9a68eca..0401f9e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
@@ -20,7 +20,9 @@ package org.apache.flink.runtime.resourcemanager;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.jobmaster.JobMasterGateway;
@@ -196,10 +198,13 @@ public class ResourceManagerJobMasterTest {
                        JobID jobID,
                        TestingLeaderRetrievalService 
jobMasterLeaderRetrievalService,
                        FatalErrorHandler fatalErrorHandler) throws Exception {
+               ResourceID rmResourceId = ResourceID.generate();
                TestingHighAvailabilityServices highAvailabilityServices = new 
TestingHighAvailabilityServices();
                
highAvailabilityServices.setResourceManagerLeaderElectionService(resourceManagerLeaderElectionService);
                highAvailabilityServices.setJobMasterLeaderRetriever(jobID, 
jobMasterLeaderRetrievalService);
 
+               HeartbeatServices heartbeatServices = 
mock(HeartbeatServices.class);
+
                ResourceManagerConfiguration resourceManagerConfiguration = new 
ResourceManagerConfiguration(
                        Time.seconds(5L),
                        Time.seconds(5L));
@@ -211,9 +216,11 @@ public class ResourceManagerJobMasterTest {
                        Time.minutes(5L));
 
                ResourceManager resourceManager = new StandaloneResourceManager(
+                       rmResourceId,
                        rpcService,
                        resourceManagerConfiguration,
                        highAvailabilityServices,
+                       heartbeatServices,
                        slotManagerFactory,
                        metricRegistry,
                        jobLeaderIdService,

http://git-wip-us.apache.org/repos/asf/flink/blob/fd90672f/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
index 0a1addb..7c811d9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.resourcemanager;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
 import org.apache.flink.runtime.metrics.MetricRegistry;
@@ -52,6 +53,8 @@ public class ResourceManagerTaskExecutorTest {
 
        private ResourceID taskExecutorResourceID;
 
+       private ResourceID resourceManagerResourceID;
+
        private StandaloneResourceManager resourceManager;
 
        private UUID leaderSessionId;
@@ -63,6 +66,7 @@ public class ResourceManagerTaskExecutorTest {
                rpcService = new TestingSerialRpcService();
 
                taskExecutorResourceID = mockTaskExecutor(taskExecutorAddress);
+               resourceManagerResourceID = ResourceID.generate();
                TestingLeaderElectionService rmLeaderElectionService = new 
TestingLeaderElectionService();
                testingFatalErrorHandler = new TestingFatalErrorHandler();
                resourceManager = 
createAndStartResourceManager(rmLeaderElectionService, 
testingFatalErrorHandler);
@@ -144,6 +148,7 @@ public class ResourceManagerTaskExecutorTest {
 
        private StandaloneResourceManager 
createAndStartResourceManager(TestingLeaderElectionService 
rmLeaderElectionService, FatalErrorHandler fatalErrorHandler) throws Exception {
                TestingHighAvailabilityServices highAvailabilityServices = new 
TestingHighAvailabilityServices();
+               HeartbeatServices heartbeatServices = 
mock(HeartbeatServices.class);
                
highAvailabilityServices.setResourceManagerLeaderElectionService(rmLeaderElectionService);
                TestingSlotManagerFactory slotManagerFactory = new 
TestingSlotManagerFactory();
                ResourceManagerConfiguration resourceManagerConfiguration = new 
ResourceManagerConfiguration(
@@ -158,9 +163,11 @@ public class ResourceManagerTaskExecutorTest {
 
                StandaloneResourceManager resourceManager =
                        new StandaloneResourceManager(
+                               resourceManagerResourceID,
                                rpcService,
                                resourceManagerConfiguration,
                                highAvailabilityServices,
+                               heartbeatServices,
                                slotManagerFactory,
                                metricRegistry,
                                jobLeaderIdService,

http://git-wip-us.apache.org/repos/asf/flink/blob/fd90672f/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
index ea660f8..28ed697 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
@@ -25,6 +25,7 @@ import 
org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.jobmaster.JobMasterGateway;
@@ -98,6 +99,7 @@ public class SlotProtocolTest extends TestLogger {
                final String rmAddress = "/rm1";
                final String jmAddress = "/jm1";
                final JobID jobID = new JobID();
+               final ResourceID rmResourceId = new ResourceID(rmAddress);
 
                testRpcService.registerGateway(jmAddress, 
mock(JobMasterGateway.class));
 
@@ -117,11 +119,16 @@ public class SlotProtocolTest extends TestLogger {
                        Time.seconds(5L));
 
                final TestingSlotManagerFactory slotManagerFactory = new 
TestingSlotManagerFactory();
+
+               final HeartbeatServices heartbeatServices = 
mock(HeartbeatServices.class);
+
                SpiedResourceManager resourceManager =
                        new SpiedResourceManager(
+                               rmResourceId,
                                testRpcService,
                                resourceManagerConfiguration,
                                testingHaServices,
+                               heartbeatServices,
                                slotManagerFactory,
                                mock(MetricRegistry.class),
                                jobLeaderIdService,
@@ -198,6 +205,7 @@ public class SlotProtocolTest extends TestLogger {
                final String jmAddress = "/jm1";
                final String tmAddress = "/tm1";
                final JobID jobID = new JobID();
+               final ResourceID rmResourceId = new ResourceID(rmAddress);
 
                testRpcService.registerGateway(jmAddress, 
mock(JobMasterGateway.class));
 
@@ -224,11 +232,16 @@ public class SlotProtocolTest extends TestLogger {
                        Time.seconds(5L));
 
                TestingSlotManagerFactory slotManagerFactory = new 
TestingSlotManagerFactory();
+
+               HeartbeatServices heartbeatServices = 
mock(HeartbeatServices.class);
+
                ResourceManager<ResourceID> resourceManager =
                        Mockito.spy(new StandaloneResourceManager(
+                               rmResourceId,
                                testRpcService,
                                resourceManagerConfiguration,
                                testingHaServices,
+                               heartbeatServices,
                                slotManagerFactory,
                                mock(MetricRegistry.class),
                                jobLeaderIdService,
@@ -302,17 +315,21 @@ public class SlotProtocolTest extends TestLogger {
                private int startNewWorkerCalled = 0;
 
                public SpiedResourceManager(
+                               ResourceID resourceId,
                                RpcService rpcService,
                                ResourceManagerConfiguration 
resourceManagerConfiguration,
                                HighAvailabilityServices 
highAvailabilityServices,
+                               HeartbeatServices heartbeatServices,
                                SlotManagerFactory slotManagerFactory,
                                MetricRegistry metricRegistry,
                                JobLeaderIdService jobLeaderIdService,
                                FatalErrorHandler fatalErrorHandler) {
                        super(
+                               resourceId,
                                rpcService,
                                resourceManagerConfiguration,
                                highAvailabilityServices,
+                               heartbeatServices,
                                slotManagerFactory,
                                metricRegistry,
                                jobLeaderIdService,

http://git-wip-us.apache.org/repos/asf/flink/blob/fd90672f/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
index f6c2dce..4e76486 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
@@ -87,6 +87,7 @@ public class TaskExecutorITCase {
                final String rmAddress = "rm";
                final String jmAddress = "jm";
                final UUID jmLeaderId = UUID.randomUUID();
+               final ResourceID rmResourceId = new ResourceID(rmAddress);
                final JobID jobId = new JobID();
                final ResourceProfile resourceProfile = new 
ResourceProfile(1.0, 1);
 
@@ -119,9 +120,11 @@ public class TaskExecutorITCase {
                final JobLeaderService jobLeaderService = new 
JobLeaderService(taskManagerLocation);
 
                ResourceManager<ResourceID> resourceManager = new 
StandaloneResourceManager(
+                       rmResourceId,
                        rpcService,
                        resourceManagerConfiguration,
                        testingHAServices,
+                       heartbeatServices,
                        slotManagerFactory,
                        metricRegistry,
                        jobLeaderIdService,

http://git-wip-us.apache.org/repos/asf/flink/blob/fd90672f/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index 67196aa..d1f6e2e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -213,9 +213,104 @@ public class TaskExecutorTest extends TestLogger {
        }
 
        @Test
+       public void testHeartbeatTimeoutWithResourceManager() throws Exception {
+               final String rmAddress = "rm";
+               final String tmAddress = "tm";
+               final ResourceID rmResourceId = new ResourceID(rmAddress);
+               final ResourceID tmResourceId = new ResourceID(tmAddress);
+               final UUID rmLeaderId = UUID.randomUUID();
+
+               // register the mock resource manager gateway
+               ResourceManagerGateway rmGateway = 
mock(ResourceManagerGateway.class);
+               when(rmGateway.registerTaskExecutor(
+                       any(UUID.class), anyString(), any(ResourceID.class), 
any(SlotReport.class), any(Time.class)))
+                       
.thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(
+                                       new TaskExecutorRegistrationSuccess(new 
InstanceID(), rmResourceId, 10L)));
+
+               final TestingSerialRpcService rpc = new 
TestingSerialRpcService();
+               rpc.registerGateway(rmAddress, rmGateway);
+
+               final TestingLeaderRetrievalService testLeaderService = new 
TestingLeaderRetrievalService();
+               final TestingHighAvailabilityServices haServices = new 
TestingHighAvailabilityServices();
+               haServices.setResourceManagerLeaderRetriever(testLeaderService);
+
+               final TaskManagerConfiguration taskManagerConfiguration = 
mock(TaskManagerConfiguration.class);
+               when(taskManagerConfiguration.getNumberSlots()).thenReturn(1);
+
+               final TaskManagerLocation taskManagerLocation = 
mock(TaskManagerLocation.class);
+               
when(taskManagerLocation.getResourceID()).thenReturn(tmResourceId);
+
+               final TaskSlotTable taskSlotTable = mock(TaskSlotTable.class);
+               final SlotReport slotReport = new SlotReport();
+               
when(taskSlotTable.createSlotReport(any(ResourceID.class))).thenReturn(slotReport);
+
+               final TestingFatalErrorHandler testingFatalErrorHandler = new 
TestingFatalErrorHandler();
+
+               final long heartbeatTimeout = 10L;
+               HeartbeatServices heartbeatServices = 
mock(HeartbeatServices.class);
+               when(heartbeatServices.createHeartbeatManager(
+                       eq(taskManagerLocation.getResourceID()),
+                       any(HeartbeatListener.class),
+                       any(ScheduledExecutor.class),
+                       any(Logger.class))).thenAnswer(
+                       new Answer<HeartbeatManagerImpl<Void, Void>>() {
+                               @Override
+                               public HeartbeatManagerImpl<Void, Void> 
answer(InvocationOnMock invocation) throws Throwable {
+                                       return new HeartbeatManagerImpl<>(
+                                               heartbeatTimeout,
+                                               
taskManagerLocation.getResourceID(),
+                                               (HeartbeatListener<Void, 
Void>)invocation.getArguments()[1],
+                                               
(Executor)invocation.getArguments()[2],
+                                               
(ScheduledExecutor)invocation.getArguments()[2],
+                                               
(Logger)invocation.getArguments()[3]);
+                                       }
+                               }
+               );
+
+               try {
+                       final TaskExecutor taskManager = new TaskExecutor(
+                               taskManagerConfiguration,
+                               taskManagerLocation,
+                               rpc,
+                               mock(MemoryManager.class),
+                               mock(IOManager.class),
+                               mock(NetworkEnvironment.class),
+                               haServices,
+                               heartbeatServices,
+                               mock(MetricRegistry.class),
+                               mock(TaskManagerMetricGroup.class),
+                               mock(BroadcastVariableManager.class),
+                               mock(FileCache.class),
+                               taskSlotTable,
+                               mock(JobManagerTable.class),
+                               mock(JobLeaderService.class),
+                               testingFatalErrorHandler);
+
+                       taskManager.start();
+
+                       // define a leader and see that a registration happens
+                       testLeaderService.notifyListener(rmAddress, rmLeaderId);
+
+                       // register resource manager success will trigger 
monitoring heartbeat target between tm and rm
+                       verify(rmGateway).registerTaskExecutor(
+                                       eq(rmLeaderId), 
eq(taskManager.getAddress()), eq(tmResourceId), any(SlotReport.class), 
any(Time.class));
+
+                       // heartbeat timeout should trigger disconnect 
TaskManager from ResourceManager
+                       verify(rmGateway, timeout(heartbeatTimeout * 
5)).disconnectTaskManager(eq(taskManagerLocation.getResourceID()), 
any(TimeoutException.class));
+
+                       // check if a concurrent error occurred
+                       testingFatalErrorHandler.rethrowError();
+
+               } finally {
+                       rpc.stopService();
+               }
+       }
+
+       @Test
        public void testImmediatelyRegistersIfLeaderIsKnown() throws Exception {
                final ResourceID resourceID = ResourceID.generate();
                final String resourceManagerAddress = 
"/resource/manager/address/one";
+               final ResourceID resourceManagerResourceId = new 
ResourceID(resourceManagerAddress);
 
                final TestingSerialRpcService rpc = new 
TestingSerialRpcService();
                try {
@@ -223,7 +318,8 @@ public class TaskExecutorTest extends TestLogger {
                        ResourceManagerGateway rmGateway = 
mock(ResourceManagerGateway.class);
                        when(rmGateway.registerTaskExecutor(
                                        any(UUID.class), anyString(), 
any(ResourceID.class), any(SlotReport.class), any(Time.class)))
-                               
.thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new 
TaskExecutorRegistrationSuccess(new InstanceID(), 10L)));
+                               
.thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new 
TaskExecutorRegistrationSuccess(
+                                       new InstanceID(), 
resourceManagerResourceId, 10L)));
 
                        TaskManagerConfiguration 
taskManagerServicesConfiguration = mock(TaskManagerConfiguration.class);
                        
when(taskManagerServicesConfiguration.getNumberSlots()).thenReturn(1);
@@ -275,12 +371,14 @@ public class TaskExecutorTest extends TestLogger {
 
        @Test
        public void testTriggerRegistrationOnLeaderChange() throws Exception {
-               final ResourceID resourceID = ResourceID.generate();
+               final ResourceID tmResourceID = ResourceID.generate();
 
                final String address1 = "/resource/manager/address/one";
                final String address2 = "/resource/manager/address/two";
                final UUID leaderId1 = UUID.randomUUID();
                final UUID leaderId2 = UUID.randomUUID();
+               final ResourceID rmResourceId1 = new ResourceID(address1);
+               final ResourceID rmResourceId2 = new ResourceID(address2);
 
                final TestingSerialRpcService rpc = new 
TestingSerialRpcService();
                try {
@@ -291,11 +389,11 @@ public class TaskExecutorTest extends TestLogger {
                        when(rmGateway1.registerTaskExecutor(
                                        any(UUID.class), anyString(), 
any(ResourceID.class), any(SlotReport.class), any(Time.class)))
                                        
.thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(
-                                               new 
TaskExecutorRegistrationSuccess(new InstanceID(), 10L)));
+                                               new 
TaskExecutorRegistrationSuccess(new InstanceID(), rmResourceId1, 10L)));
                        when(rmGateway2.registerTaskExecutor(
                                        any(UUID.class), anyString(), 
any(ResourceID.class), any(SlotReport.class), any(Time.class)))
                                        
.thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(
-                                               new 
TaskExecutorRegistrationSuccess(new InstanceID(), 10L)));
+                                               new 
TaskExecutorRegistrationSuccess(new InstanceID(), rmResourceId2, 10L)));
 
                        rpc.registerGateway(address1, rmGateway1);
                        rpc.registerGateway(address2, rmGateway2);
@@ -313,7 +411,7 @@ public class TaskExecutorTest extends TestLogger {
                        
when(taskManagerServicesConfiguration.getTmpDirectories()).thenReturn(new 
String[1]);
 
                        TaskManagerLocation taskManagerLocation = 
mock(TaskManagerLocation.class);
-                       
when(taskManagerLocation.getResourceID()).thenReturn(resourceID);
+                       
when(taskManagerLocation.getResourceID()).thenReturn(tmResourceID);
                        
when(taskManagerLocation.getHostname()).thenReturn("foobar");
 
                        final TaskSlotTable taskSlotTable = 
mock(TaskSlotTable.class);
@@ -350,7 +448,7 @@ public class TaskExecutorTest extends TestLogger {
                        testLeaderService.notifyListener(address1, leaderId1);
 
                        verify(rmGateway1).registerTaskExecutor(
-                                       eq(leaderId1), eq(taskManagerAddress), 
eq(resourceID), any(SlotReport.class), any(Time.class));
+                                       eq(leaderId1), eq(taskManagerAddress), 
eq(tmResourceID), any(SlotReport.class), any(Time.class));
                        
assertNotNull(taskManager.getResourceManagerConnection());
 
                        // cancel the leader 
@@ -360,7 +458,7 @@ public class TaskExecutorTest extends TestLogger {
                        testLeaderService.notifyListener(address2, leaderId2);
 
                        verify(rmGateway2).registerTaskExecutor(
-                                       eq(leaderId2), eq(taskManagerAddress), 
eq(resourceID), eq(slotReport), any(Time.class));
+                                       eq(leaderId2), eq(taskManagerAddress), 
eq(tmResourceID), eq(slotReport), any(Time.class));
                        
assertNotNull(taskManager.getResourceManagerConnection());
 
                        // check if a concurrent error occurred
@@ -531,6 +629,7 @@ public class TaskExecutorTest extends TestLogger {
 
                final String resourceManagerAddress = "rm";
                final UUID resourceManagerLeaderId = UUID.randomUUID();
+               final ResourceID resourceManagerResourceId = new 
ResourceID(resourceManagerAddress);
 
                final ResourceManagerGateway resourceManagerGateway = 
mock(ResourceManagerGateway.class);
                final InstanceID registrationId = new InstanceID();
@@ -540,7 +639,7 @@ public class TaskExecutorTest extends TestLogger {
                        any(String.class),
                        eq(resourceId),
                        any(SlotReport.class),
-                       
any(Time.class))).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new
 TaskExecutorRegistrationSuccess(registrationId, 1000L)));
+                       
any(Time.class))).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new
 TaskExecutorRegistrationSuccess(registrationId, resourceManagerResourceId, 
1000L)));
 
                final String jobManagerAddress = "jm";
                final UUID jobManagerLeaderId = UUID.randomUUID();
@@ -638,6 +737,7 @@ public class TaskExecutorTest extends TestLogger {
 
                final String resourceManagerAddress = "rm";
                final UUID resourceManagerLeaderId = UUID.randomUUID();
+               final ResourceID resourceManagerResourceId = new 
ResourceID(resourceManagerAddress);
 
                final String jobManagerAddress = "jm";
                final UUID jobManagerLeaderId = UUID.randomUUID();
@@ -655,7 +755,7 @@ public class TaskExecutorTest extends TestLogger {
                        any(String.class),
                        eq(resourceId),
                        any(SlotReport.class),
-                       
any(Time.class))).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new
 TaskExecutorRegistrationSuccess(registrationId, 1000L)));
+                       
any(Time.class))).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new
 TaskExecutorRegistrationSuccess(registrationId, resourceManagerResourceId, 
1000L)));
 
                final ResourceID jmResourceId = new 
ResourceID(jobManagerAddress);
                final int blobPort = 42;
@@ -844,6 +944,7 @@ public class TaskExecutorTest extends TestLogger {
 
                final String resourceManagerAddress = "rm";
                final UUID resourceManagerLeaderId = UUID.randomUUID();
+               final ResourceID resourceManagerResourceId = new 
ResourceID(resourceManagerAddress);
 
                final String jobManagerAddress = "jm";
                final UUID jobManagerLeaderId = UUID.randomUUID();
@@ -862,7 +963,7 @@ public class TaskExecutorTest extends TestLogger {
                        eq(resourceId),
                        any(SlotReport.class),
                        any(Time.class))).thenReturn(
-                               
FlinkCompletableFuture.<RegistrationResponse>completed(new 
TaskExecutorRegistrationSuccess(registrationId, 1000L)));
+                               
FlinkCompletableFuture.<RegistrationResponse>completed(new 
TaskExecutorRegistrationSuccess(registrationId, resourceManagerResourceId, 
1000L)));
 
                final ResourceID jmResourceId = new 
ResourceID(jobManagerAddress);
                final int blobPort = 42;

http://git-wip-us.apache.org/repos/asf/flink/blob/fd90672f/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
index 7a0dbbe..ed672a3 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
@@ -198,15 +198,18 @@ public class YarnFlinkApplicationMasterRunner extends 
AbstractYarnFlinkApplicati
                        haServices,
                        commonRpcService.getScheduledExecutor());
 
-               return new YarnResourceManager(config,
-                               ENV,
-                               commonRpcService,
-                               resourceManagerConfiguration,
-                               haServices,
-                               
resourceManagerRuntimeServices.getSlotManagerFactory(),
-                               metricRegistry,
-                               
resourceManagerRuntimeServices.getJobLeaderIdService(),
-                               this);
+               return new YarnResourceManager(
+                       ResourceID.generate(),
+                       config,
+                       ENV,
+                       commonRpcService,
+                       resourceManagerConfiguration,
+                       haServices,
+                       heartbeatServices,
+                       resourceManagerRuntimeServices.getSlotManagerFactory(),
+                       metricRegistry,
+                       resourceManagerRuntimeServices.getJobLeaderIdService(),
+                       this);
        }
 
        private JobManagerRunner createJobManagerRunner(Configuration config) 
throws Exception{

http://git-wip-us.apache.org/repos/asf/flink/blob/fd90672f/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index ab96441..a308079 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -26,6 +26,7 @@ import 
org.apache.flink.runtime.clusterframework.BootstrapTools;
 import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
@@ -106,19 +107,23 @@ public class YarnResourceManager extends 
ResourceManager<ResourceID> implements
        final private Map<ResourceProfile, Integer> resourcePriorities = new 
HashMap<>();
 
        public YarnResourceManager(
+                       ResourceID resourceId,
                        Configuration flinkConfig,
                        Map<String, String> env,
                        RpcService rpcService,
                        ResourceManagerConfiguration 
resourceManagerConfiguration,
                        HighAvailabilityServices highAvailabilityServices,
+                       HeartbeatServices heartbeatServices,
                        SlotManagerFactory slotManagerFactory,
                        MetricRegistry metricRegistry,
                        JobLeaderIdService jobLeaderIdService,
                        FatalErrorHandler fatalErrorHandler) {
                super(
+                       resourceId,
                        rpcService,
                        resourceManagerConfiguration,
                        highAvailabilityServices,
+                       heartbeatServices,
                        slotManagerFactory,
                        metricRegistry,
                        jobLeaderIdService,
@@ -231,7 +236,8 @@ public class YarnResourceManager extends 
ResourceManager<ResourceID> implements
        public void onContainersCompleted(List<ContainerStatus> list) {
                for (ContainerStatus container : list) {
                        if (container.getExitStatus() < 0) {
-                               notifyWorkerFailed(new 
ResourceID(container.getContainerId().toString()), container.getDiagnostics());
+                               closeTaskManagerConnection(new ResourceID(
+                                       container.getContainerId().toString()), 
new Exception(container.getDiagnostics()));
                        }
                }
        }

Reply via email to