This is an automated email from the ASF dual-hosted git repository.

zjureel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new f0c8f18ee09 [FLINK-22091][yarn] Make Flink on YARN honor env.java.home 
(#25877)
f0c8f18ee09 is described below

commit f0c8f18ee09297c4c92b36d8ce7d0e0259f80178
Author: Zhanghao Chen <m...@outlook.com>
AuthorDate: Tue Jan 7 10:29:46 2025 +0800

    [FLINK-22091][yarn] Make Flink on YARN honor env.java.home (#25877)
---
 .../generated/environment_configuration.html       |  6 +++
 .../flink/configuration/ConfigConstants.java       |  2 +
 .../apache/flink/configuration/CoreOptions.java    | 11 +++++
 .../ContaineredTaskManagerParameters.java          |  7 +++
 .../apache/flink/yarn/YarnClusterDescriptor.java   |  5 ++
 .../test/java/org/apache/flink/yarn/UtilsTest.java | 54 ++++++++++++++++------
 .../flink/yarn/YarnClusterDescriptorTest.java      | 19 +++++++-
 7 files changed, 89 insertions(+), 15 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/environment_configuration.html 
b/docs/layouts/shortcodes/generated/environment_configuration.html
index 6f520e5abff..84712c412cd 100644
--- a/docs/layouts/shortcodes/generated/environment_configuration.html
+++ b/docs/layouts/shortcodes/generated/environment_configuration.html
@@ -38,6 +38,12 @@
             <td>String</td>
             <td>A string of default JVM options to prepend to <code 
class="highlighter-rouge">env.java.opts.taskmanager</code>. This is intended to 
be set by administrators.</td>
         </tr>
+        <tr>
+            <td><h5>env.java.home</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>String</td>
+            <td>Location where Java is installed. If not specified, Flink will 
use your default Java installation.</td>
+        </tr>
         <tr>
             <td><h5>env.java.opts.all</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 653be48964f..220a1ddcf73 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -65,6 +65,8 @@ public final class ConfigConstants {
 
     // ----------------------------- Environment Variables 
----------------------------
 
+    public static final String ENV_JAVA_HOME = "JAVA_HOME";
+
     /** The environment variable name which contains the location of the 
configuration directory. */
     public static final String ENV_FLINK_CONF_DIR = "FLINK_CONF_DIR";
 
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java 
b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
index 03b92b328c6..a0570fb1a61 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
@@ -225,6 +225,17 @@ public class CoreOptions {
     //  process parameters
     // ------------------------------------------------------------------------
 
+    public static final ConfigOption<String> FLINK_JAVA_HOME =
+            ConfigOptions.key("env.java.home")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "Location where Java is installed. 
If not specified,"
+                                                    + " Flink will use your 
default Java installation.")
+                                    .build());
+
     public static final ConfigOption<String> FLINK_JVM_OPTIONS =
             ConfigOptions.key("env.java.opts.all")
                     .stringType()
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java
index ba84f97cdb3..c6a34df24fd 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java
@@ -19,11 +19,14 @@
 package org.apache.flink.runtime.clusterframework;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.ResourceManagerOptions;
 
 import java.util.HashMap;
 import java.util.Map;
 
+import static org.apache.flink.configuration.ConfigConstants.ENV_JAVA_HOME;
+
 /** This class describes the basic parameters for launching a TaskManager 
process. */
 public class ContaineredTaskManagerParameters implements java.io.Serializable {
 
@@ -90,6 +93,10 @@ public class ContaineredTaskManagerParameters implements 
java.io.Serializable {
             }
         }
 
+        // set JAVA_HOME
+        config.getOptional(CoreOptions.FLINK_JAVA_HOME)
+                .ifPresent(javaHome -> envVars.put(ENV_JAVA_HOME, javaHome));
+
         // done
         return new ContaineredTaskManagerParameters(taskExecutorProcessSpec, 
envVars);
     }
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
index c1c77c197d0..8d2e88e253a 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
@@ -137,6 +137,7 @@ import static 
org.apache.flink.client.deployment.application.ApplicationConfigur
 import static 
org.apache.flink.configuration.ConfigConstants.DEFAULT_FLINK_USR_LIB_DIR;
 import static org.apache.flink.configuration.ConfigConstants.ENV_FLINK_LIB_DIR;
 import static org.apache.flink.configuration.ConfigConstants.ENV_FLINK_OPT_DIR;
+import static org.apache.flink.configuration.ConfigConstants.ENV_JAVA_HOME;
 import static 
org.apache.flink.configuration.ResourceManagerOptions.CONTAINERIZED_MASTER_ENV_PREFIX;
 import static 
org.apache.flink.configuration.ResourceManagerOptions.CONTAINERIZED_TASK_MANAGER_ENV_PREFIX;
 import static 
org.apache.flink.runtime.entrypoint.component.FileJobGraphRetriever.JOB_GRAPH_FILE_PATH;
@@ -1969,6 +1970,10 @@ public class YarnClusterDescriptor implements 
ClusterDescriptor<ApplicationId> {
                 ConfigurationUtils.getPrefixedKeyValuePairs(
                         ResourceManagerOptions.CONTAINERIZED_MASTER_ENV_PREFIX,
                         this.flinkConfiguration));
+        // set JAVA_HOME
+        this.flinkConfiguration
+                .getOptional(CoreOptions.FLINK_JAVA_HOME)
+                .ifPresent(javaHome -> env.put(ENV_JAVA_HOME, javaHome));
         // set Flink app class path
         env.put(ENV_FLINK_CLASSPATH, classPathStr);
         // Set FLINK_LIB_DIR to `lib` folder under working dir in container
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java 
b/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
index 559071559d7..eb3a6c1c37f 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
@@ -44,8 +44,11 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.stream.Stream;
 
+import static org.apache.flink.configuration.ConfigConstants.ENV_JAVA_HOME;
+import static 
org.apache.flink.configuration.ResourceManagerOptions.CONTAINERIZED_TASK_MANAGER_ENV_PREFIX;
 import static 
org.apache.flink.yarn.configuration.YarnConfigOptions.YARN_CONTAINER_START_COMMAND_TEMPLATE;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -56,6 +59,19 @@ class UtilsTest {
     private static final String YARN_RM_ARBITRARY_SCHEDULER_CLAZZ =
             
"org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler";
 
+    private static final TaskExecutorProcessSpec TASK_EXECUTOR_PROCESS_SPEC =
+            new TaskExecutorProcessSpec(
+                    new CPUResource(1.0),
+                    new MemorySize(0), // frameworkHeapSize
+                    new MemorySize(0), // frameworkOffHeapSize
+                    new MemorySize(111), // taskHeapSize
+                    new MemorySize(0), // taskOffHeapSize
+                    new MemorySize(222), // networkMemSize
+                    new MemorySize(0), // managedMemorySize
+                    new MemorySize(333), // jvmMetaspaceSize
+                    new MemorySize(0), // jvmOverheadSize
+                    Collections.emptyList());
+
     @Test
     void testDeleteApplicationFiles(@TempDir Path tempDir) throws Exception {
         final Path applicationFilesDir = Files.createTempDirectory(tempDir, 
".flink");
@@ -208,20 +224,8 @@ class UtilsTest {
     @Test
     void testGetTaskManagerShellCommand() {
         final Configuration cfg = new Configuration();
-        final TaskExecutorProcessSpec taskExecutorProcessSpec =
-                new TaskExecutorProcessSpec(
-                        new CPUResource(1.0),
-                        new MemorySize(0), // frameworkHeapSize
-                        new MemorySize(0), // frameworkOffHeapSize
-                        new MemorySize(111), // taskHeapSize
-                        new MemorySize(0), // taskOffHeapSize
-                        new MemorySize(222), // networkMemSize
-                        new MemorySize(0), // managedMemorySize
-                        new MemorySize(333), // jvmMetaspaceSize
-                        new MemorySize(0), // jvmOverheadSize
-                        Collections.emptyList());
         final ContaineredTaskManagerParameters containeredParams =
-                new ContaineredTaskManagerParameters(taskExecutorProcessSpec, 
new HashMap<>());
+                new 
ContaineredTaskManagerParameters(TASK_EXECUTOR_PROCESS_SPEC, new HashMap<>());
 
         // no logging, with/out krb5
         final String java = "$JAVA_HOME/bin/java";
@@ -238,7 +242,8 @@ class UtilsTest {
                         + " 
-Dlog4j.configurationFile=file:./conf/log4j.properties"; // if set
         final String mainClass = "org.apache.flink.yarn.UtilsTest";
         final String dynamicConfigs =
-                
TaskExecutorProcessUtils.generateDynamicConfigsStr(taskExecutorProcessSpec).trim();
+                
TaskExecutorProcessUtils.generateDynamicConfigsStr(TASK_EXECUTOR_PROCESS_SPEC)
+                        .trim();
         final String basicArgs = "--configDir ./conf";
         final String mainArgs = "-Djobmanager.rpc.address=host1 -Dkey.a=v1";
         final String args = dynamicConfigs + " " + basicArgs + " " + mainArgs;
@@ -674,6 +679,27 @@ class UtilsTest {
                                 Utils.IGNORE_UNRECOGNIZED_VM_OPTIONS));
     }
 
+    @Test
+    void testGetTaskManagerEnvsWithJavaHomeSet() {
+        final Configuration cfg = new Configuration();
+        cfg.set(CoreOptions.FLINK_JAVA_HOME, "/opt/jdk");
+        cfg.setString(CONTAINERIZED_TASK_MANAGER_ENV_PREFIX + "key", "val");
+        final ContaineredTaskManagerParameters containeredParams =
+                ContaineredTaskManagerParameters.create(cfg, 
TASK_EXECUTOR_PROCESS_SPEC);
+        final Map<String, String> envVars = containeredParams.taskManagerEnv();
+        assertThat(envVars).containsEntry(ENV_JAVA_HOME, 
"/opt/jdk").containsEntry("key", "val");
+    }
+
+    @Test
+    void testGetTaskManagerEnvsWithoutJavaHomeSet() {
+        final Configuration cfg = new Configuration();
+        cfg.setString(CONTAINERIZED_TASK_MANAGER_ENV_PREFIX + "key", "val");
+        final ContaineredTaskManagerParameters containeredParams =
+                ContaineredTaskManagerParameters.create(cfg, 
TASK_EXECUTOR_PROCESS_SPEC);
+        final Map<String, String> envVars = containeredParams.taskManagerEnv();
+        assertThat(envVars).doesNotContainKey(ENV_JAVA_HOME);
+    }
+
     private static void verifyUnitResourceVariousSchedulers(
             YarnConfiguration yarnConfig, int minMem, int minVcore, int 
incMem, int incVcore) {
         yarnConfig.set(YarnConfiguration.RM_SCHEDULER, 
Utils.YARN_RM_FAIR_SCHEDULER_CLAZZ);
diff --git 
a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java 
b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
index 1710b1da22a..7ff1b814d54 100644
--- 
a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
+++ 
b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
@@ -921,11 +921,14 @@ class YarnClusterDescriptorTest {
         final String fakeLocalFlinkJar = "./lib/flink_dist.jar";
         final String fakeClassPath = fakeLocalFlinkJar + ":./usrlib/user.jar";
         final ApplicationId appId = ApplicationId.newInstance(0, 0);
+        final Configuration flinkConfig = new Configuration();
+        flinkConfig.set(CoreOptions.FLINK_JAVA_HOME, "/opt/jdk");
         final Map<String, String> masterEnv =
                 getTestMasterEnv(
-                        new Configuration(), flinkHomeDir, fakeClassPath, 
fakeLocalFlinkJar, appId);
+                        flinkConfig, flinkHomeDir, fakeClassPath, 
fakeLocalFlinkJar, appId);
 
         assertThat(masterEnv)
+                .containsEntry(ConfigConstants.ENV_JAVA_HOME, "/opt/jdk")
                 .containsEntry(ConfigConstants.ENV_FLINK_LIB_DIR, "./lib")
                 .containsEntry(YarnConfigKeys.ENV_APP_ID, appId.toString())
                 .containsEntry(
@@ -940,6 +943,20 @@ class YarnClusterDescriptorTest {
                 .containsEntry(YarnConfigKeys.ENV_CLIENT_HOME_DIR, 
flinkHomeDir.getPath());
     }
 
+    @Test
+    public void testContainerEnvJavaHomeNotOverriddenByDefault(@TempDir File 
flinkHomeDir)
+            throws IOException {
+        final Configuration flinkConfig = new Configuration();
+        final Map<String, String> masterEnv =
+                getTestMasterEnv(
+                        flinkConfig,
+                        flinkHomeDir,
+                        "",
+                        "./lib/flink_dist.jar",
+                        ApplicationId.newInstance(0, 0));
+        assertThat(masterEnv).doesNotContainKey(ConfigConstants.ENV_JAVA_HOME);
+    }
+
     @Test
     public void testEnvFlinkLibDirVarNotOverriddenByContainerEnv(@TempDir File 
tmpDir)
             throws IOException {

Reply via email to