xintongsong commented on a change in pull request #11445: [FLINK-16615] 
Introduce data structures and utilities to calculate Job Manager memory 
components
URL: https://github.com/apache/flink/pull/11445#discussion_r394816955
 
 

 ##########
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerProcessUtils.java
 ##########
 @@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.runtime.util.MemoryProcessUtils;
+import 
org.apache.flink.runtime.util.MemoryProcessUtils.JvmMetaspaceAndOverhead;
+import 
org.apache.flink.runtime.util.MemoryProcessUtils.JvmMetaspaceAndOverheadOptions;
+import org.apache.flink.runtime.util.MemoryProcessUtils.LegacyHeapOptions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static 
org.apache.flink.runtime.util.MemoryProcessUtils.deriveJvmMetaspaceAndOverheadFromTotalFlinkMemory;
+import static 
org.apache.flink.runtime.util.MemoryProcessUtils.deriveJvmMetaspaceAndOverheadWithTotalProcessMemory;
+import static 
org.apache.flink.runtime.util.MemoryProcessUtils.getMemorySizeFromConfig;
+
+/**
+ * JobManager utils to calculate {@link JobManagerProcessSpec} and JVM args.
+ */
+public final class JobManagerProcessUtils {
+       private static final Logger LOG = 
LoggerFactory.getLogger(JobManagerProcessUtils.class);
+
+       static final JvmMetaspaceAndOverheadOptions 
JM_JVM_METASPACE_AND_OVERHEAD_OPTIONS =
+               new JvmMetaspaceAndOverheadOptions(
+                       JobManagerOptions.TOTAL_PROCESS_MEMORY,
+                       JobManagerOptions.JVM_METASPACE,
+                       JobManagerOptions.JVM_OVERHEAD_MIN,
+                       JobManagerOptions.JVM_OVERHEAD_MAX,
+                       JobManagerOptions.JVM_OVERHEAD_FRACTION
+               );
+
+       @SuppressWarnings("deprecation")
+       static final LegacyHeapOptions JM_LEGACY_HEAP_OPTIONS =
+               new LegacyHeapOptions(
+                       "FLINK_JM_HEAP",
+                       JobManagerOptions.JOB_MANAGER_HEAP_MEMORY,
+                       JobManagerOptions.JOB_MANAGER_HEAP_MEMORY_MB
+               );
+
+       public static JobManagerProcessSpec processSpecFromConfig(Configuration 
config) {
+               if (config.contains(JobManagerOptions.JVM_HEAP_MEMORY)) {
+                       // jvm heap memory is configured, use these to derive 
total Flink and process memory
+                       return deriveProcessSpecWithExplicitJvmHeap(config);
+               } else if 
(config.contains(JobManagerOptions.TOTAL_FLINK_MEMORY)) {
+                       // jvm heap memory is not configured, total Flink 
memory is configured,
+                       // derive from total flink memory
+                       return deriveProcessSpecWithTotalFlinkMemory(config);
+               } else if 
(config.contains(JobManagerOptions.TOTAL_PROCESS_MEMORY)) {
+                       // total Flink memory is not configured, total process 
memory is configured,
+                       // derive from total process memory
+                       return deriveProcessSpecWithTotalProcessMemory(config);
+               }
+
+               throw new IllegalConfigurationException(String.format(
+                       "Either JVM Heap Memory size (%s) or Total Flink Memory 
size (%s), or Total Process Memory size (%s) " +
+                               "need to be configured explicitly.",
+                       JobManagerOptions.JVM_HEAP_MEMORY.key(),
+                       JobManagerOptions.TOTAL_FLINK_MEMORY.key(),
+                       JobManagerOptions.TOTAL_PROCESS_MEMORY.key()));
+       }
+
+       private static JobManagerProcessSpec 
deriveProcessSpecWithExplicitJvmHeap(Configuration config) {
+               // derive flink internal memory from explicitly configure jvm 
heap memory size
+
+               MemorySize jvmHeapMemorySize = getMemorySizeFromConfig(config, 
JobManagerOptions.JVM_HEAP_MEMORY);
+               MemorySize offHeapMemorySize = getMemorySizeFromConfig(config, 
JobManagerOptions.OFF_HEAP_MEMORY);
+               MemorySize derivedTotalFlinkMemorySize = 
jvmHeapMemorySize.add(offHeapMemorySize);
+
+               if (config.contains(JobManagerOptions.TOTAL_FLINK_MEMORY)) {
+                       // derive network memory from total flink memory, and 
check against network min/max
+                       MemorySize totalFlinkMemorySize = 
getMemorySizeFromConfig(config, JobManagerOptions.TOTAL_FLINK_MEMORY);
+                       if (derivedTotalFlinkMemorySize.getBytes() != 
totalFlinkMemorySize.getBytes()) {
+                               throw new 
IllegalConfigurationException(String.format(
+                                       "Sum of the configured JVM Heap Memory 
(%s) and the configured or default Off-heap Memory (%s) " +
+                                               "exceeds the configured Total 
Flink Memory (%s). Please, make the configuration consistent " +
+                                               "or configure only one option: 
either JVM Heap or Total Flink Memory.",
+                                       
jvmHeapMemorySize.toHumanReadableString(),
+                                       
offHeapMemorySize.toHumanReadableString(),
+                                       
totalFlinkMemorySize.toHumanReadableString()));
+                       }
+               }
+
+               JvmMetaspaceAndOverhead jvmMetaspaceAndOverhead = 
deriveJvmMetaspaceAndOverheadFromTotalFlinkMemory(
+                       config,
+                       derivedTotalFlinkMemorySize,
+                       JM_JVM_METASPACE_AND_OVERHEAD_OPTIONS);
+
+               return createJobManagerProcessSpec(
+                       config,
+                       new FlinkInternalMemory(jvmHeapMemorySize, 
offHeapMemorySize),
+                       jvmMetaspaceAndOverhead);
+       }
+
+       private static JobManagerProcessSpec 
deriveProcessSpecWithTotalFlinkMemory(Configuration config) {
+               MemorySize totalFlinkMemorySize = 
getMemorySizeFromConfig(config, JobManagerOptions.TOTAL_FLINK_MEMORY);
+               FlinkInternalMemory internalMemory = 
deriveInternalMemoryFromTotalFlinkMemory(config, totalFlinkMemorySize);
+               JvmMetaspaceAndOverhead jvmMetaspaceAndOverhead = 
deriveJvmMetaspaceAndOverheadFromTotalFlinkMemory(
+                       config,
+                       totalFlinkMemorySize,
+                       JM_JVM_METASPACE_AND_OVERHEAD_OPTIONS);
+
+               return createJobManagerProcessSpec(config, internalMemory, 
jvmMetaspaceAndOverhead);
+       }
+
+       private static JobManagerProcessSpec 
deriveProcessSpecWithTotalProcessMemory(Configuration config) {
+               JvmMetaspaceAndOverhead jvmMetaspaceAndOverhead =
+                       
deriveJvmMetaspaceAndOverheadWithTotalProcessMemory(config, 
JM_JVM_METASPACE_AND_OVERHEAD_OPTIONS);
+               MemorySize totalProcessMemorySize = 
getMemorySizeFromConfig(config, JobManagerOptions.TOTAL_PROCESS_MEMORY);
 
 Review comment:
   The total process memory size is fetched twice, inside and after 
`deriveJvmMetaspaceAndOverheadWithTotalProcessMemory`.
   
   It might be better to fetch it once, and just pass it as an argument into 
`deriveJvmMetaspaceAndOverheadWithTotalProcessMemory`, to avoid the risk of 
inconsistency. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to