wangyang0918 commented on a change in pull request #15131:
URL: https://github.com/apache/flink/pull/15131#discussion_r595966404



##########
File path: flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
##########
@@ -197,21 +197,28 @@ private static LocalResource registerLocalResource(
     }
 
     public static void setTokensFor(
-            ContainerLaunchContext amContainer, List<Path> paths, 
Configuration conf)
+            ContainerLaunchContext amContainer,
+            List<Path> paths,
+            Configuration conf,
+            boolean yarnFetchDelegationEnabled)

Review comment:
       ```suggestion
               boolean obtainingDelegationTokens)
   ```

##########
File path: flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
##########
@@ -197,21 +197,28 @@ private static LocalResource registerLocalResource(
     }
 
     public static void setTokensFor(
-            ContainerLaunchContext amContainer, List<Path> paths, 
Configuration conf)
+            ContainerLaunchContext amContainer,
+            List<Path> paths,
+            Configuration conf,
+            boolean yarnFetchDelegationEnabled)
             throws IOException {
         Credentials credentials = new Credentials();
-        // for HDFS
-        TokenCache.obtainTokensForNamenodes(credentials, paths.toArray(new 
Path[0]), conf);
-        // for HBase
-        obtainTokenForHBase(credentials, conf);
+
+        if (yarnFetchDelegationEnabled) {
+            // for HDFS
+            TokenCache.obtainTokensForNamenodes(credentials, paths.toArray(new 
Path[0]), conf);
+            // for HBase
+            obtainTokenForHBase(credentials, conf);
+        }
+
         // for user
         UserGroupInformation currUsr = UserGroupInformation.getCurrentUser();
 
         Collection<Token<? extends TokenIdentifier>> usrTok = 
currUsr.getTokens();
         for (Token<? extends TokenIdentifier> token : usrTok) {
-            final Text id = new Text(token.getIdentifier());
-            LOG.info("Adding user token " + id + " with " + token);
-            credentials.addToken(id, token);
+            final Text alias = new Text(token.getService());

Review comment:
       I think using the `token.getService()` makes sense here. However, I 
would suggest to factor out these changes into a separate commit.

##########
File path: docs/layouts/shortcodes/generated/yarn_config_configuration.html
##########
@@ -140,6 +140,12 @@
             <td>List&lt;String&gt;</td>
             <td>A comma-separated list of additional Kerberos-secured Hadoop 
filesystems Flink is going to access. For example, 
yarn.security.kerberos.additionalFileSystems=hdfs://namenode2:9002,hdfs://namenode3:9003.
 The client submitting to YARN needs to have access to these file systems to 
retrieve the security tokens.</td>
         </tr>
+        <tr>
+            
<td><h5>yarn.security.kerberos.fetch.delegationToken.enabled</h5></td>
+            <td style="word-wrap: break-word;">true</td>
+            <td>Boolean</td>
+            <td>When this is true Flink will fetch HDFS/HBase delegation token 
injected into AM container.</td>

Review comment:
       Why do we have the "kerberos" in the config key? Does it mean the 
delegation token only be obtained when "kerberos" is enabled?

##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
##########
@@ -1079,15 +1080,24 @@ private ApplicationReport startAppMaster(
 
         // setup security tokens
         if (UserGroupInformation.isSecurityEnabled()) {
-            // set HDFS delegation tokens when security is enabled
-            LOG.info("Adding delegation token to the AM container.");
-            List<Path> yarnAccessList =
-                    ConfigUtils.decodeListFromConfig(
-                            configuration, YarnConfigOptions.YARN_ACCESS, 
Path::new);
+            List<Path> yarnAccessList = new ArrayList<>();

Review comment:
       I would suggest to document that 
`yarn.security.kerberos.additionalFileSystems` should be unset  when obtaining 
delegation tokens are disabled.
   
   Maybe we could also add a precondition check instead of process it silently.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to