wangyang0918 commented on a change in pull request #15131:
URL: https://github.com/apache/flink/pull/15131#discussion_r627903253



##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
##########
@@ -529,6 +529,18 @@ public void killCluster(ApplicationId applicationId) 
throws FlinkException {
                         "Hadoop security with Kerberos is enabled but the 
login user "
                                 + "does not have Kerberos credentials or 
delegation tokens!");
             }
+
+            boolean fetchToken =
+                    
flinkConfiguration.getBoolean(SecurityOptions.KERBEROS_FETCH_DELEGATION_TOKEN);
+            boolean yarnAccessFSEnabled =

Review comment:
       We could use `CollectionUtil.isNullOrEmpty` to check whether 
`YARN_ACCESS` is not set.

##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
##########
@@ -1081,13 +1093,16 @@ private ApplicationReport startAppMaster(
         if (UserGroupInformation.isSecurityEnabled()) {
             // set HDFS delegation tokens when security is enabled
             LOG.info("Adding delegation token to the AM container.");
-            List<Path> yarnAccessList =
-                    ConfigUtils.decodeListFromConfig(
-                            configuration, YarnConfigOptions.YARN_ACCESS, 
Path::new);
-            Utils.setTokensFor(
-                    amContainer,
-                    ListUtils.union(yarnAccessList, 
fileUploader.getRemotePaths()),
-                    yarnConfiguration);
+            List<Path> pathsToObtainToken = Collections.emptyList();

Review comment:
       nit: I would like to create a `final` list and add them explicitly.
   
   This could avoid `pathsToObtainToken` is changed unexpectedly. And also the 
IDE complains the `Uncheck assignment` when using `ListUtils.union`.
   
   
   ```
   final List<Path> pathsToObtainToken = new ArrayList<>();
   ...
   pathsToObtainToken.addAll(yarnAccessList);
   pathsToObtainToken.addAll(fileUploader.getRemotePaths());
   ```

##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
##########
@@ -529,6 +529,18 @@ public void killCluster(ApplicationId applicationId) 
throws FlinkException {
                         "Hadoop security with Kerberos is enabled but the 
login user "
                                 + "does not have Kerberos credentials or 
delegation tokens!");
             }
+
+            boolean fetchToken =

Review comment:
       nit: `fetchToken` and `yarnAccessFSEnabled` could be `final.`

##########
File path: flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
##########
@@ -197,21 +196,28 @@ private static LocalResource registerLocalResource(
     }
 
     public static void setTokensFor(
-            ContainerLaunchContext amContainer, List<Path> paths, 
Configuration conf)
+            ContainerLaunchContext amContainer,
+            List<Path> paths,
+            Configuration conf,
+            boolean obtainingDelegationTokens)
             throws IOException {
         Credentials credentials = new Credentials();
-        // for HDFS
-        TokenCache.obtainTokensForNamenodes(credentials, paths.toArray(new 
Path[0]), conf);
-        // for HBase
-        obtainTokenForHBase(credentials, conf);
+
+        if (obtainingDelegationTokens) {
+            LOG.info("Obtaining delegation tokens for HDFS and HBase.");
+            // for HDFS
+            TokenCache.obtainTokensForNamenodes(credentials, paths.toArray(new 
Path[0]), conf);
+            // for HBase
+            obtainTokenForHBase(credentials, conf);
+        }
+
         // for user
         UserGroupInformation currUsr = UserGroupInformation.getCurrentUser();
 
         Collection<Token<? extends TokenIdentifier>> usrTok = 
currUsr.getTokens();
         for (Token<? extends TokenIdentifier> token : usrTok) {
-            final Text id = new Text(token.getIdentifier());
-            LOG.info("Adding user token " + id + " with " + token);
-            credentials.addToken(id, token);
+            LOG.info("Adding user token " + token.getService() + " with " + 
token);

Review comment:
       IIUC, we will move the second commit to a separate ticket. Right?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to