This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 752d3a79a918b9300dd2b89e96f3915ba6a2dfa6
Author: sxnan <suxuanna...@gmail.com>
AuthorDate: Tue Jan 9 17:34:23 2024 +0800

    [FLINK-34083][yarn][refactor] Move 
BootstrapTools#getTaskManagerShellCommand to flink-yarn Utils
---
 .../runtime/clusterframework/BootstrapTools.java   | 120 ------
 .../clusterframework/BootstrapToolsTest.java       | 428 --------------------
 .../src/main/java/org/apache/flink/yarn/Utils.java | 127 +++++-
 .../apache/flink/yarn/YarnClusterDescriptor.java   |   4 +-
 .../test/java/org/apache/flink/yarn/UtilsTest.java | 433 +++++++++++++++++++++
 5 files changed, 560 insertions(+), 552 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
index 7d944a1af8c..100686e23bf 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
@@ -19,13 +19,11 @@
 package org.apache.flink.runtime.clusterframework;
 
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.runtime.entrypoint.parser.CommandLineOptions;
-import org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils;
 import org.apache.flink.util.OperatingSystem;
 
 import org.apache.flink.shaded.guava32.com.google.common.escape.Escaper;
@@ -42,7 +40,6 @@ import java.io.File;
 import java.io.FileWriter;
 import java.io.IOException;
 import java.io.PrintWriter;
-import java.util.HashMap;
 import java.util.Map;
 import java.util.stream.Stream;
 
@@ -160,128 +157,11 @@ public class BootstrapTools {
         return config;
     }
 
-    /**
-     * Generates the shell command to start a task manager.
-     *
-     * @param flinkConfig The Flink configuration.
-     * @param tmParams Parameters for the task manager.
-     * @param configDirectory The configuration directory for the 
flink-conf.yaml
-     * @param logDirectory The log directory.
-     * @param hasLogback Uses logback?
-     * @param hasLog4j Uses log4j?
-     * @param mainClass The main class to start with.
-     * @return A String containing the task manager startup command.
-     */
-    public static String getTaskManagerShellCommand(
-            Configuration flinkConfig,
-            ContaineredTaskManagerParameters tmParams,
-            String configDirectory,
-            String logDirectory,
-            boolean hasLogback,
-            boolean hasLog4j,
-            boolean hasKrb5,
-            Class<?> mainClass,
-            String mainArgs) {
-
-        final Map<String, String> startCommandValues = new HashMap<>();
-        startCommandValues.put("java", "$JAVA_HOME/bin/java");
-
-        final TaskExecutorProcessSpec taskExecutorProcessSpec =
-                tmParams.getTaskExecutorProcessSpec();
-        startCommandValues.put(
-                "jvmmem", 
ProcessMemoryUtils.generateJvmParametersStr(taskExecutorProcessSpec));
-
-        String javaOpts = flinkConfig.getString(CoreOptions.FLINK_JVM_OPTIONS);
-        if (flinkConfig.getString(CoreOptions.FLINK_TM_JVM_OPTIONS).length() > 
0) {
-            javaOpts += " " + 
flinkConfig.getString(CoreOptions.FLINK_TM_JVM_OPTIONS);
-        }
-        javaOpts += " " + IGNORE_UNRECOGNIZED_VM_OPTIONS;
-
-        // krb5.conf file will be available as local resource in JM/TM 
container
-        if (hasKrb5) {
-            javaOpts += " -Djava.security.krb5.conf=krb5.conf";
-        }
-        startCommandValues.put("jvmopts", javaOpts);
-
-        String logging = "";
-        if (hasLogback || hasLog4j) {
-            logging = "-Dlog.file=" + logDirectory + "/taskmanager.log";
-            if (hasLogback) {
-                logging += " -Dlogback.configurationFile=file:" + 
configDirectory + "/logback.xml";
-            }
-            if (hasLog4j) {
-                logging += " -Dlog4j.configuration=file:" + configDirectory + 
"/log4j.properties";
-                logging +=
-                        " -Dlog4j.configurationFile=file:" + configDirectory + 
"/log4j.properties";
-            }
-        }
-
-        startCommandValues.put("logging", logging);
-        startCommandValues.put("class", mainClass.getName());
-        startCommandValues.put(
-                "redirects",
-                "1> "
-                        + logDirectory
-                        + "/taskmanager.out "
-                        + "2> "
-                        + logDirectory
-                        + "/taskmanager.err");
-
-        String argsStr =
-                
TaskExecutorProcessUtils.generateDynamicConfigsStr(taskExecutorProcessSpec)
-                        + " --configDir "
-                        + configDirectory;
-        if (!mainArgs.isEmpty()) {
-            argsStr += " " + mainArgs;
-        }
-        startCommandValues.put("args", argsStr);
-
-        final String commandTemplate =
-                flinkConfig.getString(
-                        ConfigConstants.YARN_CONTAINER_START_COMMAND_TEMPLATE,
-                        
ConfigConstants.DEFAULT_YARN_CONTAINER_START_COMMAND_TEMPLATE);
-        String startCommand = getStartCommand(commandTemplate, 
startCommandValues);
-        LOG.debug("TaskManager start command: " + startCommand);
-
-        return startCommand;
-    }
-
     // ------------------------------------------------------------------------
 
     /** Private constructor to prevent instantiation. */
     private BootstrapTools() {}
 
-    /**
-     * Replaces placeholders in the template start command with values from 
startCommandValues.
-     *
-     * <p>If the default template {@link
-     * ConfigConstants#DEFAULT_YARN_CONTAINER_START_COMMAND_TEMPLATE} is used, 
the following keys
-     * must be present in the map or the resulting command will still contain 
placeholders:
-     *
-     * <ul>
-     *   <li><tt>java</tt> = path to the Java executable
-     *   <li><tt>jvmmem</tt> = JVM memory limits and tweaks
-     *   <li><tt>jvmopts</tt> = misc options for the Java VM
-     *   <li><tt>logging</tt> = logging-related configuration settings
-     *   <li><tt>class</tt> = main class to execute
-     *   <li><tt>args</tt> = arguments for the main class
-     *   <li><tt>redirects</tt> = output redirects
-     * </ul>
-     *
-     * @param template a template start command with placeholders
-     * @param startCommandValues a replacement map <tt>placeholder -&gt; 
value</tt>
-     * @return the start command with placeholders filled in
-     */
-    public static String getStartCommand(String template, Map<String, String> 
startCommandValues) {
-        for (Map.Entry<String, String> variable : 
startCommandValues.entrySet()) {
-            template =
-                    template.replace("%" + variable.getKey() + "%", 
variable.getValue())
-                            .replace("  ", " ")
-                            .trim();
-        }
-        return template;
-    }
-
     /**
      * Set temporary configuration directories if necessary.
      *
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
index faff5f4d6d6..687a9dbcc97 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.runtime.clusterframework;
 
-import org.apache.flink.api.common.resources.CPUResource;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.Configuration;
@@ -129,432 +127,6 @@ class BootstrapToolsTest {
         }
     }
 
-    @Test
-    void testGetTaskManagerShellCommand() {
-        final Configuration cfg = new Configuration();
-        final TaskExecutorProcessSpec taskExecutorProcessSpec =
-                new TaskExecutorProcessSpec(
-                        new CPUResource(1.0),
-                        new MemorySize(0), // frameworkHeapSize
-                        new MemorySize(0), // frameworkOffHeapSize
-                        new MemorySize(111), // taskHeapSize
-                        new MemorySize(0), // taskOffHeapSize
-                        new MemorySize(222), // networkMemSize
-                        new MemorySize(0), // managedMemorySize
-                        new MemorySize(333), // jvmMetaspaceSize
-                        new MemorySize(0), // jvmOverheadSize
-                        Collections.emptyList());
-        final ContaineredTaskManagerParameters containeredParams =
-                new ContaineredTaskManagerParameters(taskExecutorProcessSpec, 
new HashMap<>());
-
-        // no logging, with/out krb5
-        final String java = "$JAVA_HOME/bin/java";
-        final String jvmmem =
-                "-Xmx111 -Xms111 -XX:MaxDirectMemorySize=222 
-XX:MaxMetaspaceSize=333";
-        final String jvmOpts = "-Djvm"; // if set
-        final String tmJvmOpts = "-DtmJvm"; // if set
-        final String logfile = "-Dlog.file=./logs/taskmanager.log"; // if set
-        final String logback = 
"-Dlogback.configurationFile=file:./conf/logback.xml"; // if set
-        final String log4j =
-                "-Dlog4j.configuration=file:./conf/log4j.properties"
-                        + " 
-Dlog4j.configurationFile=file:./conf/log4j.properties"; // if set
-        final String mainClass = 
"org.apache.flink.runtime.clusterframework.BootstrapToolsTest";
-        final String dynamicConfigs =
-                
TaskExecutorProcessUtils.generateDynamicConfigsStr(taskExecutorProcessSpec).trim();
-        final String basicArgs = "--configDir ./conf";
-        final String mainArgs = "-Djobmanager.rpc.address=host1 -Dkey.a=v1";
-        final String args = dynamicConfigs + " " + basicArgs + " " + mainArgs;
-        final String redirects = "1> ./logs/taskmanager.out 2> 
./logs/taskmanager.err";
-
-        assertThat(
-                        BootstrapTools.getTaskManagerShellCommand(
-                                cfg,
-                                containeredParams,
-                                "./conf",
-                                "./logs",
-                                false,
-                                false,
-                                false,
-                                this.getClass(),
-                                ""))
-                .isEqualTo(
-                        String.join(
-                                " ",
-                                java,
-                                jvmmem,
-                                BootstrapTools.IGNORE_UNRECOGNIZED_VM_OPTIONS,
-                                mainClass,
-                                dynamicConfigs,
-                                basicArgs,
-                                redirects));
-
-        assertThat(
-                        BootstrapTools.getTaskManagerShellCommand(
-                                cfg,
-                                containeredParams,
-                                "./conf",
-                                "./logs",
-                                false,
-                                false,
-                                false,
-                                this.getClass(),
-                                mainArgs))
-                .isEqualTo(
-                        String.join(
-                                " ",
-                                java,
-                                jvmmem,
-                                BootstrapTools.IGNORE_UNRECOGNIZED_VM_OPTIONS,
-                                mainClass,
-                                args,
-                                redirects));
-
-        final String krb5 = "-Djava.security.krb5.conf=krb5.conf";
-        assertThat(
-                        BootstrapTools.getTaskManagerShellCommand(
-                                cfg,
-                                containeredParams,
-                                "./conf",
-                                "./logs",
-                                false,
-                                false,
-                                true,
-                                this.getClass(),
-                                mainArgs))
-                .isEqualTo(
-                        String.join(
-                                " ",
-                                java,
-                                jvmmem,
-                                BootstrapTools.IGNORE_UNRECOGNIZED_VM_OPTIONS,
-                                krb5,
-                                mainClass,
-                                args,
-                                redirects));
-
-        // logback only, with/out krb5
-        assertThat(
-                        BootstrapTools.getTaskManagerShellCommand(
-                                cfg,
-                                containeredParams,
-                                "./conf",
-                                "./logs",
-                                true,
-                                false,
-                                false,
-                                this.getClass(),
-                                mainArgs))
-                .isEqualTo(
-                        String.join(
-                                " ",
-                                java,
-                                jvmmem,
-                                BootstrapTools.IGNORE_UNRECOGNIZED_VM_OPTIONS,
-                                logfile,
-                                logback,
-                                mainClass,
-                                args,
-                                redirects));
-
-        assertThat(
-                        BootstrapTools.getTaskManagerShellCommand(
-                                cfg,
-                                containeredParams,
-                                "./conf",
-                                "./logs",
-                                true,
-                                false,
-                                true,
-                                this.getClass(),
-                                mainArgs))
-                .isEqualTo(
-                        String.join(
-                                " ",
-                                java,
-                                jvmmem,
-                                BootstrapTools.IGNORE_UNRECOGNIZED_VM_OPTIONS,
-                                krb5,
-                                logfile,
-                                logback,
-                                mainClass,
-                                args,
-                                redirects));
-
-        // log4j, with/out krb5
-        assertThat(
-                        BootstrapTools.getTaskManagerShellCommand(
-                                cfg,
-                                containeredParams,
-                                "./conf",
-                                "./logs",
-                                false,
-                                true,
-                                false,
-                                this.getClass(),
-                                mainArgs))
-                .isEqualTo(
-                        String.join(
-                                " ",
-                                java,
-                                jvmmem,
-                                BootstrapTools.IGNORE_UNRECOGNIZED_VM_OPTIONS,
-                                logfile,
-                                log4j,
-                                mainClass,
-                                args,
-                                redirects));
-
-        assertThat(
-                        BootstrapTools.getTaskManagerShellCommand(
-                                cfg,
-                                containeredParams,
-                                "./conf",
-                                "./logs",
-                                false,
-                                true,
-                                true,
-                                this.getClass(),
-                                mainArgs))
-                .isEqualTo(
-                        String.join(
-                                " ",
-                                java,
-                                jvmmem,
-                                BootstrapTools.IGNORE_UNRECOGNIZED_VM_OPTIONS,
-                                krb5,
-                                logfile,
-                                log4j,
-                                mainClass,
-                                args,
-                                redirects));
-
-        // logback + log4j, with/out krb5
-        assertThat(
-                        BootstrapTools.getTaskManagerShellCommand(
-                                cfg,
-                                containeredParams,
-                                "./conf",
-                                "./logs",
-                                true,
-                                true,
-                                false,
-                                this.getClass(),
-                                mainArgs))
-                .isEqualTo(
-                        String.join(
-                                " ",
-                                java,
-                                jvmmem,
-                                BootstrapTools.IGNORE_UNRECOGNIZED_VM_OPTIONS,
-                                logfile,
-                                logback,
-                                log4j,
-                                mainClass,
-                                args,
-                                redirects));
-
-        assertThat(
-                        BootstrapTools.getTaskManagerShellCommand(
-                                cfg,
-                                containeredParams,
-                                "./conf",
-                                "./logs",
-                                true,
-                                true,
-                                true,
-                                this.getClass(),
-                                mainArgs))
-                .isEqualTo(
-                        String.join(
-                                " ",
-                                java,
-                                jvmmem,
-                                BootstrapTools.IGNORE_UNRECOGNIZED_VM_OPTIONS,
-                                krb5,
-                                logfile,
-                                logback,
-                                log4j,
-                                mainClass,
-                                args,
-                                redirects));
-
-        // logback + log4j, with/out krb5, different JVM opts
-        cfg.setString(CoreOptions.FLINK_JVM_OPTIONS, jvmOpts);
-        assertThat(
-                        BootstrapTools.getTaskManagerShellCommand(
-                                cfg,
-                                containeredParams,
-                                "./conf",
-                                "./logs",
-                                true,
-                                true,
-                                false,
-                                this.getClass(),
-                                mainArgs))
-                .isEqualTo(
-                        String.join(
-                                " ",
-                                java,
-                                jvmmem,
-                                jvmOpts,
-                                BootstrapTools.IGNORE_UNRECOGNIZED_VM_OPTIONS,
-                                logfile,
-                                logback,
-                                log4j,
-                                mainClass,
-                                args,
-                                redirects));
-
-        assertThat(
-                        BootstrapTools.getTaskManagerShellCommand(
-                                cfg,
-                                containeredParams,
-                                "./conf",
-                                "./logs",
-                                true,
-                                true,
-                                true,
-                                this.getClass(),
-                                mainArgs))
-                .isEqualTo(
-                        String.join(
-                                " ",
-                                java,
-                                jvmmem,
-                                jvmOpts,
-                                BootstrapTools.IGNORE_UNRECOGNIZED_VM_OPTIONS,
-                                krb5,
-                                logfile,
-                                logback,
-                                log4j,
-                                mainClass,
-                                args,
-                                redirects));
-
-        // logback + log4j, with/out krb5, different JVM opts
-        cfg.setString(CoreOptions.FLINK_TM_JVM_OPTIONS, tmJvmOpts);
-        assertThat(
-                        BootstrapTools.getTaskManagerShellCommand(
-                                cfg,
-                                containeredParams,
-                                "./conf",
-                                "./logs",
-                                true,
-                                true,
-                                false,
-                                this.getClass(),
-                                mainArgs))
-                .isEqualTo(
-                        String.join(
-                                " ",
-                                java,
-                                jvmmem,
-                                jvmOpts,
-                                tmJvmOpts,
-                                BootstrapTools.IGNORE_UNRECOGNIZED_VM_OPTIONS,
-                                logfile,
-                                logback,
-                                log4j,
-                                mainClass,
-                                args,
-                                redirects));
-
-        assertThat(
-                        BootstrapTools.getTaskManagerShellCommand(
-                                cfg,
-                                containeredParams,
-                                "./conf",
-                                "./logs",
-                                true,
-                                true,
-                                true,
-                                this.getClass(),
-                                mainArgs))
-                .isEqualTo(
-                        String.join(
-                                " ",
-                                java,
-                                jvmmem,
-                                jvmOpts,
-                                tmJvmOpts,
-                                BootstrapTools.IGNORE_UNRECOGNIZED_VM_OPTIONS,
-                                krb5,
-                                logfile,
-                                logback,
-                                log4j,
-                                mainClass,
-                                args,
-                                redirects));
-
-        // now try some configurations with different 
yarn.container-start-command-template
-
-        cfg.setString(
-                ConfigConstants.YARN_CONTAINER_START_COMMAND_TEMPLATE,
-                "%java% 1 %jvmmem% 2 %jvmopts% 3 %logging% 4 %class% 5 %args% 
6 %redirects%");
-        assertThat(
-                        BootstrapTools.getTaskManagerShellCommand(
-                                cfg,
-                                containeredParams,
-                                "./conf",
-                                "./logs",
-                                true,
-                                true,
-                                true,
-                                this.getClass(),
-                                mainArgs))
-                .isEqualTo(
-                        String.join(
-                                " ",
-                                java,
-                                "1",
-                                jvmmem,
-                                "2",
-                                jvmOpts,
-                                tmJvmOpts,
-                                BootstrapTools.IGNORE_UNRECOGNIZED_VM_OPTIONS,
-                                krb5,
-                                "3",
-                                logfile,
-                                logback,
-                                log4j,
-                                "4",
-                                mainClass,
-                                "5",
-                                args,
-                                "6",
-                                redirects));
-
-        cfg.setString(
-                ConfigConstants.YARN_CONTAINER_START_COMMAND_TEMPLATE,
-                "%java% %logging% %jvmopts% %jvmmem% %class% %args% 
%redirects%");
-        assertThat(
-                        BootstrapTools.getTaskManagerShellCommand(
-                                cfg,
-                                containeredParams,
-                                "./conf",
-                                "./logs",
-                                true,
-                                true,
-                                true,
-                                this.getClass(),
-                                mainArgs))
-                .isEqualTo(
-                        String.join(
-                                " ",
-                                java,
-                                logfile,
-                                logback,
-                                log4j,
-                                jvmOpts,
-                                tmJvmOpts,
-                                BootstrapTools.IGNORE_UNRECOGNIZED_VM_OPTIONS,
-                                krb5,
-                                jvmmem,
-                                mainClass,
-                                args,
-                                redirects));
-    }
-
     @Test
     void testUpdateTmpDirectoriesInConfiguration() {
         Configuration config = new Configuration();
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
index 6be3717998f..7a707cf65d9 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
@@ -21,9 +21,12 @@ package org.apache.flink.yarn;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.ConfigUtils;
-import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import org.apache.flink.configuration.CoreOptions;
 import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
+import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils;
 import org.apache.flink.runtime.util.HadoopUtils;
+import org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils;
 import org.apache.flink.util.StringUtils;
 import org.apache.flink.util.function.FunctionWithException;
 import org.apache.flink.yarn.configuration.YarnConfigOptions;
@@ -114,6 +117,9 @@ public final class Utils {
     static final String YARN_RM_INCREMENT_ALLOCATION_VCORES_LEGACY_KEY =
             "yarn.scheduler.increment-allocation-vcores";
 
+    @VisibleForTesting
+    static final String IGNORE_UNRECOGNIZED_VM_OPTIONS = 
"-XX:+IgnoreUnrecognizedVMOptions";
+
     private static final int DEFAULT_YARN_RM_INCREMENT_ALLOCATION_VCORES = 1;
 
     public static void setupYarnClassPath(Configuration conf, Map<String, 
String> appMasterEnv) {
@@ -386,7 +392,7 @@ public final class Utils {
         boolean hasLog4j = new File(workingDirectory, 
"log4j.properties").exists();
 
         String launchCommand =
-                BootstrapTools.getTaskManagerShellCommand(
+                getTaskManagerShellCommand(
                         flinkConfig,
                         tmParams,
                         ".",
@@ -471,6 +477,123 @@ public final class Utils {
         return ctx;
     }
 
+    /**
+     * Generates the shell command to start a task manager.
+     *
+     * @param flinkConfig The Flink configuration.
+     * @param tmParams Parameters for the task manager.
+     * @param configDirectory The configuration directory for the 
flink-conf.yaml
+     * @param logDirectory The log directory.
+     * @param hasLogback Uses logback?
+     * @param hasLog4j Uses log4j?
+     * @param mainClass The main class to start with.
+     * @return A String containing the task manager startup command.
+     */
+    public static String getTaskManagerShellCommand(
+            org.apache.flink.configuration.Configuration flinkConfig,
+            ContaineredTaskManagerParameters tmParams,
+            String configDirectory,
+            String logDirectory,
+            boolean hasLogback,
+            boolean hasLog4j,
+            boolean hasKrb5,
+            Class<?> mainClass,
+            String mainArgs) {
+
+        final Map<String, String> startCommandValues = new HashMap<>();
+        startCommandValues.put("java", "$JAVA_HOME/bin/java");
+
+        final TaskExecutorProcessSpec taskExecutorProcessSpec =
+                tmParams.getTaskExecutorProcessSpec();
+        startCommandValues.put(
+                "jvmmem", 
ProcessMemoryUtils.generateJvmParametersStr(taskExecutorProcessSpec));
+
+        String javaOpts = flinkConfig.getString(CoreOptions.FLINK_JVM_OPTIONS);
+        if (flinkConfig.getString(CoreOptions.FLINK_TM_JVM_OPTIONS).length() > 
0) {
+            javaOpts += " " + 
flinkConfig.getString(CoreOptions.FLINK_TM_JVM_OPTIONS);
+        }
+        javaOpts += " " + IGNORE_UNRECOGNIZED_VM_OPTIONS;
+
+        // krb5.conf file will be available as local resource in JM/TM 
container
+        if (hasKrb5) {
+            javaOpts += " -Djava.security.krb5.conf=krb5.conf";
+        }
+        startCommandValues.put("jvmopts", javaOpts);
+
+        String logging = "";
+        if (hasLogback || hasLog4j) {
+            logging = "-Dlog.file=" + logDirectory + "/taskmanager.log";
+            if (hasLogback) {
+                logging += " -Dlogback.configurationFile=file:" + 
configDirectory + "/logback.xml";
+            }
+            if (hasLog4j) {
+                logging += " -Dlog4j.configuration=file:" + configDirectory + 
"/log4j.properties";
+                logging +=
+                        " -Dlog4j.configurationFile=file:" + configDirectory + 
"/log4j.properties";
+            }
+        }
+
+        startCommandValues.put("logging", logging);
+        startCommandValues.put("class", mainClass.getName());
+        startCommandValues.put(
+                "redirects",
+                "1> "
+                        + logDirectory
+                        + "/taskmanager.out "
+                        + "2> "
+                        + logDirectory
+                        + "/taskmanager.err");
+
+        String argsStr =
+                
TaskExecutorProcessUtils.generateDynamicConfigsStr(taskExecutorProcessSpec)
+                        + " --configDir "
+                        + configDirectory;
+        if (!mainArgs.isEmpty()) {
+            argsStr += " " + mainArgs;
+        }
+        startCommandValues.put("args", argsStr);
+
+        final String commandTemplate =
+                flinkConfig.getString(
+                        ConfigConstants.YARN_CONTAINER_START_COMMAND_TEMPLATE,
+                        
ConfigConstants.DEFAULT_YARN_CONTAINER_START_COMMAND_TEMPLATE);
+        String startCommand = getStartCommand(commandTemplate, 
startCommandValues);
+        LOG.debug("TaskManager start command: " + startCommand);
+
+        return startCommand;
+    }
+
+    /**
+     * Replaces placeholders in the template start command with values from 
startCommandValues.
+     *
+     * <p>If the default template {@link
+     * ConfigConstants#DEFAULT_YARN_CONTAINER_START_COMMAND_TEMPLATE} is used, 
the following keys
+     * must be present in the map or the resulting command will still contain 
placeholders:
+     *
+     * <ul>
+     *   <li><tt>java</tt> = path to the Java executable
+     *   <li><tt>jvmmem</tt> = JVM memory limits and tweaks
+     *   <li><tt>jvmopts</tt> = misc options for the Java VM
+     *   <li><tt>logging</tt> = logging-related configuration settings
+     *   <li><tt>class</tt> = main class to execute
+     *   <li><tt>args</tt> = arguments for the main class
+     *   <li><tt>redirects</tt> = output redirects
+     * </ul>
+     *
+     * @param template a template start command with placeholders
+     * @param startCommandValues a replacement map <tt>placeholder -&gt; 
value</tt>
+     * @return the start command with placeholders filled in
+     */
+    public static String getStartCommand(String template, Map<String, String> 
startCommandValues) {
+        for (Map.Entry<String, String> variable : 
startCommandValues.entrySet()) {
+            template =
+                    template.replace("%" + variable.getKey() + "%", 
variable.getValue())
+                            .replace("  ", " ")
+                            .trim();
+        }
+        return template;
+    }
+
     static boolean isRemotePath(String path) throws IOException {
         org.apache.flink.core.fs.Path flinkPath = new 
org.apache.flink.core.fs.Path(path);
         return flinkPath.getFileSystem().isDistributedFS();
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
index 795567bc9f8..ca9cc90a599 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
@@ -143,6 +143,7 @@ import static 
org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.yarn.Utils.getPathFromLocalFile;
 import static org.apache.flink.yarn.Utils.getPathFromLocalFilePathStr;
+import static org.apache.flink.yarn.Utils.getStartCommand;
 import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH;
 import static 
org.apache.flink.yarn.YarnConfigKeys.LOCAL_RESOURCE_DESCRIPTOR_SEPARATOR;
 
@@ -1897,8 +1898,7 @@ public class YarnClusterDescriptor implements 
ClusterDescriptor<ApplicationId> {
                 flinkConfiguration.getString(
                         ConfigConstants.YARN_CONTAINER_START_COMMAND_TEMPLATE,
                         
ConfigConstants.DEFAULT_YARN_CONTAINER_START_COMMAND_TEMPLATE);
-        final String amCommand =
-                BootstrapTools.getStartCommand(commandTemplate, 
startCommandValues);
+        final String amCommand = getStartCommand(commandTemplate, 
startCommandValues);
 
         amContainer.setCommands(Collections.singletonList(amCommand));
 
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java 
b/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
index fd8aadd7b98..654b1b3bdde 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
@@ -18,8 +18,14 @@
 
 package org.apache.flink.yarn;
 
+import org.apache.flink.api.common.resources.CPUResource;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.MemorySize;
+import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
+import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils;
 import org.apache.flink.yarn.configuration.YarnConfigOptions;
 
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
@@ -34,6 +40,7 @@ import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.stream.Stream;
 
@@ -195,6 +202,432 @@ class UtilsTest {
         assertThat(yarnConfig.get(yarnPrefix + k3)).isNull();
     }
 
+    @Test
+    void testGetTaskManagerShellCommand() {
+        final Configuration cfg = new Configuration();
+        final TaskExecutorProcessSpec taskExecutorProcessSpec =
+                new TaskExecutorProcessSpec(
+                        new CPUResource(1.0),
+                        new MemorySize(0), // frameworkHeapSize
+                        new MemorySize(0), // frameworkOffHeapSize
+                        new MemorySize(111), // taskHeapSize
+                        new MemorySize(0), // taskOffHeapSize
+                        new MemorySize(222), // networkMemSize
+                        new MemorySize(0), // managedMemorySize
+                        new MemorySize(333), // jvmMetaspaceSize
+                        new MemorySize(0), // jvmOverheadSize
+                        Collections.emptyList());
+        final ContaineredTaskManagerParameters containeredParams =
+                new ContaineredTaskManagerParameters(taskExecutorProcessSpec, 
new HashMap<>());
+
+        // no logging, with/out krb5
+        final String java = "$JAVA_HOME/bin/java";
+        final String jvmmem =
+                "-Xmx111 -Xms111 -XX:MaxDirectMemorySize=222 
-XX:MaxMetaspaceSize=333";
+        final String jvmOpts = "-Djvm"; // if set
+        final String tmJvmOpts = "-DtmJvm"; // if set
+        final String logfile = "-Dlog.file=./logs/taskmanager.log"; // if set
+        final String logback = 
"-Dlogback.configurationFile=file:./conf/logback.xml"; // if set
+        final String log4j =
+                "-Dlog4j.configuration=file:./conf/log4j.properties"
+                        + " 
-Dlog4j.configurationFile=file:./conf/log4j.properties"; // if set
+        final String mainClass = "org.apache.flink.yarn.UtilsTest";
+        final String dynamicConfigs =
+                
TaskExecutorProcessUtils.generateDynamicConfigsStr(taskExecutorProcessSpec).trim();
+        final String basicArgs = "--configDir ./conf";
+        final String mainArgs = "-Djobmanager.rpc.address=host1 -Dkey.a=v1";
+        final String args = dynamicConfigs + " " + basicArgs + " " + mainArgs;
+        final String redirects = "1> ./logs/taskmanager.out 2> 
./logs/taskmanager.err";
+
+        assertThat(
+                        Utils.getTaskManagerShellCommand(
+                                cfg,
+                                containeredParams,
+                                "./conf",
+                                "./logs",
+                                false,
+                                false,
+                                false,
+                                this.getClass(),
+                                ""))
+                .isEqualTo(
+                        String.join(
+                                " ",
+                                java,
+                                jvmmem,
+                                Utils.IGNORE_UNRECOGNIZED_VM_OPTIONS,
+                                mainClass,
+                                dynamicConfigs,
+                                basicArgs,
+                                redirects));
+
+        assertThat(
+                        Utils.getTaskManagerShellCommand(
+                                cfg,
+                                containeredParams,
+                                "./conf",
+                                "./logs",
+                                false,
+                                false,
+                                false,
+                                this.getClass(),
+                                mainArgs))
+                .isEqualTo(
+                        String.join(
+                                " ",
+                                java,
+                                jvmmem,
+                                Utils.IGNORE_UNRECOGNIZED_VM_OPTIONS,
+                                mainClass,
+                                args,
+                                redirects));
+
+        final String krb5 = "-Djava.security.krb5.conf=krb5.conf";
+        assertThat(
+                        Utils.getTaskManagerShellCommand(
+                                cfg,
+                                containeredParams,
+                                "./conf",
+                                "./logs",
+                                false,
+                                false,
+                                true,
+                                this.getClass(),
+                                mainArgs))
+                .isEqualTo(
+                        String.join(
+                                " ",
+                                java,
+                                jvmmem,
+                                Utils.IGNORE_UNRECOGNIZED_VM_OPTIONS,
+                                krb5,
+                                mainClass,
+                                args,
+                                redirects));
+
+        // logback only, with/out krb5
+        assertThat(
+                        Utils.getTaskManagerShellCommand(
+                                cfg,
+                                containeredParams,
+                                "./conf",
+                                "./logs",
+                                true,
+                                false,
+                                false,
+                                this.getClass(),
+                                mainArgs))
+                .isEqualTo(
+                        String.join(
+                                " ",
+                                java,
+                                jvmmem,
+                                Utils.IGNORE_UNRECOGNIZED_VM_OPTIONS,
+                                logfile,
+                                logback,
+                                mainClass,
+                                args,
+                                redirects));
+
+        assertThat(
+                        Utils.getTaskManagerShellCommand(
+                                cfg,
+                                containeredParams,
+                                "./conf",
+                                "./logs",
+                                true,
+                                false,
+                                true,
+                                this.getClass(),
+                                mainArgs))
+                .isEqualTo(
+                        String.join(
+                                " ",
+                                java,
+                                jvmmem,
+                                Utils.IGNORE_UNRECOGNIZED_VM_OPTIONS,
+                                krb5,
+                                logfile,
+                                logback,
+                                mainClass,
+                                args,
+                                redirects));
+
+        // log4j, with/out krb5
+        assertThat(
+                        Utils.getTaskManagerShellCommand(
+                                cfg,
+                                containeredParams,
+                                "./conf",
+                                "./logs",
+                                false,
+                                true,
+                                false,
+                                this.getClass(),
+                                mainArgs))
+                .isEqualTo(
+                        String.join(
+                                " ",
+                                java,
+                                jvmmem,
+                                Utils.IGNORE_UNRECOGNIZED_VM_OPTIONS,
+                                logfile,
+                                log4j,
+                                mainClass,
+                                args,
+                                redirects));
+
+        assertThat(
+                        Utils.getTaskManagerShellCommand(
+                                cfg,
+                                containeredParams,
+                                "./conf",
+                                "./logs",
+                                false,
+                                true,
+                                true,
+                                this.getClass(),
+                                mainArgs))
+                .isEqualTo(
+                        String.join(
+                                " ",
+                                java,
+                                jvmmem,
+                                Utils.IGNORE_UNRECOGNIZED_VM_OPTIONS,
+                                krb5,
+                                logfile,
+                                log4j,
+                                mainClass,
+                                args,
+                                redirects));
+
+        // logback + log4j, with/out krb5
+        assertThat(
+                        Utils.getTaskManagerShellCommand(
+                                cfg,
+                                containeredParams,
+                                "./conf",
+                                "./logs",
+                                true,
+                                true,
+                                false,
+                                this.getClass(),
+                                mainArgs))
+                .isEqualTo(
+                        String.join(
+                                " ",
+                                java,
+                                jvmmem,
+                                Utils.IGNORE_UNRECOGNIZED_VM_OPTIONS,
+                                logfile,
+                                logback,
+                                log4j,
+                                mainClass,
+                                args,
+                                redirects));
+
+        assertThat(
+                        Utils.getTaskManagerShellCommand(
+                                cfg,
+                                containeredParams,
+                                "./conf",
+                                "./logs",
+                                true,
+                                true,
+                                true,
+                                this.getClass(),
+                                mainArgs))
+                .isEqualTo(
+                        String.join(
+                                " ",
+                                java,
+                                jvmmem,
+                                Utils.IGNORE_UNRECOGNIZED_VM_OPTIONS,
+                                krb5,
+                                logfile,
+                                logback,
+                                log4j,
+                                mainClass,
+                                args,
+                                redirects));
+
+        // logback + log4j, with/out krb5, different JVM opts
+        cfg.setString(CoreOptions.FLINK_JVM_OPTIONS, jvmOpts);
+        assertThat(
+                        Utils.getTaskManagerShellCommand(
+                                cfg,
+                                containeredParams,
+                                "./conf",
+                                "./logs",
+                                true,
+                                true,
+                                false,
+                                this.getClass(),
+                                mainArgs))
+                .isEqualTo(
+                        String.join(
+                                " ",
+                                java,
+                                jvmmem,
+                                jvmOpts,
+                                Utils.IGNORE_UNRECOGNIZED_VM_OPTIONS,
+                                logfile,
+                                logback,
+                                log4j,
+                                mainClass,
+                                args,
+                                redirects));
+
+        assertThat(
+                        Utils.getTaskManagerShellCommand(
+                                cfg,
+                                containeredParams,
+                                "./conf",
+                                "./logs",
+                                true,
+                                true,
+                                true,
+                                this.getClass(),
+                                mainArgs))
+                .isEqualTo(
+                        String.join(
+                                " ",
+                                java,
+                                jvmmem,
+                                jvmOpts,
+                                Utils.IGNORE_UNRECOGNIZED_VM_OPTIONS,
+                                krb5,
+                                logfile,
+                                logback,
+                                log4j,
+                                mainClass,
+                                args,
+                                redirects));
+
+        // logback + log4j, with/out krb5, different JVM opts
+        cfg.setString(CoreOptions.FLINK_TM_JVM_OPTIONS, tmJvmOpts);
+        assertThat(
+                        Utils.getTaskManagerShellCommand(
+                                cfg,
+                                containeredParams,
+                                "./conf",
+                                "./logs",
+                                true,
+                                true,
+                                false,
+                                this.getClass(),
+                                mainArgs))
+                .isEqualTo(
+                        String.join(
+                                " ",
+                                java,
+                                jvmmem,
+                                jvmOpts,
+                                tmJvmOpts,
+                                Utils.IGNORE_UNRECOGNIZED_VM_OPTIONS,
+                                logfile,
+                                logback,
+                                log4j,
+                                mainClass,
+                                args,
+                                redirects));
+
+        assertThat(
+                        Utils.getTaskManagerShellCommand(
+                                cfg,
+                                containeredParams,
+                                "./conf",
+                                "./logs",
+                                true,
+                                true,
+                                true,
+                                this.getClass(),
+                                mainArgs))
+                .isEqualTo(
+                        String.join(
+                                " ",
+                                java,
+                                jvmmem,
+                                jvmOpts,
+                                tmJvmOpts,
+                                Utils.IGNORE_UNRECOGNIZED_VM_OPTIONS,
+                                krb5,
+                                logfile,
+                                logback,
+                                log4j,
+                                mainClass,
+                                args,
+                                redirects));
+
+        // now try some configurations with different 
yarn.container-start-command-template
+
+        cfg.setString(
+                ConfigConstants.YARN_CONTAINER_START_COMMAND_TEMPLATE,
+                "%java% 1 %jvmmem% 2 %jvmopts% 3 %logging% 4 %class% 5 %args% 
6 %redirects%");
+        assertThat(
+                        Utils.getTaskManagerShellCommand(
+                                cfg,
+                                containeredParams,
+                                "./conf",
+                                "./logs",
+                                true,
+                                true,
+                                true,
+                                this.getClass(),
+                                mainArgs))
+                .isEqualTo(
+                        String.join(
+                                " ",
+                                java,
+                                "1",
+                                jvmmem,
+                                "2",
+                                jvmOpts,
+                                tmJvmOpts,
+                                Utils.IGNORE_UNRECOGNIZED_VM_OPTIONS,
+                                krb5,
+                                "3",
+                                logfile,
+                                logback,
+                                log4j,
+                                "4",
+                                mainClass,
+                                "5",
+                                args,
+                                "6",
+                                redirects));
+
+        cfg.setString(
+                ConfigConstants.YARN_CONTAINER_START_COMMAND_TEMPLATE,
+                "%java% %logging% %jvmopts% %jvmmem% %class% %args% 
%redirects%");
+        assertThat(
+                        Utils.getTaskManagerShellCommand(
+                                cfg,
+                                containeredParams,
+                                "./conf",
+                                "./logs",
+                                true,
+                                true,
+                                true,
+                                this.getClass(),
+                                mainArgs))
+                .isEqualTo(
+                        String.join(
+                                " ",
+                                java,
+                                logfile,
+                                logback,
+                                log4j,
+                                jvmOpts,
+                                tmJvmOpts,
+                                Utils.IGNORE_UNRECOGNIZED_VM_OPTIONS,
+                                krb5,
+                                jvmmem,
+                                mainClass,
+                                args,
+                                redirects));
+    }
+
     private static void verifyUnitResourceVariousSchedulers(
             YarnConfiguration yarnConfig, int minMem, int minVcore, int 
incMem, int incVcore) {
         yarnConfig.set(YarnConfiguration.RM_SCHEDULER, 
Utils.YARN_RM_FAIR_SCHEDULER_CLAZZ);

Reply via email to