Repository: flink Updated Branches: refs/heads/master c6b39e4f2 -> 690ab2c31
[FLINK-9890][Distributed Coordination] Remove obsolete class ResourceManagerConfiguration The configuration values are effectively not used. This commit removes the class and all its usages. This closes #6368. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/690ab2c3 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/690ab2c3 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/690ab2c3 Branch: refs/heads/master Commit: 690ab2c311c703922b3d18eec177e09e1688dca8 Parents: c6b39e4 Author: gyao <[email protected]> Authored: Wed Jul 18 16:23:59 2018 +0200 Committer: Till Rohrmann <[email protected]> Committed: Sun Jul 22 22:22:56 2018 +0200 ---------------------------------------------------------------------- .../StandaloneJobClusterEntryPoint.java | 3 - .../entrypoint/MesosJobClusterEntrypoint.java | 3 - .../MesosSessionClusterEntrypoint.java | 3 - .../clusterframework/MesosResourceManager.java | 3 - .../MesosResourceManagerTest.java | 8 -- .../StandaloneSessionClusterEntrypoint.java | 3 - .../jobmaster/JobMasterRegistrationSuccess.java | 16 +--- .../resourcemanager/ResourceManager.java | 7 -- .../ResourceManagerConfiguration.java | 78 -------------------- .../resourcemanager/ResourceManagerRunner.java | 3 - .../StandaloneResourceManager.java | 2 - .../TaskExecutorRegistrationSuccess.java | 19 ++--- .../clusterframework/ResourceManagerTest.java | 11 --- .../flink/runtime/jobmaster/JobMasterTest.java | 1 - .../resourcemanager/ResourceManagerHATest.java | 6 +- .../ResourceManagerJobMasterTest.java | 4 - .../ResourceManagerTaskExecutorTest.java | 6 +- .../resourcemanager/ResourceManagerTest.java | 2 - .../resourcemanager/TestingResourceManager.java | 2 - .../utils/TestingResourceManagerGateway.java | 7 -- .../taskexecutor/TaskExecutorITCase.java | 5 -- .../runtime/taskexecutor/TaskExecutorTest.java | 16 ++-- .../testutils/TestingResourceManager.java | 3 +- .../apache/flink/yarn/YarnResourceManager.java | 3 - .../entrypoint/YarnJobClusterEntrypoint.java | 3 - .../YarnSessionClusterEntrypoint.java | 3 - .../flink/yarn/YarnResourceManagerTest.java | 8 -- 27 files changed, 15 insertions(+), 213 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/690ab2c3/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java ---------------------------------------------------------------------- diff --git a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java index 57f7ca2..f4550e8 100644 --- a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java +++ b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java @@ -35,7 +35,6 @@ import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.resourcemanager.ResourceManager; -import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices; import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration; import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager; @@ -109,7 +108,6 @@ public final class StandaloneJobClusterEntryPoint extends JobClusterEntrypoint { FatalErrorHandler fatalErrorHandler, ClusterInformation clusterInformation, @Nullable String webInterfaceUrl) throws Exception { - final ResourceManagerConfiguration resourceManagerConfiguration = ResourceManagerConfiguration.fromConfiguration(configuration); final ResourceManagerRuntimeServicesConfiguration resourceManagerRuntimeServicesConfiguration = ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration); final ResourceManagerRuntimeServices resourceManagerRuntimeServices = ResourceManagerRuntimeServices.fromConfiguration( resourceManagerRuntimeServicesConfiguration, @@ -120,7 +118,6 @@ public final class StandaloneJobClusterEntryPoint extends JobClusterEntrypoint { rpcService, ResourceManager.RESOURCE_MANAGER_NAME, resourceId, - resourceManagerConfiguration, highAvailabilityServices, heartbeatServices, resourceManagerRuntimeServices.getSlotManager(), http://git-wip-us.apache.org/repos/asf/flink/blob/690ab2c3/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java ---------------------------------------------------------------------- diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java index cf661cb..d9876ec 100755 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java @@ -37,7 +37,6 @@ import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.resourcemanager.ResourceManager; -import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices; import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration; import org.apache.flink.runtime.rpc.FatalErrorHandler; @@ -125,7 +124,6 @@ public class MesosJobClusterEntrypoint extends JobClusterEntrypoint { FatalErrorHandler fatalErrorHandler, ClusterInformation clusterInformation, @Nullable String webInterfaceUrl) throws Exception { - final ResourceManagerConfiguration rmConfiguration = ResourceManagerConfiguration.fromConfiguration(configuration); final ResourceManagerRuntimeServicesConfiguration rmServicesConfiguration = ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration); final ResourceManagerRuntimeServices rmRuntimeServices = ResourceManagerRuntimeServices.fromConfiguration( rmServicesConfiguration, @@ -136,7 +134,6 @@ public class MesosJobClusterEntrypoint extends JobClusterEntrypoint { rpcService, ResourceManager.RESOURCE_MANAGER_NAME, resourceId, - rmConfiguration, highAvailabilityServices, heartbeatServices, rmRuntimeServices.getSlotManager(), http://git-wip-us.apache.org/repos/asf/flink/blob/690ab2c3/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java ---------------------------------------------------------------------- diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java index 5dea936..3213f65 100755 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java @@ -35,7 +35,6 @@ import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.resourcemanager.ResourceManager; -import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices; import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration; import org.apache.flink.runtime.rpc.FatalErrorHandler; @@ -115,7 +114,6 @@ public class MesosSessionClusterEntrypoint extends SessionClusterEntrypoint { FatalErrorHandler fatalErrorHandler, ClusterInformation clusterInformation, @Nullable String webInterfaceUrl) throws Exception { - final ResourceManagerConfiguration rmConfiguration = ResourceManagerConfiguration.fromConfiguration(configuration); final ResourceManagerRuntimeServicesConfiguration rmServicesConfiguration = ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration); final ResourceManagerRuntimeServices rmRuntimeServices = ResourceManagerRuntimeServices.fromConfiguration( rmServicesConfiguration, @@ -126,7 +124,6 @@ public class MesosSessionClusterEntrypoint extends SessionClusterEntrypoint { rpcService, ResourceManager.RESOURCE_MANAGER_NAME, resourceId, - rmConfiguration, highAvailabilityServices, heartbeatServices, rmRuntimeServices.getSlotManager(), http://git-wip-us.apache.org/repos/asf/flink/blob/690ab2c3/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java index ea10ff8..e24214d 100644 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java @@ -53,7 +53,6 @@ import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; import org.apache.flink.runtime.resourcemanager.ResourceManager; -import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException; import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; import org.apache.flink.runtime.rpc.FatalErrorHandler; @@ -144,7 +143,6 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN RpcService rpcService, String resourceManagerEndpointId, ResourceID resourceId, - ResourceManagerConfiguration resourceManagerConfiguration, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, SlotManager slotManager, @@ -162,7 +160,6 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN rpcService, resourceManagerEndpointId, resourceId, - resourceManagerConfiguration, highAvailabilityServices, heartbeatServices, slotManager, http://git-wip-us.apache.org/repos/asf/flink/blob/690ab2c3/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java index 9fa8c0e..5af3fa0 100644 --- a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java +++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java @@ -59,7 +59,6 @@ import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.metrics.MetricRegistryImpl; import org.apache.flink.runtime.registration.RegistrationResponse; import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; -import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; import org.apache.flink.runtime.resourcemanager.ResourceManagerId; import org.apache.flink.runtime.resourcemanager.SlotRequest; import org.apache.flink.runtime.resourcemanager.slotmanager.ResourceActions; @@ -161,7 +160,6 @@ public class MesosResourceManagerTest extends TestLogger { RpcService rpcService, String resourceManagerEndpointId, ResourceID resourceId, - ResourceManagerConfiguration resourceManagerConfiguration, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, SlotManager slotManager, @@ -179,7 +177,6 @@ public class MesosResourceManagerTest extends TestLogger { rpcService, resourceManagerEndpointId, resourceId, - resourceManagerConfiguration, highAvailabilityServices, heartbeatServices, slotManager, @@ -239,7 +236,6 @@ public class MesosResourceManagerTest extends TestLogger { MockMesosServices mesosServices; // RM - ResourceManagerConfiguration rmConfiguration; ResourceID rmResourceID; static final String RM_ADDRESS = "resourceManager"; TestingMesosResourceManager resourceManager; @@ -284,16 +280,12 @@ public class MesosResourceManagerTest extends TestLogger { Option.<String>empty(), Collections.<String>emptyList()); // resource manager - rmConfiguration = new ResourceManagerConfiguration( - Time.seconds(5L), - Time.seconds(5L)); rmResourceID = ResourceID.generate(); resourceManager = new TestingMesosResourceManager( rpcService, RM_ADDRESS, rmResourceID, - rmConfiguration, rmServices.highAvailabilityServices, rmServices.heartbeatServices, rmServices.slotManager, http://git-wip-us.apache.org/repos/asf/flink/blob/690ab2c3/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java index d56725c..9943936 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java @@ -26,7 +26,6 @@ import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.resourcemanager.ResourceManager; -import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices; import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration; import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager; @@ -58,7 +57,6 @@ public class StandaloneSessionClusterEntrypoint extends SessionClusterEntrypoint FatalErrorHandler fatalErrorHandler, ClusterInformation clusterInformation, @Nullable String webInterfaceUrl) throws Exception { - final ResourceManagerConfiguration resourceManagerConfiguration = ResourceManagerConfiguration.fromConfiguration(configuration); final ResourceManagerRuntimeServicesConfiguration resourceManagerRuntimeServicesConfiguration = ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration); final ResourceManagerRuntimeServices resourceManagerRuntimeServices = ResourceManagerRuntimeServices.fromConfiguration( resourceManagerRuntimeServicesConfiguration, @@ -69,7 +67,6 @@ public class StandaloneSessionClusterEntrypoint extends SessionClusterEntrypoint rpcService, FlinkResourceManager.RESOURCE_MANAGER_NAME, resourceId, - resourceManagerConfiguration, highAvailabilityServices, heartbeatServices, resourceManagerRuntimeServices.getSlotManager(), http://git-wip-us.apache.org/repos/asf/flink/blob/690ab2c3/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterRegistrationSuccess.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterRegistrationSuccess.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterRegistrationSuccess.java index 94ecfd2..b5c2d6a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterRegistrationSuccess.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterRegistrationSuccess.java @@ -31,30 +31,17 @@ public class JobMasterRegistrationSuccess extends RegistrationResponse.Success { private static final long serialVersionUID = 5577641250204140415L; - private final long heartbeatInterval; - private final ResourceManagerId resourceManagerId; private final ResourceID resourceManagerResourceId; public JobMasterRegistrationSuccess( - final long heartbeatInterval, final ResourceManagerId resourceManagerId, final ResourceID resourceManagerResourceId) { - this.heartbeatInterval = heartbeatInterval; this.resourceManagerId = checkNotNull(resourceManagerId); this.resourceManagerResourceId = checkNotNull(resourceManagerResourceId); } - /** - * Gets the interval in which the ResourceManager will heartbeat the JobMaster. - * - * @return the interval in which the ResourceManager will heartbeat the JobMaster - */ - public long getHeartbeatInterval() { - return heartbeatInterval; - } - public ResourceManagerId getResourceManagerId() { return resourceManagerId; } @@ -66,8 +53,7 @@ public class JobMasterRegistrationSuccess extends RegistrationResponse.Success { @Override public String toString() { return "JobMasterRegistrationSuccess{" + - "heartbeatInterval=" + heartbeatInterval + - ", resourceManagerLeaderId=" + resourceManagerId + + "resourceManagerId=" + resourceManagerId + ", resourceManagerResourceId=" + resourceManagerResourceId + '}'; } http://git-wip-us.apache.org/repos/asf/flink/blob/690ab2c3/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java index 6b104ed..453ec8b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java @@ -101,9 +101,6 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable> /** Unique id of the resource manager. */ private final ResourceID resourceId; - /** Configuration of the resource manager. */ - private final ResourceManagerConfiguration resourceManagerConfiguration; - /** All currently registered JobMasterGateways scoped by JobID. */ private final Map<JobID, JobManagerRegistration> jobManagerRegistrations; @@ -146,7 +143,6 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable> RpcService rpcService, String resourceManagerEndpointId, ResourceID resourceId, - ResourceManagerConfiguration resourceManagerConfiguration, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, SlotManager slotManager, @@ -158,7 +154,6 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable> super(rpcService, resourceManagerEndpointId); this.resourceId = checkNotNull(resourceId); - this.resourceManagerConfiguration = checkNotNull(resourceManagerConfiguration); this.highAvailabilityServices = checkNotNull(highAvailabilityServices); this.slotManager = checkNotNull(slotManager); this.metricRegistry = checkNotNull(metricRegistry); @@ -668,7 +663,6 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable> }); return new JobMasterRegistrationSuccess( - resourceManagerConfiguration.getHeartbeatInterval().toMilliseconds(), getFencingToken(), resourceId); } @@ -726,7 +720,6 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable> return new TaskExecutorRegistrationSuccess( registration.getInstanceID(), resourceId, - resourceManagerConfiguration.getHeartbeatInterval().toMilliseconds(), clusterInformation); } } http://git-wip-us.apache.org/repos/asf/flink/blob/690ab2c3/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerConfiguration.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerConfiguration.java deleted file mode 100644 index 0216789..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerConfiguration.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.resourcemanager; - -import org.apache.flink.api.common.time.Time; -import org.apache.flink.configuration.AkkaOptions; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.util.ConfigurationException; -import org.apache.flink.util.Preconditions; -import scala.concurrent.duration.Duration; - -/** - * Resource manager configuration - */ -public class ResourceManagerConfiguration { - - private final Time timeout; - private final Time heartbeatInterval; - - public ResourceManagerConfiguration( - Time timeout, - Time heartbeatInterval) { - this.timeout = Preconditions.checkNotNull(timeout, "timeout"); - this.heartbeatInterval = Preconditions.checkNotNull(heartbeatInterval, "heartbeatInterval"); - } - - public Time getTimeout() { - return timeout; - } - - public Time getHeartbeatInterval() { - return heartbeatInterval; - } - - // -------------------------------------------------------------------------- - // Static factory methods - // -------------------------------------------------------------------------- - - public static ResourceManagerConfiguration fromConfiguration(Configuration configuration) throws ConfigurationException { - final String strTimeout = configuration.getString(AkkaOptions.ASK_TIMEOUT); - final Time timeout; - - try { - timeout = Time.milliseconds(Duration.apply(strTimeout).toMillis()); - } catch (NumberFormatException e) { - throw new ConfigurationException("Could not parse the resource manager's timeout " + - "value " + AkkaOptions.ASK_TIMEOUT + '.', e); - } - - final String strHeartbeatInterval = configuration.getString(AkkaOptions.WATCH_HEARTBEAT_INTERVAL); - final Time heartbeatInterval; - - try { - heartbeatInterval = Time.milliseconds(Duration.apply(strHeartbeatInterval).toMillis()); - } catch (NumberFormatException e) { - throw new ConfigurationException("Could not parse the resource manager's heartbeat interval " + - "value " + AkkaOptions.WATCH_HEARTBEAT_INTERVAL + '.', e); - } - - return new ResourceManagerConfiguration(timeout, heartbeatInterval); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/690ab2c3/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java index ff9b4f0..072c2f2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java @@ -65,8 +65,6 @@ public class ResourceManagerRunner implements FatalErrorHandler, AutoCloseableAs Preconditions.checkNotNull(heartbeatServices); Preconditions.checkNotNull(metricRegistry); - final ResourceManagerConfiguration resourceManagerConfiguration = ResourceManagerConfiguration.fromConfiguration(configuration); - final ResourceManagerRuntimeServicesConfiguration resourceManagerRuntimeServicesConfiguration = ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration); resourceManagerRuntimeServices = ResourceManagerRuntimeServices.fromConfiguration( @@ -78,7 +76,6 @@ public class ResourceManagerRunner implements FatalErrorHandler, AutoCloseableAs rpcService, resourceManagerEndpointId, resourceId, - resourceManagerConfiguration, highAvailabilityServices, heartbeatServices, resourceManagerRuntimeServices.getSlotManager(), http://git-wip-us.apache.org/repos/asf/flink/blob/690ab2c3/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java index d8e0e48..420b89f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java @@ -44,7 +44,6 @@ public class StandaloneResourceManager extends ResourceManager<ResourceID> { RpcService rpcService, String resourceManagerEndpointId, ResourceID resourceId, - ResourceManagerConfiguration resourceManagerConfiguration, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, SlotManager slotManager, @@ -56,7 +55,6 @@ public class StandaloneResourceManager extends ResourceManager<ResourceID> { rpcService, resourceManagerEndpointId, resourceId, - resourceManagerConfiguration, highAvailabilityServices, heartbeatServices, slotManager, http://git-wip-us.apache.org/repos/asf/flink/blob/690ab2c3/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorRegistrationSuccess.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorRegistrationSuccess.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorRegistrationSuccess.java index 1b1c1e5..6a65988 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorRegistrationSuccess.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorRegistrationSuccess.java @@ -38,8 +38,6 @@ public final class TaskExecutorRegistrationSuccess extends RegistrationResponse. private final ResourceID resourceManagerResourceId; - private final long heartbeatInterval; - private final ClusterInformation clusterInformation; /** @@ -47,17 +45,14 @@ public final class TaskExecutorRegistrationSuccess extends RegistrationResponse. * * @param registrationId The ID that the ResourceManager assigned the registration. * @param resourceManagerResourceId The unique ID that identifies the ResourceManager. - * @param heartbeatInterval The interval in which the ResourceManager will heartbeat the TaskExecutor. * @param clusterInformation information about the cluster */ public TaskExecutorRegistrationSuccess( InstanceID registrationId, ResourceID resourceManagerResourceId, - long heartbeatInterval, ClusterInformation clusterInformation) { this.registrationId = Preconditions.checkNotNull(registrationId); this.resourceManagerResourceId = Preconditions.checkNotNull(resourceManagerResourceId); - this.heartbeatInterval = heartbeatInterval; this.clusterInformation = Preconditions.checkNotNull(clusterInformation); } @@ -76,13 +71,6 @@ public final class TaskExecutorRegistrationSuccess extends RegistrationResponse. } /** - * Gets the interval in which the ResourceManager will heartbeat the TaskExecutor. - */ - public long getHeartbeatInterval() { - return heartbeatInterval; - } - - /** * Gets the cluster information. */ public ClusterInformation getClusterInformation() { @@ -91,9 +79,12 @@ public final class TaskExecutorRegistrationSuccess extends RegistrationResponse. @Override public String toString() { - return "TaskExecutorRegistrationSuccess (" + registrationId + " / " + resourceManagerResourceId + " / " + heartbeatInterval + ')'; + return "TaskExecutorRegistrationSuccess{" + + "registrationId=" + registrationId + + ", resourceManagerResourceId=" + resourceManagerResourceId + + ", clusterInformation=" + clusterInformation + + '}'; } - } http://git-wip-us.apache.org/repos/asf/flink/blob/690ab2c3/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java index 8898ef0..3c1f3da 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java @@ -48,7 +48,6 @@ import org.apache.flink.runtime.messages.JobManagerMessages; import org.apache.flink.runtime.metrics.MetricRegistryImpl; import org.apache.flink.runtime.registration.RegistrationResponse; import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; -import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; import org.apache.flink.runtime.resourcemanager.ResourceManagerId; import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager; @@ -496,10 +495,6 @@ public class ResourceManagerTest extends TestLogger { final TestingRpcService rpcService = new TestingRpcService(); rpcService.registerGateway(taskManagerAddress, taskExecutorGateway); - final ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration( - Time.seconds(5L), - Time.seconds(5L)); - final TestingLeaderElectionService rmLeaderElectionService = new TestingLeaderElectionService(); final TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices(); highAvailabilityServices.setResourceManagerLeaderElectionService(rmLeaderElectionService); @@ -523,7 +518,6 @@ public class ResourceManagerTest extends TestLogger { rpcService, FlinkResourceManager.RESOURCE_MANAGER_NAME, resourceManagerResourceID, - resourceManagerConfiguration, highAvailabilityServices, heartbeatServices, slotManager, @@ -594,10 +588,6 @@ public class ResourceManagerTest extends TestLogger { final TestingRpcService rpcService = new TestingRpcService(); rpcService.registerGateway(jobMasterAddress, jobMasterGateway); - final ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration( - Time.seconds(5L), - Time.seconds(5L)); - final TestingLeaderElectionService rmLeaderElectionService = new TestingLeaderElectionService(); final SettableLeaderRetrievalService jmLeaderRetrievalService = new SettableLeaderRetrievalService(jobMasterAddress, jobMasterId.toUUID()); final TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices(); @@ -626,7 +616,6 @@ public class ResourceManagerTest extends TestLogger { rpcService, FlinkResourceManager.RESOURCE_MANAGER_NAME, rmResourceId, - resourceManagerConfiguration, highAvailabilityServices, heartbeatServices, slotManager, http://git-wip-us.apache.org/repos/asf/flink/blob/690ab2c3/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java index 82fdc94..36da21a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java @@ -264,7 +264,6 @@ public class JobMasterTest extends TestLogger { final TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway( resourceManagerId, rmResourceId, - fastHeartbeatInterval, resourceManagerAddress, "localhost"); http://git-wip-us.apache.org/repos/asf/flink/blob/690ab2c3/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java index f42c708..b8dee83 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java @@ -32,6 +32,7 @@ import org.apache.flink.runtime.rpc.TestingRpcService; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.runtime.util.TestingFatalErrorHandler; import org.apache.flink.util.TestLogger; + import org.junit.Assert; import org.junit.Test; @@ -64,10 +65,6 @@ public class ResourceManagerHATest extends TestLogger { HeartbeatServices heartbeatServices = mock(HeartbeatServices.class); - ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration( - Time.seconds(5L), - Time.seconds(5L)); - ResourceManagerRuntimeServicesConfiguration resourceManagerRuntimeServicesConfiguration = new ResourceManagerRuntimeServicesConfiguration( Time.seconds(5L), new SlotManagerConfiguration( @@ -90,7 +87,6 @@ public class ResourceManagerHATest extends TestLogger { rpcService, FlinkResourceManager.RESOURCE_MANAGER_NAME, rmResourceId, - resourceManagerConfiguration, highAvailabilityServices, heartbeatServices, resourceManagerRuntimeServices.getSlotManager(), http://git-wip-us.apache.org/repos/asf/flink/blob/690ab2c3/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java index 532732d..3e630e1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java @@ -269,9 +269,6 @@ public class ResourceManagerJobMasterTest extends TestLogger { HeartbeatServices heartbeatServices = new HeartbeatServices(1000L, 1000L); - ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration( - Time.seconds(5L), - Time.seconds(5L)); MetricRegistryImpl metricRegistry = mock(MetricRegistryImpl.class); JobLeaderIdService jobLeaderIdService = new JobLeaderIdService( highAvailabilityServices, @@ -288,7 +285,6 @@ public class ResourceManagerJobMasterTest extends TestLogger { rpcService, FlinkResourceManager.RESOURCE_MANAGER_NAME, rmResourceId, - resourceManagerConfiguration, highAvailabilityServices, heartbeatServices, slotManager, http://git-wip-us.apache.org/repos/asf/flink/blob/690ab2c3/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java index 540db69..535271a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java @@ -188,10 +188,7 @@ public class ResourceManagerTaskExecutorTest extends TestLogger { TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices(); HeartbeatServices heartbeatServices = new HeartbeatServices(1000L, 1000L); highAvailabilityServices.setResourceManagerLeaderElectionService(rmLeaderElectionService); - ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration( - Time.seconds(5L), - Time.seconds(5L)); - + SlotManager slotManager = new SlotManager( rpcService.getScheduledExecutor(), TestingUtils.infiniteTime(), @@ -209,7 +206,6 @@ public class ResourceManagerTaskExecutorTest extends TestLogger { rpcService, FlinkResourceManager.RESOURCE_MANAGER_NAME, resourceManagerResourceID, - resourceManagerConfiguration, highAvailabilityServices, heartbeatServices, slotManager, http://git-wip-us.apache.org/repos/asf/flink/blob/690ab2c3/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java index f5b6d3a..a883784 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java @@ -68,7 +68,6 @@ public class ResourceManagerTest extends TestLogger { @Test public void testRequestTaskManagerInfo() throws Exception { final Configuration configuration = new Configuration(); - final ResourceManagerConfiguration resourceManagerConfiguration = ResourceManagerConfiguration.fromConfiguration(configuration); final TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices(); final SlotManager slotManager = new SlotManager( rpcService.getScheduledExecutor(), @@ -88,7 +87,6 @@ public class ResourceManagerTest extends TestLogger { rpcService, ResourceManager.RESOURCE_MANAGER_NAME, ResourceID.generate(), - resourceManagerConfiguration, highAvailabilityServices, new HeartbeatServices(1000L, 10000L), slotManager, http://git-wip-us.apache.org/repos/asf/flink/blob/690ab2c3/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java index 2bd976b..0b56231 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java @@ -41,7 +41,6 @@ public class TestingResourceManager extends ResourceManager<ResourceID> { RpcService rpcService, String resourceManagerEndpointId, ResourceID resourceId, - ResourceManagerConfiguration resourceManagerConfiguration, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, SlotManager slotManager, @@ -52,7 +51,6 @@ public class TestingResourceManager extends ResourceManager<ResourceID> { rpcService, resourceManagerEndpointId, resourceId, - resourceManagerConfiguration, highAvailabilityServices, heartbeatServices, slotManager, http://git-wip-us.apache.org/repos/asf/flink/blob/690ab2c3/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java index daa2383..c38ea5d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java @@ -63,8 +63,6 @@ public class TestingResourceManagerGateway implements ResourceManagerGateway { private final ResourceID ownResourceId; - private final long heartbeatInterval; - private final String address; private final String hostname; @@ -95,7 +93,6 @@ public class TestingResourceManagerGateway implements ResourceManagerGateway { this( ResourceManagerId.generate(), ResourceID.generate(), - 10000L, "localhost", "localhost"); } @@ -103,12 +100,10 @@ public class TestingResourceManagerGateway implements ResourceManagerGateway { public TestingResourceManagerGateway( ResourceManagerId resourceManagerId, ResourceID resourceId, - long heartbeatInterval, String address, String hostname) { this.resourceManagerId = Preconditions.checkNotNull(resourceManagerId); this.ownResourceId = Preconditions.checkNotNull(resourceId); - this.heartbeatInterval = heartbeatInterval; this.address = Preconditions.checkNotNull(address); this.hostname = Preconditions.checkNotNull(hostname); this.slotFutureReference = new AtomicReference<>(); @@ -174,7 +169,6 @@ public class TestingResourceManagerGateway implements ResourceManagerGateway { return CompletableFuture.completedFuture( new JobMasterRegistrationSuccess( - heartbeatInterval, resourceManagerId, ownResourceId)); } @@ -227,7 +221,6 @@ public class TestingResourceManagerGateway implements ResourceManagerGateway { new TaskExecutorRegistrationSuccess( new InstanceID(), ownResourceId, - heartbeatInterval, new ClusterInformation("localhost", 1234))); } } http://git-wip-us.apache.org/repos/asf/flink/blob/690ab2c3/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java index 61cfde5..9a864bd 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java @@ -43,7 +43,6 @@ import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.registration.RegistrationResponse; import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; import org.apache.flink.runtime.resourcemanager.ResourceManager; -import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; import org.apache.flink.runtime.resourcemanager.SlotRequest; import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager; @@ -110,9 +109,6 @@ public class TaskExecutorITCase extends TestLogger { testingHAServices.setJobMasterLeaderRetriever(jobId, new SettableLeaderRetrievalService(jmAddress, jobMasterId.toUUID())); TestingRpcService rpcService = new TestingRpcService(); - ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration( - Time.milliseconds(500L), - Time.milliseconds(500L)); JobLeaderIdService jobLeaderIdService = new JobLeaderIdService( testingHAServices, rpcService.getScheduledExecutor(), @@ -142,7 +138,6 @@ public class TaskExecutorITCase extends TestLogger { rpcService, FlinkResourceManager.RESOURCE_MANAGER_NAME, rmResourceId, - resourceManagerConfiguration, testingHAServices, heartbeatServices, slotManager, http://git-wip-us.apache.org/repos/asf/flink/blob/690ab2c3/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java index 9c1f2f2..168b251 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java @@ -311,14 +311,12 @@ public class TaskExecutorTest extends TestLogger { TestingResourceManagerGateway rmGateway = new TestingResourceManagerGateway( rmLeaderId, rmResourceId, - heartbeatInterval, rmAddress, rmAddress); final TaskExecutorRegistrationSuccess registrationResponse = new TaskExecutorRegistrationSuccess( new InstanceID(), rmResourceId, - heartbeatInterval, new ClusterInformation("localhost", 1234)); final CompletableFuture<ResourceID> taskExecutorRegistrationFuture = new CompletableFuture<>(); @@ -436,7 +434,6 @@ public class TaskExecutorTest extends TestLogger { new TaskExecutorRegistrationSuccess( new InstanceID(), rmResourceId, - 10L, new ClusterInformation("localhost", 1234))); rmGateway.setRegisterTaskExecutorFunction(stringResourceIDIntegerHardwareDescriptionTuple4 -> { @@ -556,7 +553,7 @@ public class TaskExecutorTest extends TestLogger { when(rmGateway.registerTaskExecutor( anyString(), any(ResourceID.class), anyInt(), any(HardwareDescription.class), any(Time.class))) .thenReturn(CompletableFuture.completedFuture(new TaskExecutorRegistrationSuccess( - new InstanceID(), resourceManagerResourceId, 10L, new ClusterInformation("localhost", 1234)))); + new InstanceID(), resourceManagerResourceId, new ClusterInformation("localhost", 1234)))); rpc.registerGateway(resourceManagerAddress, rmGateway); @@ -620,11 +617,11 @@ public class TaskExecutorTest extends TestLogger { when(rmGateway1.registerTaskExecutor( anyString(), any(ResourceID.class), anyInt(), any(HardwareDescription.class), any(Time.class))) .thenReturn(CompletableFuture.completedFuture( - new TaskExecutorRegistrationSuccess(new InstanceID(), rmResourceId1, 10L, new ClusterInformation("localhost", 1234)))); + new TaskExecutorRegistrationSuccess(new InstanceID(), rmResourceId1, new ClusterInformation("localhost", 1234)))); when(rmGateway2.registerTaskExecutor( anyString(), any(ResourceID.class), anyInt(), any(HardwareDescription.class), any(Time.class))) .thenReturn(CompletableFuture.completedFuture( - new TaskExecutorRegistrationSuccess(new InstanceID(), rmResourceId2, 10L, new ClusterInformation("localhost", 1234)))); + new TaskExecutorRegistrationSuccess(new InstanceID(), rmResourceId2, new ClusterInformation("localhost", 1234)))); rpc.registerGateway(address1, rmGateway1); rpc.registerGateway(address2, rmGateway2); @@ -938,7 +935,7 @@ public class TaskExecutorTest extends TestLogger { resourceManagerGateway.setRegisterTaskExecutorFunction( stringResourceIDIntegerHardwareDescriptionTuple4 -> { registrationFuture.complete(stringResourceIDIntegerHardwareDescriptionTuple4.f1); - return CompletableFuture.completedFuture(new TaskExecutorRegistrationSuccess(registrationId, resourceManagerResourceId, 1000L, new ClusterInformation("localhost", 1234))); + return CompletableFuture.completedFuture(new TaskExecutorRegistrationSuccess(registrationId, resourceManagerResourceId, new ClusterInformation("localhost", 1234))); } ); @@ -1271,7 +1268,6 @@ public class TaskExecutorTest extends TestLogger { final TestingResourceManagerGateway rmGateway = new TestingResourceManagerGateway( ResourceManagerId.generate(), rmResourceID, - heartbeatInterval, rmAddress, rmAddress); @@ -1458,7 +1454,6 @@ public class TaskExecutorTest extends TestLogger { return CompletableFuture.completedFuture(new TaskExecutorRegistrationSuccess( new InstanceID(), testingResourceManagerGateway.getOwnResourceId(), - heartbeatInterval, new ClusterInformation("localhost", 1234))); } else { secondRegistration.trigger(); @@ -1558,7 +1553,7 @@ public class TaskExecutorTest extends TestLogger { try { final TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway(); final ClusterInformation clusterInformation = new ClusterInformation("foobar", 1234); - final CompletableFuture<RegistrationResponse> registrationResponseFuture = CompletableFuture.completedFuture(new TaskExecutorRegistrationSuccess(new InstanceID(), ResourceID.generate(), heartbeatInterval, clusterInformation)); + final CompletableFuture<RegistrationResponse> registrationResponseFuture = CompletableFuture.completedFuture(new TaskExecutorRegistrationSuccess(new InstanceID(), ResourceID.generate(), clusterInformation)); final BlockingQueue<ResourceID> registrationQueue = new ArrayBlockingQueue<>(1); testingResourceManagerGateway.setRegisterTaskExecutorFunction(stringResourceIDSlotReportIntegerHardwareDescriptionTuple5 -> { @@ -1657,7 +1652,6 @@ public class TaskExecutorTest extends TestLogger { new TaskExecutorRegistrationSuccess( new InstanceID(), testingResourceManagerGateway.getOwnResourceId(), - 1000L, new ClusterInformation("foobar", 1234) )); final CountDownLatch numberRegistrations = new CountDownLatch(2); http://git-wip-us.apache.org/repos/asf/flink/blob/690ab2c3/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingResourceManager.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingResourceManager.java index 17995a3..3ab0699 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingResourceManager.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingResourceManager.java @@ -18,7 +18,6 @@ package org.apache.flink.runtime.testutils; -import akka.actor.ActorRef; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.clusterframework.ApplicationStatus; import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManagerSuccessful; @@ -28,6 +27,8 @@ import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.testingUtils.TestingMessages; +import akka.actor.ActorRef; + import java.util.Collection; import java.util.HashSet; import java.util.Set; http://git-wip-us.apache.org/repos/asf/flink/blob/690ab2c3/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java index 49385e5..876e858 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java @@ -34,7 +34,6 @@ import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; import org.apache.flink.runtime.resourcemanager.ResourceManager; -import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException; import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; import org.apache.flink.runtime.rpc.FatalErrorHandler; @@ -122,7 +121,6 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme ResourceID resourceId, Configuration flinkConfig, Map<String, String> env, - ResourceManagerConfiguration resourceManagerConfiguration, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, SlotManager slotManager, @@ -135,7 +133,6 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme rpcService, resourceManagerEndpointId, resourceId, - resourceManagerConfiguration, highAvailabilityServices, heartbeatServices, slotManager, http://git-wip-us.apache.org/repos/asf/flink/blob/690ab2c3/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java index 8cc3579..482e2b7 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java @@ -28,7 +28,6 @@ import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.resourcemanager.ResourceManager; -import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices; import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration; import org.apache.flink.runtime.rpc.FatalErrorHandler; @@ -93,7 +92,6 @@ public class YarnJobClusterEntrypoint extends JobClusterEntrypoint { FatalErrorHandler fatalErrorHandler, ClusterInformation clusterInformation, @Nullable String webInterfaceUrl) throws Exception { - final ResourceManagerConfiguration rmConfiguration = ResourceManagerConfiguration.fromConfiguration(configuration); final ResourceManagerRuntimeServicesConfiguration rmServicesConfiguration = ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration); final ResourceManagerRuntimeServices rmRuntimeServices = ResourceManagerRuntimeServices.fromConfiguration( rmServicesConfiguration, @@ -106,7 +104,6 @@ public class YarnJobClusterEntrypoint extends JobClusterEntrypoint { resourceId, configuration, System.getenv(), - rmConfiguration, highAvailabilityServices, heartbeatServices, rmRuntimeServices.getSlotManager(), http://git-wip-us.apache.org/repos/asf/flink/blob/690ab2c3/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java index 232467a..6c92861 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java @@ -26,7 +26,6 @@ import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.resourcemanager.ResourceManager; -import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices; import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration; import org.apache.flink.runtime.rpc.FatalErrorHandler; @@ -81,7 +80,6 @@ public class YarnSessionClusterEntrypoint extends SessionClusterEntrypoint { FatalErrorHandler fatalErrorHandler, ClusterInformation clusterInformation, @Nullable String webInterfaceUrl) throws Exception { - final ResourceManagerConfiguration rmConfiguration = ResourceManagerConfiguration.fromConfiguration(configuration); final ResourceManagerRuntimeServicesConfiguration rmServicesConfiguration = ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration); final ResourceManagerRuntimeServices rmRuntimeServices = ResourceManagerRuntimeServices.fromConfiguration( rmServicesConfiguration, @@ -94,7 +92,6 @@ public class YarnSessionClusterEntrypoint extends SessionClusterEntrypoint { resourceId, configuration, System.getenv(), - rmConfiguration, highAvailabilityServices, heartbeatServices, rmRuntimeServices.getSlotManager(), http://git-wip-us.apache.org/repos/asf/flink/blob/690ab2c3/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java index 8c6d7f7..eb8e968 100644 --- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java +++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java @@ -41,7 +41,6 @@ import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.metrics.NoOpMetricRegistry; import org.apache.flink.runtime.registration.RegistrationResponse; import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; -import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; import org.apache.flink.runtime.resourcemanager.SlotRequest; import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; @@ -154,7 +153,6 @@ public class YarnResourceManagerTest extends TestLogger { ResourceID resourceId, Configuration flinkConfig, Map<String, String> env, - ResourceManagerConfiguration resourceManagerConfiguration, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, SlotManager slotManager, @@ -171,7 +169,6 @@ public class YarnResourceManagerTest extends TestLogger { resourceId, flinkConfig, env, - resourceManagerConfiguration, highAvailabilityServices, heartbeatServices, slotManager, @@ -220,7 +217,6 @@ public class YarnResourceManagerTest extends TestLogger { final MockResourceManagerRuntimeServices rmServices; // RM - final ResourceManagerConfiguration rmConfiguration; final ResourceID rmResourceID; static final String RM_ADDRESS = "resourceManager"; final TestingYarnResourceManager resourceManager; @@ -248,9 +244,6 @@ public class YarnResourceManagerTest extends TestLogger { rmServices = new MockResourceManagerRuntimeServices(); // resource manager - rmConfiguration = new ResourceManagerConfiguration( - Time.seconds(5L), - Time.seconds(5L)); rmResourceID = ResourceID.generate(); resourceManager = new TestingYarnResourceManager( @@ -259,7 +252,6 @@ public class YarnResourceManagerTest extends TestLogger { rmResourceID, flinkConfig, env, - rmConfiguration, rmServices.highAvailabilityServices, rmServices.heartbeatServices, rmServices.slotManager,
