This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 634a634 [FLINK-13579] Only set managed memory when starting an active
RM
634a634 is described below
commit 634a634bb21b50649b3a819bf73f179d5cbe7931
Author: Till Rohrmann <[email protected]>
AuthorDate: Mon Aug 5 11:20:22 2019 +0200
[FLINK-13579] Only set managed memory when starting an active RM
Introduce ActiveResourceManagerFactory which encapsulates the logic to set
relevant
configuration values for the active ResourceManager implementations (e.g.
managed
memory).
Let existing active ResourceManager factory implementations extend
ActiveResourceManagerFactory.
Add ActiveResourceManagerFactoryTest to ensure that active ResourceManager
relevant
configuration values are set.
Add StandaloneResourceManagerFactoryTest to ensure that a standalone
ResourceManager can be
started if the memory size is configured to be less than the containered
min cutoff size.
This closes #9357.
---
.../MesosResourceManagerFactory.java | 5 +-
...tDispatcherResourceManagerComponentFactory.java | 4 +-
.../ActiveResourceManagerFactory.java | 97 ++++++++++++++++++++
.../flink/runtime/util/ResourceManagerUtil.java | 46 ----------
.../ActiveResourceManagerFactoryTest.java | 101 +++++++++++++++++++++
.../StandaloneResourceManagerFactoryTest.java | 70 ++++++++++++++
.../apache/flink/yarn/YarnConfigurationITCase.java | 4 +-
.../yarn/entrypoint/YarnJobClusterEntrypoint.java | 2 +-
.../entrypoint/YarnResourceManagerFactory.java | 14 ++-
.../entrypoint/YarnSessionClusterEntrypoint.java | 2 +-
10 files changed, 287 insertions(+), 58 deletions(-)
diff --git
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerFactory.java
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerFactory.java
index 534b0a5..5b76740 100644
---
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerFactory.java
+++
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerFactory.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.ActiveResourceManagerFactory;
import org.apache.flink.runtime.resourcemanager.ResourceManager;
import org.apache.flink.runtime.resourcemanager.ResourceManagerFactory;
import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
@@ -41,7 +42,7 @@ import javax.annotation.Nullable;
/**
* {@link ResourceManagerFactory} which creates a {@link MesosResourceManager}.
*/
-public class MesosResourceManagerFactory implements
ResourceManagerFactory<RegisteredMesosWorkerNode> {
+public class MesosResourceManagerFactory extends
ActiveResourceManagerFactory<RegisteredMesosWorkerNode> {
@Nonnull
private final MesosServices mesosServices;
@@ -63,7 +64,7 @@ public class MesosResourceManagerFactory implements
ResourceManagerFactory<Regis
}
@Override
- public ResourceManager<RegisteredMesosWorkerNode> createResourceManager(
+ public ResourceManager<RegisteredMesosWorkerNode>
createActiveResourceManager(
Configuration configuration,
ResourceID resourceId,
RpcService rpcService,
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java
index 8b2d9c6..163b039 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java
@@ -49,7 +49,6 @@ import
org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl;
import org.apache.flink.runtime.rest.handler.legacy.metrics.VoidMetricFetcher;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.util.ResourceManagerUtil;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
@@ -168,9 +167,8 @@ public abstract class
AbstractDispatcherResourceManagerComponentFactory<T extend
hostname,
ConfigurationUtils.getSystemResourceMetricsProbingInterval(configuration));
- Configuration resourceManagerConfig =
ResourceManagerUtil.getResourceManagerConfiguration(configuration);
resourceManager =
resourceManagerFactory.createResourceManager(
- resourceManagerConfig,
+ configuration,
ResourceID.generate(),
rpcService,
highAvailabilityServices,
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ActiveResourceManagerFactory.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ActiveResourceManagerFactory.java
new file mode 100644
index 0000000..9f90106
--- /dev/null
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ActiveResourceManagerFactory.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ConfigurationUtils;
+import org.apache.flink.configuration.TaskManagerOptions;
+import
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
+import org.apache.flink.runtime.entrypoint.ClusterInformation;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
+
+import javax.annotation.Nullable;
+
+/**
+ * Resource manager factory which creates active {@link ResourceManager}
implementations.
+ *
+ * <p>The default implementation will call {@link
#createActiveResourceManagerConfiguration}
+ * to create a new configuration which is configured with active resource
manager relevant
+ * configuration options.
+ *
+ * @param <T> type of the {@link ResourceIDRetrievable}
+ */
+public abstract class ActiveResourceManagerFactory<T extends
ResourceIDRetrievable> implements ResourceManagerFactory<T> {
+
+ @Override
+ public ResourceManager<T> createResourceManager(
+ Configuration configuration,
+ ResourceID resourceId,
+ RpcService rpcService,
+ HighAvailabilityServices highAvailabilityServices,
+ HeartbeatServices heartbeatServices,
+ MetricRegistry metricRegistry,
+ FatalErrorHandler fatalErrorHandler,
+ ClusterInformation clusterInformation,
+ @Nullable String webInterfaceUrl,
+ JobManagerMetricGroup jobManagerMetricGroup) throws
Exception {
+ return createActiveResourceManager(
+ createActiveResourceManagerConfiguration(configuration),
+ resourceId,
+ rpcService,
+ highAvailabilityServices,
+ heartbeatServices,
+ metricRegistry,
+ fatalErrorHandler,
+ clusterInformation,
+ webInterfaceUrl,
+ jobManagerMetricGroup);
+ }
+
+ public static Configuration
createActiveResourceManagerConfiguration(Configuration originalConfiguration) {
+ final int taskManagerMemoryMB =
ConfigurationUtils.getTaskManagerHeapMemory(originalConfiguration).getMebiBytes();
+ final long cutoffMB =
ContaineredTaskManagerParameters.calculateCutoffMB(originalConfiguration,
taskManagerMemoryMB);
+ final long processMemoryBytes = (taskManagerMemoryMB -
cutoffMB) << 20; // megabytes to bytes
+ final long managedMemoryBytes =
TaskManagerServices.getManagedMemoryFromProcessMemory(originalConfiguration,
processMemoryBytes);
+
+ final Configuration resourceManagerConfig = new
Configuration(originalConfiguration);
+
resourceManagerConfig.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE,
managedMemoryBytes + "b");
+
+ return resourceManagerConfig;
+ }
+
+ protected abstract ResourceManager<T> createActiveResourceManager(
+ Configuration configuration,
+ ResourceID resourceId,
+ RpcService rpcService,
+ HighAvailabilityServices highAvailabilityServices,
+ HeartbeatServices heartbeatServices,
+ MetricRegistry metricRegistry,
+ FatalErrorHandler fatalErrorHandler,
+ ClusterInformation clusterInformation,
+ @Nullable String webInterfaceUrl,
+ JobManagerMetricGroup jobManagerMetricGroup) throws Exception;
+}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ResourceManagerUtil.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ResourceManagerUtil.java
deleted file mode 100644
index 7a65336..0000000
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ResourceManagerUtil.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.util;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.ConfigurationUtils;
-import org.apache.flink.configuration.TaskManagerOptions;
-import
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
-import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
-
-/**
- * Utils for ResourceManager.
- */
-public class ResourceManagerUtil {
-
- public static Configuration
getResourceManagerConfiguration(Configuration flinkConfig) {
- final int taskManagerMemoryMB =
ConfigurationUtils.getTaskManagerHeapMemory(flinkConfig).getMebiBytes();
- final long cutoffMB =
ContaineredTaskManagerParameters.calculateCutoffMB(flinkConfig,
taskManagerMemoryMB);
- final long processMemoryBytes = (taskManagerMemoryMB -
cutoffMB) << 20; // megabytes to bytes
- final long managedMemoryBytes =
TaskManagerServices.getManagedMemoryFromProcessMemory(flinkConfig,
processMemoryBytes);
-
- final Configuration resourceManagerConfig = new
Configuration(flinkConfig);
-
resourceManagerConfig.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE,
managedMemoryBytes + "b");
-
- return resourceManagerConfig;
- }
-
- private ResourceManagerUtil() {
- }
-}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ActiveResourceManagerFactoryTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ActiveResourceManagerFactoryTest.java
new file mode 100644
index 0000000..b1874e0
--- /dev/null
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ActiveResourceManagerFactoryTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.entrypoint.ClusterInformation;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+/**
+ * Tests for the {@link ActiveResourceManagerFactory}.
+ */
+public class ActiveResourceManagerFactoryTest extends TestLogger {
+
+ /**
+ * Test which ensures that the {@link ActiveResourceManagerFactory}
sets the correct managed
+ * memory when creating a resource manager.
+ */
+ @Test
+ public void
createResourceManager_WithDefaultConfiguration_ShouldSetManagedMemory() throws
Exception {
+ final Configuration configuration = new Configuration();
+
+ final TestingActiveResourceManagerFactory
resourceManagerFactory = new TestingActiveResourceManagerFactory();
+
+ final TestingRpcService rpcService = new TestingRpcService();
+
+ try {
+ final ResourceManager<ResourceID> ignored =
resourceManagerFactory.createResourceManager(
+ configuration,
+ ResourceID.generate(),
+ rpcService,
+ new TestingHighAvailabilityServices(),
+ new TestingHeartbeatServices(),
+ NoOpMetricRegistry.INSTANCE,
+ new TestingFatalErrorHandler(),
+ new ClusterInformation("foobar", 1234),
+ null,
+
UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup());
+ } finally {
+ RpcUtils.terminateRpcService(rpcService,
Time.seconds(10L));
+ }
+ }
+
+ private static final class TestingActiveResourceManagerFactory extends
ActiveResourceManagerFactory<ResourceID> {
+
+ @Override
+ protected ResourceManager<ResourceID>
createActiveResourceManager(
+ Configuration configuration,
+ ResourceID resourceId,
+ RpcService rpcService,
+ HighAvailabilityServices
highAvailabilityServices,
+ HeartbeatServices heartbeatServices,
+ MetricRegistry metricRegistry,
+ FatalErrorHandler fatalErrorHandler,
+ ClusterInformation clusterInformation,
+ @Nullable String webInterfaceUrl,
+ JobManagerMetricGroup jobManagerMetricGroup) {
+
assertThat(configuration.contains(TaskManagerOptions.MANAGED_MEMORY_SIZE),
is(true));
+
+ return null;
+ }
+ }
+}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerFactoryTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerFactoryTest.java
new file mode 100644
index 0000000..9e64d48
--- /dev/null
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerFactoryTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.ResourceManagerOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.entrypoint.ClusterInformation;
+import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
+import
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+/**
+ * Tests for the {@link StandaloneResourceManagerFactory}.
+ */
+public class StandaloneResourceManagerFactoryTest extends TestLogger {
+
+ @Test
+ public void
createResourceManager_WithLessMemoryThanContainerizedHeapCutoffMin_ShouldSucceed()
throws Exception {
+ final StandaloneResourceManagerFactory resourceManagerFactory =
StandaloneResourceManagerFactory.INSTANCE;
+
+ final TestingRpcService rpcService = new TestingRpcService();
+ try {
+ final Configuration configuration = new Configuration();
+
configuration.setString(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY, new
MemorySize(128 * 1024 * 1024).toString());
+
configuration.setInteger(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN,
600);
+
+ final ResourceManager<ResourceID> ignored =
resourceManagerFactory.createResourceManager(
+ configuration,
+ ResourceID.generate(),
+ rpcService,
+ new TestingHighAvailabilityServices(),
+ new TestingHeartbeatServices(),
+ NoOpMetricRegistry.INSTANCE,
+ new TestingFatalErrorHandler(),
+ new ClusterInformation("foobar", 1234),
+ null,
+
UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup());
+ } finally {
+ RpcUtils.terminateRpcService(rpcService,
Time.seconds(10L));
+ }
+ }
+
+}
diff --git
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
index 63ff2b2..a567a6d 100644
---
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
+++
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
@@ -31,6 +31,7 @@ import org.apache.flink.configuration.ResourceManagerOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.resourcemanager.ActiveResourceManagerFactory;
import org.apache.flink.runtime.rest.RestClient;
import org.apache.flink.runtime.rest.RestClientConfiguration;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
@@ -39,7 +40,6 @@ import
org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo;
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersHeaders;
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersInfo;
import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.runtime.util.ResourceManagerUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -217,7 +217,7 @@ public class YarnConfigurationITCase extends YarnTestBase {
}
private static int calculateManagedMemorySizeMB(Configuration
configuration) {
- Configuration resourceManagerConfig =
ResourceManagerUtil.getResourceManagerConfiguration(configuration);
+ Configuration resourceManagerConfig =
ActiveResourceManagerFactory.createActiveResourceManagerConfiguration(configuration);
return
MemorySize.parse(resourceManagerConfig.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE)).getMebiBytes();
}
}
diff --git
a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java
b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java
index 42e666e..15de5a8 100644
---
a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java
+++
b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java
@@ -64,7 +64,7 @@ public class YarnJobClusterEntrypoint extends
JobClusterEntrypoint {
@Override
protected DispatcherResourceManagerComponentFactory<?>
createDispatcherResourceManagerComponentFactory(Configuration configuration) {
return new JobDispatcherResourceManagerComponentFactory(
- YarnResourceManagerFactory.INSTANCE,
+ YarnResourceManagerFactory.getInstance(),
FileJobGraphRetriever.createFrom(configuration));
}
diff --git
a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnResourceManagerFactory.java
b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnResourceManagerFactory.java
index 24bce10..312bb41 100644
---
a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnResourceManagerFactory.java
+++
b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnResourceManagerFactory.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.ActiveResourceManagerFactory;
import org.apache.flink.runtime.resourcemanager.ResourceManager;
import org.apache.flink.runtime.resourcemanager.ResourceManagerFactory;
import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
@@ -39,11 +40,18 @@ import javax.annotation.Nullable;
/**
* {@link ResourceManagerFactory} implementation which creates a {@link
YarnResourceManager}.
*/
-public enum YarnResourceManagerFactory implements
ResourceManagerFactory<YarnWorkerNode> {
- INSTANCE;
+public class YarnResourceManagerFactory extends
ActiveResourceManagerFactory<YarnWorkerNode> {
+
+ private static final YarnResourceManagerFactory INSTANCE = new
YarnResourceManagerFactory();
+
+ private YarnResourceManagerFactory() {}
+
+ public static YarnResourceManagerFactory getInstance() {
+ return INSTANCE;
+ }
@Override
- public ResourceManager<YarnWorkerNode> createResourceManager(
+ public ResourceManager<YarnWorkerNode> createActiveResourceManager(
Configuration configuration,
ResourceID resourceId,
RpcService rpcService,
diff --git
a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java
b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java
index 0f4656e..c22c548 100644
---
a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java
+++
b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java
@@ -61,7 +61,7 @@ public class YarnSessionClusterEntrypoint extends
SessionClusterEntrypoint {
@Override
protected DispatcherResourceManagerComponentFactory<?>
createDispatcherResourceManagerComponentFactory(Configuration configuration) {
- return new
SessionDispatcherResourceManagerComponentFactory(YarnResourceManagerFactory.INSTANCE);
+ return new
SessionDispatcherResourceManagerComponentFactory(YarnResourceManagerFactory.getInstance());
}
public static void main(String[] args) {