This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a commit to branch flip116 in repository https://gitbox.apache.org/repos/asf/flink.git
commit b29a14dc9a445a1818a07c24d9a133a1b8c1a271 Author: Andrey Zagrebin <[email protected]> AuthorDate: Wed Apr 8 17:05:35 2020 +0300 [FLINK-16745][clients] Parse JobManagerProcessSpec from Configuration into ClusterSpecification --- .../AbstractContainerizedClusterClientFactory.java | 13 +++--- .../client/deployment/ClusterSpecification.java | 46 +++++++++++++++++++++- .../flink/configuration/ConfigurationUtils.java | 19 --------- .../runtime/jobmanager/JobManagerProcessUtils.java | 8 +++- 4 files changed, 59 insertions(+), 27 deletions(-) diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractContainerizedClusterClientFactory.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractContainerizedClusterClientFactory.java index 46af058..83172fd 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractContainerizedClusterClientFactory.java +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractContainerizedClusterClientFactory.java @@ -20,9 +20,11 @@ package org.apache.flink.client.deployment; import org.apache.flink.annotation.Internal; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.ConfigurationUtils; +import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils; +import org.apache.flink.runtime.jobmanager.JobManagerProcessSpec; +import org.apache.flink.runtime.jobmanager.JobManagerProcessUtils; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -36,9 +38,10 @@ public abstract class AbstractContainerizedClusterClientFactory<ClusterID> imple public ClusterSpecification getClusterSpecification(Configuration configuration) { checkNotNull(configuration); - final int jobManagerMemoryMB = ConfigurationUtils - .getJobManagerHeapMemory(configuration) - .getMebiBytes(); + final JobManagerProcessSpec jobManagerProcessSpec = JobManagerProcessUtils.processSpecFromConfig( + JobManagerProcessUtils.getConfigurationWithLegacyHeapSizeMappedToNewConfigOption( + configuration, + JobManagerOptions.TOTAL_PROCESS_MEMORY)); final int taskManagerMemoryMB = TaskExecutorProcessUtils .processSpecFromConfig(TaskExecutorProcessUtils.getConfigurationMapLegacyTaskManagerHeapSizeToConfigOption( @@ -49,7 +52,7 @@ public abstract class AbstractContainerizedClusterClientFactory<ClusterID> imple int slotsPerTaskManager = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS); return new ClusterSpecification.ClusterSpecificationBuilder() - .setMasterMemoryMB(jobManagerMemoryMB) + .setMasterProcessSpec(jobManagerProcessSpec) .setTaskManagerMemoryMB(taskManagerMemoryMB) .setSlotsPerTaskManager(slotsPerTaskManager) .createClusterSpecification(); diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterSpecification.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterSpecification.java index 51bd522..9daa35a 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterSpecification.java +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterSpecification.java @@ -18,20 +18,35 @@ package org.apache.flink.client.deployment; +import org.apache.flink.runtime.jobmanager.JobManagerProcessSpec; +import org.apache.flink.util.Preconditions; + +import static org.apache.flink.runtime.jobmanager.JobManagerProcessUtils.createDefaultJobManagerProcessSpec; + /** * Description of the cluster to start by the {@link ClusterDescriptor}. */ public final class ClusterSpecification { + private final JobManagerProcessSpec masterProcessSpec; private final int masterMemoryMB; private final int taskManagerMemoryMB; private final int slotsPerTaskManager; - private ClusterSpecification(int masterMemoryMB, int taskManagerMemoryMB, int slotsPerTaskManager) { + private ClusterSpecification( + JobManagerProcessSpec masterProcessSpec, + int masterMemoryMB, + int taskManagerMemoryMB, + int slotsPerTaskManager) { + this.masterProcessSpec = masterProcessSpec; this.masterMemoryMB = masterMemoryMB; this.taskManagerMemoryMB = taskManagerMemoryMB; this.slotsPerTaskManager = slotsPerTaskManager; } + public JobManagerProcessSpec getMasterProcessSpec() { + return masterProcessSpec; + } + public int getMasterMemoryMB() { return masterMemoryMB; } @@ -57,10 +72,18 @@ public final class ClusterSpecification { * Builder for the {@link ClusterSpecification} instance. */ public static class ClusterSpecificationBuilder { - private int masterMemoryMB = 768; + private static final JobManagerProcessSpec DEFAULT_MASTER_PROCESS_SPEC = createDefaultJobManagerProcessSpec(768); + + private JobManagerProcessSpec masterProcessSpec; + private int masterMemoryMB = -1; private int taskManagerMemoryMB = 1024; private int slotsPerTaskManager = 1; + public ClusterSpecificationBuilder setMasterProcessSpec(JobManagerProcessSpec masterProcessSpec) { + this.masterProcessSpec = masterProcessSpec; + return this; + } + public ClusterSpecificationBuilder setMasterMemoryMB(int masterMemoryMB) { this.masterMemoryMB = masterMemoryMB; return this; @@ -77,10 +100,29 @@ public final class ClusterSpecification { } public ClusterSpecification createClusterSpecification() { + deriveAndVerifyMasterSpec(); return new ClusterSpecification( + masterProcessSpec, masterMemoryMB, taskManagerMemoryMB, slotsPerTaskManager); } + + private void deriveAndVerifyMasterSpec() { + if (masterProcessSpec == null) { + masterProcessSpec = masterMemoryMB <= 0 ? + DEFAULT_MASTER_PROCESS_SPEC : createDefaultJobManagerProcessSpec(masterMemoryMB); + } + if (masterMemoryMB <= 0) { + masterMemoryMB = masterProcessSpec.getTotalProcessMemorySize().getMebiBytes(); + } + int jmTotalProcessSizeMb = masterProcessSpec.getTotalFlinkMemorySize().getMebiBytes(); + Preconditions.checkArgument( + masterMemoryMB >= jmTotalProcessSizeMb, + "Total job manager memory size %dMb cannot be less than " + + "its configured or derived total process size %dMb", + masterMemoryMB, + jmTotalProcessSizeMb); + } } } diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java index 0408a37..4fbc512 100755 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java @@ -42,25 +42,6 @@ public class ConfigurationUtils { private static final String[] EMPTY = new String[0]; /** - * Get job manager's heap memory. This method will check the new key - * {@link JobManagerOptions#JOB_MANAGER_HEAP_MEMORY} and - * the old key {@link JobManagerOptions#JOB_MANAGER_HEAP_MEMORY_MB} for backwards compatibility. - * - * @param configuration the configuration object - * @return the memory size of job manager's heap memory. - */ - public static MemorySize getJobManagerHeapMemory(Configuration configuration) { - if (configuration.containsKey(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY.key())) { - return configuration.get(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY); - } else if (configuration.containsKey(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY_MB.key())) { - return MemorySize.ofMebiBytes(configuration.getInteger(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY_MB)); - } else { - //use default value - return JobManagerOptions.JOB_MANAGER_HEAP_MEMORY.defaultValue(); - } - } - - /** * @return extracted {@link MetricOptions#SYSTEM_RESOURCE_METRICS_PROBING_INTERVAL} or {@code Optional.empty()} if * {@link MetricOptions#SYSTEM_RESOURCE_METRICS} are disabled. */ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerProcessUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerProcessUtils.java index 62f61bf..82c2490 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerProcessUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerProcessUtils.java @@ -24,8 +24,8 @@ import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.MemorySize; import org.apache.flink.runtime.util.config.memory.CommonProcessMemorySpec; import org.apache.flink.runtime.util.config.memory.JvmMetaspaceAndOverheadOptions; -import org.apache.flink.runtime.util.config.memory.MemoryBackwardsCompatibilityUtils; import org.apache.flink.runtime.util.config.memory.LegacyMemoryOptions; +import org.apache.flink.runtime.util.config.memory.MemoryBackwardsCompatibilityUtils; import org.apache.flink.runtime.util.config.memory.ProcessMemoryOptions; import org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils; import org.apache.flink.runtime.util.config.memory.jobmanager.JobManagerFlinkMemory; @@ -78,4 +78,10 @@ public class JobManagerProcessUtils { ConfigOption<MemorySize> configOption) { return LEGACY_MEMORY_UTILS.getConfWithLegacyHeapSizeMappedToNewConfigOption(configuration, configOption); } + + public static JobManagerProcessSpec createDefaultJobManagerProcessSpec(int totalProcessMemoryMb) { + Configuration configuration = new Configuration(); + configuration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(totalProcessMemoryMb)); + return processSpecFromConfig(configuration); + } }
