This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new b4eb8ac503f [FLINK-33221][core][config] Add config options for 
administrator JVM options
b4eb8ac503f is described below

commit b4eb8ac503f41fd793db1ac662fbedc46af92fd5
Author: Zhanghao Chen <m...@outlook.com>
AuthorDate: Tue Jan 16 00:30:49 2024 +0800

    [FLINK-33221][core][config] Add config options for administrator JVM options
---
 .../generated/environment_configuration.html       | 18 ++++++++
 .../apache/flink/configuration/CoreOptions.java    | 37 ++++++++++++++++
 flink-dist/src/main/flink-bin/bin/config.sh        |  9 ++++
 flink-python/pyflink/pyflink_gateway_server.py     | 11 +++--
 .../src/main/java/org/apache/flink/yarn/Utils.java | 45 +++++++++++++++-----
 .../apache/flink/yarn/YarnClusterDescriptor.java   | 19 ++++-----
 .../test/java/org/apache/flink/yarn/UtilsTest.java | 49 +++++++++++++++++++++-
 .../flink/yarn/YarnClusterDescriptorTest.java      | 14 +++++++
 8 files changed, 174 insertions(+), 28 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/environment_configuration.html 
b/docs/layouts/shortcodes/generated/environment_configuration.html
index 3627811dc9a..6f520e5abff 100644
--- a/docs/layouts/shortcodes/generated/environment_configuration.html
+++ b/docs/layouts/shortcodes/generated/environment_configuration.html
@@ -20,6 +20,24 @@
             <td>String</td>
             <td>Path to hbase configuration directory. It is required to read 
HBASE configuration. You can also set it via environment variable.</td>
         </tr>
+        <tr>
+            <td><h5>env.java.default-opts.all</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>String</td>
+            <td>A string of default JVM options to prepend to <code 
class="highlighter-rouge">env.java.opts.all</code>. This is intended to be set 
by administrators.</td>
+        </tr>
+        <tr>
+            <td><h5>env.java.default-opts.jobmanager</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>String</td>
+            <td>A string of default JVM options to prepend to <code 
class="highlighter-rouge">env.java.opts.jobmanager</code>. This is intended to 
be set by administrators.</td>
+        </tr>
+        <tr>
+            <td><h5>env.java.default-opts.taskmanager</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>String</td>
+            <td>A string of default JVM options to prepend to <code 
class="highlighter-rouge">env.java.opts.taskmanager</code>. This is intended to 
be set by administrators.</td>
+        </tr>
         <tr>
             <td><h5>env.java.opts.all</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java 
b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
index a78cde9edfd..03b92b328c6 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
@@ -31,6 +31,7 @@ import 
org.apache.flink.shaded.guava32.com.google.common.collect.Iterables;
 import java.util.List;
 
 import static org.apache.flink.configuration.ConfigOptions.key;
+import static org.apache.flink.configuration.description.TextElement.code;
 
 /** The set of configuration options for core parameters. */
 @PublicEvolving
@@ -282,6 +283,42 @@ public class CoreOptions {
                                             "Java options to start the JVM of 
the Flink SQL Gateway with.")
                                     .build());
 
+    public static final ConfigOption<String> FLINK_DEFAULT_JVM_OPTIONS =
+            ConfigOptions.key("env.java.default-opts.all")
+                    .stringType()
+                    .defaultValue("")
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "A string of default JVM options 
to prepend to %s."
+                                                    + " This is intended to be 
set by administrators.",
+                                            code(FLINK_JVM_OPTIONS.key()))
+                                    .build());
+
+    public static final ConfigOption<String> FLINK_DEFAULT_JM_JVM_OPTIONS =
+            ConfigOptions.key("env.java.default-opts.jobmanager")
+                    .stringType()
+                    .defaultValue("")
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "A string of default JVM options 
to prepend to %s."
+                                                    + " This is intended to be 
set by administrators.",
+                                            code(FLINK_JM_JVM_OPTIONS.key()))
+                                    .build());
+
+    public static final ConfigOption<String> FLINK_DEFAULT_TM_JVM_OPTIONS =
+            ConfigOptions.key("env.java.default-opts.taskmanager")
+                    .stringType()
+                    .defaultValue("")
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "A string of default JVM options 
to prepend to %s."
+                                                    + " This is intended to be 
set by administrators.",
+                                            code(FLINK_TM_JVM_OPTIONS.key()))
+                                    .build());
+
     /**
      * This option is here only for documentation generation, it is only 
evaluated in the shell
      * scripts.
diff --git a/flink-dist/src/main/flink-bin/bin/config.sh 
b/flink-dist/src/main/flink-bin/bin/config.sh
index 363e17a375f..c798bdf4c01 100755
--- a/flink-dist/src/main/flink-bin/bin/config.sh
+++ b/flink-dist/src/main/flink-bin/bin/config.sh
@@ -186,6 +186,9 @@ KEY_ENV_JAVA_OPTS_TM="env.java.opts.taskmanager"
 KEY_ENV_JAVA_OPTS_HS="env.java.opts.historyserver"
 KEY_ENV_JAVA_OPTS_CLI="env.java.opts.client"
 KEY_ENV_JAVA_OPTS_SQL_GATEWAY="env.java.opts.sql-gateway"
+KEY_ENV_JAVA_DEFAULT_OPTS="env.java.default-opts.all"
+KEY_ENV_JAVA_DEFAULT_OPTS_JM="env.java.default-opts.jobmanager"
+KEY_ENV_JAVA_DEFAULT_OPTS_TM="env.java.default-opts.taskmanager"
 KEY_ENV_SSH_OPTS="env.ssh.opts"
 KEY_HIGH_AVAILABILITY="high-availability.type"
 KEY_ZK_HEAP_MB="zookeeper.heap.mb"
@@ -326,11 +329,13 @@ if [ -z "${FLINK_PID_DIR}" ]; then
 fi
 
 if [ -z "${FLINK_ENV_JAVA_OPTS}" ]; then
+    FLINK_ENV_JAVA_DEFAULT_OPTS=$(readFromConfig ${KEY_ENV_JAVA_DEFAULT_OPTS} 
"" "${YAML_CONF}")
     FLINK_ENV_JAVA_OPTS=$(readFromConfig ${KEY_ENV_JAVA_OPTS} "" 
"${YAML_CONF}")
     if [ -z "${FLINK_ENV_JAVA_OPTS}" ]; then
       # try deprecated key
       FLINK_ENV_JAVA_OPTS=$(readFromConfig "env.java.opts" 
"${DEFAULT_ENV_JAVA_OPTS}" "${YAML_CONF}")
     fi
+    FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_DEFAULT_OPTS} ${FLINK_ENV_JAVA_OPTS}"
 
     # Remove leading and ending double quotes (if present) of value
     FLINK_ENV_JAVA_OPTS="-XX:+IgnoreUnrecognizedVMOptions $( echo 
"${FLINK_ENV_JAVA_OPTS}" | sed -e 's/^"//'  -e 's/"$//' )"
@@ -343,13 +348,17 @@ if [ -z "${FLINK_ENV_JAVA_OPTS}" ]; then
 fi
 
 if [ -z "${FLINK_ENV_JAVA_OPTS_JM}" ]; then
+    FLINK_ENV_JAVA_DEFAULT_OPTS_JM=$(readFromConfig 
${KEY_ENV_JAVA_DEFAULT_OPTS_JM} "" "${YAML_CONF}")
     FLINK_ENV_JAVA_OPTS_JM=$(readFromConfig ${KEY_ENV_JAVA_OPTS_JM} 
"${DEFAULT_ENV_JAVA_OPTS_JM}" "${YAML_CONF}")
+    FLINK_ENV_JAVA_OPTS_JM="${FLINK_ENV_JAVA_DEFAULT_OPTS_JM} 
${FLINK_ENV_JAVA_OPTS_JM}"
     # Remove leading and ending double quotes (if present) of value
     FLINK_ENV_JAVA_OPTS_JM="$( echo "${FLINK_ENV_JAVA_OPTS_JM}" | sed -e 
's/^"//'  -e 's/"$//' )"
 fi
 
 if [ -z "${FLINK_ENV_JAVA_OPTS_TM}" ]; then
+    FLINK_ENV_JAVA_DEFAULT_OPTS_TM=$(readFromConfig 
${KEY_ENV_JAVA_DEFAULT_OPTS_TM} "" "${YAML_CONF}")
     FLINK_ENV_JAVA_OPTS_TM=$(readFromConfig ${KEY_ENV_JAVA_OPTS_TM} 
"${DEFAULT_ENV_JAVA_OPTS_TM}" "${YAML_CONF}")
+    FLINK_ENV_JAVA_OPTS_TM="${FLINK_ENV_JAVA_DEFAULT_OPTS_TM} 
${FLINK_ENV_JAVA_OPTS_TM}"
     # Remove leading and ending double quotes (if present) of value
     FLINK_ENV_JAVA_OPTS_TM="$( echo "${FLINK_ENV_JAVA_OPTS_TM}" | sed -e 
's/^"//'  -e 's/"$//' )"
 fi
diff --git a/flink-python/pyflink/pyflink_gateway_server.py 
b/flink-python/pyflink/pyflink_gateway_server.py
index 5dacc15f6d0..dcb3f34884e 100644
--- a/flink-python/pyflink/pyflink_gateway_server.py
+++ b/flink-python/pyflink/pyflink_gateway_server.py
@@ -36,6 +36,7 @@ KEY_ENV_HBASE_CONF_DIR = "env.hbase.conf.dir"
 KEY_ENV_JAVA_HOME = "env.java.home"
 KEY_ENV_JAVA_OPTS = "env.java.opts.all"
 KEY_ENV_JAVA_OPTS_DEPRECATED = "env.java.opts"
+KEY_ENV_JAVA_DEFAULT_OPTS = "env.java.default-opts.all"
 
 
 def on_windows():
@@ -156,12 +157,14 @@ def construct_log_settings(env):
 
 def get_jvm_opts(env):
     flink_conf_file = os.path.join(env['FLINK_CONF_DIR'], "flink-conf.yaml")
-    jvm_opts = env.get(
-        'FLINK_ENV_JAVA_OPTS',
-        read_from_config(
+    jvm_opts = env.get("FLINK_ENV_JAVA_OPTS")
+    if jvm_opts is None:
+        default_jvm_opts = read_from_config(KEY_ENV_JAVA_DEFAULT_OPTS, "", 
flink_conf_file)
+        extra_jvm_opts = read_from_config(
             KEY_ENV_JAVA_OPTS,
             read_from_config(KEY_ENV_JAVA_OPTS_DEPRECATED, "", 
flink_conf_file),
-            flink_conf_file))
+            flink_conf_file)
+        jvm_opts = default_jvm_opts + " " + extra_jvm_opts
 
     # Remove leading and trailing double quotes (if present) of value
     jvm_opts = jvm_opts.strip('"')
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
index 621f1dcda41..985196e53fc 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
@@ -20,6 +20,7 @@ package org.apache.flink.yarn;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigUtils;
 import org.apache.flink.configuration.CoreOptions;
 import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
@@ -61,6 +62,7 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -509,17 +511,13 @@ public final class Utils {
         startCommandValues.put(
                 "jvmmem", 
ProcessMemoryUtils.generateJvmParametersStr(taskExecutorProcessSpec));
 
-        String javaOpts = flinkConfig.getString(CoreOptions.FLINK_JVM_OPTIONS);
-        if (flinkConfig.getString(CoreOptions.FLINK_TM_JVM_OPTIONS).length() > 
0) {
-            javaOpts += " " + 
flinkConfig.getString(CoreOptions.FLINK_TM_JVM_OPTIONS);
-        }
-        javaOpts += " " + IGNORE_UNRECOGNIZED_VM_OPTIONS;
-
-        // krb5.conf file will be available as local resource in JM/TM 
container
-        if (hasKrb5) {
-            javaOpts += " -Djava.security.krb5.conf=krb5.conf";
-        }
-        startCommandValues.put("jvmopts", javaOpts);
+        List<ConfigOption<String>> jvmOptions =
+                Arrays.asList(
+                        CoreOptions.FLINK_DEFAULT_JVM_OPTIONS,
+                        CoreOptions.FLINK_JVM_OPTIONS,
+                        CoreOptions.FLINK_DEFAULT_TM_JVM_OPTIONS,
+                        CoreOptions.FLINK_TM_JVM_OPTIONS);
+        startCommandValues.put("jvmopts", generateJvmOptsString(flinkConfig, 
jvmOptions, hasKrb5));
 
         String logging = "";
         if (hasLogback || hasLog4j) {
@@ -592,6 +590,23 @@ public final class Utils {
         return template;
     }
 
+    public static String generateJvmOptsString(
+            org.apache.flink.configuration.Configuration conf,
+            List<ConfigOption<String>> jvmOptions,
+            boolean hasKrb5) {
+        StringBuilder javaOptsSb = new StringBuilder();
+        for (ConfigOption<String> option : jvmOptions) {
+            concatWithSpace(javaOptsSb, conf.get(option));
+        }
+        concatWithSpace(javaOptsSb, IGNORE_UNRECOGNIZED_VM_OPTIONS);
+
+        // krb5.conf file will be available as local resource in JM/TM 
container
+        if (hasKrb5) {
+            concatWithSpace(javaOptsSb, "-Djava.security.krb5.conf=krb5.conf");
+        }
+        return javaOptsSb.toString().trim();
+    }
+
     static boolean isRemotePath(String path) throws IOException {
         org.apache.flink.core.fs.Path flinkPath = new 
org.apache.flink.core.fs.Path(path);
         return flinkPath.getFileSystem().isDistributedFS();
@@ -796,4 +811,12 @@ public final class Utils {
     public static Path getPathFromLocalFilePathStr(String localPathStr) {
         return getPathFromLocalFile(new File(localPathStr));
     }
+
+    public static void concatWithSpace(StringBuilder sb, String value) {
+        if (value == null || value.isEmpty()) {
+            return;
+        }
+        sb.append(' ');
+        sb.append(value);
+    }
 }
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
index 6e1f14392f7..176804eaab2 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
@@ -121,6 +121,7 @@ import java.net.URI;
 import java.net.URLDecoder;
 import java.nio.ByteBuffer;
 import java.nio.charset.Charset;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -1856,17 +1857,13 @@ public class YarnClusterDescriptor implements 
ClusterDescriptor<ApplicationId> {
         // ------------------ Prepare Application Master Container  
------------------------------
 
         // respect custom JVM options in the YAML file
-        String javaOpts = 
flinkConfiguration.getString(CoreOptions.FLINK_JVM_OPTIONS);
-        if 
(flinkConfiguration.getString(CoreOptions.FLINK_JM_JVM_OPTIONS).length() > 0) {
-            javaOpts += " " + 
flinkConfiguration.getString(CoreOptions.FLINK_JM_JVM_OPTIONS);
-        }
-
-        javaOpts += " " + IGNORE_UNRECOGNIZED_VM_OPTIONS;
-
-        // krb5.conf file will be available as local resource in JM/TM 
container
-        if (hasKrb5) {
-            javaOpts += " -Djava.security.krb5.conf=krb5.conf";
-        }
+        List<ConfigOption<String>> jvmOptions =
+                Arrays.asList(
+                        CoreOptions.FLINK_DEFAULT_JVM_OPTIONS,
+                        CoreOptions.FLINK_JVM_OPTIONS,
+                        CoreOptions.FLINK_DEFAULT_JM_JVM_OPTIONS,
+                        CoreOptions.FLINK_JM_JVM_OPTIONS);
+        String javaOpts = Utils.generateJvmOptsString(flinkConfiguration, 
jvmOptions, hasKrb5);
 
         // Set up the container launch context for the application master
         ContainerLaunchContext amContainer = 
Records.newRecord(ContainerLaunchContext.class);
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java 
b/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
index d3f3a9e239e..353f48d7318 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.yarn;
 
 import org.apache.flink.api.common.resources.CPUResource;
 import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.MemorySize;
@@ -39,6 +40,7 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -225,7 +227,9 @@ class UtilsTest {
         final String java = "$JAVA_HOME/bin/java";
         final String jvmmem =
                 "-Xmx111 -Xms111 -XX:MaxDirectMemorySize=222 
-XX:MaxMetaspaceSize=333";
+        final String defaultJvmOpts = "-DdefaultJvm"; // if set
         final String jvmOpts = "-Djvm"; // if set
+        final String defaultTmJvmOpts = "-DdefaultTmJvm"; // if set
         final String tmJvmOpts = "-DtmJvm"; // if set
         final String logfile = "-Dlog.file=./logs/taskmanager.log"; // if set
         final String logback = 
"-Dlogback.configurationFile=file:./conf/logback.xml"; // if set
@@ -453,7 +457,8 @@ class UtilsTest {
                                 redirects));
 
         // logback + log4j, with/out krb5, different JVM opts
-        cfg.setString(CoreOptions.FLINK_JVM_OPTIONS, jvmOpts);
+        cfg.set(CoreOptions.FLINK_DEFAULT_JVM_OPTIONS, defaultJvmOpts);
+        cfg.set(CoreOptions.FLINK_JVM_OPTIONS, jvmOpts);
         assertThat(
                         Utils.getTaskManagerShellCommand(
                                 cfg,
@@ -470,6 +475,7 @@ class UtilsTest {
                                 " ",
                                 java,
                                 jvmmem,
+                                defaultJvmOpts,
                                 jvmOpts,
                                 Utils.IGNORE_UNRECOGNIZED_VM_OPTIONS,
                                 logfile,
@@ -495,6 +501,7 @@ class UtilsTest {
                                 " ",
                                 java,
                                 jvmmem,
+                                defaultJvmOpts,
                                 jvmOpts,
                                 Utils.IGNORE_UNRECOGNIZED_VM_OPTIONS,
                                 krb5,
@@ -506,7 +513,8 @@ class UtilsTest {
                                 redirects));
 
         // logback + log4j, with/out krb5, different JVM opts
-        cfg.setString(CoreOptions.FLINK_TM_JVM_OPTIONS, tmJvmOpts);
+        cfg.set(CoreOptions.FLINK_DEFAULT_TM_JVM_OPTIONS, defaultTmJvmOpts);
+        cfg.set(CoreOptions.FLINK_TM_JVM_OPTIONS, tmJvmOpts);
         assertThat(
                         Utils.getTaskManagerShellCommand(
                                 cfg,
@@ -523,7 +531,9 @@ class UtilsTest {
                                 " ",
                                 java,
                                 jvmmem,
+                                defaultJvmOpts,
                                 jvmOpts,
+                                defaultTmJvmOpts,
                                 tmJvmOpts,
                                 Utils.IGNORE_UNRECOGNIZED_VM_OPTIONS,
                                 logfile,
@@ -549,7 +559,9 @@ class UtilsTest {
                                 " ",
                                 java,
                                 jvmmem,
+                                defaultJvmOpts,
                                 jvmOpts,
+                                defaultTmJvmOpts,
                                 tmJvmOpts,
                                 Utils.IGNORE_UNRECOGNIZED_VM_OPTIONS,
                                 krb5,
@@ -583,7 +595,9 @@ class UtilsTest {
                                 "1",
                                 jvmmem,
                                 "2",
+                                defaultJvmOpts,
                                 jvmOpts,
+                                defaultTmJvmOpts,
                                 tmJvmOpts,
                                 Utils.IGNORE_UNRECOGNIZED_VM_OPTIONS,
                                 krb5,
@@ -619,7 +633,9 @@ class UtilsTest {
                                 logfile,
                                 logback,
                                 log4j,
+                                defaultJvmOpts,
                                 jvmOpts,
+                                defaultTmJvmOpts,
                                 tmJvmOpts,
                                 Utils.IGNORE_UNRECOGNIZED_VM_OPTIONS,
                                 krb5,
@@ -629,6 +645,35 @@ class UtilsTest {
                                 redirects));
     }
 
+    @Test
+    void testGenerateJvmOptsString() {
+        final String defaultJvmOpts = "-DdefaultJvm";
+        final String jvmOpts = "-Djvm";
+        final String krb5 = "-Djava.security.krb5.conf=krb5.conf";
+        final Configuration conf = new Configuration();
+        conf.set(CoreOptions.FLINK_DEFAULT_JVM_OPTIONS, defaultJvmOpts);
+        conf.set(CoreOptions.FLINK_JVM_OPTIONS, jvmOpts);
+        final List<ConfigOption<String>> jvmOptions =
+                Arrays.asList(CoreOptions.FLINK_DEFAULT_JVM_OPTIONS, 
CoreOptions.FLINK_JVM_OPTIONS);
+        // With Krb5
+        assertThat(Utils.generateJvmOptsString(conf, jvmOptions, true))
+                .isEqualTo(
+                        String.join(
+                                " ",
+                                defaultJvmOpts,
+                                jvmOpts,
+                                Utils.IGNORE_UNRECOGNIZED_VM_OPTIONS,
+                                krb5));
+        // Without Krb5
+        assertThat(Utils.generateJvmOptsString(conf, jvmOptions, false))
+                .isEqualTo(
+                        String.join(
+                                " ",
+                                defaultJvmOpts,
+                                jvmOpts,
+                                Utils.IGNORE_UNRECOGNIZED_VM_OPTIONS));
+    }
+
     private static void verifyUnitResourceVariousSchedulers(
             YarnConfiguration yarnConfig, int minMem, int minVcore, int 
incMem, int incVcore) {
         yarnConfig.set(YarnConfiguration.RM_SCHEDULER, 
Utils.YARN_RM_FAIR_SCHEDULER_CLAZZ);
diff --git 
a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java 
b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
index 5eadae781e0..af0f283415f 100644
--- 
a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
+++ 
b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
@@ -174,7 +174,9 @@ class YarnClusterDescriptorTest {
                 
JobManagerProcessUtils.generateJvmParametersStr(jobManagerProcessSpec, cfg);
         final String dynamicParameters =
                 
JobManagerProcessUtils.generateDynamicConfigsStr(jobManagerProcessSpec);
+        final String defaultJvmOpts = "-DdefaultJvm"; // if set
         final String jvmOpts = "-Djvm"; // if set
+        final String defaultJmJvmOpts = "-DdefaultJmJvm"; // if set
         final String jmJvmOpts = "-DjmJvm"; // if set
         final String krb5 = "-Djava.security.krb5.conf=krb5.conf";
         final String logfile =
@@ -370,6 +372,7 @@ class YarnClusterDescriptorTest {
             // YarnClusterDescriptor,
             // because we have a reference to the ClusterDescriptor's 
configuration which we modify
             // continuously
+            cfg.setString(CoreOptions.FLINK_DEFAULT_JVM_OPTIONS, 
defaultJvmOpts);
             cfg.setString(CoreOptions.FLINK_JVM_OPTIONS, jvmOpts);
             cfg.set(
                     YarnConfigOptionsInternal.APPLICATION_LOG_CONFIG_FILE,
@@ -385,6 +388,7 @@ class YarnClusterDescriptorTest {
                                     " ",
                                     java,
                                     jvmmem,
+                                    defaultJvmOpts,
                                     jvmOpts,
                                     
YarnClusterDescriptor.IGNORE_UNRECOGNIZED_VM_OPTIONS,
                                     logfile,
@@ -407,6 +411,7 @@ class YarnClusterDescriptorTest {
                                     " ",
                                     java,
                                     jvmmem,
+                                    defaultJvmOpts,
                                     jvmOpts,
                                     
YarnClusterDescriptor.IGNORE_UNRECOGNIZED_VM_OPTIONS,
                                     krb5,
@@ -419,6 +424,7 @@ class YarnClusterDescriptorTest {
             // log4j, with/out krb5, different JVM opts
             // IMPORTANT: Be aware that we are using side effects here to 
modify the created
             // YarnClusterDescriptor
+            cfg.setString(CoreOptions.FLINK_DEFAULT_JM_JVM_OPTIONS, 
defaultJmJvmOpts);
             cfg.setString(CoreOptions.FLINK_JM_JVM_OPTIONS, jmJvmOpts);
             cfg.set(
                     YarnConfigOptionsInternal.APPLICATION_LOG_CONFIG_FILE,
@@ -434,7 +440,9 @@ class YarnClusterDescriptorTest {
                                     " ",
                                     java,
                                     jvmmem,
+                                    defaultJvmOpts,
                                     jvmOpts,
+                                    defaultJmJvmOpts,
                                     jmJvmOpts,
                                     
YarnClusterDescriptor.IGNORE_UNRECOGNIZED_VM_OPTIONS,
                                     logfile,
@@ -457,7 +465,9 @@ class YarnClusterDescriptorTest {
                                     " ",
                                     java,
                                     jvmmem,
+                                    defaultJvmOpts,
                                     jvmOpts,
+                                    defaultJmJvmOpts,
                                     jmJvmOpts,
                                     
YarnClusterDescriptor.IGNORE_UNRECOGNIZED_VM_OPTIONS,
                                     krb5,
@@ -489,7 +499,9 @@ class YarnClusterDescriptorTest {
                                     "1",
                                     jvmmem,
                                     "2",
+                                    defaultJvmOpts,
                                     jvmOpts,
+                                    defaultJmJvmOpts,
                                     jmJvmOpts,
                                     
YarnClusterDescriptor.IGNORE_UNRECOGNIZED_VM_OPTIONS,
                                     krb5,
@@ -523,7 +535,9 @@ class YarnClusterDescriptorTest {
                                     java,
                                     logfile,
                                     logback,
+                                    defaultJvmOpts,
                                     jvmOpts,
+                                    defaultJmJvmOpts,
                                     jmJvmOpts,
                                     
YarnClusterDescriptor.IGNORE_UNRECOGNIZED_VM_OPTIONS,
                                     krb5,

Reply via email to