This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 4cfe51d [GOBBLIN-1210][GOBBLIN-1217] Force AM to read from token file
to update token when start up (replace PR)
4cfe51d is described below
commit 4cfe51d4e21a7b5676edeac6120b2b2cdfbac455
Author: Zihan Li <[email protected]>
AuthorDate: Thu Jul 23 12:01:21 2020 -0700
[GOBBLIN-1210][GOBBLIN-1217] Force AM to read from token file to update
token when start up (replace PR)
Closes #3068 from ZihanLi58/Gobblin-1210_replace
---
.../gobblin/yarn/GobblinApplicationMaster.java | 29 +++++++++++-----------
.../gobblin/yarn/GobblinYarnAppLauncher.java | 16 ++++++------
.../apache/gobblin/yarn/GobblinYarnTaskRunner.java | 3 +++
.../org/apache/gobblin/yarn/YarnHelixUtils.java | 24 +++++++++++++++++-
4 files changed, 50 insertions(+), 22 deletions(-)
diff --git
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java
index 433dfaf..2fc36b2 100644
---
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java
+++
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java
@@ -19,12 +19,22 @@ package org.apache.gobblin.yarn;
import java.util.Collections;
import java.util.List;
-
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
+import org.apache.gobblin.cluster.GobblinClusterManager;
+import org.apache.gobblin.cluster.GobblinClusterUtils;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.JvmUtils;
+import org.apache.gobblin.util.PathUtils;
+import org.apache.gobblin.util.logs.Log4jConfigurationHelper;
+import org.apache.gobblin.util.logs.LogCopier;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+import org.apache.gobblin.yarn.event.DelegationTokenUpdatedEvent;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
@@ -39,7 +49,6 @@ import
org.apache.helix.messaging.handling.MessageHandlerFactory;
import org.apache.helix.model.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import com.google.common.base.Optional;
import com.google.common.util.concurrent.Service;
import com.typesafe.config.Config;
@@ -48,18 +57,6 @@ import com.typesafe.config.ConfigValueFactory;
import lombok.Getter;
-import org.apache.gobblin.annotation.Alpha;
-import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
-import org.apache.gobblin.cluster.GobblinClusterManager;
-import org.apache.gobblin.cluster.GobblinClusterUtils;
-import org.apache.gobblin.util.ConfigUtils;
-import org.apache.gobblin.util.JvmUtils;
-import org.apache.gobblin.util.PathUtils;
-import org.apache.gobblin.util.logs.Log4jConfigurationHelper;
-import org.apache.gobblin.util.logs.LogCopier;
-import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
-import org.apache.gobblin.yarn.event.DelegationTokenUpdatedEvent;
-
/**
* The Yarn ApplicationMaster class for Gobblin.
@@ -223,6 +220,10 @@ public class GobblinApplicationMaster extends
GobblinClusterManager {
System.exit(1);
}
+ //Because AM is restarted with the original AppSubmissionContext, it may
have outdated delegation tokens.
+ //So the refreshed tokens should be added into the container's UGI
before any HDFS/Hive/RM access is performed.
+ YarnHelixUtils.updateToken();
+
Log4jConfigurationHelper.updateLog4jConfiguration(GobblinApplicationMaster.class,
GobblinYarnConfigurationKeys.GOBBLIN_YARN_LOG4J_CONFIGURATION_FILE,
GobblinYarnConfigurationKeys.GOBBLIN_YARN_LOG4J_CONFIGURATION_FILE);
diff --git
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
index 3215a90..36104b6 100644
---
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
+++
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
@@ -40,6 +40,7 @@ import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.reflect.ConstructorUtils;
import org.apache.commons.mail.EmailException;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -343,13 +344,15 @@ public class GobblinYarnAppLauncher {
startYarnClient();
+ Optional<ApplicationId> reconnectableApplicationId =
getReconnectableApplicationId();
+
+ boolean isReconnected = reconnectableApplicationId.isPresent();
// Before setup application, first login to make sure ugi has the right
token.
if(ConfigUtils.getBoolean(config,
GobblinYarnConfigurationKeys.ENABLE_KEY_MANAGEMENT, false)) {
- this.securityManager = Optional.of(buildSecurityManager());
+ this.securityManager = Optional.of(buildSecurityManager(isReconnected));
this.securityManager.get().loginAndScheduleTokenRenewal();
}
- Optional<ApplicationId> reconnectableApplicationId =
getReconnectableApplicationId();
if (!reconnectableApplicationId.isPresent()) {
disableLiveHelixInstances();
LOGGER.info("No reconnectable application found so submitting a new
application");
@@ -877,18 +880,17 @@ public class GobblinYarnAppLauncher {
return logRootDir;
}
- private AbstractYarnAppSecurityManager buildSecurityManager() throws
IOException {
+ private AbstractYarnAppSecurityManager buildSecurityManager(boolean
isReconnected) throws IOException {
Path tokenFilePath = new Path(this.fs.getHomeDirectory(),
this.applicationName + Path.SEPARATOR +
GobblinYarnConfigurationKeys.TOKEN_FILE_NAME);
ClassAliasResolver<AbstractYarnAppSecurityManager> aliasResolver = new
ClassAliasResolver<>(
AbstractYarnAppSecurityManager.class);
try {
- return
(AbstractYarnAppSecurityManager)ConstructorUtils.invokeConstructor(Class.forName(aliasResolver.resolve(
+ return (AbstractYarnAppSecurityManager)
GobblinConstructorUtils.invokeLongestConstructor(Class.forName(aliasResolver.resolve(
ConfigUtils.getString(config,
GobblinYarnConfigurationKeys.SECURITY_MANAGER_CLASS,
GobblinYarnConfigurationKeys.DEFAULT_SECURITY_MANAGER_CLASS))), this.config,
this.helixManager, this.fs,
- tokenFilePath);
- } catch (NoSuchMethodException | IllegalAccessException |
InvocationTargetException | InstantiationException
- | ClassNotFoundException e) {
+ tokenFilePath, isReconnected);
+ } catch (ReflectiveOperationException e) {
throw new IOException(e);
}
}
diff --git
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java
index f6d7949..9db09a9 100644
---
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java
+++
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java
@@ -201,6 +201,9 @@ public class GobblinYarnTaskRunner extends
GobblinTaskRunner {
if (!Strings.isNullOrEmpty(helixInstanceTags)) {
config =
config.withValue(GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_KEY,
ConfigValueFactory.fromAnyRef(helixInstanceTags));
}
+ //Because AM is restarted with the original AppSubmissionContext, it may
have outdated delegation tokens.
+ //So the refreshed tokens should be added into the container's UGI
before any HDFS/Hive/RM access is performed.
+ YarnHelixUtils.updateToken();
GobblinTaskRunner gobblinTaskRunner =
new GobblinYarnTaskRunner(applicationName, applicationId,
helixInstanceName, containerId, config,
diff --git
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java
index fb29049..27ab4e6 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java
@@ -23,11 +23,13 @@ import java.util.Collection;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
+import org.apache.gobblin.util.ConfigUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.api.ApplicationConstants;
@@ -42,7 +44,8 @@ import org.apache.hadoop.yarn.util.Records;
import com.google.common.collect.Maps;
import com.typesafe.config.Config;
-import org.apache.gobblin.util.ConfigUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
@@ -51,6 +54,8 @@ import org.apache.gobblin.util.ConfigUtils;
* @author Yinan Li
*/
public class YarnHelixUtils {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(YarnHelixUtils.class);
+
/**
* Write a {@link Token} to a given file.
*
@@ -67,6 +72,23 @@ public class YarnHelixUtils {
}
/**
+ * Update {@link Token} with token file in resources.
+ *
+ * @param
+ * @throws IOException
+ */
+ public static void updateToken() throws IOException{
+ File tokenFile = new
File(YarnHelixUtils.class.getClassLoader().getResource(GobblinYarnConfigurationKeys.TOKEN_FILE_NAME).getFile());
+ if(tokenFile.exists()) {
+ Credentials credentials = Credentials.readTokenStorageFile(tokenFile,
new Configuration());
+ for (Token<? extends TokenIdentifier> token :
credentials.getAllTokens()) {
+ LOGGER.info("updating " + token.getKind() + " " + token.getService());
+ }
+ UserGroupInformation.getCurrentUser().addCredentials(credentials);
+ }
+ }
+
+ /**
* Read a collection {@link Token}s from a given file.
*
* @param tokenFilePath the token file path