xintongsong commented on a change in pull request #11445:
URL: https://github.com/apache/flink/pull/11445#discussion_r436513228



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/ProcessMemoryUtils.java
##########
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util.config.memory;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Common utils to parse JVM process memory configuration for JM or TM.
+ *
+ * <p>The utility calculates all common process memory components from {@link 
CommonProcessMemorySpec}.
+ *
+ * <p>It is required to configure at least one subset of the following options 
and recommended to configure only one:
+ * <ul>
+ *     <li>{@link ProcessMemoryOptions#getRequiredFineGrainedOptions()}</li>
+ *     <li>{@link ProcessMemoryOptions#getTotalFlinkMemoryOption()}</li>
+ *     <li>{@link ProcessMemoryOptions#getTotalProcessMemoryOption()}</li>
+ * </ul>
+ * Otherwise the calculation fails.
+ *
+ * <p>The utility derives the Total Process Memory from the Total Flink Memory 
and JVM components and back.
+ * To perform the calculations, it uses the provided {@link 
ProcessMemoryOptions} which are different for different
+ * Flink processes: JM/TM.
+ *
+ * <p>The utility also calls the provided FlinkMemoryUtils to derive {@link 
FlinkMemory} components from
+ * {@link ProcessMemoryOptions#getRequiredFineGrainedOptions()} or from the 
Total Flink memory. The concrete
+ * {@link FlinkMemoryUtils} is implemented for the respective processes: 
JM/TM, according to the specific
+ * structure of their {@link FlinkMemory}.
+ *
+ * @param <FM> the FLink memory component structure
+ */
+public class ProcessMemoryUtils<FM extends FlinkMemory> {
+       private static final Logger LOG = 
LoggerFactory.getLogger(ProcessMemoryUtils.class);
+
+       private final ProcessMemoryOptions options;
+       private final FlinkMemoryUtils<FM> flinkMemoryUtils;
+
+       public ProcessMemoryUtils(ProcessMemoryOptions options, 
FlinkMemoryUtils<FM> flinkMemoryUtils) {
+               this.options = checkNotNull(options);
+               this.flinkMemoryUtils = checkNotNull(flinkMemoryUtils);
+       }
+
+       public CommonProcessMemorySpec<FM> 
memoryProcessSpecFromConfig(Configuration config) {
+               if 
(options.getRequiredFineGrainedOptions().stream().allMatch(config::contains)) {
+                       // all internal memory options are configured, use 
these to derive total Flink and process memory
+                       return 
deriveProcessSpecWithExplicitInternalMemory(config);
+               } else if 
(config.contains(options.getTotalFlinkMemoryOption())) {
+                       // internal memory options are not configured, total 
Flink memory is configured,
+                       // derive from total flink memory
+                       return deriveProcessSpecWithTotalFlinkMemory(config);
+               } else if 
(config.contains(options.getTotalProcessMemoryOption())) {
+                       // total Flink memory is not configured, total process 
memory is configured,
+                       // derive from total process memory
+                       return deriveProcessSpecWithTotalProcessMemory(config);
+               }
+               return failBecauseRequiredOptionsNotConfigured();
+       }
+
+       private CommonProcessMemorySpec<FM> 
deriveProcessSpecWithExplicitInternalMemory(Configuration config) {
+               FM flinkInternalMemory = 
flinkMemoryUtils.deriveFromRequiredFineGrainedOptions(config);
+               JvmMetaspaceAndOverhead jvmMetaspaceAndOverhead = 
deriveJvmMetaspaceAndOverheadFromTotalFlinkMemory(
+                       config,
+                       flinkInternalMemory.getTotalFlinkMemorySize());
+               return new CommonProcessMemorySpec<>(flinkInternalMemory, 
jvmMetaspaceAndOverhead);
+       }
+
+       private CommonProcessMemorySpec<FM> 
deriveProcessSpecWithTotalFlinkMemory(Configuration config) {
+               MemorySize totalFlinkMemorySize = 
getMemorySizeFromConfig(config, options.getTotalFlinkMemoryOption());
+               FM flinkInternalMemory = 
flinkMemoryUtils.deriveFromTotalFlinkMemory(config, totalFlinkMemorySize);
+               JvmMetaspaceAndOverhead jvmMetaspaceAndOverhead = 
deriveJvmMetaspaceAndOverheadFromTotalFlinkMemory(config, totalFlinkMemorySize);
+               return new CommonProcessMemorySpec<>(flinkInternalMemory, 
jvmMetaspaceAndOverhead);
+       }
+
+       private CommonProcessMemorySpec<FM> 
deriveProcessSpecWithTotalProcessMemory(Configuration config) {
+               MemorySize totalProcessMemorySize = 
getMemorySizeFromConfig(config, options.getTotalProcessMemoryOption());
+               JvmMetaspaceAndOverhead jvmMetaspaceAndOverhead =
+                       
deriveJvmMetaspaceAndOverheadWithTotalProcessMemory(config, 
totalProcessMemorySize);
+               MemorySize totalFlinkMemorySize = 
totalProcessMemorySize.subtract(
+                       
jvmMetaspaceAndOverhead.getTotalJvmMetaspaceAndOverheadSize());
+               FM flinkInternalMemory = 
flinkMemoryUtils.deriveFromTotalFlinkMemory(config, totalFlinkMemorySize);
+               return new CommonProcessMemorySpec<>(flinkInternalMemory, 
jvmMetaspaceAndOverhead);
+       }
+
+       private CommonProcessMemorySpec<FM> 
failBecauseRequiredOptionsNotConfigured() {
+               String[] internalMemoryOptionKeys = 
options.getRequiredFineGrainedOptions().stream().map(ConfigOption::key).toArray(String[]::new);
+               throw new IllegalConfigurationException(String.format(
+                       "Either required fine-grained memory (%s), or Total 
Flink Memory size (%s), or Total Process Memory size " +
+                               "(%s) need to be configured explicitly.",
+                       String.join(" and ", internalMemoryOptionKeys),
+                       options.getTotalFlinkMemoryOption(),
+                       options.getTotalProcessMemoryOption()));
+       }
+
+       private JvmMetaspaceAndOverhead 
deriveJvmMetaspaceAndOverheadWithTotalProcessMemory(
+                       Configuration config,
+                       MemorySize totalProcessMemorySize) {
+               MemorySize jvmMetaspaceSize = getMemorySizeFromConfig(config, 
options.getJvmOptions().getJvmMetaspaceOption());
+               MemorySize jvmOverheadSize = deriveWithFraction(
+                       "jvm overhead memory",
+                       totalProcessMemorySize,
+                       getJvmOverheadRangeFraction(config));
+               JvmMetaspaceAndOverhead jvmMetaspaceAndOverhead = new 
JvmMetaspaceAndOverhead(jvmMetaspaceSize, jvmOverheadSize);
+
+               if 
(jvmMetaspaceAndOverhead.getTotalJvmMetaspaceAndOverheadSize().getBytes() > 
totalProcessMemorySize.getBytes()) {
+                       throw new IllegalConfigurationException(
+                               "Sum of configured JVM Metaspace (" + 
jvmMetaspaceAndOverhead.getMetaspace().toHumanReadableString()
+                                       + ") and JVM Overhead (" + 
jvmMetaspaceAndOverhead.getOverhead().toHumanReadableString()
+                                       + ") exceed configured Total Process 
Memory (" + totalProcessMemorySize.toHumanReadableString() + ").");
+               }
+
+               return jvmMetaspaceAndOverhead;
+       }
+
+       private JvmMetaspaceAndOverhead 
deriveJvmMetaspaceAndOverheadFromTotalFlinkMemory(
+                       Configuration config,
+                       MemorySize totalFlinkMemorySize) {
+               MemorySize jvmMetaspaceSize = getMemorySizeFromConfig(config, 
options.getJvmOptions().getJvmMetaspaceOption());
+               MemorySize totalFlinkAndJvmMetaspaceSize = 
totalFlinkMemorySize.add(jvmMetaspaceSize);
+               JvmMetaspaceAndOverhead jvmMetaspaceAndOverhead;
+               if (config.contains(options.getTotalProcessMemoryOption())) {
+                       MemorySize totalProcessMemorySize = 
getMemorySizeFromConfig(config, options.getTotalProcessMemoryOption());
+                       MemorySize jvmOverheadSize = 
totalProcessMemorySize.subtract(totalFlinkAndJvmMetaspaceSize);
+                       sanityCheckJvmOverhead(config, jvmOverheadSize, 
totalProcessMemorySize);
+                       jvmMetaspaceAndOverhead = new 
JvmMetaspaceAndOverhead(jvmMetaspaceSize, jvmOverheadSize);
+               } else {
+                       MemorySize jvmOverheadSize = deriveWithInverseFraction(
+                               "jvm overhead memory",
+                               totalFlinkAndJvmMetaspaceSize,
+                               getJvmOverheadRangeFraction(config));
+                       jvmMetaspaceAndOverhead = new 
JvmMetaspaceAndOverhead(jvmMetaspaceSize, jvmOverheadSize);
+                       sanityCheckTotalProcessMemory(config, 
totalFlinkMemorySize, jvmMetaspaceAndOverhead);
+               }
+               return jvmMetaspaceAndOverhead;
+       }
+
+       private void sanityCheckJvmOverhead(
+                       Configuration config,
+                       MemorySize derivedJvmOverheadSize,
+                       MemorySize totalProcessMemorySize) {
+               RangeFraction jvmOverheadRangeFraction = 
getJvmOverheadRangeFraction(config);
+               if (derivedJvmOverheadSize.getBytes() > 
jvmOverheadRangeFraction.getMaxSize().getBytes() ||
+                       derivedJvmOverheadSize.getBytes() < 
jvmOverheadRangeFraction.getMinSize().getBytes()) {
+                       throw new IllegalConfigurationException("Derived JVM 
Overhead size ("
+                               + 
derivedJvmOverheadSize.toHumanReadableString() + ") is not in configured JVM 
Overhead range ["
+                               + 
jvmOverheadRangeFraction.getMinSize().toHumanReadableString() + ", "
+                               + 
jvmOverheadRangeFraction.getMaxSize().toHumanReadableString() + "].");
+               }
+               if 
(config.contains(options.getJvmOptions().getJvmOverheadFraction()) &&
+                       
!derivedJvmOverheadSize.equals(totalProcessMemorySize.multiply(jvmOverheadRangeFraction.getFraction())))
 {
+                       LOG.info(
+                               "The derived JVM Overhead size ({}) does not 
match " +
+                                       "the configured JVM Overhead fraction 
({}) from the configured Total Process Memory size ({}). " +
+                                       "The derived JVM OVerhead size will be 
used.",
+                               derivedJvmOverheadSize.toHumanReadableString(),
+                               jvmOverheadRangeFraction.getFraction(),
+                               totalProcessMemorySize.toHumanReadableString());
+               }
+       }
+
+       private void sanityCheckTotalProcessMemory(
+                       Configuration config,
+                       MemorySize totalFlinkMemory,
+                       JvmMetaspaceAndOverhead jvmMetaspaceAndOverhead) {
+               MemorySize derivedTotalProcessMemorySize =
+                       
totalFlinkMemory.add(jvmMetaspaceAndOverhead.getMetaspace()).add(jvmMetaspaceAndOverhead.getOverhead());
+               if (config.contains(options.getTotalProcessMemoryOption())) {
+                       MemorySize configuredTotalProcessMemorySize = 
getMemorySizeFromConfig(config, options.getTotalProcessMemoryOption());
+                       if 
(!configuredTotalProcessMemorySize.equals(derivedTotalProcessMemorySize)) {
+                               throw new 
IllegalConfigurationException(String.format(
+                                       "Configured and Derived memory sizes 
(total %s) do not add up to the configured Total Process " +
+                                               "Memory size (%s). Configured 
and Derived memory sizes are: Total Flink Memory (%s), " +
+                                               "JVM Metaspace (%s), JVM 
Overhead (%s).",
+                                       
derivedTotalProcessMemorySize.toHumanReadableString(),
+                                       
configuredTotalProcessMemorySize.toHumanReadableString(),
+                                       
totalFlinkMemory.toHumanReadableString(),
+                                       
jvmMetaspaceAndOverhead.getMetaspace().toHumanReadableString(),
+                                       
jvmMetaspaceAndOverhead.getOverhead().toHumanReadableString()));
+                       }
+               }
+       }
+
+       private RangeFraction getJvmOverheadRangeFraction(Configuration config) 
{
+               MemorySize minSize = getMemorySizeFromConfig(config, 
options.getJvmOptions().getJvmOverheadMin());
+               MemorySize maxSize = getMemorySizeFromConfig(config, 
options.getJvmOptions().getJvmOverheadMax());
+               return getRangeFraction(minSize, maxSize, 
options.getJvmOptions().getJvmOverheadFraction(), config);
+       }
+
+       public static MemorySize getMemorySizeFromConfig(Configuration config, 
ConfigOption<MemorySize> option) {
+               try {
+                       return Preconditions.checkNotNull(config.get(option), 
"The memory option is not set and has no default value.");
+               } catch (Throwable t) {
+                       throw new IllegalConfigurationException("Cannot read 
memory size from config option '" + option.key() + "'.", t);
+               }
+       }
+
+       public static RangeFraction getRangeFraction(
+                       MemorySize minSize,
+                       MemorySize maxSize,
+                       ConfigOption<Float> fractionOption,
+                       Configuration config) {
+               double fraction = config.getFloat(fractionOption);
+               try {
+                       return new RangeFraction(minSize, maxSize, fraction);
+               } catch (IllegalArgumentException e) {
+                       throw new IllegalConfigurationException(
+                               String.format(
+                                       "Inconsistently configured %s (%s) and 
its min (%s), max (%s) value",
+                                       fractionOption,
+                                       fraction,
+                                       minSize.toHumanReadableString(),
+                                       maxSize.toHumanReadableString()),
+                               e);
+               }
+       }
+
+       public static MemorySize deriveWithFraction(String memoryDescription, 
MemorySize base, RangeFraction rangeFraction) {
+               MemorySize relative = 
base.multiply(rangeFraction.getFraction());
+               return capToMinMax(memoryDescription, relative, rangeFraction);
+       }
+
+       public static MemorySize deriveWithInverseFraction(String 
memoryDescription, MemorySize base, RangeFraction rangeFraction) {
+               checkArgument(rangeFraction.getFraction() < 1);
+               MemorySize relative = base.multiply(rangeFraction.getFraction() 
/ (1 - rangeFraction.getFraction()));
+               return capToMinMax(memoryDescription, relative, rangeFraction);
+       }
+
+       private static MemorySize capToMinMax(
+                       String memoryDescription,
+                       MemorySize relative,
+                       RangeFraction rangeFraction) {
+               long size = relative.getBytes();
+               if (size > rangeFraction.getMaxSize().getBytes()) {
+                       LOG.info(
+                               "The derived from fraction {} ({}) is greater 
than its max value {}, max value will be used instead",
+                               memoryDescription,
+                               relative.toHumanReadableString(),
+                               
rangeFraction.getMaxSize().toHumanReadableString());
+                       size = rangeFraction.getMaxSize().getBytes();
+               } else if (size < rangeFraction.getMinSize().getBytes()) {
+                       LOG.info(
+                               "The derived from fraction {} ({}) is less than 
its min value {}, min value will be used instead",
+                               memoryDescription,
+                               relative.toHumanReadableString(),
+                               
rangeFraction.getMinSize().toHumanReadableString());
+                       size = rangeFraction.getMinSize().getBytes();
+               }
+               return new MemorySize(size);
+       }
+
+       public static String generateJvmParametersStr(ProcessMemorySpec 
processSpec) {
+               return "-Xmx" + processSpec.getJvmHeapMemorySize().getBytes()

Review comment:
       FYI, I've created FLINK-18175 to track this.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to