tillrohrmann commented on a change in pull request #14477:
URL: https://github.com/apache/flink/pull/14477#discussion_r553198430



##########
File path: 
flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java
##########
@@ -114,6 +117,24 @@ public static Configuration getHadoopConfiguration(
                     addHadoopConfIfFound(result, hadoopConfDir) || 
foundHadoopConfiguration;
         }
 
+        // Approach 4: Flink configuration
+        // add all configuration key with prefix `flink.hadoop.` in flink conf 
to hadoop conf
+        for (String key : flinkConfiguration.keySet()) {
+            for (String prefix : FLINK_CONFIG_PREFIXES) {
+                if (key.startsWith(prefix)) {
+                    String newKey = key.substring(prefix.length());
+                    String value = flinkConfiguration.getString(key, null);
+                    result.set(newKey, value);
+                    LOG.debug(
+                            "Adding Flink config entry for {} as {}={} to 
Hadoop config",
+                            key,
+                            newKey,
+                            value);
+                    foundHadoopConfiguration = true;
+                }
+            }
+        }

Review comment:
       This code block and the one for the Yarn configuration look pretty much 
the same. Maybe we can deduplicate them.

##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java
##########
@@ -77,6 +78,8 @@ public ApplicationId getClusterId(Configuration 
configuration) {
     private YarnClusterDescriptor getClusterDescriptor(Configuration 
configuration) {
         final YarnClient yarnClient = YarnClient.createYarnClient();
         final YarnConfiguration yarnConfiguration = new YarnConfiguration();
+        
yarnConfiguration.addResource(HadoopUtils.getHadoopConfiguration(configuration));

Review comment:
       Why is it necessary to add the `HadoopConfiguration` here?

##########
File path: docs/_includes/generated/yarn_config_configuration.html
##########
@@ -14,6 +14,20 @@
             <td>String</td>
             <td>If configured, Flink will add this key to the resource profile 
of container request to Yarn. The value will be set to the value of 
external-resource.&lt;resource_name&gt;.amount.</td>
         </tr>
+        <tr>
+            <td><h5>flink.hadoop.&lt;hadoop_key&gt;</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>String</td>
+            <td>A general option to probe Hadoop configuration through prefix 
`flink.hadoop.`. Flink will remove the prefix to get hadoop_key (from <a 
href="https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/core-default.xml";>core-default.xml</a>
 and <a 
href="https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/hdfs-default.xml";>hdfs-default.xml</a>)
 then set the hadoop_key and value to Hadoop configuration. For example, 
flink.hadoop.dfs.replication=5 in Flink configuration and convert to 
dfs.replication=5 in Hadoop configuration.
+            </td>
+        </tr>
+        <tr>
+            <td><h5>flink.yarn.&lt;key&gt;</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>String</td>
+            <td>A general option to probe Yarn configuration through prefix 
`flink.yarn.`. Flink will remove the prefix `flink.` to get yarn_key (from <a 
href="https://hadoop.apache.org/docs/stable/hadoop-yarn/hadoop-yarn-common/yarn-default.xml";>yarn-default.xml</a>)
 then set the yarn_key and value to Yarn configuration. For example, 
flink.yarn.resourcemanager.container.liveness-monitor.interval-ms=300000 in 
Flink configuration and convert to 
yarn.resourcemanager.container.liveness-monitor.interval-ms=300000 in Yarn 
configuration.
+            </td>
+        </tr>

Review comment:
       This file is generated. I think we can add a `ConfigOption` with the key 
`flink.yarn.<key>` and `flink.hadoop.<key>` and the corresponding descriptions 
to `YarnConfigOptions`.

##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java
##########
@@ -77,6 +78,8 @@ public ApplicationId getClusterId(Configuration 
configuration) {
     private YarnClusterDescriptor getClusterDescriptor(Configuration 
configuration) {
         final YarnClient yarnClient = YarnClient.createYarnClient();
         final YarnConfiguration yarnConfiguration = new YarnConfiguration();
+        
yarnConfiguration.addResource(HadoopUtils.getHadoopConfiguration(configuration));
+        
yarnConfiguration.addResource(Utils.getYarnConfiguration(configuration));

Review comment:
       Would it make sense to load the `YarnConfiguration` through a utility. 
Maybe even `HadoopUtils`? That way it would always be consistent.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to