This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 634a634  [FLINK-13579] Only set managed memory when starting an active 
RM
634a634 is described below

commit 634a634bb21b50649b3a819bf73f179d5cbe7931
Author: Till Rohrmann <[email protected]>
AuthorDate: Mon Aug 5 11:20:22 2019 +0200

    [FLINK-13579] Only set managed memory when starting an active RM
    
    Introduce ActiveResourceManagerFactory which encapsulates the logic to set 
relevant
    configuration values for the active ResourceManager implementations (e.g. 
managed
    memory).
    
    Let existing active ResourceManager factory implementations extend 
ActiveResourceManagerFactory.
    
    Add ActiveResourceManagerFactoryTest to ensure that active ResourceManager 
relevant
    configuration values are set.
    
    Add StandaloneResourceManagerFactoryTest to ensure that a standalone 
ResourceManager can be
    started if the memory size is configured to be less than the containered 
min cutoff size.
    
    This closes #9357.
---
 .../MesosResourceManagerFactory.java               |   5 +-
 ...tDispatcherResourceManagerComponentFactory.java |   4 +-
 .../ActiveResourceManagerFactory.java              |  97 ++++++++++++++++++++
 .../flink/runtime/util/ResourceManagerUtil.java    |  46 ----------
 .../ActiveResourceManagerFactoryTest.java          | 101 +++++++++++++++++++++
 .../StandaloneResourceManagerFactoryTest.java      |  70 ++++++++++++++
 .../apache/flink/yarn/YarnConfigurationITCase.java |   4 +-
 .../yarn/entrypoint/YarnJobClusterEntrypoint.java  |   2 +-
 .../entrypoint/YarnResourceManagerFactory.java     |  14 ++-
 .../entrypoint/YarnSessionClusterEntrypoint.java   |   2 +-
 10 files changed, 287 insertions(+), 58 deletions(-)

diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerFactory.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerFactory.java
index 534b0a5..5b76740 100644
--- 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerFactory.java
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerFactory.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.ActiveResourceManagerFactory;
 import org.apache.flink.runtime.resourcemanager.ResourceManager;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerFactory;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
@@ -41,7 +42,7 @@ import javax.annotation.Nullable;
 /**
  * {@link ResourceManagerFactory} which creates a {@link MesosResourceManager}.
  */
-public class MesosResourceManagerFactory implements 
ResourceManagerFactory<RegisteredMesosWorkerNode> {
+public class MesosResourceManagerFactory extends 
ActiveResourceManagerFactory<RegisteredMesosWorkerNode> {
 
        @Nonnull
        private final MesosServices mesosServices;
@@ -63,7 +64,7 @@ public class MesosResourceManagerFactory implements 
ResourceManagerFactory<Regis
        }
 
        @Override
-       public ResourceManager<RegisteredMesosWorkerNode> createResourceManager(
+       public ResourceManager<RegisteredMesosWorkerNode> 
createActiveResourceManager(
                        Configuration configuration,
                        ResourceID resourceId,
                        RpcService rpcService,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java
index 8b2d9c6..163b039 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java
@@ -49,7 +49,6 @@ import 
org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl;
 import org.apache.flink.runtime.rest.handler.legacy.metrics.VoidMetricFetcher;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.util.ResourceManagerUtil;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
 import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
 import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
@@ -168,9 +167,8 @@ public abstract class 
AbstractDispatcherResourceManagerComponentFactory<T extend
                                hostname,
                                
ConfigurationUtils.getSystemResourceMetricsProbingInterval(configuration));
 
-                       Configuration resourceManagerConfig = 
ResourceManagerUtil.getResourceManagerConfiguration(configuration);
                        resourceManager = 
resourceManagerFactory.createResourceManager(
-                               resourceManagerConfig,
+                               configuration,
                                ResourceID.generate(),
                                rpcService,
                                highAvailabilityServices,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ActiveResourceManagerFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ActiveResourceManagerFactory.java
new file mode 100644
index 0000000..9f90106
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ActiveResourceManagerFactory.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ConfigurationUtils;
+import org.apache.flink.configuration.TaskManagerOptions;
+import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
+import org.apache.flink.runtime.entrypoint.ClusterInformation;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
+
+import javax.annotation.Nullable;
+
+/**
+ * Resource manager factory which creates active {@link ResourceManager} 
implementations.
+ *
+ * <p>The default implementation will call {@link 
#createActiveResourceManagerConfiguration}
+ * to create a new configuration which is configured with active resource 
manager relevant
+ * configuration options.
+ *
+ * @param <T> type of the {@link ResourceIDRetrievable}
+ */
+public abstract class ActiveResourceManagerFactory<T extends 
ResourceIDRetrievable> implements ResourceManagerFactory<T> {
+
+       @Override
+       public ResourceManager<T> createResourceManager(
+                       Configuration configuration,
+                       ResourceID resourceId,
+                       RpcService rpcService,
+                       HighAvailabilityServices highAvailabilityServices,
+                       HeartbeatServices heartbeatServices,
+                       MetricRegistry metricRegistry,
+                       FatalErrorHandler fatalErrorHandler,
+                       ClusterInformation clusterInformation,
+                       @Nullable String webInterfaceUrl,
+                       JobManagerMetricGroup jobManagerMetricGroup) throws 
Exception {
+               return createActiveResourceManager(
+                       createActiveResourceManagerConfiguration(configuration),
+                       resourceId,
+                       rpcService,
+                       highAvailabilityServices,
+                       heartbeatServices,
+                       metricRegistry,
+                       fatalErrorHandler,
+                       clusterInformation,
+                       webInterfaceUrl,
+                       jobManagerMetricGroup);
+       }
+
+       public static Configuration 
createActiveResourceManagerConfiguration(Configuration originalConfiguration) {
+               final int taskManagerMemoryMB = 
ConfigurationUtils.getTaskManagerHeapMemory(originalConfiguration).getMebiBytes();
+               final long cutoffMB = 
ContaineredTaskManagerParameters.calculateCutoffMB(originalConfiguration, 
taskManagerMemoryMB);
+               final long processMemoryBytes = (taskManagerMemoryMB - 
cutoffMB) << 20; // megabytes to bytes
+               final long managedMemoryBytes = 
TaskManagerServices.getManagedMemoryFromProcessMemory(originalConfiguration, 
processMemoryBytes);
+
+               final Configuration resourceManagerConfig = new 
Configuration(originalConfiguration);
+               
resourceManagerConfig.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, 
managedMemoryBytes + "b");
+
+               return resourceManagerConfig;
+       }
+
+       protected abstract ResourceManager<T> createActiveResourceManager(
+               Configuration configuration,
+               ResourceID resourceId,
+               RpcService rpcService,
+               HighAvailabilityServices highAvailabilityServices,
+               HeartbeatServices heartbeatServices,
+               MetricRegistry metricRegistry,
+               FatalErrorHandler fatalErrorHandler,
+               ClusterInformation clusterInformation,
+               @Nullable String webInterfaceUrl,
+               JobManagerMetricGroup jobManagerMetricGroup) throws Exception;
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ResourceManagerUtil.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ResourceManagerUtil.java
deleted file mode 100644
index 7a65336..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ResourceManagerUtil.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.util;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.ConfigurationUtils;
-import org.apache.flink.configuration.TaskManagerOptions;
-import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
-import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
-
-/**
- * Utils for ResourceManager.
- */
-public class ResourceManagerUtil {
-
-       public static Configuration 
getResourceManagerConfiguration(Configuration flinkConfig) {
-               final int taskManagerMemoryMB = 
ConfigurationUtils.getTaskManagerHeapMemory(flinkConfig).getMebiBytes();
-               final long cutoffMB = 
ContaineredTaskManagerParameters.calculateCutoffMB(flinkConfig, 
taskManagerMemoryMB);
-               final long processMemoryBytes = (taskManagerMemoryMB - 
cutoffMB) << 20; // megabytes to bytes
-               final long managedMemoryBytes = 
TaskManagerServices.getManagedMemoryFromProcessMemory(flinkConfig, 
processMemoryBytes);
-
-               final Configuration resourceManagerConfig = new 
Configuration(flinkConfig);
-               
resourceManagerConfig.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, 
managedMemoryBytes + "b");
-
-               return resourceManagerConfig;
-       }
-
-       private ResourceManagerUtil() {
-       }
-}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ActiveResourceManagerFactoryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ActiveResourceManagerFactoryTest.java
new file mode 100644
index 0000000..b1874e0
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ActiveResourceManagerFactoryTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.entrypoint.ClusterInformation;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+/**
+ * Tests for the {@link ActiveResourceManagerFactory}.
+ */
+public class ActiveResourceManagerFactoryTest extends TestLogger {
+
+       /**
+        * Test which ensures that the {@link ActiveResourceManagerFactory} 
sets the correct managed
+        * memory when creating a resource manager.
+        */
+       @Test
+       public void 
createResourceManager_WithDefaultConfiguration_ShouldSetManagedMemory() throws 
Exception {
+               final Configuration configuration = new Configuration();
+
+               final TestingActiveResourceManagerFactory 
resourceManagerFactory = new TestingActiveResourceManagerFactory();
+
+               final TestingRpcService rpcService = new TestingRpcService();
+
+               try {
+                       final ResourceManager<ResourceID> ignored = 
resourceManagerFactory.createResourceManager(
+                               configuration,
+                               ResourceID.generate(),
+                               rpcService,
+                               new TestingHighAvailabilityServices(),
+                               new TestingHeartbeatServices(),
+                               NoOpMetricRegistry.INSTANCE,
+                               new TestingFatalErrorHandler(),
+                               new ClusterInformation("foobar", 1234),
+                               null,
+                               
UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup());
+               } finally {
+                       RpcUtils.terminateRpcService(rpcService, 
Time.seconds(10L));
+               }
+       }
+
+       private static final class TestingActiveResourceManagerFactory extends 
ActiveResourceManagerFactory<ResourceID> {
+
+               @Override
+               protected ResourceManager<ResourceID> 
createActiveResourceManager(
+                               Configuration configuration,
+                               ResourceID resourceId,
+                               RpcService rpcService,
+                               HighAvailabilityServices 
highAvailabilityServices,
+                               HeartbeatServices heartbeatServices,
+                               MetricRegistry metricRegistry,
+                               FatalErrorHandler fatalErrorHandler,
+                               ClusterInformation clusterInformation,
+                               @Nullable String webInterfaceUrl,
+                               JobManagerMetricGroup jobManagerMetricGroup) {
+                       
assertThat(configuration.contains(TaskManagerOptions.MANAGED_MEMORY_SIZE), 
is(true));
+
+                       return null;
+               }
+       }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerFactoryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerFactoryTest.java
new file mode 100644
index 0000000..9e64d48
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerFactoryTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.ResourceManagerOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.entrypoint.ClusterInformation;
+import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
+import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+/**
+ * Tests for the {@link StandaloneResourceManagerFactory}.
+ */
+public class StandaloneResourceManagerFactoryTest extends TestLogger {
+
+       @Test
+       public void 
createResourceManager_WithLessMemoryThanContainerizedHeapCutoffMin_ShouldSucceed()
 throws Exception {
+               final StandaloneResourceManagerFactory resourceManagerFactory = 
StandaloneResourceManagerFactory.INSTANCE;
+
+               final TestingRpcService rpcService = new TestingRpcService();
+               try {
+                       final Configuration configuration = new Configuration();
+                       
configuration.setString(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY, new 
MemorySize(128 * 1024 * 1024).toString());
+                       
configuration.setInteger(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN, 
600);
+
+                       final ResourceManager<ResourceID> ignored = 
resourceManagerFactory.createResourceManager(
+                               configuration,
+                               ResourceID.generate(),
+                               rpcService,
+                               new TestingHighAvailabilityServices(),
+                               new TestingHeartbeatServices(),
+                               NoOpMetricRegistry.INSTANCE,
+                               new TestingFatalErrorHandler(),
+                               new ClusterInformation("foobar", 1234),
+                               null,
+                               
UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup());
+               } finally {
+                       RpcUtils.terminateRpcService(rpcService, 
Time.seconds(10L));
+               }
+       }
+
+}
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
index 63ff2b2..a567a6d 100644
--- 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
+++ 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
@@ -31,6 +31,7 @@ import org.apache.flink.configuration.ResourceManagerOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.resourcemanager.ActiveResourceManagerFactory;
 import org.apache.flink.runtime.rest.RestClient;
 import org.apache.flink.runtime.rest.RestClientConfiguration;
 import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
@@ -39,7 +40,6 @@ import 
org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo;
 import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersHeaders;
 import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersInfo;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.runtime.util.ResourceManagerUtil;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -217,7 +217,7 @@ public class YarnConfigurationITCase extends YarnTestBase {
        }
 
        private static int calculateManagedMemorySizeMB(Configuration 
configuration) {
-               Configuration resourceManagerConfig = 
ResourceManagerUtil.getResourceManagerConfiguration(configuration);
+               Configuration resourceManagerConfig = 
ActiveResourceManagerFactory.createActiveResourceManagerConfiguration(configuration);
                return 
MemorySize.parse(resourceManagerConfig.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE)).getMebiBytes();
        }
 }
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java
index 42e666e..15de5a8 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java
@@ -64,7 +64,7 @@ public class YarnJobClusterEntrypoint extends 
JobClusterEntrypoint {
        @Override
        protected DispatcherResourceManagerComponentFactory<?> 
createDispatcherResourceManagerComponentFactory(Configuration configuration) {
                return new JobDispatcherResourceManagerComponentFactory(
-                       YarnResourceManagerFactory.INSTANCE,
+                       YarnResourceManagerFactory.getInstance(),
                        FileJobGraphRetriever.createFrom(configuration));
        }
 
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnResourceManagerFactory.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnResourceManagerFactory.java
index 24bce10..312bb41 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnResourceManagerFactory.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnResourceManagerFactory.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.ActiveResourceManagerFactory;
 import org.apache.flink.runtime.resourcemanager.ResourceManager;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerFactory;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
@@ -39,11 +40,18 @@ import javax.annotation.Nullable;
 /**
  * {@link ResourceManagerFactory} implementation which creates a {@link 
YarnResourceManager}.
  */
-public enum YarnResourceManagerFactory implements 
ResourceManagerFactory<YarnWorkerNode> {
-       INSTANCE;
+public class YarnResourceManagerFactory extends 
ActiveResourceManagerFactory<YarnWorkerNode> {
+
+       private static final YarnResourceManagerFactory INSTANCE = new 
YarnResourceManagerFactory();
+
+       private YarnResourceManagerFactory() {}
+
+       public static YarnResourceManagerFactory getInstance() {
+               return INSTANCE;
+       }
 
        @Override
-       public ResourceManager<YarnWorkerNode> createResourceManager(
+       public ResourceManager<YarnWorkerNode> createActiveResourceManager(
                        Configuration configuration,
                        ResourceID resourceId,
                        RpcService rpcService,
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java
index 0f4656e..c22c548 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java
@@ -61,7 +61,7 @@ public class YarnSessionClusterEntrypoint extends 
SessionClusterEntrypoint {
 
        @Override
        protected DispatcherResourceManagerComponentFactory<?> 
createDispatcherResourceManagerComponentFactory(Configuration configuration) {
-               return new 
SessionDispatcherResourceManagerComponentFactory(YarnResourceManagerFactory.INSTANCE);
+               return new 
SessionDispatcherResourceManagerComponentFactory(YarnResourceManagerFactory.getInstance());
        }
 
        public static void main(String[] args) {

Reply via email to