This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch flip116
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b29a14dc9a445a1818a07c24d9a133a1b8c1a271
Author: Andrey Zagrebin <[email protected]>
AuthorDate: Wed Apr 8 17:05:35 2020 +0300

    [FLINK-16745][clients] Parse JobManagerProcessSpec from Configuration into 
ClusterSpecification
---
 .../AbstractContainerizedClusterClientFactory.java | 13 +++---
 .../client/deployment/ClusterSpecification.java    | 46 +++++++++++++++++++++-
 .../flink/configuration/ConfigurationUtils.java    | 19 ---------
 .../runtime/jobmanager/JobManagerProcessUtils.java |  8 +++-
 4 files changed, 59 insertions(+), 27 deletions(-)

diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractContainerizedClusterClientFactory.java
 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractContainerizedClusterClientFactory.java
index 46af058..83172fd 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractContainerizedClusterClientFactory.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractContainerizedClusterClientFactory.java
@@ -20,9 +20,11 @@ package org.apache.flink.client.deployment;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.ConfigurationUtils;
+import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils;
+import org.apache.flink.runtime.jobmanager.JobManagerProcessSpec;
+import org.apache.flink.runtime.jobmanager.JobManagerProcessUtils;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -36,9 +38,10 @@ public abstract class 
AbstractContainerizedClusterClientFactory<ClusterID> imple
        public ClusterSpecification getClusterSpecification(Configuration 
configuration) {
                checkNotNull(configuration);
 
-               final int jobManagerMemoryMB = ConfigurationUtils
-                       .getJobManagerHeapMemory(configuration)
-                       .getMebiBytes();
+               final JobManagerProcessSpec jobManagerProcessSpec = 
JobManagerProcessUtils.processSpecFromConfig(
+                       
JobManagerProcessUtils.getConfigurationWithLegacyHeapSizeMappedToNewConfigOption(
+                               configuration,
+                               JobManagerOptions.TOTAL_PROCESS_MEMORY));
 
                final int taskManagerMemoryMB = TaskExecutorProcessUtils
                        
.processSpecFromConfig(TaskExecutorProcessUtils.getConfigurationMapLegacyTaskManagerHeapSizeToConfigOption(
@@ -49,7 +52,7 @@ public abstract class 
AbstractContainerizedClusterClientFactory<ClusterID> imple
                int slotsPerTaskManager = 
configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS);
 
                return new ClusterSpecification.ClusterSpecificationBuilder()
-                       .setMasterMemoryMB(jobManagerMemoryMB)
+                       .setMasterProcessSpec(jobManagerProcessSpec)
                        .setTaskManagerMemoryMB(taskManagerMemoryMB)
                        .setSlotsPerTaskManager(slotsPerTaskManager)
                        .createClusterSpecification();
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterSpecification.java
 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterSpecification.java
index 51bd522..9daa35a 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterSpecification.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterSpecification.java
@@ -18,20 +18,35 @@
 
 package org.apache.flink.client.deployment;
 
+import org.apache.flink.runtime.jobmanager.JobManagerProcessSpec;
+import org.apache.flink.util.Preconditions;
+
+import static 
org.apache.flink.runtime.jobmanager.JobManagerProcessUtils.createDefaultJobManagerProcessSpec;
+
 /**
  * Description of the cluster to start by the {@link ClusterDescriptor}.
  */
 public final class ClusterSpecification {
+       private final JobManagerProcessSpec masterProcessSpec;
        private final int masterMemoryMB;
        private final int taskManagerMemoryMB;
        private final int slotsPerTaskManager;
 
-       private ClusterSpecification(int masterMemoryMB, int 
taskManagerMemoryMB, int slotsPerTaskManager) {
+       private ClusterSpecification(
+                       JobManagerProcessSpec masterProcessSpec,
+                       int masterMemoryMB,
+                       int taskManagerMemoryMB,
+                       int slotsPerTaskManager) {
+               this.masterProcessSpec = masterProcessSpec;
                this.masterMemoryMB = masterMemoryMB;
                this.taskManagerMemoryMB = taskManagerMemoryMB;
                this.slotsPerTaskManager = slotsPerTaskManager;
        }
 
+       public JobManagerProcessSpec getMasterProcessSpec() {
+               return masterProcessSpec;
+       }
+
        public int getMasterMemoryMB() {
                return masterMemoryMB;
        }
@@ -57,10 +72,18 @@ public final class ClusterSpecification {
         * Builder for the {@link ClusterSpecification} instance.
         */
        public static class ClusterSpecificationBuilder {
-               private int masterMemoryMB = 768;
+               private static final JobManagerProcessSpec 
DEFAULT_MASTER_PROCESS_SPEC = createDefaultJobManagerProcessSpec(768);
+
+               private JobManagerProcessSpec masterProcessSpec;
+               private int masterMemoryMB = -1;
                private int taskManagerMemoryMB = 1024;
                private int slotsPerTaskManager = 1;
 
+               public ClusterSpecificationBuilder 
setMasterProcessSpec(JobManagerProcessSpec masterProcessSpec) {
+                       this.masterProcessSpec = masterProcessSpec;
+                       return this;
+               }
+
                public ClusterSpecificationBuilder setMasterMemoryMB(int 
masterMemoryMB) {
                        this.masterMemoryMB = masterMemoryMB;
                        return this;
@@ -77,10 +100,29 @@ public final class ClusterSpecification {
                }
 
                public ClusterSpecification createClusterSpecification() {
+                       deriveAndVerifyMasterSpec();
                        return new ClusterSpecification(
+                               masterProcessSpec,
                                masterMemoryMB,
                                taskManagerMemoryMB,
                                slotsPerTaskManager);
                }
+
+               private void deriveAndVerifyMasterSpec() {
+                       if (masterProcessSpec == null) {
+                               masterProcessSpec = masterMemoryMB <= 0 ?
+                                       DEFAULT_MASTER_PROCESS_SPEC : 
createDefaultJobManagerProcessSpec(masterMemoryMB);
+                       }
+                       if (masterMemoryMB <= 0) {
+                               masterMemoryMB = 
masterProcessSpec.getTotalProcessMemorySize().getMebiBytes();
+                       }
+                       int jmTotalProcessSizeMb = 
masterProcessSpec.getTotalFlinkMemorySize().getMebiBytes();
+                       Preconditions.checkArgument(
+                               masterMemoryMB >= jmTotalProcessSizeMb,
+                               "Total job manager memory size %dMb cannot be 
less than " +
+                                       "its configured or derived total 
process size %dMb",
+                               masterMemoryMB,
+                               jmTotalProcessSizeMb);
+               }
        }
 }
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java
index 0408a37..4fbc512 100755
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java
@@ -42,25 +42,6 @@ public class ConfigurationUtils {
        private static final String[] EMPTY = new String[0];
 
        /**
-        * Get job manager's heap memory. This method will check the new key
-        * {@link JobManagerOptions#JOB_MANAGER_HEAP_MEMORY} and
-        * the old key {@link JobManagerOptions#JOB_MANAGER_HEAP_MEMORY_MB} for 
backwards compatibility.
-        *
-        * @param configuration the configuration object
-        * @return the memory size of job manager's heap memory.
-        */
-       public static MemorySize getJobManagerHeapMemory(Configuration 
configuration) {
-               if 
(configuration.containsKey(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY.key())) {
-                       return 
configuration.get(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY);
-               } else if 
(configuration.containsKey(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY_MB.key())) 
{
-                       return 
MemorySize.ofMebiBytes(configuration.getInteger(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY_MB));
-               } else {
-                       //use default value
-                       return 
JobManagerOptions.JOB_MANAGER_HEAP_MEMORY.defaultValue();
-               }
-       }
-
-       /**
         * @return extracted {@link 
MetricOptions#SYSTEM_RESOURCE_METRICS_PROBING_INTERVAL} or {@code 
Optional.empty()} if
         * {@link MetricOptions#SYSTEM_RESOURCE_METRICS} are disabled.
         */
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerProcessUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerProcessUtils.java
index 62f61bf..82c2490 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerProcessUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerProcessUtils.java
@@ -24,8 +24,8 @@ import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.runtime.util.config.memory.CommonProcessMemorySpec;
 import 
org.apache.flink.runtime.util.config.memory.JvmMetaspaceAndOverheadOptions;
-import 
org.apache.flink.runtime.util.config.memory.MemoryBackwardsCompatibilityUtils;
 import org.apache.flink.runtime.util.config.memory.LegacyMemoryOptions;
+import 
org.apache.flink.runtime.util.config.memory.MemoryBackwardsCompatibilityUtils;
 import org.apache.flink.runtime.util.config.memory.ProcessMemoryOptions;
 import org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils;
 import 
org.apache.flink.runtime.util.config.memory.jobmanager.JobManagerFlinkMemory;
@@ -78,4 +78,10 @@ public class JobManagerProcessUtils {
                        ConfigOption<MemorySize> configOption) {
                return 
LEGACY_MEMORY_UTILS.getConfWithLegacyHeapSizeMappedToNewConfigOption(configuration,
 configOption);
        }
+
+       public static JobManagerProcessSpec 
createDefaultJobManagerProcessSpec(int totalProcessMemoryMb) {
+               Configuration configuration = new Configuration();
+               configuration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, 
MemorySize.ofMebiBytes(totalProcessMemoryMb));
+               return processSpecFromConfig(configuration);
+       }
 }

Reply via email to