This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new bce5cbc [Gobblin-902][GOBBLIN-902] Enable gobblin yarn app luncher
class configurable
bce5cbc is described below
commit bce5cbc3c8bb607b8c1af56dd3b41f75bbb2b92e
Author: Zihan Li <[email protected]>
AuthorDate: Fri Oct 11 16:47:12 2019 -0700
[Gobblin-902][GOBBLIN-902] Enable gobblin yarn app luncher class
configurable
Closes #2758 from ZihanLi58/GOBBLIN-902
---
...er.java => AbstractYarnAppSecurityManager.java} | 223 ++++++++-------------
.../gobblin/yarn/GobblinYarnAppLauncher.java | 34 +++-
.../gobblin/yarn/GobblinYarnConfigurationKeys.java | 6 +-
.../apache/gobblin/yarn/GobblinYarnTaskRunner.java | 6 +-
.../yarn/YarnAppSecurityManagerWithKeytabs.java | 138 +++++++++++++
.../gobblin/yarn/YarnSecurityManagerTest.java | 18 +-
6 files changed, 264 insertions(+), 161 deletions(-)
diff --git
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAppSecurityManager.java
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/AbstractYarnAppSecurityManager.java
similarity index 56%
rename from
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAppSecurityManager.java
rename to
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/AbstractYarnAppSecurityManager.java
index 26b2fff..0b069cd 100644
---
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAppSecurityManager.java
+++
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/AbstractYarnAppSecurityManager.java
@@ -17,77 +17,53 @@
package org.apache.gobblin.yarn;
-import org.apache.gobblin.cluster.GobblinHelixMessagingService;
-import java.io.File;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Throwables;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.typesafe.config.Config;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
-
-import org.apache.hadoop.conf.Configuration;
+import org.apache.gobblin.cluster.GobblinHelixMessagingService;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.ExecutorsUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
-
import org.apache.helix.Criteria;
import org.apache.helix.HelixManager;
import org.apache.helix.InstanceType;
import org.apache.helix.model.Message;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
-import com.google.common.base.Strings;
-import com.google.common.base.Throwables;
-import com.google.common.util.concurrent.AbstractIdleService;
-
-import com.typesafe.config.Config;
-
-import org.apache.gobblin.util.ExecutorsUtils;
-
-
/**
- * A class for managing Kerberos login and token renewing on the client side
that has access to
- * the keytab file.
- *
- * <p>
- * This class works with {@link YarnContainerSecurityManager} to manage
renewing of delegation
- * tokens across the application. This class is responsible for login
through a Kerberos keytab,
- * renewing the delegation token, and storing the token to a token file on
HDFS. It sends a
- * Helix message to the controller and all the participants upon writing the
token to the token
- * file, which rely on the {@link YarnContainerSecurityManager} to read the
token in the file
- * upon receiving the message.
- * </p>
- *
* <p>
- * This class uses a scheduled task to do Kerberos re-login to renew the
Kerberos ticket on a
- * configurable schedule if login is from a keytab file. It also uses a
second scheduled task
+ * The super class for key management
+ * This class uses a scheduled task to do re-login to refetch token on a
+ * configurable schedule. It also uses a second scheduled task
* to renew the delegation token after each login. Both the re-login
interval and the token
* renewing interval are configurable.
* </p>
- *
- * @author Yinan Li
+ * @author Zihan Li
*/
-public class YarnAppSecurityManager extends AbstractIdleService {
+public abstract class AbstractYarnAppSecurityManager extends
AbstractIdleService {
- private static final Logger LOGGER =
LoggerFactory.getLogger(YarnAppSecurityManager.class);
+ protected Logger LOGGER = LoggerFactory.getLogger(this.getClass().getName());
- private final Config config;
-
- private final HelixManager helixManager;
- private final FileSystem fs;
- private final Path tokenFilePath;
- private UserGroupInformation loginUser;
- private Token<? extends TokenIdentifier> token;
+ protected Config config;
+ protected final HelixManager helixManager;
+ protected final FileSystem fs;
+ protected final Path tokenFilePath;
+ protected Token<? extends TokenIdentifier> token;
private final long loginIntervalInMinutes;
private final long tokenRenewIntervalInMinutes;
@@ -98,19 +74,18 @@ public class YarnAppSecurityManager extends
AbstractIdleService {
// This flag is used to tell if this is the first login. If yes, no token
updated message will be
// sent to the controller and the participants as they may not be up running
yet. The first login
// happens after this class starts up so the token gets regularly refreshed
before the next login.
- private volatile boolean firstLogin = true;
+ protected volatile boolean firstLogin = true;
- public YarnAppSecurityManager(Config config, HelixManager helixManager,
FileSystem fs, Path tokenFilePath)
- throws IOException {
+ public AbstractYarnAppSecurityManager(Config config, HelixManager
helixManager, FileSystem fs, Path tokenFilePath) {
this.config = config;
this.helixManager = helixManager;
this.fs = fs;
-
this.tokenFilePath = tokenFilePath;
this.fs.makeQualified(tokenFilePath);
- this.loginUser = UserGroupInformation.getLoginUser();
- this.loginIntervalInMinutes =
config.getLong(GobblinYarnConfigurationKeys.LOGIN_INTERVAL_IN_MINUTES);
- this.tokenRenewIntervalInMinutes =
config.getLong(GobblinYarnConfigurationKeys.TOKEN_RENEW_INTERVAL_IN_MINUTES);
+ this.loginIntervalInMinutes = ConfigUtils.getLong(config,
GobblinYarnConfigurationKeys.LOGIN_INTERVAL_IN_MINUTES,
+ GobblinYarnConfigurationKeys.DEFAULT_LOGIN_INTERVAL_IN_MINUTES);
+ this.tokenRenewIntervalInMinutes = ConfigUtils.getLong(config,
GobblinYarnConfigurationKeys.TOKEN_RENEW_INTERVAL_IN_MINUTES,
+ GobblinYarnConfigurationKeys.DEFAULT_TOKEN_RENEW_INTERVAL_IN_MINUTES);
this.loginExecutor = Executors.newSingleThreadScheduledExecutor(
ExecutorsUtils.newThreadFactory(Optional.of(LOGGER),
Optional.of("KeytabReLoginExecutor")));
@@ -120,7 +95,7 @@ public class YarnAppSecurityManager extends
AbstractIdleService {
@Override
protected void startUp() throws Exception {
- LOGGER.info("Starting the " +
YarnAppSecurityManager.class.getSimpleName());
+ LOGGER.info("Starting the " + this.getClass().getSimpleName());
LOGGER.info(
String.format("Scheduling the login task with an interval of %d
minute(s)", this.loginIntervalInMinutes));
@@ -129,30 +104,14 @@ public class YarnAppSecurityManager extends
AbstractIdleService {
this.loginExecutor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
- try {
- // Cancel the currently scheduled token renew task
- if (scheduledTokenRenewTask.isPresent() &&
scheduledTokenRenewTask.get().cancel(true)) {
- LOGGER.info("Cancelled the token renew task");
- }
-
- loginFromKeytab();
- if (firstLogin) {
- firstLogin = false;
- }
-
- // Re-schedule the token renew task after re-login
- scheduleTokenRenewTask();
- } catch (IOException ioe) {
- LOGGER.error("Failed to login from keytab", ioe);
- throw Throwables.propagate(ioe);
- }
+ loginAndScheduleTokenRenewal();
}
- }, 0, this.loginIntervalInMinutes, TimeUnit.MINUTES);
+ }, this.loginIntervalInMinutes, this.loginIntervalInMinutes,
TimeUnit.MINUTES);
}
@Override
protected void shutDown() throws Exception {
- LOGGER.info("Stopping the " +
YarnAppSecurityManager.class.getSimpleName());
+ LOGGER.info("Stopping the " + this.getClass().getSimpleName());
if (this.scheduledTokenRenewTask.isPresent()) {
this.scheduledTokenRenewTask.get().cancel(true);
@@ -161,7 +120,7 @@ public class YarnAppSecurityManager extends
AbstractIdleService {
ExecutorsUtils.shutdownExecutorService(this.tokenRenewExecutor,
Optional.of(LOGGER));
}
- private void scheduleTokenRenewTask() {
+ protected void scheduleTokenRenewTask() {
LOGGER.info(String.format("Scheduling the token renew task with an
interval of %d minute(s)",
this.tokenRenewIntervalInMinutes));
@@ -182,83 +141,32 @@ public class YarnAppSecurityManager extends
AbstractIdleService {
}, this.tokenRenewIntervalInMinutes, this.tokenRenewIntervalInMinutes,
TimeUnit.MINUTES));
}
- /**
- * Renew the existing delegation token.
- */
- private synchronized void renewDelegationToken() throws IOException,
InterruptedException {
- this.token.renew(this.fs.getConf());
- writeDelegationTokenToFile();
-
- if (!this.firstLogin) {
- // Send a message to the controller and all the participants if this is
not the first login
- sendTokenFileUpdatedMessage(InstanceType.CONTROLLER);
- sendTokenFileUpdatedMessage(InstanceType.PARTICIPANT);
- }
- }
-
- /**
- * Get a new delegation token for the current logged-in user.
- */
- @VisibleForTesting
- synchronized void getNewDelegationTokenForLoginUser() throws IOException {
- this.token = this.fs.getDelegationToken(this.loginUser.getShortUserName());
- }
-
- /**
- * Login the user from a given keytab file.
- */
- private void loginFromKeytab() throws IOException {
- String keyTabFilePath =
this.config.getString(GobblinYarnConfigurationKeys.KEYTAB_FILE_PATH);
- if (Strings.isNullOrEmpty(keyTabFilePath)) {
- throw new IOException("Keytab file path is not defined for Kerberos
login");
- }
-
- if (!new File(keyTabFilePath).exists()) {
- throw new IOException("Keytab file not found at: " + keyTabFilePath);
- }
-
- String principal =
this.config.getString(GobblinYarnConfigurationKeys.KEYTAB_PRINCIPAL_NAME);
- if (Strings.isNullOrEmpty(principal)) {
- principal = this.loginUser.getShortUserName() + "/localhost@LOCALHOST";
- }
-
- Configuration conf = new Configuration();
- conf.set("hadoop.security.authentication",
-
UserGroupInformation.AuthenticationMethod.KERBEROS.toString().toLowerCase());
- UserGroupInformation.setConfiguration(conf);
- UserGroupInformation.loginUserFromKeytab(principal, keyTabFilePath);
- LOGGER.info(String.format("Logged in from keytab file %s using principal
%s", keyTabFilePath, principal));
-
- this.loginUser = UserGroupInformation.getLoginUser();
+ //The whole logic for each re-login
+ public void loginAndScheduleTokenRenewal() {
+ try {
+ // Cancel the currently scheduled token renew task
+ if (scheduledTokenRenewTask.isPresent() &&
scheduledTokenRenewTask.get().cancel(true)) {
+ LOGGER.info("Cancelled the token renew task");
+ }
- getNewDelegationTokenForLoginUser();
- writeDelegationTokenToFile();
+ login();
+ if (firstLogin) {
+ firstLogin = false;
+ }
- if (!this.firstLogin) {
- // Send a message to the controller and all the participants
- sendTokenFileUpdatedMessage(InstanceType.CONTROLLER);
- sendTokenFileUpdatedMessage(InstanceType.PARTICIPANT);
+ // Re-schedule the token renew task after re-login
+ scheduleTokenRenewTask();
+ } catch (IOException | InterruptedException ioe) {
+ LOGGER.error("Failed to login from keytab", ioe);
+ throw Throwables.propagate(ioe);
}
}
/**
- * Write the current delegation token to the token file.
+ * This method is used to send TokenFileUpdatedMessage which will handle by
{@link YarnContainerSecurityManager}
*/
@VisibleForTesting
- synchronized void writeDelegationTokenToFile() throws IOException {
- if (this.fs.exists(this.tokenFilePath)) {
- LOGGER.info("Deleting existing token file " + this.tokenFilePath);
- this.fs.delete(this.tokenFilePath, false);
- }
-
- LOGGER.info("Writing new or renewed token to token file " +
this.tokenFilePath);
- YarnHelixUtils.writeTokenToFile(this.token, this.tokenFilePath,
this.fs.getConf());
- // Only grand access to the token file to the login user
- this.fs.setPermission(this.tokenFilePath, new
FsPermission(FsAction.READ_WRITE, FsAction.NONE, FsAction.NONE));
- }
-
- @VisibleForTesting
- void sendTokenFileUpdatedMessage(InstanceType instanceType) {
+ protected void sendTokenFileUpdatedMessage(InstanceType instanceType) {
Criteria criteria = new Criteria();
criteria.setInstanceName("%");
criteria.setResource("%");
@@ -268,9 +176,9 @@ public class YarnAppSecurityManager extends
AbstractIdleService {
/**
* #HELIX-0.6.7-WORKAROUND
* Add back when LIVESTANCES messaging is ported to 0.6 branch
- if (instanceType == InstanceType.PARTICIPANT) {
- criteria.setDataSource(Criteria.DataSource.LIVEINSTANCES);
- }
+ if (instanceType == InstanceType.PARTICIPANT) {
+ criteria.setDataSource(Criteria.DataSource.LIVEINSTANCES);
+ }
**/
criteria.setSessionSpecific(true);
@@ -286,9 +194,36 @@ public class YarnAppSecurityManager extends
AbstractIdleService {
// Temporarily bypass the default messaging service to allow upgrade to
0.6.7 which is missing support
// for messaging to instances
//int messagesSent =
this.helixManager.getMessagingService().send(criteria, tokenFileUpdatedMessage);
- GobblinHelixMessagingService messagingService = new
GobblinHelixMessagingService(this.helixManager);
+ GobblinHelixMessagingService messagingService = new
GobblinHelixMessagingService(helixManager);
int messagesSent = messagingService.send(criteria,
tokenFileUpdatedMessage);
LOGGER.info(String.format("Sent %d token file updated message(s) to the
%s", messagesSent, instanceType));
}
+
+ /**
+ * Write the current delegation token to the token file.
+ */
+ @VisibleForTesting
+ protected synchronized void writeDelegationTokenToFile() throws IOException {
+ if (this.fs.exists(this.tokenFilePath)) {
+ LOGGER.info("Deleting existing token file " + this.tokenFilePath);
+ this.fs.delete(this.tokenFilePath, false);
+ }
+
+ LOGGER.info("Writing new or renewed token to token file " +
this.tokenFilePath);
+ YarnHelixUtils.writeTokenToFile(token, this.tokenFilePath,
this.fs.getConf());
+ // Only grand access to the token file to the login user
+ this.fs.setPermission(this.tokenFilePath, new
FsPermission(FsAction.READ_WRITE, FsAction.NONE, FsAction.NONE));
+ }
+
+ /**
+ * Renew the existing delegation token.
+ */
+ protected abstract void renewDelegationToken() throws IOException,
InterruptedException;
+
+
+ /**
+ * Login the user from a given keytab file.
+ */
+ protected abstract void login() throws IOException, InterruptedException;
}
diff --git
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
index 8542f13..facbdd4 100644
---
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
+++
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
@@ -19,6 +19,7 @@ package org.apache.gobblin.yarn;
import java.io.File;
import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.EnumSet;
@@ -34,7 +35,9 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.commons.lang3.reflect.ConstructorUtils;
import org.apache.commons.mail.EmailException;
+import org.apache.gobblin.util.ClassAliasResolver;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -208,6 +211,7 @@ public class GobblinYarnAppLauncher {
private final int appMasterMemoryMbs;
private final int jvmMemoryOverheadMbs;
private final double jvmMemoryXmxRatio;
+ private Optional<AbstractYarnAppSecurityManager> securityManager =
Optional.absent();
private final String containerTimezone;
@@ -294,6 +298,12 @@ public class GobblinYarnAppLauncher {
startYarnClient();
+ // Before setup application, first login to make sure ugi has the right
token.
+ if(ConfigUtils.getBoolean(config,
GobblinYarnConfigurationKeys.ENABLE_KEY_MANAGEMENT, false)) {
+ this.securityManager = Optional.of(buildSecurityManager());
+ this.securityManager.get().loginAndScheduleTokenRenewal();
+ }
+
this.applicationId = getApplicationId();
this.applicationStatusMonitor.scheduleAtFixedRate(new Runnable() {
@@ -308,10 +318,14 @@ public class GobblinYarnAppLauncher {
}
}, 0, this.appReportIntervalMinutes, TimeUnit.MINUTES);
+ addServices();
+ }
+
+ private void addServices() throws IOException{
List<Service> services = Lists.newArrayList();
- if (this.config.hasPath(GobblinYarnConfigurationKeys.KEYTAB_FILE_PATH)) {
- LOGGER.info("Adding YarnAppSecurityManager since login is keytab based");
- services.add(buildYarnAppSecurityManager());
+ if (this.securityManager.isPresent()) {
+ LOGGER.info("Adding KeyManagerService since key management is enabled");
+ services.add(this.securityManager.get());
}
if
(!this.config.hasPath(GobblinYarnConfigurationKeys.LOG_COPIER_DISABLE_DRIVER_COPY)
||
!this.config.getBoolean(GobblinYarnConfigurationKeys.LOG_COPIER_DISABLE_DRIVER_COPY))
{
@@ -754,10 +768,20 @@ public class GobblinYarnAppLauncher {
return logRootDir;
}
- private YarnAppSecurityManager buildYarnAppSecurityManager() throws
IOException {
+ private AbstractYarnAppSecurityManager buildSecurityManager() throws
IOException {
Path tokenFilePath = new Path(this.fs.getHomeDirectory(),
this.applicationName + Path.SEPARATOR +
GobblinYarnConfigurationKeys.TOKEN_FILE_NAME);
- return new YarnAppSecurityManager(this.config, this.helixManager, this.fs,
tokenFilePath);
+
+ ClassAliasResolver<AbstractYarnAppSecurityManager> aliasResolver = new
ClassAliasResolver<>(
+ AbstractYarnAppSecurityManager.class);
+ try {
+ return
(AbstractYarnAppSecurityManager)ConstructorUtils.invokeConstructor(Class.forName(aliasResolver.resolve(
+ ConfigUtils.getString(config,
GobblinYarnConfigurationKeys.SECURITY_MANAGER_CLASS,
GobblinYarnConfigurationKeys.DEFAULT_SECURITY_MANAGER_CLASS))), this.config,
this.helixManager, this.fs,
+ tokenFilePath);
+ } catch (NoSuchMethodException | IllegalAccessException |
InvocationTargetException | InstantiationException
+ | ClassNotFoundException e) {
+ throw new IOException(e);
+ }
}
@VisibleForTesting
diff --git
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java
index 882b1d5..224bad8 100644
---
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java
+++
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java
@@ -77,12 +77,16 @@ public class GobblinYarnConfigurationKeys {
public static final String HELIX_INSTANCE_MAX_RETRIES = GOBBLIN_YARN_PREFIX
+ "helix.instance.max.retries";
// Security and authentication configuration properties.
+ public static final String SECURITY_MANAGER_CLASS = GOBBLIN_YARN_PREFIX +
"security.manager.class";
+ public static final String DEFAULT_SECURITY_MANAGER_CLASS =
"org.apache.gobblin.yarn.YarnAppSecurityManagerWithKeytabs";
+ public static final String ENABLE_KEY_MANAGEMENT = GOBBLIN_YARN_PREFIX +
"enable.key.management";
public static final String KEYTAB_FILE_PATH = GOBBLIN_YARN_PREFIX +
"keytab.file.path";
public static final String KEYTAB_PRINCIPAL_NAME = GOBBLIN_YARN_PREFIX +
"keytab.principal.name";
public static final String TOKEN_FILE_NAME = ".token";
public static final String LOGIN_INTERVAL_IN_MINUTES = GOBBLIN_YARN_PREFIX +
"login.interval.minutes";
+ public static final Long DEFAULT_LOGIN_INTERVAL_IN_MINUTES = Long.MAX_VALUE;
public static final String TOKEN_RENEW_INTERVAL_IN_MINUTES =
GOBBLIN_YARN_PREFIX + "token.renew.interval.minutes";
-
+ public static final Long DEFAULT_TOKEN_RENEW_INTERVAL_IN_MINUTES =
Long.MAX_VALUE;
// Resource/dependencies configuration properties.
public static final String LIB_JARS_DIR_KEY = GOBBLIN_YARN_PREFIX +
"lib.jars.dir";
diff --git
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java
index 60e4a76..d3bb523 100644
---
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java
+++
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java
@@ -25,7 +25,9 @@ import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
+import org.apache.gobblin.util.ConfigUtils;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.util.ConverterUtils;
@@ -66,8 +68,8 @@ public class GobblinYarnTaskRunner extends GobblinTaskRunner {
public List<Service> getServices() {
List<Service> services = new ArrayList<>();
services.addAll(super.getServices());
- if (this.config.hasPath(GobblinYarnConfigurationKeys.KEYTAB_FILE_PATH)) {
- LOGGER.info("Adding YarnContainerSecurityManager since login is keytab
based");
+ if (UserGroupInformation.isSecurityEnabled()) {
+ LOGGER.info("Adding YarnContainerSecurityManager since security is
enabled");
services.add(new YarnContainerSecurityManager(this.config, this.fs,
this.eventBus));
}
diff --git
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAppSecurityManagerWithKeytabs.java
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAppSecurityManagerWithKeytabs.java
new file mode 100644
index 0000000..3ee6107
--- /dev/null
+++
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAppSecurityManagerWithKeytabs.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.yarn;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.ScheduledFuture;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.InstanceType;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Strings;
+
+import com.typesafe.config.Config;
+
+
+/**
+ * A class for managing Kerberos login and token renewing on the client side
that has access to
+ * the keytab file.
+ *
+ * <p>
+ * This class works with {@link YarnContainerSecurityManager} to manage
renewing of delegation
+ * tokens across the application. This class is responsible for login
through a Kerberos keytab,
+ * renewing the delegation token, and storing the token to a token file on
HDFS. It sends a
+ * Helix message to the controller and all the participants upon writing the
token to the token
+ * file, which rely on the {@link YarnContainerSecurityManager} to read the
token in the file
+ * upon receiving the message.
+ * </p>
+ *
+ * <p>
+ * This class uses a scheduled task to do Kerberos re-login to renew the
Kerberos ticket on a
+ * configurable schedule if login is from a keytab file. It also uses a
second scheduled task
+ * to renew the delegation token after each login. Both the re-login
interval and the token
+ * renewing interval are configurable.
+ * </p>
+ *
+ * @author Yinan Li
+ */
+public class YarnAppSecurityManagerWithKeytabs extends
AbstractYarnAppSecurityManager {
+
+ private UserGroupInformation loginUser;
+ private Optional<ScheduledFuture<?>> scheduledTokenRenewTask =
Optional.absent();
+
+ // This flag is used to tell if this is the first login. If yes, no token
updated message will be
+ // sent to the controller and the participants as they may not be up running
yet. The first login
+ // happens after this class starts up so the token gets regularly refreshed
before the next login.
+ private volatile boolean firstLogin = true;
+
+ public YarnAppSecurityManagerWithKeytabs(Config config, HelixManager
helixManager, FileSystem fs, Path tokenFilePath)
+ throws IOException {
+ super(config, helixManager, fs, tokenFilePath);
+ this.loginUser = UserGroupInformation.getLoginUser();
+ }
+
+ /**
+ * Renew the existing delegation token.
+ */
+ protected synchronized void renewDelegationToken() throws IOException,
InterruptedException {
+ this.token.renew(this.fs.getConf());
+ writeDelegationTokenToFile();
+
+ if (!this.firstLogin) {
+ // Send a message to the controller and all the participants if this is
not the first login
+ sendTokenFileUpdatedMessage(InstanceType.CONTROLLER);
+ sendTokenFileUpdatedMessage(InstanceType.PARTICIPANT);
+ }
+ }
+
+ /**
+ * Get a new delegation token for the current logged-in user.
+ */
+ @VisibleForTesting
+ synchronized void getNewDelegationTokenForLoginUser() throws IOException {
+ this.token = this.fs.getDelegationToken(this.loginUser.getShortUserName());
+ }
+
+ /**
+ * Login the user from a given keytab file.
+ */
+ protected void login() throws IOException {
+ String keyTabFilePath =
this.config.getString(GobblinYarnConfigurationKeys.KEYTAB_FILE_PATH);
+ if (Strings.isNullOrEmpty(keyTabFilePath)) {
+ throw new IOException("Keytab file path is not defined for Kerberos
login");
+ }
+
+ if (!new File(keyTabFilePath).exists()) {
+ throw new IOException("Keytab file not found at: " + keyTabFilePath);
+ }
+
+ String principal =
this.config.getString(GobblinYarnConfigurationKeys.KEYTAB_PRINCIPAL_NAME);
+ if (Strings.isNullOrEmpty(principal)) {
+ principal = this.loginUser.getShortUserName() + "/localhost@LOCALHOST";
+ }
+
+ Configuration conf = new Configuration();
+ conf.set("hadoop.security.authentication",
+
UserGroupInformation.AuthenticationMethod.KERBEROS.toString().toLowerCase());
+ UserGroupInformation.setConfiguration(conf);
+ UserGroupInformation.loginUserFromKeytab(principal, keyTabFilePath);
+ LOGGER.info(String.format("Logged in from keytab file %s using principal
%s", keyTabFilePath, principal));
+
+ this.loginUser = UserGroupInformation.getLoginUser();
+
+ getNewDelegationTokenForLoginUser();
+ writeDelegationTokenToFile();
+
+ if (!this.firstLogin) {
+ // Send a message to the controller and all the participants
+ sendTokenFileUpdatedMessage(InstanceType.CONTROLLER);
+ sendTokenFileUpdatedMessage(InstanceType.PARTICIPANT);
+ }
+ }
+
+}
diff --git
a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnSecurityManagerTest.java
b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnSecurityManagerTest.java
index a6e6e55..ee15a21 100644
---
a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnSecurityManagerTest.java
+++
b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnSecurityManagerTest.java
@@ -56,17 +56,17 @@ import org.apache.gobblin.testing.AssertWithBackoff;
/**
- * Unit tests for {@link YarnAppSecurityManager} and {@link
YarnContainerSecurityManager}.
+ * Unit tests for {@link YarnAppSecurityManagerWithKeytabs} and {@link
YarnContainerSecurityManager}.
*
* <p>
- * This class tests {@link YarnAppSecurityManager} and {@link
YarnContainerSecurityManager} together
+ * This class tests {@link YarnAppSecurityManagerWithKeytabs} and {@link
YarnContainerSecurityManager} together
* as it is more convenient to test both here where all dependencies are
setup between the two.
* </p>
*
* <p>
* This class uses a {@link TestingServer} as an embedded ZooKeeper server
for testing. The Curator
* framework is used to provide a ZooKeeper client. This class also uses a
{@link HelixManager} as
- * being required by {@link YarnAppSecurityManager}. The local file system
as returned by
+ * being required by {@link YarnAppSecurityManagerWithKeytabs}. The local
file system as returned by
* {@link FileSystem#getLocal(Configuration)} is used for writing the
testing delegation token, which
* is acquired by mocking the method {@link
FileSystem#getDelegationToken(String)} on the local
* {@link FileSystem} instance.
@@ -87,7 +87,7 @@ public class YarnSecurityManagerTest {
private Path tokenFilePath;
private Token<?> token;
- private YarnAppSecurityManager yarnAppSecurityManager;
+ private YarnAppSecurityManagerWithKeytabs
_yarnAppYarnAppSecurityManagerWithKeytabs;
private YarnContainerSecurityManager yarnContainerSecurityManager;
private final Closer closer = Closer.create();
@@ -131,19 +131,19 @@ public class YarnSecurityManagerTest {
this.baseDir = new Path(YarnSecurityManagerTest.class.getSimpleName());
this.tokenFilePath = new Path(this.baseDir,
GobblinYarnConfigurationKeys.TOKEN_FILE_NAME);
- this.yarnAppSecurityManager =
- new YarnAppSecurityManager(config, this.helixManager, this.localFs,
this.tokenFilePath);
+ this._yarnAppYarnAppSecurityManagerWithKeytabs =
+ new YarnAppSecurityManagerWithKeytabs(config, this.helixManager,
this.localFs, this.tokenFilePath);
this.yarnContainerSecurityManager = new
YarnContainerSecurityManager(config, this.localFs, new EventBus());
}
@Test
public void testGetNewDelegationTokenForLoginUser() throws IOException {
- this.yarnAppSecurityManager.getNewDelegationTokenForLoginUser();
+
this._yarnAppYarnAppSecurityManagerWithKeytabs.getNewDelegationTokenForLoginUser();
}
@Test(dependsOnMethods = "testGetNewDelegationTokenForLoginUser")
public void testWriteDelegationTokenToFile() throws IOException {
- this.yarnAppSecurityManager.writeDelegationTokenToFile();
+
this._yarnAppYarnAppSecurityManagerWithKeytabs.writeDelegationTokenToFile();
Assert.assertTrue(this.localFs.exists(this.tokenFilePath));
assertToken(YarnHelixUtils.readTokensFromFile(this.tokenFilePath,
this.configuration));
}
@@ -173,7 +173,7 @@ public class YarnSecurityManagerTest {
@Test
public void testSendTokenFileUpdatedMessage() throws Exception {
Logger log = LoggerFactory.getLogger("testSendTokenFileUpdatedMessage");
-
this.yarnAppSecurityManager.sendTokenFileUpdatedMessage(InstanceType.CONTROLLER);
+
this._yarnAppYarnAppSecurityManagerWithKeytabs.sendTokenFileUpdatedMessage(InstanceType.CONTROLLER);
Assert.assertEquals(this.curatorFramework.checkExists().forPath(
String.format("/%s/CONTROLLER/MESSAGES",
YarnSecurityManagerTest.class.getSimpleName())).getVersion(), 0);
AssertWithBackoff.create().logger(log).timeoutMs(20000)