This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new bce5cbc  [Gobblin-902][GOBBLIN-902] Enable gobblin yarn app luncher 
class configurable
bce5cbc is described below

commit bce5cbc3c8bb607b8c1af56dd3b41f75bbb2b92e
Author: Zihan Li <[email protected]>
AuthorDate: Fri Oct 11 16:47:12 2019 -0700

    [Gobblin-902][GOBBLIN-902] Enable gobblin yarn app luncher class 
configurable
    
    Closes #2758 from ZihanLi58/GOBBLIN-902
---
 ...er.java => AbstractYarnAppSecurityManager.java} | 223 ++++++++-------------
 .../gobblin/yarn/GobblinYarnAppLauncher.java       |  34 +++-
 .../gobblin/yarn/GobblinYarnConfigurationKeys.java |   6 +-
 .../apache/gobblin/yarn/GobblinYarnTaskRunner.java |   6 +-
 .../yarn/YarnAppSecurityManagerWithKeytabs.java    | 138 +++++++++++++
 .../gobblin/yarn/YarnSecurityManagerTest.java      |  18 +-
 6 files changed, 264 insertions(+), 161 deletions(-)

diff --git 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAppSecurityManager.java
 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/AbstractYarnAppSecurityManager.java
similarity index 56%
rename from 
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAppSecurityManager.java
rename to 
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/AbstractYarnAppSecurityManager.java
index 26b2fff..0b069cd 100644
--- 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAppSecurityManager.java
+++ 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/AbstractYarnAppSecurityManager.java
@@ -17,77 +17,53 @@
 
 package org.apache.gobblin.yarn;
 
-import org.apache.gobblin.cluster.GobblinHelixMessagingService;
-import java.io.File;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Throwables;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.typesafe.config.Config;
 import java.io.IOException;
 import java.util.UUID;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
-
-import org.apache.hadoop.conf.Configuration;
+import org.apache.gobblin.cluster.GobblinHelixMessagingService;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.ExecutorsUtils;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
-
 import org.apache.helix.Criteria;
 import org.apache.helix.HelixManager;
 import org.apache.helix.InstanceType;
 import org.apache.helix.model.Message;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
-import com.google.common.base.Strings;
-import com.google.common.base.Throwables;
-import com.google.common.util.concurrent.AbstractIdleService;
-
-import com.typesafe.config.Config;
-
-import org.apache.gobblin.util.ExecutorsUtils;
-
-
 /**
- * A class for managing Kerberos login and token renewing on the client side 
that has access to
- * the keytab file.
- *
- * <p>
- *   This class works with {@link YarnContainerSecurityManager} to manage 
renewing of delegation
- *   tokens across the application. This class is responsible for login 
through a Kerberos keytab,
- *   renewing the delegation token, and storing the token to a token file on 
HDFS. It sends a
- *   Helix message to the controller and all the participants upon writing the 
token to the token
- *   file, which rely on the {@link YarnContainerSecurityManager} to read the 
token in the file
- *   upon receiving the message.
- * </p>
- *
  * <p>
- *   This class uses a scheduled task to do Kerberos re-login to renew the 
Kerberos ticket on a
- *   configurable schedule if login is from a keytab file. It also uses a 
second scheduled task
+ *   The super class for key management
+ *   This class uses a scheduled task to do re-login to refetch token on a
+ *   configurable schedule. It also uses a second scheduled task
  *   to renew the delegation token after each login. Both the re-login 
interval and the token
  *   renewing interval are configurable.
  * </p>
- *
- * @author Yinan Li
+ * @author Zihan Li
  */
-public class YarnAppSecurityManager extends AbstractIdleService {
+public abstract class AbstractYarnAppSecurityManager extends 
AbstractIdleService {
 
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(YarnAppSecurityManager.class);
+  protected Logger LOGGER = LoggerFactory.getLogger(this.getClass().getName());
 
-  private final Config config;
-
-  private final HelixManager helixManager;
-  private final FileSystem fs;
-  private final Path tokenFilePath;
-  private UserGroupInformation loginUser;
-  private Token<? extends TokenIdentifier> token;
+  protected Config config;
 
+  protected final HelixManager helixManager;
+  protected final FileSystem fs;
+  protected final Path tokenFilePath;
+  protected Token<? extends TokenIdentifier> token;
   private final long loginIntervalInMinutes;
   private final long tokenRenewIntervalInMinutes;
 
@@ -98,19 +74,18 @@ public class YarnAppSecurityManager extends 
AbstractIdleService {
   // This flag is used to tell if this is the first login. If yes, no token 
updated message will be
   // sent to the controller and the participants as they may not be up running 
yet. The first login
   // happens after this class starts up so the token gets regularly refreshed 
before the next login.
-  private volatile boolean firstLogin = true;
+  protected volatile boolean firstLogin = true;
 
-  public YarnAppSecurityManager(Config config, HelixManager helixManager, 
FileSystem fs, Path tokenFilePath)
-      throws IOException {
+  public AbstractYarnAppSecurityManager(Config config, HelixManager 
helixManager, FileSystem fs, Path tokenFilePath) {
     this.config = config;
     this.helixManager = helixManager;
     this.fs = fs;
-
     this.tokenFilePath = tokenFilePath;
     this.fs.makeQualified(tokenFilePath);
-    this.loginUser = UserGroupInformation.getLoginUser();
-    this.loginIntervalInMinutes = 
config.getLong(GobblinYarnConfigurationKeys.LOGIN_INTERVAL_IN_MINUTES);
-    this.tokenRenewIntervalInMinutes = 
config.getLong(GobblinYarnConfigurationKeys.TOKEN_RENEW_INTERVAL_IN_MINUTES);
+    this.loginIntervalInMinutes = ConfigUtils.getLong(config, 
GobblinYarnConfigurationKeys.LOGIN_INTERVAL_IN_MINUTES,
+        GobblinYarnConfigurationKeys.DEFAULT_LOGIN_INTERVAL_IN_MINUTES);
+    this.tokenRenewIntervalInMinutes = ConfigUtils.getLong(config, 
GobblinYarnConfigurationKeys.TOKEN_RENEW_INTERVAL_IN_MINUTES,
+        GobblinYarnConfigurationKeys.DEFAULT_TOKEN_RENEW_INTERVAL_IN_MINUTES);
 
     this.loginExecutor = Executors.newSingleThreadScheduledExecutor(
         ExecutorsUtils.newThreadFactory(Optional.of(LOGGER), 
Optional.of("KeytabReLoginExecutor")));
@@ -120,7 +95,7 @@ public class YarnAppSecurityManager extends 
AbstractIdleService {
 
   @Override
   protected void startUp() throws Exception {
-    LOGGER.info("Starting the " + 
YarnAppSecurityManager.class.getSimpleName());
+    LOGGER.info("Starting the " + this.getClass().getSimpleName());
 
     LOGGER.info(
         String.format("Scheduling the login task with an interval of %d 
minute(s)", this.loginIntervalInMinutes));
@@ -129,30 +104,14 @@ public class YarnAppSecurityManager extends 
AbstractIdleService {
     this.loginExecutor.scheduleAtFixedRate(new Runnable() {
       @Override
       public void run() {
-        try {
-          // Cancel the currently scheduled token renew task
-          if (scheduledTokenRenewTask.isPresent() && 
scheduledTokenRenewTask.get().cancel(true)) {
-            LOGGER.info("Cancelled the token renew task");
-          }
-
-          loginFromKeytab();
-          if (firstLogin) {
-            firstLogin = false;
-          }
-
-          // Re-schedule the token renew task after re-login
-          scheduleTokenRenewTask();
-        } catch (IOException ioe) {
-          LOGGER.error("Failed to login from keytab", ioe);
-          throw Throwables.propagate(ioe);
-        }
+        loginAndScheduleTokenRenewal();
       }
-    }, 0, this.loginIntervalInMinutes, TimeUnit.MINUTES);
+    }, this.loginIntervalInMinutes, this.loginIntervalInMinutes, 
TimeUnit.MINUTES);
   }
 
   @Override
   protected void shutDown() throws Exception {
-    LOGGER.info("Stopping the " + 
YarnAppSecurityManager.class.getSimpleName());
+    LOGGER.info("Stopping the " + this.getClass().getSimpleName());
 
     if (this.scheduledTokenRenewTask.isPresent()) {
       this.scheduledTokenRenewTask.get().cancel(true);
@@ -161,7 +120,7 @@ public class YarnAppSecurityManager extends 
AbstractIdleService {
     ExecutorsUtils.shutdownExecutorService(this.tokenRenewExecutor, 
Optional.of(LOGGER));
   }
 
-  private void scheduleTokenRenewTask() {
+  protected void scheduleTokenRenewTask() {
     LOGGER.info(String.format("Scheduling the token renew task with an 
interval of %d minute(s)",
         this.tokenRenewIntervalInMinutes));
 
@@ -182,83 +141,32 @@ public class YarnAppSecurityManager extends 
AbstractIdleService {
         }, this.tokenRenewIntervalInMinutes, this.tokenRenewIntervalInMinutes, 
TimeUnit.MINUTES));
   }
 
-  /**
-   * Renew the existing delegation token.
-   */
-  private synchronized void renewDelegationToken() throws IOException, 
InterruptedException {
-    this.token.renew(this.fs.getConf());
-    writeDelegationTokenToFile();
-
-    if (!this.firstLogin) {
-      // Send a message to the controller and all the participants if this is 
not the first login
-      sendTokenFileUpdatedMessage(InstanceType.CONTROLLER);
-      sendTokenFileUpdatedMessage(InstanceType.PARTICIPANT);
-    }
-  }
-
-  /**
-   * Get a new delegation token for the current logged-in user.
-   */
-  @VisibleForTesting
-  synchronized void getNewDelegationTokenForLoginUser() throws IOException {
-    this.token = this.fs.getDelegationToken(this.loginUser.getShortUserName());
-  }
-
-  /**
-   * Login the user from a given keytab file.
-   */
-  private void loginFromKeytab() throws IOException {
-    String keyTabFilePath = 
this.config.getString(GobblinYarnConfigurationKeys.KEYTAB_FILE_PATH);
-    if (Strings.isNullOrEmpty(keyTabFilePath)) {
-      throw new IOException("Keytab file path is not defined for Kerberos 
login");
-    }
-
-    if (!new File(keyTabFilePath).exists()) {
-      throw new IOException("Keytab file not found at: " + keyTabFilePath);
-    }
-
-    String principal = 
this.config.getString(GobblinYarnConfigurationKeys.KEYTAB_PRINCIPAL_NAME);
-    if (Strings.isNullOrEmpty(principal)) {
-      principal = this.loginUser.getShortUserName() + "/localhost@LOCALHOST";
-    }
-
-    Configuration conf = new Configuration();
-    conf.set("hadoop.security.authentication",
-        
UserGroupInformation.AuthenticationMethod.KERBEROS.toString().toLowerCase());
-    UserGroupInformation.setConfiguration(conf);
-    UserGroupInformation.loginUserFromKeytab(principal, keyTabFilePath);
-    LOGGER.info(String.format("Logged in from keytab file %s using principal 
%s", keyTabFilePath, principal));
-
-    this.loginUser = UserGroupInformation.getLoginUser();
+  //The whole logic for each re-login
+  public void loginAndScheduleTokenRenewal() {
+    try {
+      // Cancel the currently scheduled token renew task
+      if (scheduledTokenRenewTask.isPresent() && 
scheduledTokenRenewTask.get().cancel(true)) {
+        LOGGER.info("Cancelled the token renew task");
+      }
 
-    getNewDelegationTokenForLoginUser();
-    writeDelegationTokenToFile();
+      login();
+      if (firstLogin) {
+        firstLogin = false;
+      }
 
-    if (!this.firstLogin) {
-      // Send a message to the controller and all the participants
-      sendTokenFileUpdatedMessage(InstanceType.CONTROLLER);
-      sendTokenFileUpdatedMessage(InstanceType.PARTICIPANT);
+      // Re-schedule the token renew task after re-login
+      scheduleTokenRenewTask();
+    } catch (IOException | InterruptedException ioe) {
+      LOGGER.error("Failed to login from keytab", ioe);
+      throw Throwables.propagate(ioe);
     }
   }
 
   /**
-   * Write the current delegation token to the token file.
+   * This method is used to send TokenFileUpdatedMessage which will handle by 
{@link YarnContainerSecurityManager}
    */
   @VisibleForTesting
-  synchronized void writeDelegationTokenToFile() throws IOException {
-    if (this.fs.exists(this.tokenFilePath)) {
-      LOGGER.info("Deleting existing token file " + this.tokenFilePath);
-      this.fs.delete(this.tokenFilePath, false);
-    }
-
-    LOGGER.info("Writing new or renewed token to token file " + 
this.tokenFilePath);
-    YarnHelixUtils.writeTokenToFile(this.token, this.tokenFilePath, 
this.fs.getConf());
-    // Only grand access to the token file to the login user
-    this.fs.setPermission(this.tokenFilePath, new 
FsPermission(FsAction.READ_WRITE, FsAction.NONE, FsAction.NONE));
-  }
-
-  @VisibleForTesting
-  void sendTokenFileUpdatedMessage(InstanceType instanceType) {
+  protected void sendTokenFileUpdatedMessage(InstanceType instanceType) {
     Criteria criteria = new Criteria();
     criteria.setInstanceName("%");
     criteria.setResource("%");
@@ -268,9 +176,9 @@ public class YarnAppSecurityManager extends 
AbstractIdleService {
     /**
      * #HELIX-0.6.7-WORKAROUND
      * Add back when LIVESTANCES messaging is ported to 0.6 branch
-    if (instanceType == InstanceType.PARTICIPANT) {
-      criteria.setDataSource(Criteria.DataSource.LIVEINSTANCES);
-    }
+     if (instanceType == InstanceType.PARTICIPANT) {
+     criteria.setDataSource(Criteria.DataSource.LIVEINSTANCES);
+     }
      **/
     criteria.setSessionSpecific(true);
 
@@ -286,9 +194,36 @@ public class YarnAppSecurityManager extends 
AbstractIdleService {
     // Temporarily bypass the default messaging service to allow upgrade to 
0.6.7 which is missing support
     // for messaging to instances
     //int messagesSent = 
this.helixManager.getMessagingService().send(criteria, tokenFileUpdatedMessage);
-    GobblinHelixMessagingService messagingService = new 
GobblinHelixMessagingService(this.helixManager);
+    GobblinHelixMessagingService messagingService = new 
GobblinHelixMessagingService(helixManager);
 
     int messagesSent = messagingService.send(criteria, 
tokenFileUpdatedMessage);
     LOGGER.info(String.format("Sent %d token file updated message(s) to the 
%s", messagesSent, instanceType));
   }
+
+  /**
+   * Write the current delegation token to the token file.
+   */
+  @VisibleForTesting
+  protected synchronized void writeDelegationTokenToFile() throws IOException {
+    if (this.fs.exists(this.tokenFilePath)) {
+      LOGGER.info("Deleting existing token file " + this.tokenFilePath);
+      this.fs.delete(this.tokenFilePath, false);
+    }
+
+    LOGGER.info("Writing new or renewed token to token file " + 
this.tokenFilePath);
+    YarnHelixUtils.writeTokenToFile(token, this.tokenFilePath, 
this.fs.getConf());
+    // Only grand access to the token file to the login user
+    this.fs.setPermission(this.tokenFilePath, new 
FsPermission(FsAction.READ_WRITE, FsAction.NONE, FsAction.NONE));
+  }
+
+  /**
+   * Renew the existing delegation token.
+   */
+  protected abstract void renewDelegationToken() throws IOException, 
InterruptedException;
+
+
+  /**
+   * Login the user from a given keytab file.
+   */
+  protected abstract void login() throws IOException, InterruptedException;
 }
diff --git 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
index 8542f13..facbdd4 100644
--- 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
+++ 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
@@ -19,6 +19,7 @@ package org.apache.gobblin.yarn;
 
 import java.io.File;
 import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
 import java.net.URI;
 import java.nio.ByteBuffer;
 import java.util.EnumSet;
@@ -34,7 +35,9 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.commons.lang3.reflect.ConstructorUtils;
 import org.apache.commons.mail.EmailException;
+import org.apache.gobblin.util.ClassAliasResolver;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -208,6 +211,7 @@ public class GobblinYarnAppLauncher {
   private final int appMasterMemoryMbs;
   private final int jvmMemoryOverheadMbs;
   private final double jvmMemoryXmxRatio;
+  private Optional<AbstractYarnAppSecurityManager> securityManager = 
Optional.absent();
 
   private final String containerTimezone;
 
@@ -294,6 +298,12 @@ public class GobblinYarnAppLauncher {
 
     startYarnClient();
 
+    // Before setup application, first login to make sure ugi has the right 
token.
+    if(ConfigUtils.getBoolean(config, 
GobblinYarnConfigurationKeys.ENABLE_KEY_MANAGEMENT, false)) {
+      this.securityManager = Optional.of(buildSecurityManager());
+      this.securityManager.get().loginAndScheduleTokenRenewal();
+    }
+
     this.applicationId = getApplicationId();
 
     this.applicationStatusMonitor.scheduleAtFixedRate(new Runnable() {
@@ -308,10 +318,14 @@ public class GobblinYarnAppLauncher {
       }
     }, 0, this.appReportIntervalMinutes, TimeUnit.MINUTES);
 
+    addServices();
+  }
+
+  private void addServices() throws IOException{
     List<Service> services = Lists.newArrayList();
-    if (this.config.hasPath(GobblinYarnConfigurationKeys.KEYTAB_FILE_PATH)) {
-      LOGGER.info("Adding YarnAppSecurityManager since login is keytab based");
-      services.add(buildYarnAppSecurityManager());
+    if (this.securityManager.isPresent()) {
+      LOGGER.info("Adding KeyManagerService since key management is enabled");
+      services.add(this.securityManager.get());
     }
     if 
(!this.config.hasPath(GobblinYarnConfigurationKeys.LOG_COPIER_DISABLE_DRIVER_COPY)
 ||
         
!this.config.getBoolean(GobblinYarnConfigurationKeys.LOG_COPIER_DISABLE_DRIVER_COPY))
 {
@@ -754,10 +768,20 @@ public class GobblinYarnAppLauncher {
     return logRootDir;
   }
 
-  private YarnAppSecurityManager buildYarnAppSecurityManager() throws 
IOException {
+  private AbstractYarnAppSecurityManager buildSecurityManager() throws 
IOException {
     Path tokenFilePath = new Path(this.fs.getHomeDirectory(), 
this.applicationName + Path.SEPARATOR +
         GobblinYarnConfigurationKeys.TOKEN_FILE_NAME);
-    return new YarnAppSecurityManager(this.config, this.helixManager, this.fs, 
tokenFilePath);
+
+    ClassAliasResolver<AbstractYarnAppSecurityManager> aliasResolver = new 
ClassAliasResolver<>(
+        AbstractYarnAppSecurityManager.class);
+    try {
+     return 
(AbstractYarnAppSecurityManager)ConstructorUtils.invokeConstructor(Class.forName(aliasResolver.resolve(
+          ConfigUtils.getString(config, 
GobblinYarnConfigurationKeys.SECURITY_MANAGER_CLASS, 
GobblinYarnConfigurationKeys.DEFAULT_SECURITY_MANAGER_CLASS))), this.config, 
this.helixManager, this.fs,
+          tokenFilePath);
+    } catch (NoSuchMethodException | IllegalAccessException | 
InvocationTargetException | InstantiationException
+        | ClassNotFoundException e) {
+      throw new IOException(e);
+    }
   }
 
   @VisibleForTesting
diff --git 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java
 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java
index 882b1d5..224bad8 100644
--- 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java
+++ 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java
@@ -77,12 +77,16 @@ public class GobblinYarnConfigurationKeys {
   public static final String HELIX_INSTANCE_MAX_RETRIES = GOBBLIN_YARN_PREFIX 
+ "helix.instance.max.retries";
 
   // Security and authentication configuration properties.
+  public static final String SECURITY_MANAGER_CLASS = GOBBLIN_YARN_PREFIX + 
"security.manager.class";
+  public static final String DEFAULT_SECURITY_MANAGER_CLASS = 
"org.apache.gobblin.yarn.YarnAppSecurityManagerWithKeytabs";
+  public static final String ENABLE_KEY_MANAGEMENT = GOBBLIN_YARN_PREFIX + 
"enable.key.management";
   public static final String KEYTAB_FILE_PATH = GOBBLIN_YARN_PREFIX + 
"keytab.file.path";
   public static final String KEYTAB_PRINCIPAL_NAME = GOBBLIN_YARN_PREFIX + 
"keytab.principal.name";
   public static final String TOKEN_FILE_NAME = ".token";
   public static final String LOGIN_INTERVAL_IN_MINUTES = GOBBLIN_YARN_PREFIX + 
"login.interval.minutes";
+  public static final Long DEFAULT_LOGIN_INTERVAL_IN_MINUTES = Long.MAX_VALUE;
   public static final String TOKEN_RENEW_INTERVAL_IN_MINUTES = 
GOBBLIN_YARN_PREFIX + "token.renew.interval.minutes";
-
+  public static final Long DEFAULT_TOKEN_RENEW_INTERVAL_IN_MINUTES = 
Long.MAX_VALUE;
   // Resource/dependencies configuration properties.
   public static final String LIB_JARS_DIR_KEY = GOBBLIN_YARN_PREFIX + 
"lib.jars.dir";
 
diff --git 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java
index 60e4a76..d3bb523 100644
--- 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java
+++ 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java
@@ -25,7 +25,9 @@ import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.DefaultParser;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
+import org.apache.gobblin.util.ConfigUtils;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.util.ConverterUtils;
@@ -66,8 +68,8 @@ public class GobblinYarnTaskRunner extends GobblinTaskRunner {
   public List<Service> getServices() {
     List<Service> services = new ArrayList<>();
     services.addAll(super.getServices());
-    if (this.config.hasPath(GobblinYarnConfigurationKeys.KEYTAB_FILE_PATH)) {
-      LOGGER.info("Adding YarnContainerSecurityManager since login is keytab 
based");
+    if (UserGroupInformation.isSecurityEnabled()) {
+      LOGGER.info("Adding YarnContainerSecurityManager since security is 
enabled");
       services.add(new YarnContainerSecurityManager(this.config, this.fs, 
this.eventBus));
     }
 
diff --git 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAppSecurityManagerWithKeytabs.java
 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAppSecurityManagerWithKeytabs.java
new file mode 100644
index 0000000..3ee6107
--- /dev/null
+++ 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAppSecurityManagerWithKeytabs.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.yarn;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.ScheduledFuture;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.InstanceType;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Strings;
+
+import com.typesafe.config.Config;
+
+
+/**
+ * A class for managing Kerberos login and token renewing on the client side 
that has access to
+ * the keytab file.
+ *
+ * <p>
+ *   This class works with {@link YarnContainerSecurityManager} to manage 
renewing of delegation
+ *   tokens across the application. This class is responsible for login 
through a Kerberos keytab,
+ *   renewing the delegation token, and storing the token to a token file on 
HDFS. It sends a
+ *   Helix message to the controller and all the participants upon writing the 
token to the token
+ *   file, which rely on the {@link YarnContainerSecurityManager} to read the 
token in the file
+ *   upon receiving the message.
+ * </p>
+ *
+ * <p>
+ *   This class uses a scheduled task to do Kerberos re-login to renew the 
Kerberos ticket on a
+ *   configurable schedule if login is from a keytab file. It also uses a 
second scheduled task
+ *   to renew the delegation token after each login. Both the re-login 
interval and the token
+ *   renewing interval are configurable.
+ * </p>
+ *
+ * @author Yinan Li
+ */
+public class YarnAppSecurityManagerWithKeytabs extends 
AbstractYarnAppSecurityManager {
+
+  private UserGroupInformation loginUser;
+  private Optional<ScheduledFuture<?>> scheduledTokenRenewTask = 
Optional.absent();
+
+  // This flag is used to tell if this is the first login. If yes, no token 
updated message will be
+  // sent to the controller and the participants as they may not be up running 
yet. The first login
+  // happens after this class starts up so the token gets regularly refreshed 
before the next login.
+  private volatile boolean firstLogin = true;
+
+  public YarnAppSecurityManagerWithKeytabs(Config config, HelixManager 
helixManager, FileSystem fs, Path tokenFilePath)
+      throws IOException {
+    super(config, helixManager, fs, tokenFilePath);
+    this.loginUser = UserGroupInformation.getLoginUser();
+  }
+
+  /**
+   * Renew the existing delegation token.
+   */
+  protected synchronized void renewDelegationToken() throws IOException, 
InterruptedException {
+    this.token.renew(this.fs.getConf());
+    writeDelegationTokenToFile();
+
+    if (!this.firstLogin) {
+      // Send a message to the controller and all the participants if this is 
not the first login
+      sendTokenFileUpdatedMessage(InstanceType.CONTROLLER);
+      sendTokenFileUpdatedMessage(InstanceType.PARTICIPANT);
+    }
+  }
+
+  /**
+   * Get a new delegation token for the current logged-in user.
+   */
+  @VisibleForTesting
+  synchronized void getNewDelegationTokenForLoginUser() throws IOException {
+    this.token = this.fs.getDelegationToken(this.loginUser.getShortUserName());
+  }
+
+  /**
+   * Login the user from a given keytab file.
+   */
+  protected void login() throws IOException {
+    String keyTabFilePath = 
this.config.getString(GobblinYarnConfigurationKeys.KEYTAB_FILE_PATH);
+    if (Strings.isNullOrEmpty(keyTabFilePath)) {
+      throw new IOException("Keytab file path is not defined for Kerberos 
login");
+    }
+
+    if (!new File(keyTabFilePath).exists()) {
+      throw new IOException("Keytab file not found at: " + keyTabFilePath);
+    }
+
+    String principal = 
this.config.getString(GobblinYarnConfigurationKeys.KEYTAB_PRINCIPAL_NAME);
+    if (Strings.isNullOrEmpty(principal)) {
+      principal = this.loginUser.getShortUserName() + "/localhost@LOCALHOST";
+    }
+
+    Configuration conf = new Configuration();
+    conf.set("hadoop.security.authentication",
+        
UserGroupInformation.AuthenticationMethod.KERBEROS.toString().toLowerCase());
+    UserGroupInformation.setConfiguration(conf);
+    UserGroupInformation.loginUserFromKeytab(principal, keyTabFilePath);
+    LOGGER.info(String.format("Logged in from keytab file %s using principal 
%s", keyTabFilePath, principal));
+
+    this.loginUser = UserGroupInformation.getLoginUser();
+
+    getNewDelegationTokenForLoginUser();
+    writeDelegationTokenToFile();
+
+    if (!this.firstLogin) {
+      // Send a message to the controller and all the participants
+      sendTokenFileUpdatedMessage(InstanceType.CONTROLLER);
+      sendTokenFileUpdatedMessage(InstanceType.PARTICIPANT);
+    }
+  }
+
+}
diff --git 
a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnSecurityManagerTest.java
 
b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnSecurityManagerTest.java
index a6e6e55..ee15a21 100644
--- 
a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnSecurityManagerTest.java
+++ 
b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnSecurityManagerTest.java
@@ -56,17 +56,17 @@ import org.apache.gobblin.testing.AssertWithBackoff;
 
 
 /**
- * Unit tests for {@link YarnAppSecurityManager} and {@link 
YarnContainerSecurityManager}.
+ * Unit tests for {@link YarnAppSecurityManagerWithKeytabs} and {@link 
YarnContainerSecurityManager}.
  *
  * <p>
- *   This class tests {@link YarnAppSecurityManager} and {@link 
YarnContainerSecurityManager} together
+ *   This class tests {@link YarnAppSecurityManagerWithKeytabs} and {@link 
YarnContainerSecurityManager} together
  *   as it is more convenient to test both here where all dependencies are 
setup between the two.
  * </p>
  *
  * <p>
  *   This class uses a {@link TestingServer} as an embedded ZooKeeper server 
for testing. The Curator
  *   framework is used to provide a ZooKeeper client. This class also uses a 
{@link HelixManager} as
- *   being required by {@link YarnAppSecurityManager}. The local file system 
as returned by
+ *   being required by {@link YarnAppSecurityManagerWithKeytabs}. The local 
file system as returned by
  *   {@link FileSystem#getLocal(Configuration)} is used for writing the 
testing delegation token, which
  *   is acquired by mocking the method {@link 
FileSystem#getDelegationToken(String)} on the local
  *   {@link FileSystem} instance.
@@ -87,7 +87,7 @@ public class YarnSecurityManagerTest {
   private Path tokenFilePath;
   private Token<?> token;
 
-  private YarnAppSecurityManager yarnAppSecurityManager;
+  private YarnAppSecurityManagerWithKeytabs 
_yarnAppYarnAppSecurityManagerWithKeytabs;
   private YarnContainerSecurityManager yarnContainerSecurityManager;
 
   private final Closer closer = Closer.create();
@@ -131,19 +131,19 @@ public class YarnSecurityManagerTest {
 
     this.baseDir = new Path(YarnSecurityManagerTest.class.getSimpleName());
     this.tokenFilePath = new Path(this.baseDir, 
GobblinYarnConfigurationKeys.TOKEN_FILE_NAME);
-    this.yarnAppSecurityManager =
-        new YarnAppSecurityManager(config, this.helixManager, this.localFs, 
this.tokenFilePath);
+    this._yarnAppYarnAppSecurityManagerWithKeytabs =
+        new YarnAppSecurityManagerWithKeytabs(config, this.helixManager, 
this.localFs, this.tokenFilePath);
     this.yarnContainerSecurityManager = new 
YarnContainerSecurityManager(config, this.localFs, new EventBus());
   }
 
   @Test
   public void testGetNewDelegationTokenForLoginUser() throws IOException {
-    this.yarnAppSecurityManager.getNewDelegationTokenForLoginUser();
+    
this._yarnAppYarnAppSecurityManagerWithKeytabs.getNewDelegationTokenForLoginUser();
   }
 
   @Test(dependsOnMethods = "testGetNewDelegationTokenForLoginUser")
   public void testWriteDelegationTokenToFile() throws IOException {
-    this.yarnAppSecurityManager.writeDelegationTokenToFile();
+    
this._yarnAppYarnAppSecurityManagerWithKeytabs.writeDelegationTokenToFile();
     Assert.assertTrue(this.localFs.exists(this.tokenFilePath));
     assertToken(YarnHelixUtils.readTokensFromFile(this.tokenFilePath, 
this.configuration));
   }
@@ -173,7 +173,7 @@ public class YarnSecurityManagerTest {
   @Test
   public void testSendTokenFileUpdatedMessage() throws Exception {
     Logger log = LoggerFactory.getLogger("testSendTokenFileUpdatedMessage");
-    
this.yarnAppSecurityManager.sendTokenFileUpdatedMessage(InstanceType.CONTROLLER);
+    
this._yarnAppYarnAppSecurityManagerWithKeytabs.sendTokenFileUpdatedMessage(InstanceType.CONTROLLER);
     Assert.assertEquals(this.curatorFramework.checkExists().forPath(
         String.format("/%s/CONTROLLER/MESSAGES", 
YarnSecurityManagerTest.class.getSimpleName())).getVersion(), 0);
     AssertWithBackoff.create().logger(log).timeoutMs(20000)

Reply via email to