This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit eb6d6ff56747e3b34e941a4cd301cd851c0b0b25
Author: Roc Marshal <flin...@126.com>
AuthorDate: Mon Jan 8 13:26:34 2024 +0800

    [FLINK-33988][configuration] Fix the invalid configuration when using 
initialized root logger level on yarn deployment mode
---
 .../org/apache/flink/yarn/YarnClusterDescriptor.java  | 19 +++++++++++++++++++
 1 file changed, 19 insertions(+)

diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
index a0732992e8b..97e49b8046a 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
@@ -136,6 +136,8 @@ import static 
org.apache.flink.client.deployment.application.ApplicationConfigur
 import static 
org.apache.flink.configuration.ConfigConstants.DEFAULT_FLINK_USR_LIB_DIR;
 import static org.apache.flink.configuration.ConfigConstants.ENV_FLINK_LIB_DIR;
 import static org.apache.flink.configuration.ConfigConstants.ENV_FLINK_OPT_DIR;
+import static 
org.apache.flink.configuration.ResourceManagerOptions.CONTAINERIZED_MASTER_ENV_PREFIX;
+import static 
org.apache.flink.configuration.ResourceManagerOptions.CONTAINERIZED_TASK_MANAGER_ENV_PREFIX;
 import static 
org.apache.flink.runtime.entrypoint.component.FileJobGraphRetriever.JOB_GRAPH_FILE_PATH;
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -204,6 +206,8 @@ public class YarnClusterDescriptor implements 
ClusterDescriptor<ApplicationId> {
         this.flinkConfiguration = 
Preconditions.checkNotNull(flinkConfiguration);
         this.userJarInclusion = getUserJarInclusionMode(flinkConfiguration);
 
+        adaptEnvSetting(flinkConfiguration, CoreOptions.FLINK_LOG_LEVEL, 
"ROOT_LOG_LEVEL");
+
         
getLocalFlinkDistPath(flinkConfiguration).ifPresent(this::setLocalJarPath);
         decodeFilesToShipToCluster(flinkConfiguration, 
YarnConfigOptions.SHIP_FILES)
                 .ifPresent(this::addShipFiles);
@@ -216,6 +220,21 @@ public class YarnClusterDescriptor implements 
ClusterDescriptor<ApplicationId> {
         this.nodeLabel = 
flinkConfiguration.getString(YarnConfigOptions.NODE_LABEL);
     }
 
+    /** Adapt flink env setting. */
+    private static <T> void adaptEnvSetting(
+            Configuration config, ConfigOption<T> configOption, String envKey) 
{
+        config.getOptional(configOption)
+                .ifPresent(
+                        value -> {
+                            config.setString(
+                                    CONTAINERIZED_MASTER_ENV_PREFIX + envKey,
+                                    String.valueOf(value));
+                            config.setString(
+                                    CONTAINERIZED_TASK_MANAGER_ENV_PREFIX + 
envKey,
+                                    String.valueOf(value));
+                        });
+    }
+
     private Optional<List<Path>> decodeFilesToShipToCluster(
             final Configuration configuration, final 
ConfigOption<List<String>> configOption) {
         checkNotNull(configuration);

Reply via email to