This is an automated email from the ASF dual-hosted git repository. fanrui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit eb6d6ff56747e3b34e941a4cd301cd851c0b0b25 Author: Roc Marshal <flin...@126.com> AuthorDate: Mon Jan 8 13:26:34 2024 +0800 [FLINK-33988][configuration] Fix the invalid configuration when using initialized root logger level on yarn deployment mode --- .../org/apache/flink/yarn/YarnClusterDescriptor.java | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java index a0732992e8b..97e49b8046a 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java @@ -136,6 +136,8 @@ import static org.apache.flink.client.deployment.application.ApplicationConfigur import static org.apache.flink.configuration.ConfigConstants.DEFAULT_FLINK_USR_LIB_DIR; import static org.apache.flink.configuration.ConfigConstants.ENV_FLINK_LIB_DIR; import static org.apache.flink.configuration.ConfigConstants.ENV_FLINK_OPT_DIR; +import static org.apache.flink.configuration.ResourceManagerOptions.CONTAINERIZED_MASTER_ENV_PREFIX; +import static org.apache.flink.configuration.ResourceManagerOptions.CONTAINERIZED_TASK_MANAGER_ENV_PREFIX; import static org.apache.flink.runtime.entrypoint.component.FileJobGraphRetriever.JOB_GRAPH_FILE_PATH; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -204,6 +206,8 @@ public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> { this.flinkConfiguration = Preconditions.checkNotNull(flinkConfiguration); this.userJarInclusion = getUserJarInclusionMode(flinkConfiguration); + adaptEnvSetting(flinkConfiguration, CoreOptions.FLINK_LOG_LEVEL, "ROOT_LOG_LEVEL"); + getLocalFlinkDistPath(flinkConfiguration).ifPresent(this::setLocalJarPath); decodeFilesToShipToCluster(flinkConfiguration, YarnConfigOptions.SHIP_FILES) .ifPresent(this::addShipFiles); @@ -216,6 +220,21 @@ public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> { this.nodeLabel = flinkConfiguration.getString(YarnConfigOptions.NODE_LABEL); } + /** Adapt flink env setting. */ + private static <T> void adaptEnvSetting( + Configuration config, ConfigOption<T> configOption, String envKey) { + config.getOptional(configOption) + .ifPresent( + value -> { + config.setString( + CONTAINERIZED_MASTER_ENV_PREFIX + envKey, + String.valueOf(value)); + config.setString( + CONTAINERIZED_TASK_MANAGER_ENV_PREFIX + envKey, + String.valueOf(value)); + }); + } + private Optional<List<Path>> decodeFilesToShipToCluster( final Configuration configuration, final ConfigOption<List<String>> configOption) { checkNotNull(configuration);