This is an automated email from the ASF dual-hosted git repository. fanrui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 752d3a79a918b9300dd2b89e96f3915ba6a2dfa6 Author: sxnan <suxuanna...@gmail.com> AuthorDate: Tue Jan 9 17:34:23 2024 +0800 [FLINK-34083][yarn][refactor] Move BootstrapTools#getTaskManagerShellCommand to flink-yarn Utils --- .../runtime/clusterframework/BootstrapTools.java | 120 ------ .../clusterframework/BootstrapToolsTest.java | 428 -------------------- .../src/main/java/org/apache/flink/yarn/Utils.java | 127 +++++- .../apache/flink/yarn/YarnClusterDescriptor.java | 4 +- .../test/java/org/apache/flink/yarn/UtilsTest.java | 433 +++++++++++++++++++++ 5 files changed, 560 insertions(+), 552 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java index 7d944a1af8c..100686e23bf 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java @@ -19,13 +19,11 @@ package org.apache.flink.runtime.clusterframework; import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; import org.apache.flink.runtime.entrypoint.parser.CommandLineOptions; -import org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils; import org.apache.flink.util.OperatingSystem; import org.apache.flink.shaded.guava32.com.google.common.escape.Escaper; @@ -42,7 +40,6 @@ import java.io.File; import java.io.FileWriter; import java.io.IOException; import java.io.PrintWriter; -import java.util.HashMap; import java.util.Map; import java.util.stream.Stream; @@ -160,128 +157,11 @@ public class BootstrapTools { return config; } - /** - * Generates the shell command to start a task manager. - * - * @param flinkConfig The Flink configuration. - * @param tmParams Parameters for the task manager. - * @param configDirectory The configuration directory for the flink-conf.yaml - * @param logDirectory The log directory. - * @param hasLogback Uses logback? - * @param hasLog4j Uses log4j? - * @param mainClass The main class to start with. - * @return A String containing the task manager startup command. - */ - public static String getTaskManagerShellCommand( - Configuration flinkConfig, - ContaineredTaskManagerParameters tmParams, - String configDirectory, - String logDirectory, - boolean hasLogback, - boolean hasLog4j, - boolean hasKrb5, - Class<?> mainClass, - String mainArgs) { - - final Map<String, String> startCommandValues = new HashMap<>(); - startCommandValues.put("java", "$JAVA_HOME/bin/java"); - - final TaskExecutorProcessSpec taskExecutorProcessSpec = - tmParams.getTaskExecutorProcessSpec(); - startCommandValues.put( - "jvmmem", ProcessMemoryUtils.generateJvmParametersStr(taskExecutorProcessSpec)); - - String javaOpts = flinkConfig.getString(CoreOptions.FLINK_JVM_OPTIONS); - if (flinkConfig.getString(CoreOptions.FLINK_TM_JVM_OPTIONS).length() > 0) { - javaOpts += " " + flinkConfig.getString(CoreOptions.FLINK_TM_JVM_OPTIONS); - } - javaOpts += " " + IGNORE_UNRECOGNIZED_VM_OPTIONS; - - // krb5.conf file will be available as local resource in JM/TM container - if (hasKrb5) { - javaOpts += " -Djava.security.krb5.conf=krb5.conf"; - } - startCommandValues.put("jvmopts", javaOpts); - - String logging = ""; - if (hasLogback || hasLog4j) { - logging = "-Dlog.file=" + logDirectory + "/taskmanager.log"; - if (hasLogback) { - logging += " -Dlogback.configurationFile=file:" + configDirectory + "/logback.xml"; - } - if (hasLog4j) { - logging += " -Dlog4j.configuration=file:" + configDirectory + "/log4j.properties"; - logging += - " -Dlog4j.configurationFile=file:" + configDirectory + "/log4j.properties"; - } - } - - startCommandValues.put("logging", logging); - startCommandValues.put("class", mainClass.getName()); - startCommandValues.put( - "redirects", - "1> " - + logDirectory - + "/taskmanager.out " - + "2> " - + logDirectory - + "/taskmanager.err"); - - String argsStr = - TaskExecutorProcessUtils.generateDynamicConfigsStr(taskExecutorProcessSpec) - + " --configDir " - + configDirectory; - if (!mainArgs.isEmpty()) { - argsStr += " " + mainArgs; - } - startCommandValues.put("args", argsStr); - - final String commandTemplate = - flinkConfig.getString( - ConfigConstants.YARN_CONTAINER_START_COMMAND_TEMPLATE, - ConfigConstants.DEFAULT_YARN_CONTAINER_START_COMMAND_TEMPLATE); - String startCommand = getStartCommand(commandTemplate, startCommandValues); - LOG.debug("TaskManager start command: " + startCommand); - - return startCommand; - } - // ------------------------------------------------------------------------ /** Private constructor to prevent instantiation. */ private BootstrapTools() {} - /** - * Replaces placeholders in the template start command with values from startCommandValues. - * - * <p>If the default template {@link - * ConfigConstants#DEFAULT_YARN_CONTAINER_START_COMMAND_TEMPLATE} is used, the following keys - * must be present in the map or the resulting command will still contain placeholders: - * - * <ul> - * <li><tt>java</tt> = path to the Java executable - * <li><tt>jvmmem</tt> = JVM memory limits and tweaks - * <li><tt>jvmopts</tt> = misc options for the Java VM - * <li><tt>logging</tt> = logging-related configuration settings - * <li><tt>class</tt> = main class to execute - * <li><tt>args</tt> = arguments for the main class - * <li><tt>redirects</tt> = output redirects - * </ul> - * - * @param template a template start command with placeholders - * @param startCommandValues a replacement map <tt>placeholder -> value</tt> - * @return the start command with placeholders filled in - */ - public static String getStartCommand(String template, Map<String, String> startCommandValues) { - for (Map.Entry<String, String> variable : startCommandValues.entrySet()) { - template = - template.replace("%" + variable.getKey() + "%", variable.getValue()) - .replace(" ", " ") - .trim(); - } - return template; - } - /** * Set temporary configuration directories if necessary. * diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java index faff5f4d6d6..687a9dbcc97 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java @@ -18,8 +18,6 @@ package org.apache.flink.runtime.clusterframework; -import org.apache.flink.api.common.resources.CPUResource; -import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.configuration.Configuration; @@ -129,432 +127,6 @@ class BootstrapToolsTest { } } - @Test - void testGetTaskManagerShellCommand() { - final Configuration cfg = new Configuration(); - final TaskExecutorProcessSpec taskExecutorProcessSpec = - new TaskExecutorProcessSpec( - new CPUResource(1.0), - new MemorySize(0), // frameworkHeapSize - new MemorySize(0), // frameworkOffHeapSize - new MemorySize(111), // taskHeapSize - new MemorySize(0), // taskOffHeapSize - new MemorySize(222), // networkMemSize - new MemorySize(0), // managedMemorySize - new MemorySize(333), // jvmMetaspaceSize - new MemorySize(0), // jvmOverheadSize - Collections.emptyList()); - final ContaineredTaskManagerParameters containeredParams = - new ContaineredTaskManagerParameters(taskExecutorProcessSpec, new HashMap<>()); - - // no logging, with/out krb5 - final String java = "$JAVA_HOME/bin/java"; - final String jvmmem = - "-Xmx111 -Xms111 -XX:MaxDirectMemorySize=222 -XX:MaxMetaspaceSize=333"; - final String jvmOpts = "-Djvm"; // if set - final String tmJvmOpts = "-DtmJvm"; // if set - final String logfile = "-Dlog.file=./logs/taskmanager.log"; // if set - final String logback = "-Dlogback.configurationFile=file:./conf/logback.xml"; // if set - final String log4j = - "-Dlog4j.configuration=file:./conf/log4j.properties" - + " -Dlog4j.configurationFile=file:./conf/log4j.properties"; // if set - final String mainClass = "org.apache.flink.runtime.clusterframework.BootstrapToolsTest"; - final String dynamicConfigs = - TaskExecutorProcessUtils.generateDynamicConfigsStr(taskExecutorProcessSpec).trim(); - final String basicArgs = "--configDir ./conf"; - final String mainArgs = "-Djobmanager.rpc.address=host1 -Dkey.a=v1"; - final String args = dynamicConfigs + " " + basicArgs + " " + mainArgs; - final String redirects = "1> ./logs/taskmanager.out 2> ./logs/taskmanager.err"; - - assertThat( - BootstrapTools.getTaskManagerShellCommand( - cfg, - containeredParams, - "./conf", - "./logs", - false, - false, - false, - this.getClass(), - "")) - .isEqualTo( - String.join( - " ", - java, - jvmmem, - BootstrapTools.IGNORE_UNRECOGNIZED_VM_OPTIONS, - mainClass, - dynamicConfigs, - basicArgs, - redirects)); - - assertThat( - BootstrapTools.getTaskManagerShellCommand( - cfg, - containeredParams, - "./conf", - "./logs", - false, - false, - false, - this.getClass(), - mainArgs)) - .isEqualTo( - String.join( - " ", - java, - jvmmem, - BootstrapTools.IGNORE_UNRECOGNIZED_VM_OPTIONS, - mainClass, - args, - redirects)); - - final String krb5 = "-Djava.security.krb5.conf=krb5.conf"; - assertThat( - BootstrapTools.getTaskManagerShellCommand( - cfg, - containeredParams, - "./conf", - "./logs", - false, - false, - true, - this.getClass(), - mainArgs)) - .isEqualTo( - String.join( - " ", - java, - jvmmem, - BootstrapTools.IGNORE_UNRECOGNIZED_VM_OPTIONS, - krb5, - mainClass, - args, - redirects)); - - // logback only, with/out krb5 - assertThat( - BootstrapTools.getTaskManagerShellCommand( - cfg, - containeredParams, - "./conf", - "./logs", - true, - false, - false, - this.getClass(), - mainArgs)) - .isEqualTo( - String.join( - " ", - java, - jvmmem, - BootstrapTools.IGNORE_UNRECOGNIZED_VM_OPTIONS, - logfile, - logback, - mainClass, - args, - redirects)); - - assertThat( - BootstrapTools.getTaskManagerShellCommand( - cfg, - containeredParams, - "./conf", - "./logs", - true, - false, - true, - this.getClass(), - mainArgs)) - .isEqualTo( - String.join( - " ", - java, - jvmmem, - BootstrapTools.IGNORE_UNRECOGNIZED_VM_OPTIONS, - krb5, - logfile, - logback, - mainClass, - args, - redirects)); - - // log4j, with/out krb5 - assertThat( - BootstrapTools.getTaskManagerShellCommand( - cfg, - containeredParams, - "./conf", - "./logs", - false, - true, - false, - this.getClass(), - mainArgs)) - .isEqualTo( - String.join( - " ", - java, - jvmmem, - BootstrapTools.IGNORE_UNRECOGNIZED_VM_OPTIONS, - logfile, - log4j, - mainClass, - args, - redirects)); - - assertThat( - BootstrapTools.getTaskManagerShellCommand( - cfg, - containeredParams, - "./conf", - "./logs", - false, - true, - true, - this.getClass(), - mainArgs)) - .isEqualTo( - String.join( - " ", - java, - jvmmem, - BootstrapTools.IGNORE_UNRECOGNIZED_VM_OPTIONS, - krb5, - logfile, - log4j, - mainClass, - args, - redirects)); - - // logback + log4j, with/out krb5 - assertThat( - BootstrapTools.getTaskManagerShellCommand( - cfg, - containeredParams, - "./conf", - "./logs", - true, - true, - false, - this.getClass(), - mainArgs)) - .isEqualTo( - String.join( - " ", - java, - jvmmem, - BootstrapTools.IGNORE_UNRECOGNIZED_VM_OPTIONS, - logfile, - logback, - log4j, - mainClass, - args, - redirects)); - - assertThat( - BootstrapTools.getTaskManagerShellCommand( - cfg, - containeredParams, - "./conf", - "./logs", - true, - true, - true, - this.getClass(), - mainArgs)) - .isEqualTo( - String.join( - " ", - java, - jvmmem, - BootstrapTools.IGNORE_UNRECOGNIZED_VM_OPTIONS, - krb5, - logfile, - logback, - log4j, - mainClass, - args, - redirects)); - - // logback + log4j, with/out krb5, different JVM opts - cfg.setString(CoreOptions.FLINK_JVM_OPTIONS, jvmOpts); - assertThat( - BootstrapTools.getTaskManagerShellCommand( - cfg, - containeredParams, - "./conf", - "./logs", - true, - true, - false, - this.getClass(), - mainArgs)) - .isEqualTo( - String.join( - " ", - java, - jvmmem, - jvmOpts, - BootstrapTools.IGNORE_UNRECOGNIZED_VM_OPTIONS, - logfile, - logback, - log4j, - mainClass, - args, - redirects)); - - assertThat( - BootstrapTools.getTaskManagerShellCommand( - cfg, - containeredParams, - "./conf", - "./logs", - true, - true, - true, - this.getClass(), - mainArgs)) - .isEqualTo( - String.join( - " ", - java, - jvmmem, - jvmOpts, - BootstrapTools.IGNORE_UNRECOGNIZED_VM_OPTIONS, - krb5, - logfile, - logback, - log4j, - mainClass, - args, - redirects)); - - // logback + log4j, with/out krb5, different JVM opts - cfg.setString(CoreOptions.FLINK_TM_JVM_OPTIONS, tmJvmOpts); - assertThat( - BootstrapTools.getTaskManagerShellCommand( - cfg, - containeredParams, - "./conf", - "./logs", - true, - true, - false, - this.getClass(), - mainArgs)) - .isEqualTo( - String.join( - " ", - java, - jvmmem, - jvmOpts, - tmJvmOpts, - BootstrapTools.IGNORE_UNRECOGNIZED_VM_OPTIONS, - logfile, - logback, - log4j, - mainClass, - args, - redirects)); - - assertThat( - BootstrapTools.getTaskManagerShellCommand( - cfg, - containeredParams, - "./conf", - "./logs", - true, - true, - true, - this.getClass(), - mainArgs)) - .isEqualTo( - String.join( - " ", - java, - jvmmem, - jvmOpts, - tmJvmOpts, - BootstrapTools.IGNORE_UNRECOGNIZED_VM_OPTIONS, - krb5, - logfile, - logback, - log4j, - mainClass, - args, - redirects)); - - // now try some configurations with different yarn.container-start-command-template - - cfg.setString( - ConfigConstants.YARN_CONTAINER_START_COMMAND_TEMPLATE, - "%java% 1 %jvmmem% 2 %jvmopts% 3 %logging% 4 %class% 5 %args% 6 %redirects%"); - assertThat( - BootstrapTools.getTaskManagerShellCommand( - cfg, - containeredParams, - "./conf", - "./logs", - true, - true, - true, - this.getClass(), - mainArgs)) - .isEqualTo( - String.join( - " ", - java, - "1", - jvmmem, - "2", - jvmOpts, - tmJvmOpts, - BootstrapTools.IGNORE_UNRECOGNIZED_VM_OPTIONS, - krb5, - "3", - logfile, - logback, - log4j, - "4", - mainClass, - "5", - args, - "6", - redirects)); - - cfg.setString( - ConfigConstants.YARN_CONTAINER_START_COMMAND_TEMPLATE, - "%java% %logging% %jvmopts% %jvmmem% %class% %args% %redirects%"); - assertThat( - BootstrapTools.getTaskManagerShellCommand( - cfg, - containeredParams, - "./conf", - "./logs", - true, - true, - true, - this.getClass(), - mainArgs)) - .isEqualTo( - String.join( - " ", - java, - logfile, - logback, - log4j, - jvmOpts, - tmJvmOpts, - BootstrapTools.IGNORE_UNRECOGNIZED_VM_OPTIONS, - krb5, - jvmmem, - mainClass, - args, - redirects)); - } - @Test void testUpdateTmpDirectoriesInConfiguration() { Configuration config = new Configuration(); diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java index 6be3717998f..7a707cf65d9 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java @@ -21,9 +21,12 @@ package org.apache.flink.yarn; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.ConfigUtils; -import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.configuration.CoreOptions; import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; +import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec; +import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils; import org.apache.flink.runtime.util.HadoopUtils; +import org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils; import org.apache.flink.util.StringUtils; import org.apache.flink.util.function.FunctionWithException; import org.apache.flink.yarn.configuration.YarnConfigOptions; @@ -114,6 +117,9 @@ public final class Utils { static final String YARN_RM_INCREMENT_ALLOCATION_VCORES_LEGACY_KEY = "yarn.scheduler.increment-allocation-vcores"; + @VisibleForTesting + static final String IGNORE_UNRECOGNIZED_VM_OPTIONS = "-XX:+IgnoreUnrecognizedVMOptions"; + private static final int DEFAULT_YARN_RM_INCREMENT_ALLOCATION_VCORES = 1; public static void setupYarnClassPath(Configuration conf, Map<String, String> appMasterEnv) { @@ -386,7 +392,7 @@ public final class Utils { boolean hasLog4j = new File(workingDirectory, "log4j.properties").exists(); String launchCommand = - BootstrapTools.getTaskManagerShellCommand( + getTaskManagerShellCommand( flinkConfig, tmParams, ".", @@ -471,6 +477,123 @@ public final class Utils { return ctx; } + /** + * Generates the shell command to start a task manager. + * + * @param flinkConfig The Flink configuration. + * @param tmParams Parameters for the task manager. + * @param configDirectory The configuration directory for the flink-conf.yaml + * @param logDirectory The log directory. + * @param hasLogback Uses logback? + * @param hasLog4j Uses log4j? + * @param mainClass The main class to start with. + * @return A String containing the task manager startup command. + */ + public static String getTaskManagerShellCommand( + org.apache.flink.configuration.Configuration flinkConfig, + ContaineredTaskManagerParameters tmParams, + String configDirectory, + String logDirectory, + boolean hasLogback, + boolean hasLog4j, + boolean hasKrb5, + Class<?> mainClass, + String mainArgs) { + + final Map<String, String> startCommandValues = new HashMap<>(); + startCommandValues.put("java", "$JAVA_HOME/bin/java"); + + final TaskExecutorProcessSpec taskExecutorProcessSpec = + tmParams.getTaskExecutorProcessSpec(); + startCommandValues.put( + "jvmmem", ProcessMemoryUtils.generateJvmParametersStr(taskExecutorProcessSpec)); + + String javaOpts = flinkConfig.getString(CoreOptions.FLINK_JVM_OPTIONS); + if (flinkConfig.getString(CoreOptions.FLINK_TM_JVM_OPTIONS).length() > 0) { + javaOpts += " " + flinkConfig.getString(CoreOptions.FLINK_TM_JVM_OPTIONS); + } + javaOpts += " " + IGNORE_UNRECOGNIZED_VM_OPTIONS; + + // krb5.conf file will be available as local resource in JM/TM container + if (hasKrb5) { + javaOpts += " -Djava.security.krb5.conf=krb5.conf"; + } + startCommandValues.put("jvmopts", javaOpts); + + String logging = ""; + if (hasLogback || hasLog4j) { + logging = "-Dlog.file=" + logDirectory + "/taskmanager.log"; + if (hasLogback) { + logging += " -Dlogback.configurationFile=file:" + configDirectory + "/logback.xml"; + } + if (hasLog4j) { + logging += " -Dlog4j.configuration=file:" + configDirectory + "/log4j.properties"; + logging += + " -Dlog4j.configurationFile=file:" + configDirectory + "/log4j.properties"; + } + } + + startCommandValues.put("logging", logging); + startCommandValues.put("class", mainClass.getName()); + startCommandValues.put( + "redirects", + "1> " + + logDirectory + + "/taskmanager.out " + + "2> " + + logDirectory + + "/taskmanager.err"); + + String argsStr = + TaskExecutorProcessUtils.generateDynamicConfigsStr(taskExecutorProcessSpec) + + " --configDir " + + configDirectory; + if (!mainArgs.isEmpty()) { + argsStr += " " + mainArgs; + } + startCommandValues.put("args", argsStr); + + final String commandTemplate = + flinkConfig.getString( + ConfigConstants.YARN_CONTAINER_START_COMMAND_TEMPLATE, + ConfigConstants.DEFAULT_YARN_CONTAINER_START_COMMAND_TEMPLATE); + String startCommand = getStartCommand(commandTemplate, startCommandValues); + LOG.debug("TaskManager start command: " + startCommand); + + return startCommand; + } + + /** + * Replaces placeholders in the template start command with values from startCommandValues. + * + * <p>If the default template {@link + * ConfigConstants#DEFAULT_YARN_CONTAINER_START_COMMAND_TEMPLATE} is used, the following keys + * must be present in the map or the resulting command will still contain placeholders: + * + * <ul> + * <li><tt>java</tt> = path to the Java executable + * <li><tt>jvmmem</tt> = JVM memory limits and tweaks + * <li><tt>jvmopts</tt> = misc options for the Java VM + * <li><tt>logging</tt> = logging-related configuration settings + * <li><tt>class</tt> = main class to execute + * <li><tt>args</tt> = arguments for the main class + * <li><tt>redirects</tt> = output redirects + * </ul> + * + * @param template a template start command with placeholders + * @param startCommandValues a replacement map <tt>placeholder -> value</tt> + * @return the start command with placeholders filled in + */ + public static String getStartCommand(String template, Map<String, String> startCommandValues) { + for (Map.Entry<String, String> variable : startCommandValues.entrySet()) { + template = + template.replace("%" + variable.getKey() + "%", variable.getValue()) + .replace(" ", " ") + .trim(); + } + return template; + } + static boolean isRemotePath(String path) throws IOException { org.apache.flink.core.fs.Path flinkPath = new org.apache.flink.core.fs.Path(path); return flinkPath.getFileSystem().isDistributedFS(); diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java index 795567bc9f8..ca9cc90a599 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java @@ -143,6 +143,7 @@ import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.yarn.Utils.getPathFromLocalFile; import static org.apache.flink.yarn.Utils.getPathFromLocalFilePathStr; +import static org.apache.flink.yarn.Utils.getStartCommand; import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH; import static org.apache.flink.yarn.YarnConfigKeys.LOCAL_RESOURCE_DESCRIPTOR_SEPARATOR; @@ -1897,8 +1898,7 @@ public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> { flinkConfiguration.getString( ConfigConstants.YARN_CONTAINER_START_COMMAND_TEMPLATE, ConfigConstants.DEFAULT_YARN_CONTAINER_START_COMMAND_TEMPLATE); - final String amCommand = - BootstrapTools.getStartCommand(commandTemplate, startCommandValues); + final String amCommand = getStartCommand(commandTemplate, startCommandValues); amContainer.setCommands(Collections.singletonList(amCommand)); diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java index fd8aadd7b98..654b1b3bdde 100644 --- a/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java +++ b/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java @@ -18,8 +18,14 @@ package org.apache.flink.yarn; +import org.apache.flink.api.common.resources.CPUResource; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.configuration.MemorySize; +import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; +import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec; +import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils; import org.apache.flink.yarn.configuration.YarnConfigOptions; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; @@ -34,6 +40,7 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.stream.Stream; @@ -195,6 +202,432 @@ class UtilsTest { assertThat(yarnConfig.get(yarnPrefix + k3)).isNull(); } + @Test + void testGetTaskManagerShellCommand() { + final Configuration cfg = new Configuration(); + final TaskExecutorProcessSpec taskExecutorProcessSpec = + new TaskExecutorProcessSpec( + new CPUResource(1.0), + new MemorySize(0), // frameworkHeapSize + new MemorySize(0), // frameworkOffHeapSize + new MemorySize(111), // taskHeapSize + new MemorySize(0), // taskOffHeapSize + new MemorySize(222), // networkMemSize + new MemorySize(0), // managedMemorySize + new MemorySize(333), // jvmMetaspaceSize + new MemorySize(0), // jvmOverheadSize + Collections.emptyList()); + final ContaineredTaskManagerParameters containeredParams = + new ContaineredTaskManagerParameters(taskExecutorProcessSpec, new HashMap<>()); + + // no logging, with/out krb5 + final String java = "$JAVA_HOME/bin/java"; + final String jvmmem = + "-Xmx111 -Xms111 -XX:MaxDirectMemorySize=222 -XX:MaxMetaspaceSize=333"; + final String jvmOpts = "-Djvm"; // if set + final String tmJvmOpts = "-DtmJvm"; // if set + final String logfile = "-Dlog.file=./logs/taskmanager.log"; // if set + final String logback = "-Dlogback.configurationFile=file:./conf/logback.xml"; // if set + final String log4j = + "-Dlog4j.configuration=file:./conf/log4j.properties" + + " -Dlog4j.configurationFile=file:./conf/log4j.properties"; // if set + final String mainClass = "org.apache.flink.yarn.UtilsTest"; + final String dynamicConfigs = + TaskExecutorProcessUtils.generateDynamicConfigsStr(taskExecutorProcessSpec).trim(); + final String basicArgs = "--configDir ./conf"; + final String mainArgs = "-Djobmanager.rpc.address=host1 -Dkey.a=v1"; + final String args = dynamicConfigs + " " + basicArgs + " " + mainArgs; + final String redirects = "1> ./logs/taskmanager.out 2> ./logs/taskmanager.err"; + + assertThat( + Utils.getTaskManagerShellCommand( + cfg, + containeredParams, + "./conf", + "./logs", + false, + false, + false, + this.getClass(), + "")) + .isEqualTo( + String.join( + " ", + java, + jvmmem, + Utils.IGNORE_UNRECOGNIZED_VM_OPTIONS, + mainClass, + dynamicConfigs, + basicArgs, + redirects)); + + assertThat( + Utils.getTaskManagerShellCommand( + cfg, + containeredParams, + "./conf", + "./logs", + false, + false, + false, + this.getClass(), + mainArgs)) + .isEqualTo( + String.join( + " ", + java, + jvmmem, + Utils.IGNORE_UNRECOGNIZED_VM_OPTIONS, + mainClass, + args, + redirects)); + + final String krb5 = "-Djava.security.krb5.conf=krb5.conf"; + assertThat( + Utils.getTaskManagerShellCommand( + cfg, + containeredParams, + "./conf", + "./logs", + false, + false, + true, + this.getClass(), + mainArgs)) + .isEqualTo( + String.join( + " ", + java, + jvmmem, + Utils.IGNORE_UNRECOGNIZED_VM_OPTIONS, + krb5, + mainClass, + args, + redirects)); + + // logback only, with/out krb5 + assertThat( + Utils.getTaskManagerShellCommand( + cfg, + containeredParams, + "./conf", + "./logs", + true, + false, + false, + this.getClass(), + mainArgs)) + .isEqualTo( + String.join( + " ", + java, + jvmmem, + Utils.IGNORE_UNRECOGNIZED_VM_OPTIONS, + logfile, + logback, + mainClass, + args, + redirects)); + + assertThat( + Utils.getTaskManagerShellCommand( + cfg, + containeredParams, + "./conf", + "./logs", + true, + false, + true, + this.getClass(), + mainArgs)) + .isEqualTo( + String.join( + " ", + java, + jvmmem, + Utils.IGNORE_UNRECOGNIZED_VM_OPTIONS, + krb5, + logfile, + logback, + mainClass, + args, + redirects)); + + // log4j, with/out krb5 + assertThat( + Utils.getTaskManagerShellCommand( + cfg, + containeredParams, + "./conf", + "./logs", + false, + true, + false, + this.getClass(), + mainArgs)) + .isEqualTo( + String.join( + " ", + java, + jvmmem, + Utils.IGNORE_UNRECOGNIZED_VM_OPTIONS, + logfile, + log4j, + mainClass, + args, + redirects)); + + assertThat( + Utils.getTaskManagerShellCommand( + cfg, + containeredParams, + "./conf", + "./logs", + false, + true, + true, + this.getClass(), + mainArgs)) + .isEqualTo( + String.join( + " ", + java, + jvmmem, + Utils.IGNORE_UNRECOGNIZED_VM_OPTIONS, + krb5, + logfile, + log4j, + mainClass, + args, + redirects)); + + // logback + log4j, with/out krb5 + assertThat( + Utils.getTaskManagerShellCommand( + cfg, + containeredParams, + "./conf", + "./logs", + true, + true, + false, + this.getClass(), + mainArgs)) + .isEqualTo( + String.join( + " ", + java, + jvmmem, + Utils.IGNORE_UNRECOGNIZED_VM_OPTIONS, + logfile, + logback, + log4j, + mainClass, + args, + redirects)); + + assertThat( + Utils.getTaskManagerShellCommand( + cfg, + containeredParams, + "./conf", + "./logs", + true, + true, + true, + this.getClass(), + mainArgs)) + .isEqualTo( + String.join( + " ", + java, + jvmmem, + Utils.IGNORE_UNRECOGNIZED_VM_OPTIONS, + krb5, + logfile, + logback, + log4j, + mainClass, + args, + redirects)); + + // logback + log4j, with/out krb5, different JVM opts + cfg.setString(CoreOptions.FLINK_JVM_OPTIONS, jvmOpts); + assertThat( + Utils.getTaskManagerShellCommand( + cfg, + containeredParams, + "./conf", + "./logs", + true, + true, + false, + this.getClass(), + mainArgs)) + .isEqualTo( + String.join( + " ", + java, + jvmmem, + jvmOpts, + Utils.IGNORE_UNRECOGNIZED_VM_OPTIONS, + logfile, + logback, + log4j, + mainClass, + args, + redirects)); + + assertThat( + Utils.getTaskManagerShellCommand( + cfg, + containeredParams, + "./conf", + "./logs", + true, + true, + true, + this.getClass(), + mainArgs)) + .isEqualTo( + String.join( + " ", + java, + jvmmem, + jvmOpts, + Utils.IGNORE_UNRECOGNIZED_VM_OPTIONS, + krb5, + logfile, + logback, + log4j, + mainClass, + args, + redirects)); + + // logback + log4j, with/out krb5, different JVM opts + cfg.setString(CoreOptions.FLINK_TM_JVM_OPTIONS, tmJvmOpts); + assertThat( + Utils.getTaskManagerShellCommand( + cfg, + containeredParams, + "./conf", + "./logs", + true, + true, + false, + this.getClass(), + mainArgs)) + .isEqualTo( + String.join( + " ", + java, + jvmmem, + jvmOpts, + tmJvmOpts, + Utils.IGNORE_UNRECOGNIZED_VM_OPTIONS, + logfile, + logback, + log4j, + mainClass, + args, + redirects)); + + assertThat( + Utils.getTaskManagerShellCommand( + cfg, + containeredParams, + "./conf", + "./logs", + true, + true, + true, + this.getClass(), + mainArgs)) + .isEqualTo( + String.join( + " ", + java, + jvmmem, + jvmOpts, + tmJvmOpts, + Utils.IGNORE_UNRECOGNIZED_VM_OPTIONS, + krb5, + logfile, + logback, + log4j, + mainClass, + args, + redirects)); + + // now try some configurations with different yarn.container-start-command-template + + cfg.setString( + ConfigConstants.YARN_CONTAINER_START_COMMAND_TEMPLATE, + "%java% 1 %jvmmem% 2 %jvmopts% 3 %logging% 4 %class% 5 %args% 6 %redirects%"); + assertThat( + Utils.getTaskManagerShellCommand( + cfg, + containeredParams, + "./conf", + "./logs", + true, + true, + true, + this.getClass(), + mainArgs)) + .isEqualTo( + String.join( + " ", + java, + "1", + jvmmem, + "2", + jvmOpts, + tmJvmOpts, + Utils.IGNORE_UNRECOGNIZED_VM_OPTIONS, + krb5, + "3", + logfile, + logback, + log4j, + "4", + mainClass, + "5", + args, + "6", + redirects)); + + cfg.setString( + ConfigConstants.YARN_CONTAINER_START_COMMAND_TEMPLATE, + "%java% %logging% %jvmopts% %jvmmem% %class% %args% %redirects%"); + assertThat( + Utils.getTaskManagerShellCommand( + cfg, + containeredParams, + "./conf", + "./logs", + true, + true, + true, + this.getClass(), + mainArgs)) + .isEqualTo( + String.join( + " ", + java, + logfile, + logback, + log4j, + jvmOpts, + tmJvmOpts, + Utils.IGNORE_UNRECOGNIZED_VM_OPTIONS, + krb5, + jvmmem, + mainClass, + args, + redirects)); + } + private static void verifyUnitResourceVariousSchedulers( YarnConfiguration yarnConfig, int minMem, int minVcore, int incMem, int incVcore) { yarnConfig.set(YarnConfiguration.RM_SCHEDULER, Utils.YARN_RM_FAIR_SCHEDULER_CLAZZ);