This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new ab38bfe  [GOBBLIN-1308] ability to manage token for remote cluster
ab38bfe is described below

commit ab38bfe904df8e02f0550dfea03d50866d7fee64
Author: Jay Sen <[email protected]>
AuthorDate: Wed Jan 20 14:26:27 2021 -0800

    [GOBBLIN-1308] ability to manage token for remote cluster
    
    Closes #3157 from jhsenjaliya/GOBBLIN-1308
---
 .../org/apache/gobblin/util/hadoop/TokenUtils.java | 234 ++++++++++++++-------
 .../yarn/AbstractYarnAppSecurityManager.java       |  35 ++-
 .../gobblin/yarn/GobblinYarnAppLauncher.java       |  33 ++-
 .../yarn/YarnAppSecurityManagerWithKeytabs.java    |  60 +++++-
 .../org/apache/gobblin/yarn/YarnHelixUtils.java    |  13 +-
 .../gobblin/yarn/YarnSecurityManagerTest.java      |  40 ++--
 6 files changed, 279 insertions(+), 136 deletions(-)

diff --git 
a/gobblin-utility/src/main/java/org/apache/gobblin/util/hadoop/TokenUtils.java 
b/gobblin-utility/src/main/java/org/apache/gobblin/util/hadoop/TokenUtils.java
index fd6463d..7bb4fee 100644
--- 
a/gobblin-utility/src/main/java/org/apache/gobblin/util/hadoop/TokenUtils.java
+++ 
b/gobblin-utility/src/main/java/org/apache/gobblin/util/hadoop/TokenUtils.java
@@ -20,17 +20,8 @@ package org.apache.gobblin.util.hadoop;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.security.PrivilegedExceptionAction;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.regex.Pattern;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.gobblin.configuration.State;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -55,9 +46,21 @@ import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.log4j.Logger;
 import org.apache.thrift.TException;
 
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.regex.Pattern;
+
 
 /**
  * A utility class for obtain Hadoop tokens and Hive metastore tokens for 
Azkaban jobs.
@@ -66,15 +69,15 @@ import org.apache.thrift.TException;
  *   This class is compatible with Hadoop 2.
  * </p>
  */
+@Slf4j
 public class TokenUtils {
 
-  private static final Logger LOG = Logger.getLogger(TokenUtils.class);
-
   private static final String USER_TO_PROXY = "tokens.user.to.proxy";
   private static final String KEYTAB_USER = "keytab.user";
   private static final String KEYTAB_LOCATION = "keytab.location";
   private static final String HADOOP_SECURITY_AUTHENTICATION = 
"hadoop.security.authentication";
-  private static final String OTHER_NAMENODES = "other_namenodes";
+  public static final String OTHER_NAMENODES = "other_namenodes";
+  public static final String TOKEN_RENEWER = "token_renewer";
   private static final String KERBEROS = "kerberos";
   private static final String YARN_RESOURCEMANAGER_PRINCIPAL = 
"yarn.resourcemanager.principal";
   private static final String YARN_RESOURCEMANAGER_ADDRESS = 
"yarn.resourcemanager.address";
@@ -132,6 +135,35 @@ public class TokenUtils {
     return ugi;
   }
 
+  public static void getHadoopFSTokens(final State state, Optional<File> 
tokenFile, final Credentials cred, final String renewer)
+          throws IOException, InterruptedException {
+
+    Preconditions.checkArgument(state.contains(KEYTAB_USER), "Missing required 
property " + KEYTAB_USER);
+    Preconditions.checkArgument(state.contains(KEYTAB_LOCATION), "Missing 
required property " + KEYTAB_LOCATION);
+
+    Configuration configuration = new Configuration();
+    configuration.set(HADOOP_SECURITY_AUTHENTICATION, KERBEROS);
+    UserGroupInformation.setConfiguration(configuration);
+    UserGroupInformation.loginUserFromKeytab(obtainKerberosPrincipal(state), 
state.getProp(KEYTAB_LOCATION));
+
+    final Optional<String> userToProxy = 
Strings.isNullOrEmpty(state.getProp(USER_TO_PROXY)) ? Optional.<String>absent()
+            : Optional.fromNullable(state.getProp(USER_TO_PROXY));
+    final Configuration conf = new Configuration();
+
+    log.info("Getting tokens for userToProxy " + userToProxy);
+
+    List<String> remoteFSURIList = new ArrayList<>();
+    if(state.contains(OTHER_NAMENODES)){
+      remoteFSURIList = state.getPropAsList(OTHER_NAMENODES);
+    }
+
+    getAllFSTokens(conf, cred, renewer, userToProxy, remoteFSURIList);
+
+    if (tokenFile.isPresent()) {
+      persistTokens(cred, tokenFile.get());
+    }
+  }
+
   /**
    * Get Hadoop tokens (tokens for job history server, job tracker and HDFS) 
using Kerberos keytab.
    *
@@ -141,7 +173,7 @@ public class TokenUtils {
    * @param tokenFile If present, the file will store materialized credentials.
    * @param cred A im-memory representation of credentials.
    */
-  public static void getHadoopTokens(final State state, Optional<File> 
tokenFile, Credentials cred)
+  public static void getHadoopTokens(final State state, Optional<File> 
tokenFile, final Credentials cred)
       throws IOException, InterruptedException {
 
     Preconditions.checkArgument(state.contains(KEYTAB_USER), "Missing required 
property " + KEYTAB_USER);
@@ -156,10 +188,13 @@ public class TokenUtils {
         : Optional.fromNullable(state.getProp(USER_TO_PROXY));
     final Configuration conf = new Configuration();
 
-    LOG.info("Getting tokens for " + userToProxy);
+    List<String> remoteFSURIList = state.getPropAsList(OTHER_NAMENODES);
+    String renewer = state.getProp(TOKEN_RENEWER);
+    log.info("Getting tokens for {}, using renewer: {}, including remote FS: 
{}", userToProxy, renewer, remoteFSURIList.toString());
 
     getJhToken(conf, cred);
-    getFsAndJtTokens(state, conf, userToProxy, cred);
+    getJtTokens(conf, cred, userToProxy, state);
+    getAllFSTokens(conf, cred, renewer, userToProxy, remoteFSURIList);
 
     if (tokenFile.isPresent()) {
       persistTokens(cred, tokenFile.get());
@@ -204,11 +239,11 @@ public class TokenUtils {
           state.contains(USER_DEFINED_HIVE_LOCATIONS) ? 
state.getPropAsList(USER_DEFINED_HIVE_LOCATIONS)
               : Collections.EMPTY_LIST;
       if (!extraHcatLocations.isEmpty()) {
-        LOG.info("Need to fetch extra metaStore tokens from hive.");
+        log.info("Need to fetch extra metaStore tokens from hive.");
 
         // start to process the user inputs.
         for (final String thriftUrl : extraHcatLocations) {
-          LOG.info("Fetching metaStore token from : " + thriftUrl);
+          log.info("Fetching metaStore token from : " + thriftUrl);
 
           hiveConf = new HiveConf();
           hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, thriftUrl);
@@ -216,12 +251,12 @@ public class TokenUtils {
           cred.addToken(hcatToken.getService(), hcatToken);
           ugi.addToken(hcatToken);
 
-          LOG.info("Successfully fetched token for:" + thriftUrl);
+          log.info("Successfully fetched token for:" + thriftUrl);
         }
       }
     } catch (final Throwable t) {
       final String message = "Failed to get hive metastore token." + 
t.getMessage() + t.getCause();
-      LOG.error(message, t);
+      log.error(message, t);
       throw new RuntimeException(message);
     }
   }
@@ -237,10 +272,10 @@ public class TokenUtils {
       final String tokenSignatureOverwrite, final IMetaStoreClient hiveClient)
       throws IOException, TException, InterruptedException {
 
-    LOG.info(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL.varname + ": " + 
hiveConf.get(
+    log.info(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL.varname + ": " + 
hiveConf.get(
         HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL.varname));
 
-    LOG.info(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname + ": " + 
hiveConf.get(
+    log.info(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname + ": " + 
hiveConf.get(
         HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname));
 
     final Token<DelegationTokenIdentifier> hcatToken = new Token<>();
@@ -255,10 +290,10 @@ public class TokenUtils {
         && tokenSignatureOverwrite.trim().length() > 0) {
       hcatToken.setService(new 
Text(tokenSignatureOverwrite.trim().toLowerCase()));
 
-      LOG.info(HIVE_TOKEN_SIGNATURE_KEY + ":" + tokenSignatureOverwrite);
+      log.info(HIVE_TOKEN_SIGNATURE_KEY + ":" + tokenSignatureOverwrite);
     }
 
-    LOG.info("Created hive metastore token for user:" + userToProxy + " with 
kind[" + hcatToken.getKind() + "]"
+    log.info("Created hive metastore token for user:" + userToProxy + " with 
kind[" + hcatToken.getKind() + "]"
         + " and service[" + hcatToken.getService() + "]");
     return hcatToken;
   }
@@ -267,10 +302,10 @@ public class TokenUtils {
     YarnRPC rpc = YarnRPC.create(conf);
     final String serviceAddr = conf.get(JHAdminConfig.MR_HISTORY_ADDRESS);
 
-    LOG.debug("Connecting to HistoryServer at: " + serviceAddr);
+    log.debug("Connecting to HistoryServer at: " + serviceAddr);
     HSClientProtocol hsProxy =
         (HSClientProtocol) rpc.getProxy(HSClientProtocol.class, 
NetUtils.createSocketAddr(serviceAddr), conf);
-    LOG.info("Pre-fetching JH token from job history server");
+    log.info("Pre-fetching JH token from job history server");
 
     Token<?> jhToken = null;
     try {
@@ -280,95 +315,136 @@ public class TokenUtils {
     }
 
     if (jhToken == null) {
-      LOG.error("getDelegationTokenFromHS() returned null");
+      log.error("getDelegationTokenFromHS() returned null");
       throw new IOException("Unable to fetch JH token.");
     }
 
-    LOG.info("Created JH token: " + jhToken.toString());
-    LOG.info("Token kind: " + jhToken.getKind());
-    LOG.info("Token id: " + Arrays.toString(jhToken.getIdentifier()));
-    LOG.info("Token service: " + jhToken.getService());
+    log.info("Created JH token: " + jhToken.toString());
+    log.info("Token kind: " + jhToken.getKind());
+    log.info("Token id: " + Arrays.toString(jhToken.getIdentifier()));
+    log.info("Token service: " + jhToken.getService());
 
     cred.addToken(jhToken.getService(), jhToken);
   }
 
-  private static void getFsAndJtTokens(final State state, final Configuration 
conf, final Optional<String> userToProxy,
-      final Credentials cred) throws IOException, InterruptedException {
+  private static void getJtTokens(final Configuration conf, final Credentials 
cred, final Optional<String> userToProxy,
+      final State state) throws IOException, InterruptedException {
 
     if (userToProxy.isPresent()) {
       UserGroupInformation.createProxyUser(userToProxy.get(), 
UserGroupInformation.getLoginUser())
           .doAs(new PrivilegedExceptionAction<Void>() {
             @Override
             public Void run() throws Exception {
-              getFsAndJtTokensImpl(state, conf, cred);
+              getJtTokensImpl(state, conf, cred);
               return null;
             }
           });
     } else {
-      getFsAndJtTokensImpl(state, conf, cred);
+      getJtTokensImpl(state, conf, cred);
     }
   }
 
-  private static void getFsAndJtTokensImpl(final State state, final 
Configuration conf, final Credentials cred)
+  private static void getJtTokensImpl(final State state, final Configuration 
conf, final Credentials cred)
       throws IOException {
-    getHdfsToken(conf, cred);
-    if (state.contains(OTHER_NAMENODES)) {
-      getOtherNamenodesToken(state.getPropAsList(OTHER_NAMENODES), conf, cred);
-    }
-    getJtToken(cred);
-  }
-
-  private static void getHdfsToken(Configuration conf, Credentials cred) 
throws IOException {
-    FileSystem fs = FileSystem.get(conf);
-    LOG.info("Getting DFS token from " + fs.getUri());
-    String renewer = getMRTokenRenewerInternal(new JobConf()).toString();
-    Token<?>[] fsTokens = fs.addDelegationTokens(renewer, cred);
-    for(int i = 0; i < fsTokens.length; i++) {
-      Token<?> token = fsTokens[i];
-      String message =
-          String.format("DFS token fetched from namenode, token kind: %s, 
token service %s", token.getKind(),
-              token.getService());
-      LOG.info(message);
-    }
-  }
-
-  private static void getOtherNamenodesToken(List<String> otherNamenodes, 
Configuration conf, Credentials cred)
-      throws IOException {
-    LOG.info(OTHER_NAMENODES + ": " + otherNamenodes);
-    Path[] ps = new Path[otherNamenodes.size()];
-    for (int i = 0; i < ps.length; i++) {
-      ps[i] = new Path(otherNamenodes.get(i).trim());
-    }
-    TokenCache.obtainTokensForNamenodes(cred, ps, conf);
-    LOG.info("Successfully fetched tokens for: " + otherNamenodes);
-  }
-
-  private static void getJtToken(Credentials cred) throws IOException {
     try {
       JobConf jobConf = new JobConf();
       JobClient jobClient = new JobClient(jobConf);
-      LOG.info("Pre-fetching JT token from JobTracker");
+      log.info("Pre-fetching JT token from JobTracker");
 
       Token<DelegationTokenIdentifier> mrdt = 
jobClient.getDelegationToken(getMRTokenRenewerInternal(jobConf));
       if (mrdt == null) {
-        LOG.error("Failed to fetch JT token");
+        log.error("Failed to fetch JT token");
         throw new IOException("Failed to fetch JT token.");
       }
-      LOG.info("Created JT token: " + mrdt.toString());
-      LOG.info("Token kind: " + mrdt.getKind());
-      LOG.info("Token id: " + Arrays.toString(mrdt.getIdentifier()));
-      LOG.info("Token service: " + mrdt.getService());
+      log.info("Created JT token: " + mrdt.toString());
+      log.info("Token kind: " + mrdt.getKind());
+      log.info("Token id: " + Arrays.toString(mrdt.getIdentifier()));
+      log.info("Token service: " + mrdt.getService());
       cred.addToken(mrdt.getService(), mrdt);
     } catch (InterruptedException ie) {
       throw new IOException(ie);
     }
   }
 
+  public static void getAllFSTokens(final Configuration conf, final 
Credentials cred, final String renewer,
+                                    final Optional<String> userToProxy, final 
List<String> remoteFSURIList) throws IOException, InterruptedException {
+
+    if (userToProxy.isPresent()) {
+      UserGroupInformation.createProxyUser(userToProxy.get(), 
UserGroupInformation.getLoginUser())
+              .doAs(new PrivilegedExceptionAction<Void>() {
+                @Override
+                public Void run() throws Exception {
+                  getAllFSTokensImpl(conf, cred, renewer, remoteFSURIList);
+                  return null;
+                }
+              });
+    } else {
+      getAllFSTokensImpl(conf, cred, renewer, remoteFSURIList);
+    }
+  }
+
+  public static void getAllFSTokensImpl(Configuration conf, Credentials cred, 
String renewer, List<String> remoteFSURIList) {
+    try {
+      // Handles token for local namenode
+      getLocalFSToken(conf, cred, renewer);
+
+      // Handle token for remote namenodes if any
+      getRemoteFSTokenFromURI(conf, cred, renewer, remoteFSURIList);
+
+      log.debug("All credential tokens: " + cred.getAllTokens());
+    } catch (IOException e) {
+      log.error("Error getting or creating HDFS token with renewer: " + 
renewer, e);
+    }
+  }
+
+  public static void getLocalFSToken(Configuration conf, Credentials cred, 
String renewer) throws IOException {
+    FileSystem fs = FileSystem.get(conf);
+    if (StringUtils.isEmpty(renewer)) {
+      renewer = getMRTokenRenewerInternal(new JobConf()).toString();
+      log.info("No renewer specified for FS: {}, taking default renewer: {}", 
fs.getUri(), renewer);
+    }
+
+    log.debug("Getting HDFS token for" + fs.getUri() + " with renewer: " + 
renewer);
+    Token<?>[] fsTokens = fs.addDelegationTokens(renewer, cred);
+    if (fsTokens != null) {
+      for (Token<?> token : fsTokens) {
+        log.info("FS Uri: " + fs.getUri() + " token: " + token);
+      }
+    }
+  }
+
+  public static void getRemoteFSTokenFromURI(Configuration conf, Credentials 
cred, String renewer, List<String> remoteNamenodesList)
+          throws IOException {
+    if (remoteNamenodesList == null || remoteNamenodesList.size() == 0) {
+      log.debug("no remote namenode URI specified, not getting any tokens for 
remote namenodes: " + remoteNamenodesList);
+      return;
+    }
+
+    log.debug("Getting tokens for remote namenodes: " + remoteNamenodesList);
+    Path[] ps = new Path[remoteNamenodesList.size()];
+    for (int i = 0; i < ps.length; i++) {
+      ps[i] = new Path(remoteNamenodesList.get(i).trim());
+      FileSystem otherNameNodeFS = ps[i].getFileSystem(conf);
+
+      if (StringUtils.isEmpty(renewer)) {
+        TokenCache.obtainTokensForNamenodes(cred, ps, conf);
+      } else {
+        final Token<?>[] tokens = otherNameNodeFS.addDelegationTokens(renewer, 
cred);
+        if (tokens != null) {
+          for (Token<?> token : tokens) {
+            log.info("Got dt token for " + otherNameNodeFS.getUri() + "; " + 
token);
+          }
+        }
+      }
+    }
+    log.info("Successfully fetched tokens for: " + remoteNamenodesList);
+  }
+
   private static void persistTokens(Credentials cred, File tokenFile) throws 
IOException {
     try (FileOutputStream fos = new FileOutputStream(tokenFile); 
DataOutputStream dos = new DataOutputStream(fos)) {
       cred.writeTokenStorageToStream(dos);
     }
-    LOG.info("Tokens loaded in " + tokenFile.getAbsolutePath());
+    log.info("Tokens loaded in " + tokenFile.getAbsolutePath());
   }
 
   private static Token<?> getDelegationTokenFromHS(HSClientProtocol hsProxy, 
Configuration conf) throws IOException {
@@ -380,7 +456,7 @@ public class TokenUtils {
     return ConverterUtils.convertFromYarn(mrDelegationToken, 
hsProxy.getConnectAddress());
   }
 
-  private static Text getMRTokenRenewerInternal(JobConf jobConf) throws 
IOException {
+  public static Text getMRTokenRenewerInternal(JobConf jobConf) throws 
IOException {
     String servicePrincipal = jobConf.get(YARN_RESOURCEMANAGER_PRINCIPAL, 
jobConf.get(JTConfig.JT_USER_NAME));
     Text renewer;
     if (servicePrincipal != null) {
diff --git 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/AbstractYarnAppSecurityManager.java
 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/AbstractYarnAppSecurityManager.java
index 2a0410e..76d6595 100644
--- 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/AbstractYarnAppSecurityManager.java
+++ 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/AbstractYarnAppSecurityManager.java
@@ -28,8 +28,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.security.Credentials;
 import org.apache.helix.Criteria;
 import org.apache.helix.HelixManager;
 import org.apache.helix.InstanceType;
@@ -50,6 +49,8 @@ import 
org.apache.gobblin.cluster.GobblinHelixMessagingService;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.ExecutorsUtils;
 
+import static 
org.apache.hadoop.security.UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION;
+
 /**
  * <p>
  *   The super class for key management
@@ -70,7 +71,7 @@ public abstract class AbstractYarnAppSecurityManager extends 
AbstractIdleService
   protected final FileSystem fs;
   protected final Path tokenFilePath;
 
-  protected Token<? extends TokenIdentifier> token;
+  protected Credentials credentials = new Credentials();
   private final long loginIntervalInMinutes;
   private final long tokenRenewIntervalInMinutes;
   private final boolean isHelixClusterManaged;
@@ -117,7 +118,11 @@ public abstract class AbstractYarnAppSecurityManager 
extends AbstractIdleService
     this.loginExecutor.scheduleAtFixedRate(new Runnable() {
       @Override
       public void run() {
-        loginAndScheduleTokenRenewal();
+        try {
+          loginAndScheduleTokenRenewal();
+        }catch(Exception e){
+          LOGGER.error("Error during login, will continue the thread and try 
next time.");
+        }
       }
     }, this.loginIntervalInMinutes, this.loginIntervalInMinutes, 
TimeUnit.MINUTES);
   }
@@ -228,19 +233,31 @@ public abstract class AbstractYarnAppSecurityManager 
extends AbstractIdleService
   }
 
   /**
-   * Write the current delegation token to the token file.
+   * Write the current credentials to the token file.
    */
-  @VisibleForTesting
-  protected synchronized void writeDelegationTokenToFile() throws IOException {
+  protected synchronized void writeDelegationTokenToFile(Credentials cred) 
throws IOException {
+
     if (this.fs.exists(this.tokenFilePath)) {
       LOGGER.info("Deleting existing token file " + this.tokenFilePath);
       this.fs.delete(this.tokenFilePath, false);
     }
 
-    LOGGER.info("Writing new or renewed token to token file " + 
this.tokenFilePath);
-    YarnHelixUtils.writeTokenToFile(token, this.tokenFilePath, 
this.fs.getConf());
+    LOGGER.debug("creating new token file {} with 644 permission.", 
this.tokenFilePath);
+
+    YarnHelixUtils.writeTokenToFile(this.tokenFilePath, cred, 
this.fs.getConf());
     // Only grand access to the token file to the login user
     this.fs.setPermission(this.tokenFilePath, new 
FsPermission(FsAction.READ_WRITE, FsAction.NONE, FsAction.NONE));
+
+    System.setProperty(HADOOP_TOKEN_FILE_LOCATION, 
tokenFilePath.toUri().getPath());
+    LOGGER.info("set HADOOP_TOKEN_FILE_LOCATION = {}", this.tokenFilePath);
+  }
+
+  /**
+   * Write the current delegation token to the token file. Should be only for 
testing
+   */
+  @VisibleForTesting
+  protected synchronized void writeDelegationTokenToFile() throws IOException {
+    writeDelegationTokenToFile(this.credentials);
   }
 
   /**
diff --git 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
index 36104b6..9c1f03b 100644
--- 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
+++ 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
@@ -19,7 +19,6 @@ package org.apache.gobblin.yarn;
 
 import java.io.File;
 import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
 import java.net.URI;
 import java.nio.ByteBuffer;
 import java.util.EnumSet;
@@ -37,9 +36,8 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.avro.Schema;
 import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.reflect.ConstructorUtils;
 import org.apache.commons.mail.EmailException;
+import org.apache.gobblin.util.hadoop.TokenUtils;
 import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
@@ -49,7 +47,6 @@ import org.apache.hadoop.fs.RawLocalFileSystem;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
@@ -325,7 +322,7 @@ public class GobblinYarnAppLauncher {
    * @throws IOException if there's something wrong launching the application
    * @throws YarnException if there's something wrong launching the application
    */
-  public void launch() throws IOException, YarnException {
+  public void launch() throws IOException, YarnException, InterruptedException 
{
     this.eventBus.register(this);
 
     if (this.isHelixClusterManaged) {
@@ -602,12 +599,14 @@ public class GobblinYarnAppLauncher {
    * @throws YarnException if there's anything wrong setting up and submitting 
the Yarn application
    */
   @VisibleForTesting
-  ApplicationId setupAndSubmitApplication() throws IOException, YarnException {
+  ApplicationId setupAndSubmitApplication() throws IOException, YarnException, 
InterruptedException {
+    LOGGER.info("creating new yarn application");
     YarnClientApplication gobblinYarnApp = this.yarnClient.createApplication();
     ApplicationSubmissionContext appSubmissionContext = 
gobblinYarnApp.getApplicationSubmissionContext();
     appSubmissionContext.setApplicationType(GOBBLIN_YARN_APPLICATION_TYPE);
     appSubmissionContext.setMaxAppAttempts(ConfigUtils.getInt(config, 
GobblinYarnConfigurationKeys.APP_MASTER_MAX_ATTEMPTS_KEY, 
GobblinYarnConfigurationKeys.DEFAULT_APP_MASTER_MAX_ATTEMPTS_KEY));
     ApplicationId applicationId = appSubmissionContext.getApplicationId();
+    LOGGER.info("created new yarn application: "+ applicationId.getId());
 
     GetNewApplicationResponse newApplicationResponse = 
gobblinYarnApp.getNewApplicationResponse();
     // Set up resource type requirements for ApplicationMaster
@@ -821,29 +820,22 @@ public class GobblinYarnAppLauncher {
         .toString();
   }
 
-  private void setupSecurityTokens(ContainerLaunchContext 
containerLaunchContext) throws IOException {
+  private void setupSecurityTokens(ContainerLaunchContext 
containerLaunchContext) throws IOException, InterruptedException {
+    LOGGER.info("setting up SecurityTokens for containerLaunchContext.");
     Credentials credentials = 
UserGroupInformation.getCurrentUser().getCredentials();
+    String renewerName = 
this.yarnConfiguration.get(YarnConfiguration.RM_PRINCIPAL);
 
     // Pass on the credentials from the hadoop token file if present.
     // The value in the token file takes precedence.
     if (System.getenv(HADOOP_TOKEN_FILE_LOCATION) != null) {
+      LOGGER.info("HADOOP_TOKEN_FILE_LOCATION is set to {} reading tokens from 
it for containerLaunchContext.", System.getenv(HADOOP_TOKEN_FILE_LOCATION));
       Credentials tokenFileCredentials = Credentials.readTokenStorageFile(new 
File(System.getenv(HADOOP_TOKEN_FILE_LOCATION)),
-          new Configuration());
+              new Configuration());
       credentials.addAll(tokenFileCredentials);
+      LOGGER.debug("All containerLaunchContext tokens: {} present in file {} 
", credentials.getAllTokens(), System.getenv(HADOOP_TOKEN_FILE_LOCATION));
     }
 
-    String tokenRenewer = 
this.yarnConfiguration.get(YarnConfiguration.RM_PRINCIPAL);
-    if (tokenRenewer == null || tokenRenewer.length() == 0) {
-      throw new IOException("Failed to get master Kerberos principal for the 
RM to use as renewer");
-    }
-
-    // For now, only getting tokens for the default file-system.
-    Token<?> tokens[] = this.fs.addDelegationTokens(tokenRenewer, credentials);
-    if (tokens != null) {
-      for (Token<?> token : tokens) {
-        LOGGER.info("Got delegation token for " + this.fs.getUri() + "; " + 
token);
-      }
-    }
+    TokenUtils.getAllFSTokens(new Configuration(), credentials, renewerName, 
null, ConfigUtils.getStringList(this.config, TokenUtils.OTHER_NAMENODES));
 
     Closer closer = Closer.create();
     try {
@@ -851,6 +843,7 @@ public class GobblinYarnAppLauncher {
       credentials.writeTokenStorageToStream(dataOutputBuffer);
       ByteBuffer fsTokens = ByteBuffer.wrap(dataOutputBuffer.getData(), 0, 
dataOutputBuffer.getLength());
       containerLaunchContext.setTokens(fsTokens);
+      LOGGER.info("Setting containerLaunchContext with All credential tokens: 
" + credentials.getAllTokens());
     } catch (Throwable t) {
       throw closer.rethrow(t);
     } finally {
diff --git 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAppSecurityManagerWithKeytabs.java
 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAppSecurityManagerWithKeytabs.java
index 8925d94..d5081ca 100644
--- 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAppSecurityManagerWithKeytabs.java
+++ 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAppSecurityManagerWithKeytabs.java
@@ -19,11 +19,15 @@ package org.apache.gobblin.yarn;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.concurrent.ScheduledFuture;
 
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.hadoop.TokenUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.helix.HelixManager;
 
@@ -31,6 +35,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Strings;
 import com.typesafe.config.Config;
+import static 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION;
 
 
 /**
@@ -62,7 +67,6 @@ public class YarnAppSecurityManagerWithKeytabs extends 
AbstractYarnAppSecurityMa
   // This flag is used to tell if this is the first login. If yes, no token 
updated message will be
   // sent to the controller and the participants as they may not be up running 
yet. The first login
   // happens after this class starts up so the token gets regularly refreshed 
before the next login.
-  private volatile boolean firstLogin = true;
 
   public YarnAppSecurityManagerWithKeytabs(Config config, HelixManager 
helixManager, FileSystem fs, Path tokenFilePath)
       throws IOException {
@@ -74,11 +78,29 @@ public class YarnAppSecurityManagerWithKeytabs extends 
AbstractYarnAppSecurityMa
    * Renew the existing delegation token.
    */
   protected synchronized void renewDelegationToken() throws IOException, 
InterruptedException {
-    this.token.renew(this.fs.getConf());
-    writeDelegationTokenToFile();
+    LOGGER.debug("renewing all tokens {}", credentials.getAllTokens());
+
+    credentials.getAllTokens().forEach(
+      existingToken -> {
+        try {
+          long expiryTime = existingToken.renew(this.fs.getConf());
+          LOGGER.info("renewed token: {}, expiryTime: {}, Id; {}", 
existingToken, expiryTime, Arrays.toString(existingToken.getIdentifier()));
+
+          // TODO: If token failed to get renewed in case its expired ( can be 
detected via the error text ),
+          //  it should call the login() to reissue the new tokens
+        } catch (IOException | InterruptedException e) {
+          LOGGER.error("Error renewing token: " + existingToken + " ,error: " 
+ e, e);
+        }
+      }
+    );
+
+    writeDelegationTokenToFile(credentials);
 
     if (!this.firstLogin) {
+      LOGGER.info("This is not a first login, sending 
TokenFileUpdatedMessage.");
       sendTokenFileUpdatedMessage();
+    } else {
+      LOGGER.info("This is first login of the interval, so skipping sending 
TokenFileUpdatedMessage.");
     }
   }
 
@@ -86,14 +108,25 @@ public class YarnAppSecurityManagerWithKeytabs extends 
AbstractYarnAppSecurityMa
    * Get a new delegation token for the current logged-in user.
    */
   @VisibleForTesting
-  synchronized void getNewDelegationTokenForLoginUser() throws IOException {
-    this.token = this.fs.getDelegationToken(this.loginUser.getShortUserName());
+  synchronized void getNewDelegationTokenForLoginUser() throws IOException, 
InterruptedException {
+    final Configuration newConfig = new Configuration();
+    final Credentials allCreds = new Credentials();
+//    Text renewer = TokenUtils.getMRTokenRenewerInternal(new JobConf());
+    String renewer = UserGroupInformation.getLoginUser().getShortUserName();
+
+    LOGGER.info("creating new login tokens with renewer: {}", renewer);
+    TokenUtils.getAllFSTokens(newConfig, allCreds, renewer, Optional.absent(), 
ConfigUtils.getStringList(this.config, TokenUtils.OTHER_NAMENODES));
+    //TODO: Any other required tokens can be fetched here based on config or 
any other detection mechanism
+
+    LOGGER.debug("All new tokens in credential: {}", allCreds.getAllTokens());
+
+    this.credentials = allCreds;
   }
 
   /**
    * Login the user from a given keytab file.
    */
-  protected void login() throws IOException {
+  protected synchronized void login() throws IOException, InterruptedException 
{
     String keyTabFilePath = 
this.config.getString(GobblinYarnConfigurationKeys.KEYTAB_FILE_PATH);
     if (Strings.isNullOrEmpty(keyTabFilePath)) {
       throw new IOException("Keytab file path is not defined for Kerberos 
login");
@@ -107,21 +140,26 @@ public class YarnAppSecurityManagerWithKeytabs extends 
AbstractYarnAppSecurityMa
     if (Strings.isNullOrEmpty(principal)) {
       principal = this.loginUser.getShortUserName() + "/localhost@LOCALHOST";
     }
+    LOGGER.info("Login using kerberos principal : "+ principal);
 
     Configuration conf = new Configuration();
-    conf.set("hadoop.security.authentication",
-        
UserGroupInformation.AuthenticationMethod.KERBEROS.toString().toLowerCase());
+    conf.set(HADOOP_SECURITY_AUTHENTICATION, 
UserGroupInformation.AuthenticationMethod.KERBEROS.toString().toLowerCase());
     UserGroupInformation.setConfiguration(conf);
     UserGroupInformation.loginUserFromKeytab(principal, keyTabFilePath);
-    LOGGER.info(String.format("Logged in from keytab file %s using principal 
%s", keyTabFilePath, principal));
 
-    this.loginUser = UserGroupInformation.getLoginUser();
+    LOGGER.info(String.format("Logged in from keytab file %s using principal 
%s for user: %s", keyTabFilePath, principal, this.loginUser));
 
     getNewDelegationTokenForLoginUser();
-    writeDelegationTokenToFile();
+
+    writeDelegationTokenToFile(this.credentials);
+
+    UserGroupInformation.getCurrentUser().addCredentials(this.credentials);
 
     if (!this.firstLogin) {
+      LOGGER.info("This is not a first login, sending TokenFileUpdatedMessage 
from Login().");
       sendTokenFileUpdatedMessage();
+    }else {
+      LOGGER.info("This is first login of the interval, so skipping sending 
TokenFileUpdatedMessage from Login().");
     }
   }
 }
diff --git 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java
index 946649a..4ecc8b9 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java
@@ -60,15 +60,17 @@ public class YarnHelixUtils {
   /**
    * Write a {@link Token} to a given file.
    *
-   * @param token the token to write
    * @param tokenFilePath the token file path
+   * @param credentials all tokens of this credentials to be written to given 
file
    * @param configuration a {@link Configuration} object carrying Hadoop 
configuration properties
    * @throws IOException
    */
-  public static void writeTokenToFile(Token<? extends TokenIdentifier> token, 
Path tokenFilePath,
-      Configuration configuration) throws IOException {
-    Credentials credentials = new Credentials();
-    credentials.addToken(token.getService(), token);
+  public static void writeTokenToFile(Path tokenFilePath, Credentials 
credentials, Configuration configuration) throws IOException {
+    if(credentials == null) {
+      LOGGER.warn("got empty credentials, creating default one as new.");
+      credentials = new Credentials();
+    }
+    LOGGER.debug(String.format("Writing all tokens %s to file %s",  
credentials.getAllTokens(), tokenFilePath));
     credentials.writeTokenStorageFile(tokenFilePath, configuration);
   }
 
@@ -79,6 +81,7 @@ public class YarnHelixUtils {
    * @throws IOException
    */
   public static void updateToken(String tokenFileName) throws IOException{
+    LOGGER.info("reading token from file: "+ tokenFileName);
     URL tokenFileUrl = 
YarnHelixUtils.class.getClassLoader().getResource(tokenFileName);
     if (tokenFileUrl != null) {
       File tokenFile = new File(tokenFileUrl.getFile());
diff --git 
a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnSecurityManagerTest.java
 
b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnSecurityManagerTest.java
index 489270a..8b440d3 100644
--- 
a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnSecurityManagerTest.java
+++ 
b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnSecurityManagerTest.java
@@ -19,7 +19,9 @@ package org.apache.gobblin.yarn;
 
 import java.io.IOException;
 import java.net.URL;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.List;
 
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
@@ -36,6 +38,8 @@ import org.apache.helix.HelixManager;
 import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.InstanceType;
 import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -55,6 +59,8 @@ import org.apache.gobblin.cluster.HelixUtils;
 import org.apache.gobblin.cluster.TestHelper;
 import org.apache.gobblin.testing.AssertWithBackoff;
 
+import static org.mockito.Matchers.any;
+
 
 /**
  * Unit tests for {@link YarnAppSecurityManagerWithKeytabs} and {@link 
YarnContainerSecurityManager}.
@@ -88,7 +94,8 @@ public class YarnSecurityManagerTest {
   private FileSystem localFs;
   private Path baseDir;
   private Path tokenFilePath;
-  private Token<?> token;
+  private Token<?> fsToken;
+  private List<Token<?>> allTokens;
 
   private YarnAppSecurityManagerWithKeytabs 
_yarnAppYarnAppSecurityManagerWithKeytabs;
   private YarnContainerSecurityManager yarnContainerSecurityManager;
@@ -130,21 +137,32 @@ public class YarnSecurityManagerTest {
     this.configuration = new Configuration();
     this.localFs = Mockito.spy(FileSystem.getLocal(this.configuration));
 
-    this.token = new Token<>();
-    this.token.setKind(new Text("test"));
-    this.token.setService(new Text("test"));
-    
Mockito.<Token<?>>when(this.localFs.getDelegationToken(UserGroupInformation.getLoginUser().getShortUserName()))
-        .thenReturn(this.token);
+    this.fsToken = new Token<>();
+    this.fsToken.setKind(new Text("HDFS_DELEGATION_TOKEN"));
+    this.fsToken.setService(new Text("HDFS"));
+    this.allTokens = new ArrayList<>();
+    allTokens.add(fsToken);
+    Token<?>[] allTokenArray = new Token<?>[2];
+    allTokenArray[0]= fsToken;
+
+    Mockito.<Token<?>[]>when(localFs.addDelegationTokens(any(String.class), 
any(Credentials.class)))
+            .thenReturn(allTokenArray);
 
     this.baseDir = new Path(YarnSecurityManagerTest.class.getSimpleName());
     this.tokenFilePath = new Path(this.baseDir, 
GobblinYarnConfigurationKeys.TOKEN_FILE_NAME);
-    this._yarnAppYarnAppSecurityManagerWithKeytabs =
-        new YarnAppSecurityManagerWithKeytabs(config, this.helixManager, 
this.localFs, this.tokenFilePath);
+    this._yarnAppYarnAppSecurityManagerWithKeytabs = Mockito.spy(new 
YarnAppSecurityManagerWithKeytabs(config, this.helixManager, this.localFs, 
this.tokenFilePath));
     this.yarnContainerSecurityManager = new 
YarnContainerSecurityManager(config, this.localFs, new EventBus());
+
+    Mockito.doAnswer(new Answer<Void>() {
+      public Void answer(InvocationOnMock invocation) throws Throwable {
+        _yarnAppYarnAppSecurityManagerWithKeytabs.credentials.addToken(new 
Text("HDFS_DELEGATION_TOKEN"), fsToken);
+        return null;
+      }
+    
}).when(_yarnAppYarnAppSecurityManagerWithKeytabs).getNewDelegationTokenForLoginUser();
   }
 
   @Test
-  public void testGetNewDelegationTokenForLoginUser() throws IOException {
+  public void testGetNewDelegationTokenForLoginUser() throws IOException, 
InterruptedException {
     
this._yarnAppYarnAppSecurityManagerWithKeytabs.getNewDelegationTokenForLoginUser();
   }
 
@@ -235,8 +253,6 @@ public class YarnSecurityManagerTest {
   }
 
   private void assertToken(Collection<Token<?>> tokens) {
-    Assert.assertEquals(tokens.size(), 1);
-    Token<?> token = tokens.iterator().next();
-    Assert.assertEquals(token, this.token);
+    tokens.forEach( token -> 
org.junit.Assert.assertTrue(allTokens.contains(token)));
   }
 }

Reply via email to