[
https://issues.apache.org/jira/browse/GOBBLIN-1308?focusedWorklogId=518248&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-518248
]
ASF GitHub Bot logged work on GOBBLIN-1308:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 01/Dec/20 02:39
Start Date: 01/Dec/20 02:39
Worklog Time Spent: 10m
Work Description: ZihanLi58 commented on a change in pull request #3157:
URL: https://github.com/apache/incubator-gobblin/pull/3157#discussion_r533030983
##########
File path:
gobblin-utility/src/main/java/org/apache/gobblin/util/hadoop/TokenUtils.java
##########
@@ -280,84 +314,122 @@ private static void getJhToken(Configuration conf,
Credentials cred) throws IOEx
}
if (jhToken == null) {
- LOG.error("getDelegationTokenFromHS() returned null");
+ log.error("getDelegationTokenFromHS() returned null");
throw new IOException("Unable to fetch JH token.");
}
- LOG.info("Created JH token: " + jhToken.toString());
- LOG.info("Token kind: " + jhToken.getKind());
- LOG.info("Token id: " + Arrays.toString(jhToken.getIdentifier()));
- LOG.info("Token service: " + jhToken.getService());
+ log.info("Created JH token: " + jhToken.toString());
+ log.info("Token kind: " + jhToken.getKind());
+ log.info("Token id: " + Arrays.toString(jhToken.getIdentifier()));
+ log.info("Token service: " + jhToken.getService());
cred.addToken(jhToken.getService(), jhToken);
}
- private static void getFsAndJtTokens(final State state, final Configuration
conf, final Optional<String> userToProxy,
- final Credentials cred) throws IOException, InterruptedException {
+ private static void getJtTokens(final Configuration conf, final Credentials
cred, final Optional<String> userToProxy,
+ final State state) throws IOException, InterruptedException {
if (userToProxy.isPresent()) {
UserGroupInformation.createProxyUser(userToProxy.get(),
UserGroupInformation.getLoginUser())
.doAs(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
- getFsAndJtTokensImpl(state, conf, cred);
+ getJtTokensImpl(state, conf, cred);
return null;
}
});
} else {
- getFsAndJtTokensImpl(state, conf, cred);
+ getJtTokensImpl(state, conf, cred);
}
}
- private static void getFsAndJtTokensImpl(final State state, final
Configuration conf, final Credentials cred)
+ private static void getJtTokensImpl(final State state, final Configuration
conf, final Credentials cred)
throws IOException {
- getHdfsToken(conf, cred);
- if (state.contains(OTHER_NAMENODES)) {
- getOtherNamenodesToken(state.getPropAsList(OTHER_NAMENODES), conf, cred);
- }
getJtToken(cred);
}
- private static void getHdfsToken(Configuration conf, Credentials cred)
throws IOException {
- FileSystem fs = FileSystem.get(conf);
- LOG.info("Getting DFS token from " + fs.getUri());
- String renewer = getMRTokenRenewerInternal(new JobConf()).toString();
- Token<?>[] fsTokens = fs.addDelegationTokens(renewer, cred);
- for(int i = 0; i < fsTokens.length; i++) {
- Token<?> token = fsTokens[i];
- String message =
- String.format("DFS token fetched from namenode, token kind: %s,
token service %s", token.getKind(),
- token.getService());
- LOG.info(message);
+ public static void getAllFSTokens(final Configuration conf, final
Credentials cred, final String renewer,
+ final Optional<String> userToProxy, final
List<String> remoteFSURIList) throws IOException, InterruptedException {
+
+ if (userToProxy.isPresent()) {
+ UserGroupInformation.createProxyUser(userToProxy.get(),
UserGroupInformation.getLoginUser())
+ .doAs(new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws Exception {
+ getAllFSTokensImpl(conf, cred, renewer, remoteFSURIList);
+ return null;
+ }
+ });
+ } else {
+ getAllFSTokensImpl(conf, cred, renewer, remoteFSURIList);
+ }
+ }
+
+ public static void getAllFSTokensImpl(Configuration conf, Credentials cred,
String renewer, List<String> remoteFSURIList) {
+ try {
+ FileSystem fs = FileSystem.get(conf);
+ if (StringUtils.isEmpty(renewer)) {
+ renewer = getMRTokenRenewerInternal(new JobConf()).toString();
+ log.info("No renewer specified for FS: {}, taking default renewer:
{}", fs.getUri(), renewer);
+ }
+
+ log.debug("Getting HDFS token for" + fs.getUri() + " with renewer: " +
renewer);
+ Token<?>[] fsTokens = fs.addDelegationTokens(renewer, cred);
+ if (fsTokens != null) {
+ for (Token<?> token : fsTokens) {
+ log.info("FS Uri: " + fs.getUri() + " token: " + token);
+ }
+ }
+
+ // Handle remote namenodes if any
+ if(remoteFSURIList !=null && remoteFSURIList.size() >0){
+ getRemoteFSTokenFromURI(conf, cred, remoteFSURIList, renewer);
+ }
+
+ log.debug("All credential tokens: " + cred.getAllTokens());
+ } catch (IOException e) {
+ log.error("Error getting or creating HDFS token with renewer: "+
renewer);
}
+
}
- private static void getOtherNamenodesToken(List<String> otherNamenodes,
Configuration conf, Credentials cred)
+ public static void getRemoteFSTokenFromURI(Configuration conf, Credentials
cred, List<String> otherNamenodes, String renewer)
throws IOException {
- LOG.info(OTHER_NAMENODES + ": " + otherNamenodes);
+ log.debug("Getting tokens for other namenodes: " + otherNamenodes);
Path[] ps = new Path[otherNamenodes.size()];
for (int i = 0; i < ps.length; i++) {
ps[i] = new Path(otherNamenodes.get(i).trim());
+ FileSystem otherNameNodeFS = ps[i].getFileSystem(conf);
+
+ if (StringUtils.isEmpty(renewer)) {
+ TokenCache.obtainTokensForNamenodes(cred, ps, conf);
+ } else {
+ final Token<?>[] tokens = otherNameNodeFS.addDelegationTokens(renewer,
cred);
+ if (tokens != null) {
+ for (Token<?> token : tokens) {
+ log.info("Got dt token for " + otherNameNodeFS.getUri() + "; " +
token);
+ }
+ }
+ }
}
- TokenCache.obtainTokensForNamenodes(cred, ps, conf);
Review comment:
I take a look at the implementation of
`TokenCache.obtainTokensForNamenodes(cred, ps, conf);` Seems like the origin
implementation do support get remote token by get it from the remote fs. Why we
add this separately? To support use a non-default renewer?
##########
File path:
gobblin-utility/src/main/java/org/apache/gobblin/util/hadoop/TokenUtils.java
##########
@@ -132,6 +134,35 @@ public Void run() throws Exception {
return ugi;
}
+ public static void getHadoopFSTokens(final State state, Optional<File>
tokenFile, final Credentials cred, final String renewer)
Review comment:
Should we also get renewer from the state here?
##########
File path:
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/AbstractYarnAppSecurityManager.java
##########
@@ -117,7 +118,11 @@ protected void startUp() throws Exception {
this.loginExecutor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
- loginAndScheduleTokenRenewal();
+ try {
+ loginAndScheduleTokenRenewal();
+ }catch(Exception e){
+ LOGGER.error("Error login using keytab, will continue the thread and
try next time.");
Review comment:
Change the message to "Error during login, will continue the thread and
try next time." Since this is super class and we may not use keytab to login
##########
File path:
gobblin-utility/src/main/java/org/apache/gobblin/util/hadoop/TokenUtils.java
##########
@@ -280,84 +314,122 @@ private static void getJhToken(Configuration conf,
Credentials cred) throws IOEx
}
if (jhToken == null) {
- LOG.error("getDelegationTokenFromHS() returned null");
+ log.error("getDelegationTokenFromHS() returned null");
throw new IOException("Unable to fetch JH token.");
}
- LOG.info("Created JH token: " + jhToken.toString());
- LOG.info("Token kind: " + jhToken.getKind());
- LOG.info("Token id: " + Arrays.toString(jhToken.getIdentifier()));
- LOG.info("Token service: " + jhToken.getService());
+ log.info("Created JH token: " + jhToken.toString());
+ log.info("Token kind: " + jhToken.getKind());
+ log.info("Token id: " + Arrays.toString(jhToken.getIdentifier()));
+ log.info("Token service: " + jhToken.getService());
cred.addToken(jhToken.getService(), jhToken);
}
- private static void getFsAndJtTokens(final State state, final Configuration
conf, final Optional<String> userToProxy,
- final Credentials cred) throws IOException, InterruptedException {
+ private static void getJtTokens(final Configuration conf, final Credentials
cred, final Optional<String> userToProxy,
+ final State state) throws IOException, InterruptedException {
if (userToProxy.isPresent()) {
UserGroupInformation.createProxyUser(userToProxy.get(),
UserGroupInformation.getLoginUser())
.doAs(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
- getFsAndJtTokensImpl(state, conf, cred);
+ getJtTokensImpl(state, conf, cred);
return null;
}
});
} else {
- getFsAndJtTokensImpl(state, conf, cred);
+ getJtTokensImpl(state, conf, cred);
}
}
- private static void getFsAndJtTokensImpl(final State state, final
Configuration conf, final Credentials cred)
+ private static void getJtTokensImpl(final State state, final Configuration
conf, final Credentials cred)
throws IOException {
- getHdfsToken(conf, cred);
- if (state.contains(OTHER_NAMENODES)) {
- getOtherNamenodesToken(state.getPropAsList(OTHER_NAMENODES), conf, cred);
- }
getJtToken(cred);
}
- private static void getHdfsToken(Configuration conf, Credentials cred)
throws IOException {
- FileSystem fs = FileSystem.get(conf);
- LOG.info("Getting DFS token from " + fs.getUri());
- String renewer = getMRTokenRenewerInternal(new JobConf()).toString();
- Token<?>[] fsTokens = fs.addDelegationTokens(renewer, cred);
- for(int i = 0; i < fsTokens.length; i++) {
- Token<?> token = fsTokens[i];
- String message =
- String.format("DFS token fetched from namenode, token kind: %s,
token service %s", token.getKind(),
- token.getService());
- LOG.info(message);
+ public static void getAllFSTokens(final Configuration conf, final
Credentials cred, final String renewer,
+ final Optional<String> userToProxy, final
List<String> remoteFSURIList) throws IOException, InterruptedException {
+
+ if (userToProxy.isPresent()) {
+ UserGroupInformation.createProxyUser(userToProxy.get(),
UserGroupInformation.getLoginUser())
+ .doAs(new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws Exception {
+ getAllFSTokensImpl(conf, cred, renewer, remoteFSURIList);
+ return null;
+ }
+ });
+ } else {
+ getAllFSTokensImpl(conf, cred, renewer, remoteFSURIList);
+ }
+ }
+
+ public static void getAllFSTokensImpl(Configuration conf, Credentials cred,
String renewer, List<String> remoteFSURIList) {
+ try {
+ FileSystem fs = FileSystem.get(conf);
+ if (StringUtils.isEmpty(renewer)) {
+ renewer = getMRTokenRenewerInternal(new JobConf()).toString();
+ log.info("No renewer specified for FS: {}, taking default renewer:
{}", fs.getUri(), renewer);
+ }
+
+ log.debug("Getting HDFS token for" + fs.getUri() + " with renewer: " +
renewer);
+ Token<?>[] fsTokens = fs.addDelegationTokens(renewer, cred);
+ if (fsTokens != null) {
+ for (Token<?> token : fsTokens) {
+ log.info("FS Uri: " + fs.getUri() + " token: " + token);
+ }
+ }
+
+ // Handle remote namenodes if any
+ if(remoteFSURIList !=null && remoteFSURIList.size() >0){
+ getRemoteFSTokenFromURI(conf, cred, remoteFSURIList, renewer);
+ }
+
+ log.debug("All credential tokens: " + cred.getAllTokens());
+ } catch (IOException e) {
+ log.error("Error getting or creating HDFS token with renewer: "+
renewer);
}
+
}
- private static void getOtherNamenodesToken(List<String> otherNamenodes,
Configuration conf, Credentials cred)
+ public static void getRemoteFSTokenFromURI(Configuration conf, Credentials
cred, List<String> otherNamenodes, String renewer)
throws IOException {
- LOG.info(OTHER_NAMENODES + ": " + otherNamenodes);
+ log.debug("Getting tokens for other namenodes: " + otherNamenodes);
Path[] ps = new Path[otherNamenodes.size()];
for (int i = 0; i < ps.length; i++) {
ps[i] = new Path(otherNamenodes.get(i).trim());
+ FileSystem otherNameNodeFS = ps[i].getFileSystem(conf);
+
+ if (StringUtils.isEmpty(renewer)) {
Review comment:
renewer here seems to be "getMRTokenRenewerInternal(new
JobConf()).toString()" defined on line 372, will it be empty? this may end up
with we forcing to obtain remote token from remote fs
##########
File path:
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java
##########
@@ -60,15 +60,19 @@
/**
* Write a {@link Token} to a given file.
*
- * @param token the token to write
* @param tokenFilePath the token file path
+ * @param credentials all tokens of this credentials to be written to given
file
* @param configuration a {@link Configuration} object carrying Hadoop
configuration properties
* @throws IOException
*/
- public static void writeTokenToFile(Token<? extends TokenIdentifier> token,
Path tokenFilePath,
- Configuration configuration) throws IOException {
- Credentials credentials = new Credentials();
- credentials.addToken(token.getService(), token);
+ public static void writeTokenToFile(Path tokenFilePath, Credentials
credentials, Configuration configuration) throws IOException {
+ if(credentials == null) {
+ LOGGER.warn("got empty credentials, creating default one as new.");
+ credentials = new Credentials();
+ }
+ // TODO: token needs to be removed from the function signature as well
Review comment:
Remove useless comment
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 518248)
Time Spent: 0.5h (was: 20m)
> Gobblin's kerberos token management for remote clusters
> -------------------------------------------------------
>
> Key: GOBBLIN-1308
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1308
> Project: Apache Gobblin
> Issue Type: Improvement
> Affects Versions: 0.15.0
> Reporter: Jay Sen
> Priority: Major
> Fix For: 0.16.0
>
> Time Spent: 0.5h
> Remaining Estimate: 0h
>
> Gobblin's hadoop tokens/ key management :
> Problem: Gobblin only maintains local cluster tokens when key management is
> enabled. and does not have capability to manage tokens for remote hadoop
> cluster. ( based on my conversation with many folks here, the token files can
> be made available externally. but that would require that external system
> running on cron or something )
> Solution: add remote cluster token management in Gobblin. where remote
> clusters key can be managed same way it manages the local clusters keys.
>
> Config looks like following
> ( Changes the enable.key.management config to key.management.enabled )
>
> {code:java}
> gobblin.hadoop.key.management {
> enabled = true
> remote.clusters = [ ${gobblin_sync_systems.hadoop_cluster1},
> ${gobblin_sync_systems.hadoop_cluster2} ]
> }
> // These Gobblin platform configurations can be moved to database for other
> use-cases, but this layout helps make the platform moduler for each
> connectors.
> gobblin_sync_systems {
> hadoop_cluster1 {
> // if Hadoop config path is specified, the FileSystem will be created based
> on all the xml config provided here, which has all the required info.
> hadoop_config_path = "file:///etc/hadoop_cluster1/hadoop/config"
> // If hadoop config path is not specified, you can still specify the
> speecific nodes for the specific type of tokens
> namenode_uri = ["hdfs://nn1.hadoop_cluster1.example.com:8020",
> "hdfs://nn2.hadoop_cluster1.example.com:8020"]
> kms_nodes = [ "kms1.hadoop_cluster1.example.com:9292",
> "kms2.hadoop_cluster1.example.com:9292" ]
> }
> hadoop_cluster2 {
> hadoop_config_path = "file:///etc/hadoop_cluster1/hadoop/config"
> namenode_uri = ["hdfs://nn1.hadoop_cluster2.example.com:8020",
> "hdfs://nn2.hadoop_cluster2.example.com:8020"]
> kms_nodes = [ "kms1.hadoop_cluster2.example.com:9292",
> "kms2.hadoop_cluster2.example.com:9292" ]
> }
> }{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)