This is an automated email from the ASF dual-hosted git repository.

mbalassi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new f8f54bc1831 [FLINK-28044][runtime][security] Make hadoop filesystems 
configuration available to all deployment targets
f8f54bc1831 is described below

commit f8f54bc18319d9cb7ff3caf8d62513aee9742058
Author: gabor.g.somogyi <[email protected]>
AuthorDate: Tue Jun 14 12:11:01 2022 +0200

    [FLINK-28044][runtime][security] Make hadoop filesystems configuration 
available to all deployment targets
    
    This closes #19953.
---
 .../shortcodes/generated/security_auth_kerberos_section.html |  6 ++++++
 .../layouts/shortcodes/generated/security_configuration.html |  6 ++++++
 .../shortcodes/generated/yarn_config_configuration.html      |  6 ------
 .../java/org/apache/flink/configuration/SecurityOptions.java | 12 ++++++++++++
 .../java/org/apache/flink/runtime/util/HadoopUtilsTest.java  |  2 +-
 .../org/apache/flink/runtime/hadoop/HadoopUserUtilsTest.java |  6 ++++++
 .../java/org/apache/flink/yarn/YarnClusterDescriptor.java    |  9 ++++++---
 .../apache/flink/yarn/configuration/YarnConfigOptions.java   |  8 --------
 8 files changed, 37 insertions(+), 18 deletions(-)

diff --git 
a/docs/layouts/shortcodes/generated/security_auth_kerberos_section.html 
b/docs/layouts/shortcodes/generated/security_auth_kerberos_section.html
index b0cbc3179bd..0d97daba7f2 100644
--- a/docs/layouts/shortcodes/generated/security_auth_kerberos_section.html
+++ b/docs/layouts/shortcodes/generated/security_auth_kerberos_section.html
@@ -8,6 +8,12 @@
         </tr>
     </thead>
     <tbody>
+        <tr>
+            <td><h5>security.kerberos.access.hadoopFileSystems</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>List&lt;String&gt;</td>
+            <td>A comma-separated list of Kerberos-secured Hadoop filesystems 
Flink is going to access. For example, 
security.kerberos.access.hadoopFileSystems=hdfs://namenode2:9002,hdfs://namenode3:9003.
 The JobManager needs to have access to these filesystems to retrieve the 
security tokens.</td>
+        </tr>
         <tr>
             <td><h5>security.kerberos.fetch.delegation-token</h5></td>
             <td style="word-wrap: break-word;">true</td>
diff --git a/docs/layouts/shortcodes/generated/security_configuration.html 
b/docs/layouts/shortcodes/generated/security_configuration.html
index e0b9e6b0fc5..9b34482d61b 100644
--- a/docs/layouts/shortcodes/generated/security_configuration.html
+++ b/docs/layouts/shortcodes/generated/security_configuration.html
@@ -14,6 +14,12 @@
             <td>List&lt;String&gt;</td>
             <td>List of factories that should be used to instantiate a 
security context. If multiple are configured, Flink will use the first 
compatible factory. You should have a NoOpSecurityContextFactory in this list 
as a fallback.</td>
         </tr>
+        <tr>
+            <td><h5>security.kerberos.access.hadoopFileSystems</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>List&lt;String&gt;</td>
+            <td>A comma-separated list of Kerberos-secured Hadoop filesystems 
Flink is going to access. For example, 
security.kerberos.access.hadoopFileSystems=hdfs://namenode2:9002,hdfs://namenode3:9003.
 The JobManager needs to have access to these filesystems to retrieve the 
security tokens.</td>
+        </tr>
         <tr>
             <td><h5>security.kerberos.fetch.delegation-token</h5></td>
             <td style="word-wrap: break-word;">true</td>
diff --git a/docs/layouts/shortcodes/generated/yarn_config_configuration.html 
b/docs/layouts/shortcodes/generated/yarn_config_configuration.html
index 086d7045dee..28757829dbe 100644
--- a/docs/layouts/shortcodes/generated/yarn_config_configuration.html
+++ b/docs/layouts/shortcodes/generated/yarn_config_configuration.html
@@ -134,12 +134,6 @@
             <td>List&lt;String&gt;</td>
             <td>A semicolon-separated list of provided lib directories. They 
should be pre-uploaded and world-readable. Flink will use them to exclude the 
local Flink jars(e.g. flink-dist, lib/, plugins/)uploading to accelerate the 
job submission process. Also YARN will cache them on the nodes so that they 
doesn't need to be downloaded every time for each application. An example could 
be hdfs://$namenode_address/path/of/flink/lib</td>
         </tr>
-        <tr>
-            <td><h5>yarn.security.kerberos.additionalFileSystems</h5></td>
-            <td style="word-wrap: break-word;">(none)</td>
-            <td>List&lt;String&gt;</td>
-            <td>A comma-separated list of additional Kerberos-secured Hadoop 
filesystems Flink is going to access. For example, 
yarn.security.kerberos.additionalFileSystems=hdfs://namenode2:9002,hdfs://namenode3:9003.
 The client submitting to YARN needs to have access to these file systems to 
retrieve the security tokens.</td>
-        </tr>
         <tr>
             <td><h5>yarn.security.kerberos.localized-keytab-path</h5></td>
             <td style="word-wrap: break-word;">"krb5.keytab"</td>
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java 
b/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
index e67d3edc5b2..9205bd32e40 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
@@ -148,6 +148,18 @@ public class SecurityOptions {
                     .withDescription(
                             "Ratio of the tokens's expiration time when new 
credentials should be re-obtained.");
 
+    @Documentation.Section(Documentation.Sections.SECURITY_AUTH_KERBEROS)
+    public static final ConfigOption<List<String>> 
KERBEROS_HADOOP_FILESYSTEMS_TO_ACCESS =
+            key("security.kerberos.access.hadoopFileSystems")
+                    .stringType()
+                    .asList()
+                    .noDefaultValue()
+                    
.withDeprecatedKeys("yarn.security.kerberos.additionalFileSystems")
+                    .withDescription(
+                            "A comma-separated list of Kerberos-secured Hadoop 
filesystems Flink is going to access. For example, "
+                                    + 
"security.kerberos.access.hadoopFileSystems=hdfs://namenode2:9002,hdfs://namenode3:9003.
 "
+                                    + "The JobManager needs to have access to 
these filesystems to retrieve the security tokens.");
+
     // ------------------------------------------------------------------------
     //  ZooKeeper Security Options
     // ------------------------------------------------------------------------
diff --git 
a/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/util/HadoopUtilsTest.java
 
b/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/util/HadoopUtilsTest.java
index 39c8ed722a7..1fcd0728b50 100644
--- 
a/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/util/HadoopUtilsTest.java
+++ 
b/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/util/HadoopUtilsTest.java
@@ -49,7 +49,7 @@ public class HadoopUtilsTest extends TestLogger {
     }
 
     @AfterClass
-    public static void cleanupHadoopConfigs() throws KrbException {
+    public static void cleanupHadoopConfigs() {
         UserGroupInformation.setConfiguration(new Configuration());
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/hadoop/HadoopUserUtilsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/hadoop/HadoopUserUtilsTest.java
index 2fa454a084c..0c09db21e7a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/hadoop/HadoopUserUtilsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/hadoop/HadoopUserUtilsTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.hadoop;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
+import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 import sun.security.krb5.KrbException;
@@ -39,6 +40,11 @@ class HadoopUserUtilsTest {
         sun.security.krb5.Config.refresh();
     }
 
+    @AfterAll
+    public static void cleanupHadoopConfigs() {
+        UserGroupInformation.setConfiguration(new Configuration());
+    }
+
     @Test
     public void testIsProxyUserShouldReturnFalseWhenNormalUser() {
         UserGroupInformation.setConfiguration(
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
index 0ae2e699fde..5cba3a233bd 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
@@ -550,13 +550,14 @@ public class YarnClusterDescriptor implements 
ClusterDescriptor<ApplicationId> {
                     
flinkConfiguration.getBoolean(SecurityOptions.KERBEROS_FETCH_DELEGATION_TOKEN);
             final boolean yarnAccessFSEnabled =
                     !CollectionUtil.isNullOrEmpty(
-                            
flinkConfiguration.get(YarnConfigOptions.YARN_ACCESS));
+                            flinkConfiguration.get(
+                                    
SecurityOptions.KERBEROS_HADOOP_FILESYSTEMS_TO_ACCESS));
             if (!fetchToken && yarnAccessFSEnabled) {
                 throw new IllegalConfigurationException(
                         String.format(
                                 "When %s is disabled, %s must be disabled as 
well.",
                                 
SecurityOptions.KERBEROS_FETCH_DELEGATION_TOKEN.key(),
-                                YarnConfigOptions.YARN_ACCESS.key()));
+                                
SecurityOptions.KERBEROS_HADOOP_FILESYSTEMS_TO_ACCESS.key()));
             }
         }
 
@@ -1150,7 +1151,9 @@ public class YarnClusterDescriptor implements 
ClusterDescriptor<ApplicationId> {
             if (fetchToken) {
                 List<Path> yarnAccessList =
                         ConfigUtils.decodeListFromConfig(
-                                configuration, YarnConfigOptions.YARN_ACCESS, 
Path::new);
+                                configuration,
+                                
SecurityOptions.KERBEROS_HADOOP_FILESYSTEMS_TO_ACCESS,
+                                Path::new);
                 pathsToObtainToken.addAll(yarnAccessList);
                 pathsToObtainToken.addAll(fileUploader.getRemotePaths());
             }
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
index f7b2d35ec8b..93a7ea0fb6b 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
@@ -340,14 +340,6 @@ public class YarnConfigOptions {
                                     + "they doesn't need to be downloaded 
every time for each application. An example could be "
                                     + 
"hdfs://$namenode_address/path/of/flink/lib");
 
-    public static final ConfigOption<List<String>> YARN_ACCESS =
-            key("yarn.security.kerberos.additionalFileSystems")
-                    .stringType()
-                    .asList()
-                    .noDefaultValue()
-                    .withDescription(
-                            "A comma-separated list of additional 
Kerberos-secured Hadoop filesystems Flink is going to access. For example, 
yarn.security.kerberos.additionalFileSystems=hdfs://namenode2:9002,hdfs://namenode3:9003.
 The client submitting to YARN needs to have access to these file systems to 
retrieve the security tokens.");
-
     @SuppressWarnings("unused")
     public static final ConfigOption<String> HADOOP_CONFIG_KEY =
             key("flink.hadoop.<key>")

Reply via email to