This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new ab38bfe [GOBBLIN-1308] ability to manage token for remote cluster
ab38bfe is described below
commit ab38bfe904df8e02f0550dfea03d50866d7fee64
Author: Jay Sen <[email protected]>
AuthorDate: Wed Jan 20 14:26:27 2021 -0800
[GOBBLIN-1308] ability to manage token for remote cluster
Closes #3157 from jhsenjaliya/GOBBLIN-1308
---
.../org/apache/gobblin/util/hadoop/TokenUtils.java | 234 ++++++++++++++-------
.../yarn/AbstractYarnAppSecurityManager.java | 35 ++-
.../gobblin/yarn/GobblinYarnAppLauncher.java | 33 ++-
.../yarn/YarnAppSecurityManagerWithKeytabs.java | 60 +++++-
.../org/apache/gobblin/yarn/YarnHelixUtils.java | 13 +-
.../gobblin/yarn/YarnSecurityManagerTest.java | 40 ++--
6 files changed, 279 insertions(+), 136 deletions(-)
diff --git
a/gobblin-utility/src/main/java/org/apache/gobblin/util/hadoop/TokenUtils.java
b/gobblin-utility/src/main/java/org/apache/gobblin/util/hadoop/TokenUtils.java
index fd6463d..7bb4fee 100644
---
a/gobblin-utility/src/main/java/org/apache/gobblin/util/hadoop/TokenUtils.java
+++
b/gobblin-utility/src/main/java/org/apache/gobblin/util/hadoop/TokenUtils.java
@@ -20,17 +20,8 @@ package org.apache.gobblin.util.hadoop;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.security.PrivilegedExceptionAction;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.regex.Pattern;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
import org.apache.gobblin.configuration.State;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -55,9 +46,21 @@ import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.log4j.Logger;
import org.apache.thrift.TException;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.regex.Pattern;
+
/**
* A utility class for obtain Hadoop tokens and Hive metastore tokens for
Azkaban jobs.
@@ -66,15 +69,15 @@ import org.apache.thrift.TException;
* This class is compatible with Hadoop 2.
* </p>
*/
+@Slf4j
public class TokenUtils {
- private static final Logger LOG = Logger.getLogger(TokenUtils.class);
-
private static final String USER_TO_PROXY = "tokens.user.to.proxy";
private static final String KEYTAB_USER = "keytab.user";
private static final String KEYTAB_LOCATION = "keytab.location";
private static final String HADOOP_SECURITY_AUTHENTICATION =
"hadoop.security.authentication";
- private static final String OTHER_NAMENODES = "other_namenodes";
+ public static final String OTHER_NAMENODES = "other_namenodes";
+ public static final String TOKEN_RENEWER = "token_renewer";
private static final String KERBEROS = "kerberos";
private static final String YARN_RESOURCEMANAGER_PRINCIPAL =
"yarn.resourcemanager.principal";
private static final String YARN_RESOURCEMANAGER_ADDRESS =
"yarn.resourcemanager.address";
@@ -132,6 +135,35 @@ public class TokenUtils {
return ugi;
}
+ public static void getHadoopFSTokens(final State state, Optional<File>
tokenFile, final Credentials cred, final String renewer)
+ throws IOException, InterruptedException {
+
+ Preconditions.checkArgument(state.contains(KEYTAB_USER), "Missing required
property " + KEYTAB_USER);
+ Preconditions.checkArgument(state.contains(KEYTAB_LOCATION), "Missing
required property " + KEYTAB_LOCATION);
+
+ Configuration configuration = new Configuration();
+ configuration.set(HADOOP_SECURITY_AUTHENTICATION, KERBEROS);
+ UserGroupInformation.setConfiguration(configuration);
+ UserGroupInformation.loginUserFromKeytab(obtainKerberosPrincipal(state),
state.getProp(KEYTAB_LOCATION));
+
+ final Optional<String> userToProxy =
Strings.isNullOrEmpty(state.getProp(USER_TO_PROXY)) ? Optional.<String>absent()
+ : Optional.fromNullable(state.getProp(USER_TO_PROXY));
+ final Configuration conf = new Configuration();
+
+ log.info("Getting tokens for userToProxy " + userToProxy);
+
+ List<String> remoteFSURIList = new ArrayList<>();
+ if(state.contains(OTHER_NAMENODES)){
+ remoteFSURIList = state.getPropAsList(OTHER_NAMENODES);
+ }
+
+ getAllFSTokens(conf, cred, renewer, userToProxy, remoteFSURIList);
+
+ if (tokenFile.isPresent()) {
+ persistTokens(cred, tokenFile.get());
+ }
+ }
+
/**
* Get Hadoop tokens (tokens for job history server, job tracker and HDFS)
using Kerberos keytab.
*
@@ -141,7 +173,7 @@ public class TokenUtils {
* @param tokenFile If present, the file will store materialized credentials.
* @param cred A im-memory representation of credentials.
*/
- public static void getHadoopTokens(final State state, Optional<File>
tokenFile, Credentials cred)
+ public static void getHadoopTokens(final State state, Optional<File>
tokenFile, final Credentials cred)
throws IOException, InterruptedException {
Preconditions.checkArgument(state.contains(KEYTAB_USER), "Missing required
property " + KEYTAB_USER);
@@ -156,10 +188,13 @@ public class TokenUtils {
: Optional.fromNullable(state.getProp(USER_TO_PROXY));
final Configuration conf = new Configuration();
- LOG.info("Getting tokens for " + userToProxy);
+ List<String> remoteFSURIList = state.getPropAsList(OTHER_NAMENODES);
+ String renewer = state.getProp(TOKEN_RENEWER);
+ log.info("Getting tokens for {}, using renewer: {}, including remote FS:
{}", userToProxy, renewer, remoteFSURIList.toString());
getJhToken(conf, cred);
- getFsAndJtTokens(state, conf, userToProxy, cred);
+ getJtTokens(conf, cred, userToProxy, state);
+ getAllFSTokens(conf, cred, renewer, userToProxy, remoteFSURIList);
if (tokenFile.isPresent()) {
persistTokens(cred, tokenFile.get());
@@ -204,11 +239,11 @@ public class TokenUtils {
state.contains(USER_DEFINED_HIVE_LOCATIONS) ?
state.getPropAsList(USER_DEFINED_HIVE_LOCATIONS)
: Collections.EMPTY_LIST;
if (!extraHcatLocations.isEmpty()) {
- LOG.info("Need to fetch extra metaStore tokens from hive.");
+ log.info("Need to fetch extra metaStore tokens from hive.");
// start to process the user inputs.
for (final String thriftUrl : extraHcatLocations) {
- LOG.info("Fetching metaStore token from : " + thriftUrl);
+ log.info("Fetching metaStore token from : " + thriftUrl);
hiveConf = new HiveConf();
hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, thriftUrl);
@@ -216,12 +251,12 @@ public class TokenUtils {
cred.addToken(hcatToken.getService(), hcatToken);
ugi.addToken(hcatToken);
- LOG.info("Successfully fetched token for:" + thriftUrl);
+ log.info("Successfully fetched token for:" + thriftUrl);
}
}
} catch (final Throwable t) {
final String message = "Failed to get hive metastore token." +
t.getMessage() + t.getCause();
- LOG.error(message, t);
+ log.error(message, t);
throw new RuntimeException(message);
}
}
@@ -237,10 +272,10 @@ public class TokenUtils {
final String tokenSignatureOverwrite, final IMetaStoreClient hiveClient)
throws IOException, TException, InterruptedException {
- LOG.info(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL.varname + ": " +
hiveConf.get(
+ log.info(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL.varname + ": " +
hiveConf.get(
HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL.varname));
- LOG.info(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname + ": " +
hiveConf.get(
+ log.info(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname + ": " +
hiveConf.get(
HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname));
final Token<DelegationTokenIdentifier> hcatToken = new Token<>();
@@ -255,10 +290,10 @@ public class TokenUtils {
&& tokenSignatureOverwrite.trim().length() > 0) {
hcatToken.setService(new
Text(tokenSignatureOverwrite.trim().toLowerCase()));
- LOG.info(HIVE_TOKEN_SIGNATURE_KEY + ":" + tokenSignatureOverwrite);
+ log.info(HIVE_TOKEN_SIGNATURE_KEY + ":" + tokenSignatureOverwrite);
}
- LOG.info("Created hive metastore token for user:" + userToProxy + " with
kind[" + hcatToken.getKind() + "]"
+ log.info("Created hive metastore token for user:" + userToProxy + " with
kind[" + hcatToken.getKind() + "]"
+ " and service[" + hcatToken.getService() + "]");
return hcatToken;
}
@@ -267,10 +302,10 @@ public class TokenUtils {
YarnRPC rpc = YarnRPC.create(conf);
final String serviceAddr = conf.get(JHAdminConfig.MR_HISTORY_ADDRESS);
- LOG.debug("Connecting to HistoryServer at: " + serviceAddr);
+ log.debug("Connecting to HistoryServer at: " + serviceAddr);
HSClientProtocol hsProxy =
(HSClientProtocol) rpc.getProxy(HSClientProtocol.class,
NetUtils.createSocketAddr(serviceAddr), conf);
- LOG.info("Pre-fetching JH token from job history server");
+ log.info("Pre-fetching JH token from job history server");
Token<?> jhToken = null;
try {
@@ -280,95 +315,136 @@ public class TokenUtils {
}
if (jhToken == null) {
- LOG.error("getDelegationTokenFromHS() returned null");
+ log.error("getDelegationTokenFromHS() returned null");
throw new IOException("Unable to fetch JH token.");
}
- LOG.info("Created JH token: " + jhToken.toString());
- LOG.info("Token kind: " + jhToken.getKind());
- LOG.info("Token id: " + Arrays.toString(jhToken.getIdentifier()));
- LOG.info("Token service: " + jhToken.getService());
+ log.info("Created JH token: " + jhToken.toString());
+ log.info("Token kind: " + jhToken.getKind());
+ log.info("Token id: " + Arrays.toString(jhToken.getIdentifier()));
+ log.info("Token service: " + jhToken.getService());
cred.addToken(jhToken.getService(), jhToken);
}
- private static void getFsAndJtTokens(final State state, final Configuration
conf, final Optional<String> userToProxy,
- final Credentials cred) throws IOException, InterruptedException {
+ private static void getJtTokens(final Configuration conf, final Credentials
cred, final Optional<String> userToProxy,
+ final State state) throws IOException, InterruptedException {
if (userToProxy.isPresent()) {
UserGroupInformation.createProxyUser(userToProxy.get(),
UserGroupInformation.getLoginUser())
.doAs(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
- getFsAndJtTokensImpl(state, conf, cred);
+ getJtTokensImpl(state, conf, cred);
return null;
}
});
} else {
- getFsAndJtTokensImpl(state, conf, cred);
+ getJtTokensImpl(state, conf, cred);
}
}
- private static void getFsAndJtTokensImpl(final State state, final
Configuration conf, final Credentials cred)
+ private static void getJtTokensImpl(final State state, final Configuration
conf, final Credentials cred)
throws IOException {
- getHdfsToken(conf, cred);
- if (state.contains(OTHER_NAMENODES)) {
- getOtherNamenodesToken(state.getPropAsList(OTHER_NAMENODES), conf, cred);
- }
- getJtToken(cred);
- }
-
- private static void getHdfsToken(Configuration conf, Credentials cred)
throws IOException {
- FileSystem fs = FileSystem.get(conf);
- LOG.info("Getting DFS token from " + fs.getUri());
- String renewer = getMRTokenRenewerInternal(new JobConf()).toString();
- Token<?>[] fsTokens = fs.addDelegationTokens(renewer, cred);
- for(int i = 0; i < fsTokens.length; i++) {
- Token<?> token = fsTokens[i];
- String message =
- String.format("DFS token fetched from namenode, token kind: %s,
token service %s", token.getKind(),
- token.getService());
- LOG.info(message);
- }
- }
-
- private static void getOtherNamenodesToken(List<String> otherNamenodes,
Configuration conf, Credentials cred)
- throws IOException {
- LOG.info(OTHER_NAMENODES + ": " + otherNamenodes);
- Path[] ps = new Path[otherNamenodes.size()];
- for (int i = 0; i < ps.length; i++) {
- ps[i] = new Path(otherNamenodes.get(i).trim());
- }
- TokenCache.obtainTokensForNamenodes(cred, ps, conf);
- LOG.info("Successfully fetched tokens for: " + otherNamenodes);
- }
-
- private static void getJtToken(Credentials cred) throws IOException {
try {
JobConf jobConf = new JobConf();
JobClient jobClient = new JobClient(jobConf);
- LOG.info("Pre-fetching JT token from JobTracker");
+ log.info("Pre-fetching JT token from JobTracker");
Token<DelegationTokenIdentifier> mrdt =
jobClient.getDelegationToken(getMRTokenRenewerInternal(jobConf));
if (mrdt == null) {
- LOG.error("Failed to fetch JT token");
+ log.error("Failed to fetch JT token");
throw new IOException("Failed to fetch JT token.");
}
- LOG.info("Created JT token: " + mrdt.toString());
- LOG.info("Token kind: " + mrdt.getKind());
- LOG.info("Token id: " + Arrays.toString(mrdt.getIdentifier()));
- LOG.info("Token service: " + mrdt.getService());
+ log.info("Created JT token: " + mrdt.toString());
+ log.info("Token kind: " + mrdt.getKind());
+ log.info("Token id: " + Arrays.toString(mrdt.getIdentifier()));
+ log.info("Token service: " + mrdt.getService());
cred.addToken(mrdt.getService(), mrdt);
} catch (InterruptedException ie) {
throw new IOException(ie);
}
}
+ public static void getAllFSTokens(final Configuration conf, final
Credentials cred, final String renewer,
+ final Optional<String> userToProxy, final
List<String> remoteFSURIList) throws IOException, InterruptedException {
+
+ if (userToProxy.isPresent()) {
+ UserGroupInformation.createProxyUser(userToProxy.get(),
UserGroupInformation.getLoginUser())
+ .doAs(new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws Exception {
+ getAllFSTokensImpl(conf, cred, renewer, remoteFSURIList);
+ return null;
+ }
+ });
+ } else {
+ getAllFSTokensImpl(conf, cred, renewer, remoteFSURIList);
+ }
+ }
+
+ public static void getAllFSTokensImpl(Configuration conf, Credentials cred,
String renewer, List<String> remoteFSURIList) {
+ try {
+ // Handles token for local namenode
+ getLocalFSToken(conf, cred, renewer);
+
+ // Handle token for remote namenodes if any
+ getRemoteFSTokenFromURI(conf, cred, renewer, remoteFSURIList);
+
+ log.debug("All credential tokens: " + cred.getAllTokens());
+ } catch (IOException e) {
+ log.error("Error getting or creating HDFS token with renewer: " +
renewer, e);
+ }
+ }
+
+ public static void getLocalFSToken(Configuration conf, Credentials cred,
String renewer) throws IOException {
+ FileSystem fs = FileSystem.get(conf);
+ if (StringUtils.isEmpty(renewer)) {
+ renewer = getMRTokenRenewerInternal(new JobConf()).toString();
+ log.info("No renewer specified for FS: {}, taking default renewer: {}",
fs.getUri(), renewer);
+ }
+
+ log.debug("Getting HDFS token for" + fs.getUri() + " with renewer: " +
renewer);
+ Token<?>[] fsTokens = fs.addDelegationTokens(renewer, cred);
+ if (fsTokens != null) {
+ for (Token<?> token : fsTokens) {
+ log.info("FS Uri: " + fs.getUri() + " token: " + token);
+ }
+ }
+ }
+
+ public static void getRemoteFSTokenFromURI(Configuration conf, Credentials
cred, String renewer, List<String> remoteNamenodesList)
+ throws IOException {
+ if (remoteNamenodesList == null || remoteNamenodesList.size() == 0) {
+ log.debug("no remote namenode URI specified, not getting any tokens for
remote namenodes: " + remoteNamenodesList);
+ return;
+ }
+
+ log.debug("Getting tokens for remote namenodes: " + remoteNamenodesList);
+ Path[] ps = new Path[remoteNamenodesList.size()];
+ for (int i = 0; i < ps.length; i++) {
+ ps[i] = new Path(remoteNamenodesList.get(i).trim());
+ FileSystem otherNameNodeFS = ps[i].getFileSystem(conf);
+
+ if (StringUtils.isEmpty(renewer)) {
+ TokenCache.obtainTokensForNamenodes(cred, ps, conf);
+ } else {
+ final Token<?>[] tokens = otherNameNodeFS.addDelegationTokens(renewer,
cred);
+ if (tokens != null) {
+ for (Token<?> token : tokens) {
+ log.info("Got dt token for " + otherNameNodeFS.getUri() + "; " +
token);
+ }
+ }
+ }
+ }
+ log.info("Successfully fetched tokens for: " + remoteNamenodesList);
+ }
+
private static void persistTokens(Credentials cred, File tokenFile) throws
IOException {
try (FileOutputStream fos = new FileOutputStream(tokenFile);
DataOutputStream dos = new DataOutputStream(fos)) {
cred.writeTokenStorageToStream(dos);
}
- LOG.info("Tokens loaded in " + tokenFile.getAbsolutePath());
+ log.info("Tokens loaded in " + tokenFile.getAbsolutePath());
}
private static Token<?> getDelegationTokenFromHS(HSClientProtocol hsProxy,
Configuration conf) throws IOException {
@@ -380,7 +456,7 @@ public class TokenUtils {
return ConverterUtils.convertFromYarn(mrDelegationToken,
hsProxy.getConnectAddress());
}
- private static Text getMRTokenRenewerInternal(JobConf jobConf) throws
IOException {
+ public static Text getMRTokenRenewerInternal(JobConf jobConf) throws
IOException {
String servicePrincipal = jobConf.get(YARN_RESOURCEMANAGER_PRINCIPAL,
jobConf.get(JTConfig.JT_USER_NAME));
Text renewer;
if (servicePrincipal != null) {
diff --git
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/AbstractYarnAppSecurityManager.java
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/AbstractYarnAppSecurityManager.java
index 2a0410e..76d6595 100644
---
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/AbstractYarnAppSecurityManager.java
+++
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/AbstractYarnAppSecurityManager.java
@@ -28,8 +28,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.security.Credentials;
import org.apache.helix.Criteria;
import org.apache.helix.HelixManager;
import org.apache.helix.InstanceType;
@@ -50,6 +49,8 @@ import
org.apache.gobblin.cluster.GobblinHelixMessagingService;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.ExecutorsUtils;
+import static
org.apache.hadoop.security.UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION;
+
/**
* <p>
* The super class for key management
@@ -70,7 +71,7 @@ public abstract class AbstractYarnAppSecurityManager extends
AbstractIdleService
protected final FileSystem fs;
protected final Path tokenFilePath;
- protected Token<? extends TokenIdentifier> token;
+ protected Credentials credentials = new Credentials();
private final long loginIntervalInMinutes;
private final long tokenRenewIntervalInMinutes;
private final boolean isHelixClusterManaged;
@@ -117,7 +118,11 @@ public abstract class AbstractYarnAppSecurityManager
extends AbstractIdleService
this.loginExecutor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
- loginAndScheduleTokenRenewal();
+ try {
+ loginAndScheduleTokenRenewal();
+ }catch(Exception e){
+ LOGGER.error("Error during login, will continue the thread and try
next time.");
+ }
}
}, this.loginIntervalInMinutes, this.loginIntervalInMinutes,
TimeUnit.MINUTES);
}
@@ -228,19 +233,31 @@ public abstract class AbstractYarnAppSecurityManager
extends AbstractIdleService
}
/**
- * Write the current delegation token to the token file.
+ * Write the current credentials to the token file.
*/
- @VisibleForTesting
- protected synchronized void writeDelegationTokenToFile() throws IOException {
+ protected synchronized void writeDelegationTokenToFile(Credentials cred)
throws IOException {
+
if (this.fs.exists(this.tokenFilePath)) {
LOGGER.info("Deleting existing token file " + this.tokenFilePath);
this.fs.delete(this.tokenFilePath, false);
}
- LOGGER.info("Writing new or renewed token to token file " +
this.tokenFilePath);
- YarnHelixUtils.writeTokenToFile(token, this.tokenFilePath,
this.fs.getConf());
+ LOGGER.debug("creating new token file {} with 644 permission.",
this.tokenFilePath);
+
+ YarnHelixUtils.writeTokenToFile(this.tokenFilePath, cred,
this.fs.getConf());
// Only grand access to the token file to the login user
this.fs.setPermission(this.tokenFilePath, new
FsPermission(FsAction.READ_WRITE, FsAction.NONE, FsAction.NONE));
+
+ System.setProperty(HADOOP_TOKEN_FILE_LOCATION,
tokenFilePath.toUri().getPath());
+ LOGGER.info("set HADOOP_TOKEN_FILE_LOCATION = {}", this.tokenFilePath);
+ }
+
+ /**
+ * Write the current delegation token to the token file. Should be only for
testing
+ */
+ @VisibleForTesting
+ protected synchronized void writeDelegationTokenToFile() throws IOException {
+ writeDelegationTokenToFile(this.credentials);
}
/**
diff --git
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
index 36104b6..9c1f03b 100644
---
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
+++
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
@@ -19,7 +19,6 @@ package org.apache.gobblin.yarn;
import java.io.File;
import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.EnumSet;
@@ -37,9 +36,8 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.avro.Schema;
import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.reflect.ConstructorUtils;
import org.apache.commons.mail.EmailException;
+import org.apache.gobblin.util.hadoop.TokenUtils;
import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
@@ -49,7 +47,6 @@ import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
@@ -325,7 +322,7 @@ public class GobblinYarnAppLauncher {
* @throws IOException if there's something wrong launching the application
* @throws YarnException if there's something wrong launching the application
*/
- public void launch() throws IOException, YarnException {
+ public void launch() throws IOException, YarnException, InterruptedException
{
this.eventBus.register(this);
if (this.isHelixClusterManaged) {
@@ -602,12 +599,14 @@ public class GobblinYarnAppLauncher {
* @throws YarnException if there's anything wrong setting up and submitting
the Yarn application
*/
@VisibleForTesting
- ApplicationId setupAndSubmitApplication() throws IOException, YarnException {
+ ApplicationId setupAndSubmitApplication() throws IOException, YarnException,
InterruptedException {
+ LOGGER.info("creating new yarn application");
YarnClientApplication gobblinYarnApp = this.yarnClient.createApplication();
ApplicationSubmissionContext appSubmissionContext =
gobblinYarnApp.getApplicationSubmissionContext();
appSubmissionContext.setApplicationType(GOBBLIN_YARN_APPLICATION_TYPE);
appSubmissionContext.setMaxAppAttempts(ConfigUtils.getInt(config,
GobblinYarnConfigurationKeys.APP_MASTER_MAX_ATTEMPTS_KEY,
GobblinYarnConfigurationKeys.DEFAULT_APP_MASTER_MAX_ATTEMPTS_KEY));
ApplicationId applicationId = appSubmissionContext.getApplicationId();
+ LOGGER.info("created new yarn application: "+ applicationId.getId());
GetNewApplicationResponse newApplicationResponse =
gobblinYarnApp.getNewApplicationResponse();
// Set up resource type requirements for ApplicationMaster
@@ -821,29 +820,22 @@ public class GobblinYarnAppLauncher {
.toString();
}
- private void setupSecurityTokens(ContainerLaunchContext
containerLaunchContext) throws IOException {
+ private void setupSecurityTokens(ContainerLaunchContext
containerLaunchContext) throws IOException, InterruptedException {
+ LOGGER.info("setting up SecurityTokens for containerLaunchContext.");
Credentials credentials =
UserGroupInformation.getCurrentUser().getCredentials();
+ String renewerName =
this.yarnConfiguration.get(YarnConfiguration.RM_PRINCIPAL);
// Pass on the credentials from the hadoop token file if present.
// The value in the token file takes precedence.
if (System.getenv(HADOOP_TOKEN_FILE_LOCATION) != null) {
+ LOGGER.info("HADOOP_TOKEN_FILE_LOCATION is set to {} reading tokens from
it for containerLaunchContext.", System.getenv(HADOOP_TOKEN_FILE_LOCATION));
Credentials tokenFileCredentials = Credentials.readTokenStorageFile(new
File(System.getenv(HADOOP_TOKEN_FILE_LOCATION)),
- new Configuration());
+ new Configuration());
credentials.addAll(tokenFileCredentials);
+ LOGGER.debug("All containerLaunchContext tokens: {} present in file {}
", credentials.getAllTokens(), System.getenv(HADOOP_TOKEN_FILE_LOCATION));
}
- String tokenRenewer =
this.yarnConfiguration.get(YarnConfiguration.RM_PRINCIPAL);
- if (tokenRenewer == null || tokenRenewer.length() == 0) {
- throw new IOException("Failed to get master Kerberos principal for the
RM to use as renewer");
- }
-
- // For now, only getting tokens for the default file-system.
- Token<?> tokens[] = this.fs.addDelegationTokens(tokenRenewer, credentials);
- if (tokens != null) {
- for (Token<?> token : tokens) {
- LOGGER.info("Got delegation token for " + this.fs.getUri() + "; " +
token);
- }
- }
+ TokenUtils.getAllFSTokens(new Configuration(), credentials, renewerName,
null, ConfigUtils.getStringList(this.config, TokenUtils.OTHER_NAMENODES));
Closer closer = Closer.create();
try {
@@ -851,6 +843,7 @@ public class GobblinYarnAppLauncher {
credentials.writeTokenStorageToStream(dataOutputBuffer);
ByteBuffer fsTokens = ByteBuffer.wrap(dataOutputBuffer.getData(), 0,
dataOutputBuffer.getLength());
containerLaunchContext.setTokens(fsTokens);
+ LOGGER.info("Setting containerLaunchContext with All credential tokens:
" + credentials.getAllTokens());
} catch (Throwable t) {
throw closer.rethrow(t);
} finally {
diff --git
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAppSecurityManagerWithKeytabs.java
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAppSecurityManagerWithKeytabs.java
index 8925d94..d5081ca 100644
---
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAppSecurityManagerWithKeytabs.java
+++
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAppSecurityManagerWithKeytabs.java
@@ -19,11 +19,15 @@ package org.apache.gobblin.yarn;
import java.io.File;
import java.io.IOException;
+import java.util.Arrays;
import java.util.concurrent.ScheduledFuture;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.hadoop.TokenUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.helix.HelixManager;
@@ -31,6 +35,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Strings;
import com.typesafe.config.Config;
+import static
org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION;
/**
@@ -62,7 +67,6 @@ public class YarnAppSecurityManagerWithKeytabs extends
AbstractYarnAppSecurityMa
// This flag is used to tell if this is the first login. If yes, no token
updated message will be
// sent to the controller and the participants as they may not be up running
yet. The first login
// happens after this class starts up so the token gets regularly refreshed
before the next login.
- private volatile boolean firstLogin = true;
public YarnAppSecurityManagerWithKeytabs(Config config, HelixManager
helixManager, FileSystem fs, Path tokenFilePath)
throws IOException {
@@ -74,11 +78,29 @@ public class YarnAppSecurityManagerWithKeytabs extends
AbstractYarnAppSecurityMa
* Renew the existing delegation token.
*/
protected synchronized void renewDelegationToken() throws IOException,
InterruptedException {
- this.token.renew(this.fs.getConf());
- writeDelegationTokenToFile();
+ LOGGER.debug("renewing all tokens {}", credentials.getAllTokens());
+
+ credentials.getAllTokens().forEach(
+ existingToken -> {
+ try {
+ long expiryTime = existingToken.renew(this.fs.getConf());
+ LOGGER.info("renewed token: {}, expiryTime: {}, Id; {}",
existingToken, expiryTime, Arrays.toString(existingToken.getIdentifier()));
+
+ // TODO: If token failed to get renewed in case its expired ( can be
detected via the error text ),
+ // it should call the login() to reissue the new tokens
+ } catch (IOException | InterruptedException e) {
+ LOGGER.error("Error renewing token: " + existingToken + " ,error: "
+ e, e);
+ }
+ }
+ );
+
+ writeDelegationTokenToFile(credentials);
if (!this.firstLogin) {
+ LOGGER.info("This is not a first login, sending
TokenFileUpdatedMessage.");
sendTokenFileUpdatedMessage();
+ } else {
+ LOGGER.info("This is first login of the interval, so skipping sending
TokenFileUpdatedMessage.");
}
}
@@ -86,14 +108,25 @@ public class YarnAppSecurityManagerWithKeytabs extends
AbstractYarnAppSecurityMa
* Get a new delegation token for the current logged-in user.
*/
@VisibleForTesting
- synchronized void getNewDelegationTokenForLoginUser() throws IOException {
- this.token = this.fs.getDelegationToken(this.loginUser.getShortUserName());
+ synchronized void getNewDelegationTokenForLoginUser() throws IOException,
InterruptedException {
+ final Configuration newConfig = new Configuration();
+ final Credentials allCreds = new Credentials();
+// Text renewer = TokenUtils.getMRTokenRenewerInternal(new JobConf());
+ String renewer = UserGroupInformation.getLoginUser().getShortUserName();
+
+ LOGGER.info("creating new login tokens with renewer: {}", renewer);
+ TokenUtils.getAllFSTokens(newConfig, allCreds, renewer, Optional.absent(),
ConfigUtils.getStringList(this.config, TokenUtils.OTHER_NAMENODES));
+ //TODO: Any other required tokens can be fetched here based on config or
any other detection mechanism
+
+ LOGGER.debug("All new tokens in credential: {}", allCreds.getAllTokens());
+
+ this.credentials = allCreds;
}
/**
* Login the user from a given keytab file.
*/
- protected void login() throws IOException {
+ protected synchronized void login() throws IOException, InterruptedException
{
String keyTabFilePath =
this.config.getString(GobblinYarnConfigurationKeys.KEYTAB_FILE_PATH);
if (Strings.isNullOrEmpty(keyTabFilePath)) {
throw new IOException("Keytab file path is not defined for Kerberos
login");
@@ -107,21 +140,26 @@ public class YarnAppSecurityManagerWithKeytabs extends
AbstractYarnAppSecurityMa
if (Strings.isNullOrEmpty(principal)) {
principal = this.loginUser.getShortUserName() + "/localhost@LOCALHOST";
}
+ LOGGER.info("Login using kerberos principal : "+ principal);
Configuration conf = new Configuration();
- conf.set("hadoop.security.authentication",
-
UserGroupInformation.AuthenticationMethod.KERBEROS.toString().toLowerCase());
+ conf.set(HADOOP_SECURITY_AUTHENTICATION,
UserGroupInformation.AuthenticationMethod.KERBEROS.toString().toLowerCase());
UserGroupInformation.setConfiguration(conf);
UserGroupInformation.loginUserFromKeytab(principal, keyTabFilePath);
- LOGGER.info(String.format("Logged in from keytab file %s using principal
%s", keyTabFilePath, principal));
- this.loginUser = UserGroupInformation.getLoginUser();
+ LOGGER.info(String.format("Logged in from keytab file %s using principal
%s for user: %s", keyTabFilePath, principal, this.loginUser));
getNewDelegationTokenForLoginUser();
- writeDelegationTokenToFile();
+
+ writeDelegationTokenToFile(this.credentials);
+
+ UserGroupInformation.getCurrentUser().addCredentials(this.credentials);
if (!this.firstLogin) {
+ LOGGER.info("This is not a first login, sending TokenFileUpdatedMessage
from Login().");
sendTokenFileUpdatedMessage();
+ }else {
+ LOGGER.info("This is first login of the interval, so skipping sending
TokenFileUpdatedMessage from Login().");
}
}
}
diff --git
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java
index 946649a..4ecc8b9 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java
@@ -60,15 +60,17 @@ public class YarnHelixUtils {
/**
* Write a {@link Token} to a given file.
*
- * @param token the token to write
* @param tokenFilePath the token file path
+ * @param credentials all tokens of this credentials to be written to given
file
* @param configuration a {@link Configuration} object carrying Hadoop
configuration properties
* @throws IOException
*/
- public static void writeTokenToFile(Token<? extends TokenIdentifier> token,
Path tokenFilePath,
- Configuration configuration) throws IOException {
- Credentials credentials = new Credentials();
- credentials.addToken(token.getService(), token);
+ public static void writeTokenToFile(Path tokenFilePath, Credentials
credentials, Configuration configuration) throws IOException {
+ if(credentials == null) {
+ LOGGER.warn("got empty credentials, creating default one as new.");
+ credentials = new Credentials();
+ }
+ LOGGER.debug(String.format("Writing all tokens %s to file %s",
credentials.getAllTokens(), tokenFilePath));
credentials.writeTokenStorageFile(tokenFilePath, configuration);
}
@@ -79,6 +81,7 @@ public class YarnHelixUtils {
* @throws IOException
*/
public static void updateToken(String tokenFileName) throws IOException{
+ LOGGER.info("reading token from file: "+ tokenFileName);
URL tokenFileUrl =
YarnHelixUtils.class.getClassLoader().getResource(tokenFileName);
if (tokenFileUrl != null) {
File tokenFile = new File(tokenFileUrl.getFile());
diff --git
a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnSecurityManagerTest.java
b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnSecurityManagerTest.java
index 489270a..8b440d3 100644
---
a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnSecurityManagerTest.java
+++
b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnSecurityManagerTest.java
@@ -19,7 +19,9 @@ package org.apache.gobblin.yarn;
import java.io.IOException;
import java.net.URL;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.List;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
@@ -36,6 +38,8 @@ import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
@@ -55,6 +59,8 @@ import org.apache.gobblin.cluster.HelixUtils;
import org.apache.gobblin.cluster.TestHelper;
import org.apache.gobblin.testing.AssertWithBackoff;
+import static org.mockito.Matchers.any;
+
/**
* Unit tests for {@link YarnAppSecurityManagerWithKeytabs} and {@link
YarnContainerSecurityManager}.
@@ -88,7 +94,8 @@ public class YarnSecurityManagerTest {
private FileSystem localFs;
private Path baseDir;
private Path tokenFilePath;
- private Token<?> token;
+ private Token<?> fsToken;
+ private List<Token<?>> allTokens;
private YarnAppSecurityManagerWithKeytabs
_yarnAppYarnAppSecurityManagerWithKeytabs;
private YarnContainerSecurityManager yarnContainerSecurityManager;
@@ -130,21 +137,32 @@ public class YarnSecurityManagerTest {
this.configuration = new Configuration();
this.localFs = Mockito.spy(FileSystem.getLocal(this.configuration));
- this.token = new Token<>();
- this.token.setKind(new Text("test"));
- this.token.setService(new Text("test"));
-
Mockito.<Token<?>>when(this.localFs.getDelegationToken(UserGroupInformation.getLoginUser().getShortUserName()))
- .thenReturn(this.token);
+ this.fsToken = new Token<>();
+ this.fsToken.setKind(new Text("HDFS_DELEGATION_TOKEN"));
+ this.fsToken.setService(new Text("HDFS"));
+ this.allTokens = new ArrayList<>();
+ allTokens.add(fsToken);
+ Token<?>[] allTokenArray = new Token<?>[2];
+ allTokenArray[0]= fsToken;
+
+ Mockito.<Token<?>[]>when(localFs.addDelegationTokens(any(String.class),
any(Credentials.class)))
+ .thenReturn(allTokenArray);
this.baseDir = new Path(YarnSecurityManagerTest.class.getSimpleName());
this.tokenFilePath = new Path(this.baseDir,
GobblinYarnConfigurationKeys.TOKEN_FILE_NAME);
- this._yarnAppYarnAppSecurityManagerWithKeytabs =
- new YarnAppSecurityManagerWithKeytabs(config, this.helixManager,
this.localFs, this.tokenFilePath);
+ this._yarnAppYarnAppSecurityManagerWithKeytabs = Mockito.spy(new
YarnAppSecurityManagerWithKeytabs(config, this.helixManager, this.localFs,
this.tokenFilePath));
this.yarnContainerSecurityManager = new
YarnContainerSecurityManager(config, this.localFs, new EventBus());
+
+ Mockito.doAnswer(new Answer<Void>() {
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ _yarnAppYarnAppSecurityManagerWithKeytabs.credentials.addToken(new
Text("HDFS_DELEGATION_TOKEN"), fsToken);
+ return null;
+ }
+
}).when(_yarnAppYarnAppSecurityManagerWithKeytabs).getNewDelegationTokenForLoginUser();
}
@Test
- public void testGetNewDelegationTokenForLoginUser() throws IOException {
+ public void testGetNewDelegationTokenForLoginUser() throws IOException,
InterruptedException {
this._yarnAppYarnAppSecurityManagerWithKeytabs.getNewDelegationTokenForLoginUser();
}
@@ -235,8 +253,6 @@ public class YarnSecurityManagerTest {
}
private void assertToken(Collection<Token<?>> tokens) {
- Assert.assertEquals(tokens.size(), 1);
- Token<?> token = tokens.iterator().next();
- Assert.assertEquals(token, this.token);
+ tokens.forEach( token ->
org.junit.Assert.assertTrue(allTokens.contains(token)));
}
}