wangyang0918 commented on a change in pull request #15131:
URL: https://github.com/apache/flink/pull/15131#discussion_r595966404
##########
File path: flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
##########
@@ -197,21 +197,28 @@ private static LocalResource registerLocalResource(
}
public static void setTokensFor(
- ContainerLaunchContext amContainer, List<Path> paths,
Configuration conf)
+ ContainerLaunchContext amContainer,
+ List<Path> paths,
+ Configuration conf,
+ boolean yarnFetchDelegationEnabled)
Review comment:
```suggestion
boolean obtainingDelegationTokens)
```
##########
File path: flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
##########
@@ -197,21 +197,28 @@ private static LocalResource registerLocalResource(
}
public static void setTokensFor(
- ContainerLaunchContext amContainer, List<Path> paths,
Configuration conf)
+ ContainerLaunchContext amContainer,
+ List<Path> paths,
+ Configuration conf,
+ boolean yarnFetchDelegationEnabled)
throws IOException {
Credentials credentials = new Credentials();
- // for HDFS
- TokenCache.obtainTokensForNamenodes(credentials, paths.toArray(new
Path[0]), conf);
- // for HBase
- obtainTokenForHBase(credentials, conf);
+
+ if (yarnFetchDelegationEnabled) {
+ // for HDFS
+ TokenCache.obtainTokensForNamenodes(credentials, paths.toArray(new
Path[0]), conf);
+ // for HBase
+ obtainTokenForHBase(credentials, conf);
+ }
+
// for user
UserGroupInformation currUsr = UserGroupInformation.getCurrentUser();
Collection<Token<? extends TokenIdentifier>> usrTok =
currUsr.getTokens();
for (Token<? extends TokenIdentifier> token : usrTok) {
- final Text id = new Text(token.getIdentifier());
- LOG.info("Adding user token " + id + " with " + token);
- credentials.addToken(id, token);
+ final Text alias = new Text(token.getService());
Review comment:
I think using the `token.getService()` makes sense here. However, I
would suggest to factor out these changes into a separate commit.
##########
File path: docs/layouts/shortcodes/generated/yarn_config_configuration.html
##########
@@ -140,6 +140,12 @@
<td>List<String></td>
<td>A comma-separated list of additional Kerberos-secured Hadoop
filesystems Flink is going to access. For example,
yarn.security.kerberos.additionalFileSystems=hdfs://namenode2:9002,hdfs://namenode3:9003.
The client submitting to YARN needs to have access to these file systems to
retrieve the security tokens.</td>
</tr>
+ <tr>
+
<td><h5>yarn.security.kerberos.fetch.delegationToken.enabled</h5></td>
+ <td style="word-wrap: break-word;">true</td>
+ <td>Boolean</td>
+ <td>When this is true Flink will fetch HDFS/HBase delegation token
injected into AM container.</td>
Review comment:
Why do we have the "kerberos" in the config key? Does it mean the
delegation token only be obtained when "kerberos" is enabled?
##########
File path:
flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
##########
@@ -1079,15 +1080,24 @@ private ApplicationReport startAppMaster(
// setup security tokens
if (UserGroupInformation.isSecurityEnabled()) {
- // set HDFS delegation tokens when security is enabled
- LOG.info("Adding delegation token to the AM container.");
- List<Path> yarnAccessList =
- ConfigUtils.decodeListFromConfig(
- configuration, YarnConfigOptions.YARN_ACCESS,
Path::new);
+ List<Path> yarnAccessList = new ArrayList<>();
Review comment:
I would suggest to document that
`yarn.security.kerberos.additionalFileSystems` should be unset when obtaining
delegation tokens are disabled.
Maybe we could also add a precondition check instead of process it silently.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]