This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 4cfe51d  [GOBBLIN-1210][GOBBLIN-1217] Force AM to read from token file 
to update token when start up (replace PR)
4cfe51d is described below

commit 4cfe51d4e21a7b5676edeac6120b2b2cdfbac455
Author: Zihan Li <[email protected]>
AuthorDate: Thu Jul 23 12:01:21 2020 -0700

    [GOBBLIN-1210][GOBBLIN-1217] Force AM to read from token file to update 
token when start up (replace PR)
    
    Closes #3068 from ZihanLi58/Gobblin-1210_replace
---
 .../gobblin/yarn/GobblinApplicationMaster.java     | 29 +++++++++++-----------
 .../gobblin/yarn/GobblinYarnAppLauncher.java       | 16 ++++++------
 .../apache/gobblin/yarn/GobblinYarnTaskRunner.java |  3 +++
 .../org/apache/gobblin/yarn/YarnHelixUtils.java    | 24 +++++++++++++++++-
 4 files changed, 50 insertions(+), 22 deletions(-)

diff --git 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java
 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java
index 433dfaf..2fc36b2 100644
--- 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java
+++ 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java
@@ -19,12 +19,22 @@ package org.apache.gobblin.yarn;
 
 import java.util.Collections;
 import java.util.List;
-
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.DefaultParser;
 import org.apache.commons.cli.HelpFormatter;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
+import org.apache.gobblin.cluster.GobblinClusterManager;
+import org.apache.gobblin.cluster.GobblinClusterUtils;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.JvmUtils;
+import org.apache.gobblin.util.PathUtils;
+import org.apache.gobblin.util.logs.Log4jConfigurationHelper;
+import org.apache.gobblin.util.logs.LogCopier;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+import org.apache.gobblin.yarn.event.DelegationTokenUpdatedEvent;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -39,7 +49,6 @@ import 
org.apache.helix.messaging.handling.MessageHandlerFactory;
 import org.apache.helix.model.Message;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.Service;
 import com.typesafe.config.Config;
@@ -48,18 +57,6 @@ import com.typesafe.config.ConfigValueFactory;
 
 import lombok.Getter;
 
-import org.apache.gobblin.annotation.Alpha;
-import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
-import org.apache.gobblin.cluster.GobblinClusterManager;
-import org.apache.gobblin.cluster.GobblinClusterUtils;
-import org.apache.gobblin.util.ConfigUtils;
-import org.apache.gobblin.util.JvmUtils;
-import org.apache.gobblin.util.PathUtils;
-import org.apache.gobblin.util.logs.Log4jConfigurationHelper;
-import org.apache.gobblin.util.logs.LogCopier;
-import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
-import org.apache.gobblin.yarn.event.DelegationTokenUpdatedEvent;
-
 
 /**
  * The Yarn ApplicationMaster class for Gobblin.
@@ -223,6 +220,10 @@ public class GobblinApplicationMaster extends 
GobblinClusterManager {
         System.exit(1);
       }
 
+      //Because AM is restarted with the original AppSubmissionContext, it may 
have outdated delegation tokens.
+      //So the refreshed tokens should be added into the container's UGI 
before any HDFS/Hive/RM access is performed.
+      YarnHelixUtils.updateToken();
+
       
Log4jConfigurationHelper.updateLog4jConfiguration(GobblinApplicationMaster.class,
           GobblinYarnConfigurationKeys.GOBBLIN_YARN_LOG4J_CONFIGURATION_FILE,
           GobblinYarnConfigurationKeys.GOBBLIN_YARN_LOG4J_CONFIGURATION_FILE);
diff --git 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
index 3215a90..36104b6 100644
--- 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
+++ 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
@@ -40,6 +40,7 @@ import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.reflect.ConstructorUtils;
 import org.apache.commons.mail.EmailException;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -343,13 +344,15 @@ public class GobblinYarnAppLauncher {
 
     startYarnClient();
 
+    Optional<ApplicationId> reconnectableApplicationId = 
getReconnectableApplicationId();
+
+    boolean isReconnected = reconnectableApplicationId.isPresent();
     // Before setup application, first login to make sure ugi has the right 
token.
     if(ConfigUtils.getBoolean(config, 
GobblinYarnConfigurationKeys.ENABLE_KEY_MANAGEMENT, false)) {
-      this.securityManager = Optional.of(buildSecurityManager());
+      this.securityManager = Optional.of(buildSecurityManager(isReconnected));
       this.securityManager.get().loginAndScheduleTokenRenewal();
     }
 
-    Optional<ApplicationId> reconnectableApplicationId = 
getReconnectableApplicationId();
     if (!reconnectableApplicationId.isPresent()) {
       disableLiveHelixInstances();
       LOGGER.info("No reconnectable application found so submitting a new 
application");
@@ -877,18 +880,17 @@ public class GobblinYarnAppLauncher {
     return logRootDir;
   }
 
-  private AbstractYarnAppSecurityManager buildSecurityManager() throws 
IOException {
+  private AbstractYarnAppSecurityManager buildSecurityManager(boolean 
isReconnected) throws IOException {
     Path tokenFilePath = new Path(this.fs.getHomeDirectory(), 
this.applicationName + Path.SEPARATOR +
         GobblinYarnConfigurationKeys.TOKEN_FILE_NAME);
 
     ClassAliasResolver<AbstractYarnAppSecurityManager> aliasResolver = new 
ClassAliasResolver<>(
         AbstractYarnAppSecurityManager.class);
     try {
-     return 
(AbstractYarnAppSecurityManager)ConstructorUtils.invokeConstructor(Class.forName(aliasResolver.resolve(
+     return (AbstractYarnAppSecurityManager) 
GobblinConstructorUtils.invokeLongestConstructor(Class.forName(aliasResolver.resolve(
           ConfigUtils.getString(config, 
GobblinYarnConfigurationKeys.SECURITY_MANAGER_CLASS, 
GobblinYarnConfigurationKeys.DEFAULT_SECURITY_MANAGER_CLASS))), this.config, 
this.helixManager, this.fs,
-          tokenFilePath);
-    } catch (NoSuchMethodException | IllegalAccessException | 
InvocationTargetException | InstantiationException
-        | ClassNotFoundException e) {
+          tokenFilePath, isReconnected);
+    } catch (ReflectiveOperationException e) {
       throw new IOException(e);
     }
   }
diff --git 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java
index f6d7949..9db09a9 100644
--- 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java
+++ 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java
@@ -201,6 +201,9 @@ public class GobblinYarnTaskRunner extends 
GobblinTaskRunner {
       if (!Strings.isNullOrEmpty(helixInstanceTags)) {
         config = 
config.withValue(GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_KEY, 
ConfigValueFactory.fromAnyRef(helixInstanceTags));
       }
+      //Because AM is restarted with the original AppSubmissionContext, it may 
have outdated delegation tokens.
+      //So the refreshed tokens should be added into the container's UGI 
before any HDFS/Hive/RM access is performed.
+      YarnHelixUtils.updateToken();
 
       GobblinTaskRunner gobblinTaskRunner =
           new GobblinYarnTaskRunner(applicationName, applicationId, 
helixInstanceName, containerId, config,
diff --git 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java
index fb29049..27ab4e6 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java
@@ -23,11 +23,13 @@ import java.util.Collection;
 import java.util.Map;
 
 import org.apache.commons.lang3.StringUtils;
+import org.apache.gobblin.util.ConfigUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
@@ -42,7 +44,8 @@ import org.apache.hadoop.yarn.util.Records;
 import com.google.common.collect.Maps;
 import com.typesafe.config.Config;
 
-import org.apache.gobblin.util.ConfigUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
@@ -51,6 +54,8 @@ import org.apache.gobblin.util.ConfigUtils;
  * @author Yinan Li
  */
 public class YarnHelixUtils {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(YarnHelixUtils.class);
+
   /**
    * Write a {@link Token} to a given file.
    *
@@ -67,6 +72,23 @@ public class YarnHelixUtils {
   }
 
   /**
+   * Update {@link Token} with token file in resources.
+   *
+   * @param
+   * @throws IOException
+   */
+  public static void updateToken() throws IOException{
+    File tokenFile = new 
File(YarnHelixUtils.class.getClassLoader().getResource(GobblinYarnConfigurationKeys.TOKEN_FILE_NAME).getFile());
+    if(tokenFile.exists()) {
+      Credentials credentials = Credentials.readTokenStorageFile(tokenFile, 
new Configuration());
+      for (Token<? extends TokenIdentifier> token : 
credentials.getAllTokens()) {
+        LOGGER.info("updating " + token.getKind() + " " + token.getService());
+      }
+      UserGroupInformation.getCurrentUser().addCredentials(credentials);
+    }
+  }
+
+  /**
    * Read a collection {@link Token}s from a given file.
    *
    * @param tokenFilePath the token file path

Reply via email to