Repository: flink
Updated Branches:
  refs/heads/master c6b39e4f2 -> 690ab2c31


[FLINK-9890][Distributed Coordination] Remove obsolete class 
ResourceManagerConfiguration

The configuration values are effectively not used. This commit removes the class
and all its usages.

This closes #6368.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/690ab2c3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/690ab2c3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/690ab2c3

Branch: refs/heads/master
Commit: 690ab2c311c703922b3d18eec177e09e1688dca8
Parents: c6b39e4
Author: gyao <[email protected]>
Authored: Wed Jul 18 16:23:59 2018 +0200
Committer: Till Rohrmann <[email protected]>
Committed: Sun Jul 22 22:22:56 2018 +0200

----------------------------------------------------------------------
 .../StandaloneJobClusterEntryPoint.java         |  3 -
 .../entrypoint/MesosJobClusterEntrypoint.java   |  3 -
 .../MesosSessionClusterEntrypoint.java          |  3 -
 .../clusterframework/MesosResourceManager.java  |  3 -
 .../MesosResourceManagerTest.java               |  8 --
 .../StandaloneSessionClusterEntrypoint.java     |  3 -
 .../jobmaster/JobMasterRegistrationSuccess.java | 16 +---
 .../resourcemanager/ResourceManager.java        |  7 --
 .../ResourceManagerConfiguration.java           | 78 --------------------
 .../resourcemanager/ResourceManagerRunner.java  |  3 -
 .../StandaloneResourceManager.java              |  2 -
 .../TaskExecutorRegistrationSuccess.java        | 19 ++---
 .../clusterframework/ResourceManagerTest.java   | 11 ---
 .../flink/runtime/jobmaster/JobMasterTest.java  |  1 -
 .../resourcemanager/ResourceManagerHATest.java  |  6 +-
 .../ResourceManagerJobMasterTest.java           |  4 -
 .../ResourceManagerTaskExecutorTest.java        |  6 +-
 .../resourcemanager/ResourceManagerTest.java    |  2 -
 .../resourcemanager/TestingResourceManager.java |  2 -
 .../utils/TestingResourceManagerGateway.java    |  7 --
 .../taskexecutor/TaskExecutorITCase.java        |  5 --
 .../runtime/taskexecutor/TaskExecutorTest.java  | 16 ++--
 .../testutils/TestingResourceManager.java       |  3 +-
 .../apache/flink/yarn/YarnResourceManager.java  |  3 -
 .../entrypoint/YarnJobClusterEntrypoint.java    |  3 -
 .../YarnSessionClusterEntrypoint.java           |  3 -
 .../flink/yarn/YarnResourceManagerTest.java     |  8 --
 27 files changed, 15 insertions(+), 213 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/690ab2c3/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java
----------------------------------------------------------------------
diff --git 
a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java
 
b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java
index 57f7ca2..f4550e8 100644
--- 
a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java
+++ 
b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java
@@ -35,7 +35,6 @@ import 
org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.resourcemanager.ResourceManager;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
 import 
org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration;
 import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager;
@@ -109,7 +108,6 @@ public final class StandaloneJobClusterEntryPoint extends 
JobClusterEntrypoint {
                        FatalErrorHandler fatalErrorHandler,
                        ClusterInformation clusterInformation,
                        @Nullable String webInterfaceUrl) throws Exception {
-               final ResourceManagerConfiguration resourceManagerConfiguration 
= ResourceManagerConfiguration.fromConfiguration(configuration);
                final ResourceManagerRuntimeServicesConfiguration 
resourceManagerRuntimeServicesConfiguration = 
ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);
                final ResourceManagerRuntimeServices 
resourceManagerRuntimeServices = 
ResourceManagerRuntimeServices.fromConfiguration(
                        resourceManagerRuntimeServicesConfiguration,
@@ -120,7 +118,6 @@ public final class StandaloneJobClusterEntryPoint extends 
JobClusterEntrypoint {
                        rpcService,
                        ResourceManager.RESOURCE_MANAGER_NAME,
                        resourceId,
-                       resourceManagerConfiguration,
                        highAvailabilityServices,
                        heartbeatServices,
                        resourceManagerRuntimeServices.getSlotManager(),

http://git-wip-us.apache.org/repos/asf/flink/blob/690ab2c3/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java
index cf661cb..d9876ec 100755
--- 
a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java
@@ -37,7 +37,6 @@ import 
org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.resourcemanager.ResourceManager;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
 import 
org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
@@ -125,7 +124,6 @@ public class MesosJobClusterEntrypoint extends 
JobClusterEntrypoint {
                        FatalErrorHandler fatalErrorHandler,
                        ClusterInformation clusterInformation,
                        @Nullable String webInterfaceUrl) throws Exception {
-               final ResourceManagerConfiguration rmConfiguration = 
ResourceManagerConfiguration.fromConfiguration(configuration);
                final ResourceManagerRuntimeServicesConfiguration 
rmServicesConfiguration = 
ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);
                final ResourceManagerRuntimeServices rmRuntimeServices = 
ResourceManagerRuntimeServices.fromConfiguration(
                        rmServicesConfiguration,
@@ -136,7 +134,6 @@ public class MesosJobClusterEntrypoint extends 
JobClusterEntrypoint {
                        rpcService,
                        ResourceManager.RESOURCE_MANAGER_NAME,
                        resourceId,
-                       rmConfiguration,
                        highAvailabilityServices,
                        heartbeatServices,
                        rmRuntimeServices.getSlotManager(),

http://git-wip-us.apache.org/repos/asf/flink/blob/690ab2c3/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java
index 5dea936..3213f65 100755
--- 
a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java
@@ -35,7 +35,6 @@ import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.resourcemanager.ResourceManager;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
 import 
org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
@@ -115,7 +114,6 @@ public class MesosSessionClusterEntrypoint extends 
SessionClusterEntrypoint {
                        FatalErrorHandler fatalErrorHandler,
                        ClusterInformation clusterInformation,
                        @Nullable String webInterfaceUrl) throws Exception {
-               final ResourceManagerConfiguration rmConfiguration = 
ResourceManagerConfiguration.fromConfiguration(configuration);
                final ResourceManagerRuntimeServicesConfiguration 
rmServicesConfiguration = 
ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);
                final ResourceManagerRuntimeServices rmRuntimeServices = 
ResourceManagerRuntimeServices.fromConfiguration(
                        rmServicesConfiguration,
@@ -126,7 +124,6 @@ public class MesosSessionClusterEntrypoint extends 
SessionClusterEntrypoint {
                        rpcService,
                        ResourceManager.RESOURCE_MANAGER_NAME,
                        resourceId,
-                       rmConfiguration,
                        highAvailabilityServices,
                        heartbeatServices,
                        rmRuntimeServices.getSlotManager(),

http://git-wip-us.apache.org/repos/asf/flink/blob/690ab2c3/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
index ea10ff8..e24214d 100644
--- 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
@@ -53,7 +53,6 @@ import 
org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
 import org.apache.flink.runtime.resourcemanager.ResourceManager;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
 import 
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
@@ -144,7 +143,6 @@ public class MesosResourceManager extends 
ResourceManager<RegisteredMesosWorkerN
                        RpcService rpcService,
                        String resourceManagerEndpointId,
                        ResourceID resourceId,
-                       ResourceManagerConfiguration 
resourceManagerConfiguration,
                        HighAvailabilityServices highAvailabilityServices,
                        HeartbeatServices heartbeatServices,
                        SlotManager slotManager,
@@ -162,7 +160,6 @@ public class MesosResourceManager extends 
ResourceManager<RegisteredMesosWorkerN
                        rpcService,
                        resourceManagerEndpointId,
                        resourceId,
-                       resourceManagerConfiguration,
                        highAvailabilityServices,
                        heartbeatServices,
                        slotManager,

http://git-wip-us.apache.org/repos/asf/flink/blob/690ab2c3/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
 
b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
index 9fa8c0e..5af3fa0 100644
--- 
a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
+++ 
b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
@@ -59,7 +59,6 @@ import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
 import org.apache.flink.runtime.resourcemanager.SlotRequest;
 import org.apache.flink.runtime.resourcemanager.slotmanager.ResourceActions;
@@ -161,7 +160,6 @@ public class MesosResourceManagerTest extends TestLogger {
                        RpcService rpcService,
                        String resourceManagerEndpointId,
                        ResourceID resourceId,
-                       ResourceManagerConfiguration 
resourceManagerConfiguration,
                        HighAvailabilityServices highAvailabilityServices,
                        HeartbeatServices heartbeatServices,
                        SlotManager slotManager,
@@ -179,7 +177,6 @@ public class MesosResourceManagerTest extends TestLogger {
                                rpcService,
                                resourceManagerEndpointId,
                                resourceId,
-                               resourceManagerConfiguration,
                                highAvailabilityServices,
                                heartbeatServices,
                                slotManager,
@@ -239,7 +236,6 @@ public class MesosResourceManagerTest extends TestLogger {
                MockMesosServices mesosServices;
 
                // RM
-               ResourceManagerConfiguration rmConfiguration;
                ResourceID rmResourceID;
                static final String RM_ADDRESS = "resourceManager";
                TestingMesosResourceManager resourceManager;
@@ -284,16 +280,12 @@ public class MesosResourceManagerTest extends TestLogger {
                                Option.<String>empty(), 
Collections.<String>emptyList());
 
                        // resource manager
-                       rmConfiguration = new ResourceManagerConfiguration(
-                               Time.seconds(5L),
-                               Time.seconds(5L));
                        rmResourceID = ResourceID.generate();
                        resourceManager =
                                new TestingMesosResourceManager(
                                        rpcService,
                                        RM_ADDRESS,
                                        rmResourceID,
-                                       rmConfiguration,
                                        rmServices.highAvailabilityServices,
                                        rmServices.heartbeatServices,
                                        rmServices.slotManager,

http://git-wip-us.apache.org/repos/asf/flink/blob/690ab2c3/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java
index d56725c..9943936 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java
@@ -26,7 +26,6 @@ import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.resourcemanager.ResourceManager;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
 import 
org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration;
 import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager;
@@ -58,7 +57,6 @@ public class StandaloneSessionClusterEntrypoint extends 
SessionClusterEntrypoint
                        FatalErrorHandler fatalErrorHandler,
                        ClusterInformation clusterInformation,
                        @Nullable String webInterfaceUrl) throws Exception {
-               final ResourceManagerConfiguration resourceManagerConfiguration 
= ResourceManagerConfiguration.fromConfiguration(configuration);
                final ResourceManagerRuntimeServicesConfiguration 
resourceManagerRuntimeServicesConfiguration = 
ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);
                final ResourceManagerRuntimeServices 
resourceManagerRuntimeServices = 
ResourceManagerRuntimeServices.fromConfiguration(
                        resourceManagerRuntimeServicesConfiguration,
@@ -69,7 +67,6 @@ public class StandaloneSessionClusterEntrypoint extends 
SessionClusterEntrypoint
                        rpcService,
                        FlinkResourceManager.RESOURCE_MANAGER_NAME,
                        resourceId,
-                       resourceManagerConfiguration,
                        highAvailabilityServices,
                        heartbeatServices,
                        resourceManagerRuntimeServices.getSlotManager(),

http://git-wip-us.apache.org/repos/asf/flink/blob/690ab2c3/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterRegistrationSuccess.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterRegistrationSuccess.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterRegistrationSuccess.java
index 94ecfd2..b5c2d6a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterRegistrationSuccess.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterRegistrationSuccess.java
@@ -31,30 +31,17 @@ public class JobMasterRegistrationSuccess extends 
RegistrationResponse.Success {
 
        private static final long serialVersionUID = 5577641250204140415L;
 
-       private final long heartbeatInterval;
-
        private final ResourceManagerId resourceManagerId;
 
        private final ResourceID resourceManagerResourceId;
 
        public JobMasterRegistrationSuccess(
-                       final long heartbeatInterval,
                        final ResourceManagerId resourceManagerId,
                        final ResourceID resourceManagerResourceId) {
-               this.heartbeatInterval = heartbeatInterval;
                this.resourceManagerId = checkNotNull(resourceManagerId);
                this.resourceManagerResourceId = 
checkNotNull(resourceManagerResourceId);
        }
 
-       /**
-        * Gets the interval in which the ResourceManager will heartbeat the 
JobMaster.
-        *
-        * @return the interval in which the ResourceManager will heartbeat the 
JobMaster
-        */
-       public long getHeartbeatInterval() {
-               return heartbeatInterval;
-       }
-
        public ResourceManagerId getResourceManagerId() {
                return resourceManagerId;
        }
@@ -66,8 +53,7 @@ public class JobMasterRegistrationSuccess extends 
RegistrationResponse.Success {
        @Override
        public String toString() {
                return "JobMasterRegistrationSuccess{" +
-                       "heartbeatInterval=" + heartbeatInterval +
-                       ", resourceManagerLeaderId=" + resourceManagerId +
+                       "resourceManagerId=" + resourceManagerId +
                        ", resourceManagerResourceId=" + 
resourceManagerResourceId +
                        '}';
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/690ab2c3/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 6b104ed..453ec8b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -101,9 +101,6 @@ public abstract class ResourceManager<WorkerType extends 
ResourceIDRetrievable>
        /** Unique id of the resource manager. */
        private final ResourceID resourceId;
 
-       /** Configuration of the resource manager. */
-       private final ResourceManagerConfiguration resourceManagerConfiguration;
-
        /** All currently registered JobMasterGateways scoped by JobID. */
        private final Map<JobID, JobManagerRegistration> 
jobManagerRegistrations;
 
@@ -146,7 +143,6 @@ public abstract class ResourceManager<WorkerType extends 
ResourceIDRetrievable>
                        RpcService rpcService,
                        String resourceManagerEndpointId,
                        ResourceID resourceId,
-                       ResourceManagerConfiguration 
resourceManagerConfiguration,
                        HighAvailabilityServices highAvailabilityServices,
                        HeartbeatServices heartbeatServices,
                        SlotManager slotManager,
@@ -158,7 +154,6 @@ public abstract class ResourceManager<WorkerType extends 
ResourceIDRetrievable>
                super(rpcService, resourceManagerEndpointId);
 
                this.resourceId = checkNotNull(resourceId);
-               this.resourceManagerConfiguration = 
checkNotNull(resourceManagerConfiguration);
                this.highAvailabilityServices = 
checkNotNull(highAvailabilityServices);
                this.slotManager = checkNotNull(slotManager);
                this.metricRegistry = checkNotNull(metricRegistry);
@@ -668,7 +663,6 @@ public abstract class ResourceManager<WorkerType extends 
ResourceIDRetrievable>
                });
 
                return new JobMasterRegistrationSuccess(
-                       
resourceManagerConfiguration.getHeartbeatInterval().toMilliseconds(),
                        getFencingToken(),
                        resourceId);
        }
@@ -726,7 +720,6 @@ public abstract class ResourceManager<WorkerType extends 
ResourceIDRetrievable>
                        return new TaskExecutorRegistrationSuccess(
                                registration.getInstanceID(),
                                resourceId,
-                               
resourceManagerConfiguration.getHeartbeatInterval().toMilliseconds(),
                                clusterInformation);
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/690ab2c3/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerConfiguration.java
deleted file mode 100644
index 0216789..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerConfiguration.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.resourcemanager;
-
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.AkkaOptions;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.util.ConfigurationException;
-import org.apache.flink.util.Preconditions;
-import scala.concurrent.duration.Duration;
-
-/**
- * Resource manager configuration
- */
-public class ResourceManagerConfiguration {
-
-       private final Time timeout;
-       private final Time heartbeatInterval;
-
-       public ResourceManagerConfiguration(
-                       Time timeout,
-                       Time heartbeatInterval) {
-               this.timeout = Preconditions.checkNotNull(timeout, "timeout");
-               this.heartbeatInterval = 
Preconditions.checkNotNull(heartbeatInterval, "heartbeatInterval");
-       }
-
-       public Time getTimeout() {
-               return timeout;
-       }
-
-       public Time getHeartbeatInterval() {
-               return heartbeatInterval;
-       }
-
-       // 
--------------------------------------------------------------------------
-       // Static factory methods
-       // 
--------------------------------------------------------------------------
-
-       public static ResourceManagerConfiguration 
fromConfiguration(Configuration configuration) throws ConfigurationException {
-               final String strTimeout = 
configuration.getString(AkkaOptions.ASK_TIMEOUT);
-               final Time timeout;
-
-               try {
-                       timeout = 
Time.milliseconds(Duration.apply(strTimeout).toMillis());
-               } catch (NumberFormatException e) {
-                       throw new ConfigurationException("Could not parse the 
resource manager's timeout " +
-                               "value " + AkkaOptions.ASK_TIMEOUT + '.', e);
-               }
-
-               final String strHeartbeatInterval = 
configuration.getString(AkkaOptions.WATCH_HEARTBEAT_INTERVAL);
-               final Time heartbeatInterval;
-
-               try {
-                       heartbeatInterval = 
Time.milliseconds(Duration.apply(strHeartbeatInterval).toMillis());
-               } catch (NumberFormatException e) {
-                       throw new ConfigurationException("Could not parse the 
resource manager's heartbeat interval " +
-                               "value " + AkkaOptions.WATCH_HEARTBEAT_INTERVAL 
+ '.', e);
-               }
-
-               return new ResourceManagerConfiguration(timeout, 
heartbeatInterval);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/690ab2c3/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
index ff9b4f0..072c2f2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
@@ -65,8 +65,6 @@ public class ResourceManagerRunner implements 
FatalErrorHandler, AutoCloseableAs
                Preconditions.checkNotNull(heartbeatServices);
                Preconditions.checkNotNull(metricRegistry);
 
-               final ResourceManagerConfiguration resourceManagerConfiguration 
= ResourceManagerConfiguration.fromConfiguration(configuration);
-
                final ResourceManagerRuntimeServicesConfiguration 
resourceManagerRuntimeServicesConfiguration = 
ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);
 
                resourceManagerRuntimeServices = 
ResourceManagerRuntimeServices.fromConfiguration(
@@ -78,7 +76,6 @@ public class ResourceManagerRunner implements 
FatalErrorHandler, AutoCloseableAs
                        rpcService,
                        resourceManagerEndpointId,
                        resourceId,
-                       resourceManagerConfiguration,
                        highAvailabilityServices,
                        heartbeatServices,
                        resourceManagerRuntimeServices.getSlotManager(),

http://git-wip-us.apache.org/repos/asf/flink/blob/690ab2c3/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
index d8e0e48..420b89f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
@@ -44,7 +44,6 @@ public class StandaloneResourceManager extends 
ResourceManager<ResourceID> {
                        RpcService rpcService,
                        String resourceManagerEndpointId,
                        ResourceID resourceId,
-                       ResourceManagerConfiguration 
resourceManagerConfiguration,
                        HighAvailabilityServices highAvailabilityServices,
                        HeartbeatServices heartbeatServices,
                        SlotManager slotManager,
@@ -56,7 +55,6 @@ public class StandaloneResourceManager extends 
ResourceManager<ResourceID> {
                        rpcService,
                        resourceManagerEndpointId,
                        resourceId,
-                       resourceManagerConfiguration,
                        highAvailabilityServices,
                        heartbeatServices,
                        slotManager,

http://git-wip-us.apache.org/repos/asf/flink/blob/690ab2c3/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorRegistrationSuccess.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorRegistrationSuccess.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorRegistrationSuccess.java
index 1b1c1e5..6a65988 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorRegistrationSuccess.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorRegistrationSuccess.java
@@ -38,8 +38,6 @@ public final class TaskExecutorRegistrationSuccess extends 
RegistrationResponse.
 
        private final ResourceID resourceManagerResourceId;
 
-       private final long heartbeatInterval;
-
        private final ClusterInformation clusterInformation;
 
        /**
@@ -47,17 +45,14 @@ public final class TaskExecutorRegistrationSuccess extends 
RegistrationResponse.
         *
         * @param registrationId The ID that the ResourceManager assigned the 
registration.
         * @param resourceManagerResourceId The unique ID that identifies the 
ResourceManager.
-        * @param heartbeatInterval The interval in which the ResourceManager 
will heartbeat the TaskExecutor.
         * @param clusterInformation information about the cluster
         */
        public TaskExecutorRegistrationSuccess(
                        InstanceID registrationId,
                        ResourceID resourceManagerResourceId,
-                       long heartbeatInterval,
                        ClusterInformation clusterInformation) {
                this.registrationId = 
Preconditions.checkNotNull(registrationId);
                this.resourceManagerResourceId = 
Preconditions.checkNotNull(resourceManagerResourceId);
-               this.heartbeatInterval = heartbeatInterval;
                this.clusterInformation = 
Preconditions.checkNotNull(clusterInformation);
        }
 
@@ -76,13 +71,6 @@ public final class TaskExecutorRegistrationSuccess extends 
RegistrationResponse.
        }
 
        /**
-        * Gets the interval in which the ResourceManager will heartbeat the 
TaskExecutor.
-        */
-       public long getHeartbeatInterval() {
-               return heartbeatInterval;
-       }
-
-       /**
         * Gets the cluster information.
         */
        public ClusterInformation getClusterInformation() {
@@ -91,9 +79,12 @@ public final class TaskExecutorRegistrationSuccess extends 
RegistrationResponse.
 
        @Override
        public String toString() {
-               return "TaskExecutorRegistrationSuccess (" + registrationId + " 
/ " + resourceManagerResourceId + " / " + heartbeatInterval + ')';
+               return "TaskExecutorRegistrationSuccess{" +
+                       "registrationId=" + registrationId +
+                       ", resourceManagerResourceId=" + 
resourceManagerResourceId +
+                       ", clusterInformation=" + clusterInformation +
+                       '}';
        }
-
 }
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/690ab2c3/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
index 8898ef0..3c1f3da 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
@@ -48,7 +48,6 @@ import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
 import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager;
@@ -496,10 +495,6 @@ public class ResourceManagerTest extends TestLogger {
                final TestingRpcService rpcService = new TestingRpcService();
                rpcService.registerGateway(taskManagerAddress, 
taskExecutorGateway);
 
-               final ResourceManagerConfiguration resourceManagerConfiguration 
= new ResourceManagerConfiguration(
-                       Time.seconds(5L),
-                       Time.seconds(5L));
-
                final TestingLeaderElectionService rmLeaderElectionService = 
new TestingLeaderElectionService();
                final TestingHighAvailabilityServices highAvailabilityServices 
= new TestingHighAvailabilityServices();
                
highAvailabilityServices.setResourceManagerLeaderElectionService(rmLeaderElectionService);
@@ -523,7 +518,6 @@ public class ResourceManagerTest extends TestLogger {
                                rpcService,
                                FlinkResourceManager.RESOURCE_MANAGER_NAME,
                                resourceManagerResourceID,
-                               resourceManagerConfiguration,
                                highAvailabilityServices,
                                heartbeatServices,
                                slotManager,
@@ -594,10 +588,6 @@ public class ResourceManagerTest extends TestLogger {
                final TestingRpcService rpcService = new TestingRpcService();
                rpcService.registerGateway(jobMasterAddress, jobMasterGateway);
 
-               final ResourceManagerConfiguration resourceManagerConfiguration 
= new ResourceManagerConfiguration(
-                       Time.seconds(5L),
-                       Time.seconds(5L));
-
                final TestingLeaderElectionService rmLeaderElectionService = 
new TestingLeaderElectionService();
                final SettableLeaderRetrievalService jmLeaderRetrievalService = 
new SettableLeaderRetrievalService(jobMasterAddress, jobMasterId.toUUID());
                final TestingHighAvailabilityServices highAvailabilityServices 
= new TestingHighAvailabilityServices();
@@ -626,7 +616,6 @@ public class ResourceManagerTest extends TestLogger {
                                rpcService,
                                FlinkResourceManager.RESOURCE_MANAGER_NAME,
                                rmResourceId,
-                               resourceManagerConfiguration,
                                highAvailabilityServices,
                                heartbeatServices,
                                slotManager,

http://git-wip-us.apache.org/repos/asf/flink/blob/690ab2c3/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index 82fdc94..36da21a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -264,7 +264,6 @@ public class JobMasterTest extends TestLogger {
                final TestingResourceManagerGateway resourceManagerGateway = 
new TestingResourceManagerGateway(
                        resourceManagerId,
                        rmResourceId,
-                       fastHeartbeatInterval,
                        resourceManagerAddress,
                        "localhost");
 

http://git-wip-us.apache.org/repos/asf/flink/blob/690ab2c3/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
index f42c708..b8dee83 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
@@ -32,6 +32,7 @@ import org.apache.flink.runtime.rpc.TestingRpcService;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -64,10 +65,6 @@ public class ResourceManagerHATest extends TestLogger {
 
                HeartbeatServices heartbeatServices = 
mock(HeartbeatServices.class);
 
-               ResourceManagerConfiguration resourceManagerConfiguration = new 
ResourceManagerConfiguration(
-                       Time.seconds(5L),
-                       Time.seconds(5L));
-
                ResourceManagerRuntimeServicesConfiguration 
resourceManagerRuntimeServicesConfiguration = new 
ResourceManagerRuntimeServicesConfiguration(
                        Time.seconds(5L),
                        new SlotManagerConfiguration(
@@ -90,7 +87,6 @@ public class ResourceManagerHATest extends TestLogger {
                                rpcService,
                                FlinkResourceManager.RESOURCE_MANAGER_NAME,
                                rmResourceId,
-                               resourceManagerConfiguration,
                                highAvailabilityServices,
                                heartbeatServices,
                                resourceManagerRuntimeServices.getSlotManager(),

http://git-wip-us.apache.org/repos/asf/flink/blob/690ab2c3/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
index 532732d..3e630e1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
@@ -269,9 +269,6 @@ public class ResourceManagerJobMasterTest extends 
TestLogger {
 
                HeartbeatServices heartbeatServices = new 
HeartbeatServices(1000L, 1000L);
 
-               ResourceManagerConfiguration resourceManagerConfiguration = new 
ResourceManagerConfiguration(
-                       Time.seconds(5L),
-                       Time.seconds(5L));
                MetricRegistryImpl metricRegistry = 
mock(MetricRegistryImpl.class);
                JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(
                        highAvailabilityServices,
@@ -288,7 +285,6 @@ public class ResourceManagerJobMasterTest extends 
TestLogger {
                        rpcService,
                        FlinkResourceManager.RESOURCE_MANAGER_NAME,
                        rmResourceId,
-                       resourceManagerConfiguration,
                        highAvailabilityServices,
                        heartbeatServices,
                        slotManager,

http://git-wip-us.apache.org/repos/asf/flink/blob/690ab2c3/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
index 540db69..535271a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
@@ -188,10 +188,7 @@ public class ResourceManagerTaskExecutorTest extends 
TestLogger {
                TestingHighAvailabilityServices highAvailabilityServices = new 
TestingHighAvailabilityServices();
                HeartbeatServices heartbeatServices = new 
HeartbeatServices(1000L, 1000L);
                
highAvailabilityServices.setResourceManagerLeaderElectionService(rmLeaderElectionService);
-               ResourceManagerConfiguration resourceManagerConfiguration = new 
ResourceManagerConfiguration(
-                       Time.seconds(5L),
-                       Time.seconds(5L));
-                       
+
                SlotManager slotManager = new SlotManager(
                        rpcService.getScheduledExecutor(),
                        TestingUtils.infiniteTime(),
@@ -209,7 +206,6 @@ public class ResourceManagerTaskExecutorTest extends 
TestLogger {
                                rpcService,
                                FlinkResourceManager.RESOURCE_MANAGER_NAME,
                                resourceManagerResourceID,
-                               resourceManagerConfiguration,
                                highAvailabilityServices,
                                heartbeatServices,
                                slotManager,

http://git-wip-us.apache.org/repos/asf/flink/blob/690ab2c3/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
index f5b6d3a..a883784 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
@@ -68,7 +68,6 @@ public class ResourceManagerTest extends TestLogger {
        @Test
        public void testRequestTaskManagerInfo() throws Exception {
                final Configuration configuration = new Configuration();
-               final ResourceManagerConfiguration resourceManagerConfiguration 
= ResourceManagerConfiguration.fromConfiguration(configuration);
                final TestingHighAvailabilityServices highAvailabilityServices 
= new TestingHighAvailabilityServices();
                final SlotManager slotManager = new SlotManager(
                        rpcService.getScheduledExecutor(),
@@ -88,7 +87,6 @@ public class ResourceManagerTest extends TestLogger {
                        rpcService,
                        ResourceManager.RESOURCE_MANAGER_NAME,
                        ResourceID.generate(),
-                       resourceManagerConfiguration,
                        highAvailabilityServices,
                        new HeartbeatServices(1000L, 10000L),
                        slotManager,

http://git-wip-us.apache.org/repos/asf/flink/blob/690ab2c3/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java
index 2bd976b..0b56231 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java
@@ -41,7 +41,6 @@ public class TestingResourceManager extends 
ResourceManager<ResourceID> {
                        RpcService rpcService,
                        String resourceManagerEndpointId,
                        ResourceID resourceId,
-                       ResourceManagerConfiguration 
resourceManagerConfiguration,
                        HighAvailabilityServices highAvailabilityServices,
                        HeartbeatServices heartbeatServices,
                        SlotManager slotManager,
@@ -52,7 +51,6 @@ public class TestingResourceManager extends 
ResourceManager<ResourceID> {
                        rpcService,
                        resourceManagerEndpointId,
                        resourceId,
-                       resourceManagerConfiguration,
                        highAvailabilityServices,
                        heartbeatServices,
                        slotManager,

http://git-wip-us.apache.org/repos/asf/flink/blob/690ab2c3/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
index daa2383..c38ea5d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
@@ -63,8 +63,6 @@ public class TestingResourceManagerGateway implements 
ResourceManagerGateway {
 
        private final ResourceID ownResourceId;
 
-       private final long heartbeatInterval;
-
        private final String address;
 
        private final String hostname;
@@ -95,7 +93,6 @@ public class TestingResourceManagerGateway implements 
ResourceManagerGateway {
                this(
                        ResourceManagerId.generate(),
                        ResourceID.generate(),
-                       10000L,
                        "localhost",
                        "localhost");
        }
@@ -103,12 +100,10 @@ public class TestingResourceManagerGateway implements 
ResourceManagerGateway {
        public TestingResourceManagerGateway(
                        ResourceManagerId resourceManagerId,
                        ResourceID resourceId,
-                       long heartbeatInterval,
                        String address,
                        String hostname) {
                this.resourceManagerId = 
Preconditions.checkNotNull(resourceManagerId);
                this.ownResourceId = Preconditions.checkNotNull(resourceId);
-               this.heartbeatInterval = heartbeatInterval;
                this.address = Preconditions.checkNotNull(address);
                this.hostname = Preconditions.checkNotNull(hostname);
                this.slotFutureReference = new AtomicReference<>();
@@ -174,7 +169,6 @@ public class TestingResourceManagerGateway implements 
ResourceManagerGateway {
 
                return CompletableFuture.completedFuture(
                        new JobMasterRegistrationSuccess(
-                               heartbeatInterval,
                                resourceManagerId,
                                ownResourceId));
        }
@@ -227,7 +221,6 @@ public class TestingResourceManagerGateway implements 
ResourceManagerGateway {
                                new TaskExecutorRegistrationSuccess(
                                        new InstanceID(),
                                        ownResourceId,
-                                       heartbeatInterval,
                                        new ClusterInformation("localhost", 
1234)));
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/690ab2c3/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
index 61cfde5..9a864bd 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
@@ -43,7 +43,6 @@ import 
org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
 import org.apache.flink.runtime.resourcemanager.ResourceManager;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.resourcemanager.SlotRequest;
 import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager;
@@ -110,9 +109,6 @@ public class TaskExecutorITCase extends TestLogger {
                testingHAServices.setJobMasterLeaderRetriever(jobId, new 
SettableLeaderRetrievalService(jmAddress, jobMasterId.toUUID()));
 
                TestingRpcService rpcService = new TestingRpcService();
-               ResourceManagerConfiguration resourceManagerConfiguration = new 
ResourceManagerConfiguration(
-                       Time.milliseconds(500L),
-                       Time.milliseconds(500L));
                JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(
                        testingHAServices,
                        rpcService.getScheduledExecutor(),
@@ -142,7 +138,6 @@ public class TaskExecutorITCase extends TestLogger {
                        rpcService,
                        FlinkResourceManager.RESOURCE_MANAGER_NAME,
                        rmResourceId,
-                       resourceManagerConfiguration,
                        testingHAServices,
                        heartbeatServices,
                        slotManager,

http://git-wip-us.apache.org/repos/asf/flink/blob/690ab2c3/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index 9c1f2f2..168b251 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -311,14 +311,12 @@ public class TaskExecutorTest extends TestLogger {
                TestingResourceManagerGateway rmGateway = new 
TestingResourceManagerGateway(
                        rmLeaderId,
                        rmResourceId,
-                       heartbeatInterval,
                        rmAddress,
                        rmAddress);
 
                final TaskExecutorRegistrationSuccess registrationResponse = 
new TaskExecutorRegistrationSuccess(
                        new InstanceID(),
                        rmResourceId,
-                       heartbeatInterval,
                        new ClusterInformation("localhost", 1234));
 
                final CompletableFuture<ResourceID> 
taskExecutorRegistrationFuture = new CompletableFuture<>();
@@ -436,7 +434,6 @@ public class TaskExecutorTest extends TestLogger {
                        new TaskExecutorRegistrationSuccess(
                                new InstanceID(),
                                rmResourceId,
-                               10L,
                                new ClusterInformation("localhost", 1234)));
 
                
rmGateway.setRegisterTaskExecutorFunction(stringResourceIDIntegerHardwareDescriptionTuple4
 -> {
@@ -556,7 +553,7 @@ public class TaskExecutorTest extends TestLogger {
                when(rmGateway.registerTaskExecutor(
                                        anyString(), any(ResourceID.class), 
anyInt(), any(HardwareDescription.class), any(Time.class)))
                        .thenReturn(CompletableFuture.completedFuture(new 
TaskExecutorRegistrationSuccess(
-                               new InstanceID(), resourceManagerResourceId, 
10L, new ClusterInformation("localhost", 1234))));
+                               new InstanceID(), resourceManagerResourceId, 
new ClusterInformation("localhost", 1234))));
 
                rpc.registerGateway(resourceManagerAddress, rmGateway);
 
@@ -620,11 +617,11 @@ public class TaskExecutorTest extends TestLogger {
                when(rmGateway1.registerTaskExecutor(
                                        anyString(), any(ResourceID.class), 
anyInt(), any(HardwareDescription.class), any(Time.class)))
                        .thenReturn(CompletableFuture.completedFuture(
-                               new TaskExecutorRegistrationSuccess(new 
InstanceID(), rmResourceId1, 10L, new ClusterInformation("localhost", 1234))));
+                               new TaskExecutorRegistrationSuccess(new 
InstanceID(), rmResourceId1, new ClusterInformation("localhost", 1234))));
                when(rmGateway2.registerTaskExecutor(
                                        anyString(), any(ResourceID.class), 
anyInt(), any(HardwareDescription.class), any(Time.class)))
                        .thenReturn(CompletableFuture.completedFuture(
-                               new TaskExecutorRegistrationSuccess(new 
InstanceID(), rmResourceId2, 10L, new ClusterInformation("localhost", 1234))));
+                               new TaskExecutorRegistrationSuccess(new 
InstanceID(), rmResourceId2, new ClusterInformation("localhost", 1234))));
 
                rpc.registerGateway(address1, rmGateway1);
                rpc.registerGateway(address2, rmGateway2);
@@ -938,7 +935,7 @@ public class TaskExecutorTest extends TestLogger {
                resourceManagerGateway.setRegisterTaskExecutorFunction(
                        stringResourceIDIntegerHardwareDescriptionTuple4 -> {
                 
registrationFuture.complete(stringResourceIDIntegerHardwareDescriptionTuple4.f1);
-                return CompletableFuture.completedFuture(new 
TaskExecutorRegistrationSuccess(registrationId, resourceManagerResourceId, 
1000L, new ClusterInformation("localhost", 1234)));
+                return CompletableFuture.completedFuture(new 
TaskExecutorRegistrationSuccess(registrationId, resourceManagerResourceId, new 
ClusterInformation("localhost", 1234)));
             }
                );
 
@@ -1271,7 +1268,6 @@ public class TaskExecutorTest extends TestLogger {
                final TestingResourceManagerGateway rmGateway = new 
TestingResourceManagerGateway(
                        ResourceManagerId.generate(),
                        rmResourceID,
-                       heartbeatInterval,
                        rmAddress,
                        rmAddress);
 
@@ -1458,7 +1454,6 @@ public class TaskExecutorTest extends TestLogger {
                                                return 
CompletableFuture.completedFuture(new TaskExecutorRegistrationSuccess(
                                                        new InstanceID(),
                                                        
testingResourceManagerGateway.getOwnResourceId(),
-                                                       heartbeatInterval,
                                                        new 
ClusterInformation("localhost", 1234)));
                                        } else {
                                                secondRegistration.trigger();
@@ -1558,7 +1553,7 @@ public class TaskExecutorTest extends TestLogger {
                try {
                        final TestingResourceManagerGateway 
testingResourceManagerGateway = new TestingResourceManagerGateway();
                        final ClusterInformation clusterInformation = new 
ClusterInformation("foobar", 1234);
-                       final CompletableFuture<RegistrationResponse> 
registrationResponseFuture = CompletableFuture.completedFuture(new 
TaskExecutorRegistrationSuccess(new InstanceID(), ResourceID.generate(), 
heartbeatInterval, clusterInformation));
+                       final CompletableFuture<RegistrationResponse> 
registrationResponseFuture = CompletableFuture.completedFuture(new 
TaskExecutorRegistrationSuccess(new InstanceID(), ResourceID.generate(), 
clusterInformation));
                        final BlockingQueue<ResourceID> registrationQueue = new 
ArrayBlockingQueue<>(1);
 
                        
testingResourceManagerGateway.setRegisterTaskExecutorFunction(stringResourceIDSlotReportIntegerHardwareDescriptionTuple5
 -> {
@@ -1657,7 +1652,6 @@ public class TaskExecutorTest extends TestLogger {
                                new TaskExecutorRegistrationSuccess(
                                        new InstanceID(),
                                        
testingResourceManagerGateway.getOwnResourceId(),
-                                       1000L,
                                        new ClusterInformation("foobar", 1234)  
));
 
                        final CountDownLatch numberRegistrations = new 
CountDownLatch(2);

http://git-wip-us.apache.org/repos/asf/flink/blob/690ab2c3/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingResourceManager.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingResourceManager.java
index 17995a3..3ab0699 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingResourceManager.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingResourceManager.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.testutils;
 
-import akka.actor.ActorRef;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import 
org.apache.flink.runtime.clusterframework.messages.RegisterResourceManagerSuccessful;
@@ -28,6 +27,8 @@ import 
org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.testingUtils.TestingMessages;
 
+import akka.actor.ActorRef;
+
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.Set;

http://git-wip-us.apache.org/repos/asf/flink/blob/690ab2c3/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index 49385e5..876e858 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -34,7 +34,6 @@ import 
org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
 import org.apache.flink.runtime.resourcemanager.ResourceManager;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
 import 
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
@@ -122,7 +121,6 @@ public class YarnResourceManager extends 
ResourceManager<YarnWorkerNode> impleme
                        ResourceID resourceId,
                        Configuration flinkConfig,
                        Map<String, String> env,
-                       ResourceManagerConfiguration 
resourceManagerConfiguration,
                        HighAvailabilityServices highAvailabilityServices,
                        HeartbeatServices heartbeatServices,
                        SlotManager slotManager,
@@ -135,7 +133,6 @@ public class YarnResourceManager extends 
ResourceManager<YarnWorkerNode> impleme
                        rpcService,
                        resourceManagerEndpointId,
                        resourceId,
-                       resourceManagerConfiguration,
                        highAvailabilityServices,
                        heartbeatServices,
                        slotManager,

http://git-wip-us.apache.org/repos/asf/flink/blob/690ab2c3/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java
index 8cc3579..482e2b7 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java
@@ -28,7 +28,6 @@ import 
org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.resourcemanager.ResourceManager;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
 import 
org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
@@ -93,7 +92,6 @@ public class YarnJobClusterEntrypoint extends 
JobClusterEntrypoint {
                        FatalErrorHandler fatalErrorHandler,
                        ClusterInformation clusterInformation,
                        @Nullable String webInterfaceUrl) throws Exception {
-               final ResourceManagerConfiguration rmConfiguration = 
ResourceManagerConfiguration.fromConfiguration(configuration);
                final ResourceManagerRuntimeServicesConfiguration 
rmServicesConfiguration = 
ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);
                final ResourceManagerRuntimeServices rmRuntimeServices = 
ResourceManagerRuntimeServices.fromConfiguration(
                        rmServicesConfiguration,
@@ -106,7 +104,6 @@ public class YarnJobClusterEntrypoint extends 
JobClusterEntrypoint {
                        resourceId,
                        configuration,
                        System.getenv(),
-                       rmConfiguration,
                        highAvailabilityServices,
                        heartbeatServices,
                        rmRuntimeServices.getSlotManager(),

http://git-wip-us.apache.org/repos/asf/flink/blob/690ab2c3/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java
index 232467a..6c92861 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java
@@ -26,7 +26,6 @@ import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.resourcemanager.ResourceManager;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
 import 
org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
@@ -81,7 +80,6 @@ public class YarnSessionClusterEntrypoint extends 
SessionClusterEntrypoint {
                        FatalErrorHandler fatalErrorHandler,
                        ClusterInformation clusterInformation,
                        @Nullable String webInterfaceUrl) throws Exception {
-               final ResourceManagerConfiguration rmConfiguration = 
ResourceManagerConfiguration.fromConfiguration(configuration);
                final ResourceManagerRuntimeServicesConfiguration 
rmServicesConfiguration = 
ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);
                final ResourceManagerRuntimeServices rmRuntimeServices = 
ResourceManagerRuntimeServices.fromConfiguration(
                        rmServicesConfiguration,
@@ -94,7 +92,6 @@ public class YarnSessionClusterEntrypoint extends 
SessionClusterEntrypoint {
                        resourceId,
                        configuration,
                        System.getenv(),
-                       rmConfiguration,
                        highAvailabilityServices,
                        heartbeatServices,
                        rmRuntimeServices.getSlotManager(),

http://git-wip-us.apache.org/repos/asf/flink/blob/690ab2c3/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java 
b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
index 8c6d7f7..eb8e968 100644
--- 
a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
+++ 
b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
@@ -41,7 +41,6 @@ import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
 import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.resourcemanager.SlotRequest;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
@@ -154,7 +153,6 @@ public class YarnResourceManagerTest extends TestLogger {
                                ResourceID resourceId,
                                Configuration flinkConfig,
                                Map<String, String> env,
-                               ResourceManagerConfiguration 
resourceManagerConfiguration,
                                HighAvailabilityServices 
highAvailabilityServices,
                                HeartbeatServices heartbeatServices,
                                SlotManager slotManager,
@@ -171,7 +169,6 @@ public class YarnResourceManagerTest extends TestLogger {
                                resourceId,
                                flinkConfig,
                                env,
-                               resourceManagerConfiguration,
                                highAvailabilityServices,
                                heartbeatServices,
                                slotManager,
@@ -220,7 +217,6 @@ public class YarnResourceManagerTest extends TestLogger {
                final MockResourceManagerRuntimeServices rmServices;
 
                // RM
-               final ResourceManagerConfiguration rmConfiguration;
                final ResourceID rmResourceID;
                static final String RM_ADDRESS = "resourceManager";
                final TestingYarnResourceManager resourceManager;
@@ -248,9 +244,6 @@ public class YarnResourceManagerTest extends TestLogger {
                        rmServices = new MockResourceManagerRuntimeServices();
 
                        // resource manager
-                       rmConfiguration = new ResourceManagerConfiguration(
-                                       Time.seconds(5L),
-                                       Time.seconds(5L));
                        rmResourceID = ResourceID.generate();
                        resourceManager =
                                        new TestingYarnResourceManager(
@@ -259,7 +252,6 @@ public class YarnResourceManagerTest extends TestLogger {
                                                        rmResourceID,
                                                        flinkConfig,
                                                        env,
-                                                       rmConfiguration,
                                                        
rmServices.highAvailabilityServices,
                                                        
rmServices.heartbeatServices,
                                                        rmServices.slotManager,

Reply via email to