This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 4b176f28b5995b65c6bfc4bd0337b7959b0f4e60 Author: Chesnay Schepler <[email protected]> AuthorDate: Fri Jun 12 14:12:10 2020 +0200 [FLINK-18175][conf] Log final memory configuration --- .../flink/runtime/util/bash/BashJavaUtils.java | 43 ++++++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/bash/BashJavaUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/bash/BashJavaUtils.java index f460906..9d53db0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/bash/BashJavaUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/bash/BashJavaUtils.java @@ -21,14 +21,20 @@ package org.apache.flink.runtime.util.bash; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.configuration.MemorySize; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec; import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils; import org.apache.flink.runtime.jobmanager.JobManagerProcessSpec; import org.apache.flink.runtime.jobmanager.JobManagerProcessUtils; import org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils; +import org.apache.flink.runtime.util.config.memory.jobmanager.JobManagerFlinkMemory; +import org.apache.flink.runtime.util.config.memory.taskmanager.TaskExecutorFlinkMemory; import org.apache.flink.util.FlinkException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -39,6 +45,7 @@ import static org.apache.flink.util.Preconditions.checkArgument; * Utility class for using java utilities in bash scripts. */ public class BashJavaUtils { + private static final Logger LOG = LoggerFactory.getLogger(BashJavaUtils.class); @VisibleForTesting public static final String EXECUTION_PREFIX = "BASH_JAVA_UTILS_EXEC_RESULT:"; @@ -79,6 +86,9 @@ public class BashJavaUtils { configuration, TaskManagerOptions.TOTAL_FLINK_MEMORY); TaskExecutorProcessSpec taskExecutorProcessSpec = TaskExecutorProcessUtils.processSpecFromConfig(configurationWithFallback); + + logTaskExecutorConfiguration(taskExecutorProcessSpec); + return Arrays.asList( ProcessMemoryUtils.generateJvmParametersStr(taskExecutorProcessSpec), TaskExecutorProcessUtils.generateDynamicConfigsStr(taskExecutorProcessSpec)); @@ -92,9 +102,42 @@ public class BashJavaUtils { JobManagerProcessSpec jobManagerProcessSpec = JobManagerProcessUtils.processSpecFromConfigWithNewOptionToInterpretLegacyHeap( configuration, JobManagerOptions.JVM_HEAP_MEMORY); + + logMasterConfiguration(jobManagerProcessSpec); + return Collections.singletonList(ProcessMemoryUtils.generateJvmParametersStr(jobManagerProcessSpec)); } + private static void logMasterConfiguration(JobManagerProcessSpec spec) { + JobManagerFlinkMemory flinkMemory = spec.getFlinkMemory(); + LOG.info("Final Master Memory configuration:"); + LOG.info(" Total Process Memory: {}", spec.getTotalProcessMemorySize().toHumanReadableString()); + LOG.info(" Total Flink Memory: {}", flinkMemory.getTotalFlinkMemorySize().toHumanReadableString()); + LOG.info(" JVM Heap: {}", flinkMemory.getJvmHeapMemorySize().toHumanReadableString()); + LOG.info(" Off-heap: {}", flinkMemory.getJvmDirectMemorySize().toHumanReadableString()); + LOG.info(" JVM Metaspace: {}", spec.getJvmMetaspaceSize().toHumanReadableString()); + LOG.info(" JVM Overhead: {}", spec.getJvmOverheadSize().toHumanReadableString()); + } + + private static void logTaskExecutorConfiguration(TaskExecutorProcessSpec spec) { + TaskExecutorFlinkMemory flinkMemory = spec.getFlinkMemory(); + MemorySize totalOffHeapMemory = flinkMemory.getManaged().add(flinkMemory.getJvmDirectMemorySize()); + LOG.info("Final TaskExecutor Memory configuration:"); + LOG.info(" Total Process Memory: {}", spec.getTotalProcessMemorySize().toHumanReadableString()); + LOG.info(" Total Flink Memory: {}", flinkMemory.getTotalFlinkMemorySize().toHumanReadableString()); + LOG.info(" Total JVM Heap Memory: {}", flinkMemory.getJvmHeapMemorySize().toHumanReadableString()); + LOG.info(" Framework: {}", flinkMemory.getFrameworkHeap().toHumanReadableString()); + LOG.info(" Task: {}", flinkMemory.getTaskHeap().toHumanReadableString()); + LOG.info(" Total Off-heap Memory: {}", totalOffHeapMemory.toHumanReadableString()); + LOG.info(" Managed: {}", flinkMemory.getManaged().toHumanReadableString()); + LOG.info(" Total JVM Direct Memory: {}", flinkMemory.getJvmDirectMemorySize().toHumanReadableString()); + LOG.info(" Framework: {}", flinkMemory.getFrameworkOffHeap().toHumanReadableString()); + LOG.info(" Task: {}", flinkMemory.getTaskOffHeap().toHumanReadableString()); + LOG.info(" Network: {}", flinkMemory.getNetwork().toHumanReadableString()); + LOG.info(" JVM Metaspace: {}", spec.getJvmMetaspaceSize().toHumanReadableString()); + LOG.info(" JVM Overhead: {}", spec.getJvmOverheadSize().toHumanReadableString()); + } + /** * Commands that BashJavaUtils supports. */
