Repository: hadoop
Updated Branches:
  refs/heads/branch-3.2 0b2cfc8ab -> 34387599c


YARN-8960. [Submarine] Can't get submarine service status using the command of 
"yarn app -status" under security environment. (Zac Zhou via wangda)

Change-Id: I21b1addc9c32817650ea744a8f2e6b5602f2f4d4
(cherry picked from commit 8b2381441558cd49b4c940b0760c8accbb2a5567)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/34387599
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/34387599
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/34387599

Branch: refs/heads/branch-3.2
Commit: 34387599c41b5026a7ab179b31fbc09cfcbd5071
Parents: 0b2cfc8
Author: Wangda Tan <wan...@apache.org>
Authored: Mon Nov 19 08:55:30 2018 -0800
Committer: Wangda Tan <wan...@apache.org>
Committed: Mon Nov 19 09:01:21 2018 -0800

----------------------------------------------------------------------
 .../hadoop/yarn/submarine/client/cli/Cli.java   |  3 -
 .../yarn/submarine/client/cli/CliConstants.java |  3 +
 .../yarn/submarine/client/cli/CliUtils.java     | 56 ++++++++++++++++++
 .../yarn/submarine/client/cli/RunJobCli.java    | 12 +++-
 .../client/cli/param/RunJobParameters.java      | 46 ++++++++++++++-
 .../yarn/submarine/common/ClientContext.java    | 15 +++--
 .../fs/DefaultRemoteDirectoryManager.java       |  3 +-
 .../yarnservice/YarnServiceJobSubmitter.java    | 61 +++++++++++++++++---
 .../client/cli/TestRunJobCliParsing.java        |  9 ++-
 9 files changed, 188 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/34387599/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/Cli.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/Cli.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/Cli.java
index b4c5e4c..69189f4 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/Cli.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/Cli.java
@@ -16,7 +16,6 @@ package org.apache.hadoop.yarn.submarine.client.cli;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.submarine.common.ClientContext;
-import 
org.apache.hadoop.yarn.submarine.common.fs.DefaultRemoteDirectoryManager;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.submarine.runtimes.RuntimeFactory;
 import org.slf4j.Logger;
@@ -43,8 +42,6 @@ public class Cli {
     Configuration conf = new YarnConfiguration();
     ClientContext clientContext = new ClientContext();
     clientContext.setConfiguration(conf);
-    clientContext.setRemoteDirectoryManager(
-        new DefaultRemoteDirectoryManager(clientContext));
     RuntimeFactory runtimeFactory = RuntimeFactory.getRuntimeFactory(
         clientContext);
     clientContext.setRuntimeFactory(runtimeFactory);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/34387599/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/CliConstants.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/CliConstants.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/CliConstants.java
index 454ff1c..2d7a472 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/CliConstants.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/CliConstants.java
@@ -52,4 +52,7 @@ public class CliConstants {
   public static final String QUICKLINK = "quicklink";
   public static final String TENSORBOARD_DOCKER_IMAGE =
       "tensorboard_docker_image";
+  public static final String KEYTAB = "keytab";
+  public static final String PRINCIPAL = "principal";
+  public static final String DISTRIBUTE_KEYTAB = "distribute_keytab";
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/34387599/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/CliUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/CliUtils.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/CliUtils.java
index bfdfa9a..194e8df 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/CliUtils.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/CliUtils.java
@@ -14,22 +14,33 @@
 
 package org.apache.hadoop.yarn.submarine.client.cli;
 
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceInformation;
 import org.apache.hadoop.yarn.api.records.ResourceTypeInfo;
 import org.apache.hadoop.yarn.exceptions.ResourceNotFoundException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.submarine.client.cli.param.RunJobParameters;
+import 
org.apache.hadoop.yarn.submarine.common.exception.SubmarineRuntimeException;
 import org.apache.hadoop.yarn.submarine.common.fs.RemoteDirectoryManager;
 import org.apache.hadoop.yarn.util.UnitsConversionUtil;
 import org.apache.hadoop.yarn.util.resource.ResourceUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.hadoop.yarn.submarine.client.cli.CliConstants.KEYTAB;
+import static 
org.apache.hadoop.yarn.submarine.client.cli.CliConstants.PRINCIPAL;
+
 public class CliUtils {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(CliUtils.class);
   private final static String RES_PATTERN = "^[^=]+=\\d+\\s?\\w*$";
   /**
    * Replace patterns inside cli
@@ -163,4 +174,49 @@ public class CliUtils {
 
     return false;
   }
+
+  public static void doLoginIfSecure(String keytab, String principal) throws
+      IOException {
+    if (!UserGroupInformation.isSecurityEnabled()) {
+      return;
+    }
+
+    if (StringUtils.isEmpty(keytab) || StringUtils.isEmpty(principal)) {
+      if (StringUtils.isNotEmpty(keytab)) {
+        SubmarineRuntimeException e = new SubmarineRuntimeException("The " +
+            "parameter of " + PRINCIPAL + " is missing.");
+        LOG.error(e.getMessage(), e);
+        throw e;
+      }
+
+      if (StringUtils.isNotEmpty(principal)) {
+        SubmarineRuntimeException e = new SubmarineRuntimeException("The " +
+            "parameter of " + KEYTAB + " is missing.");
+        LOG.error(e.getMessage(), e);
+        throw e;
+      }
+
+      UserGroupInformation user = UserGroupInformation.getCurrentUser();
+      if(user == null || user.getAuthenticationMethod() ==
+          UserGroupInformation.AuthenticationMethod.SIMPLE) {
+        SubmarineRuntimeException e = new SubmarineRuntimeException("Failed " +
+            "to authenticate in secure environment. Please run kinit " +
+            "command in advance or use " + "--" + KEYTAB + "/--" + PRINCIPAL +
+            " parameters");
+        LOG.error(e.getMessage(), e);
+        throw e;
+      }
+      LOG.info("Submarine job is submitted by user: " + user.getUserName());
+      return;
+    }
+
+    File keytabFile = new File(keytab);
+    if (!keytabFile.exists()) {
+      SubmarineRuntimeException e =  new SubmarineRuntimeException("No " +
+          "keytab localized at  " + keytab);
+      LOG.error(e.getMessage(), e);
+      throw e;
+    }
+    UserGroupInformation.loginUserFromKeytab(principal, keytab);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/34387599/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/RunJobCli.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/RunJobCli.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/RunJobCli.java
index 5054a94..fc57be9 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/RunJobCli.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/RunJobCli.java
@@ -125,6 +125,15 @@ public class RunJobCli extends AbstractCli {
         + "if want to link to first worker's 7070 port, and text of quicklink "
         + "is Notebook_UI, user need to specify --quicklink "
         + "Notebook_UI=https://master-0:7070";);
+    options.addOption(CliConstants.KEYTAB, true, "Specify keytab used by the " 
+
+        "job under security environment");
+    options.addOption(CliConstants.PRINCIPAL, true, "Specify principal used " +
+        "by the job under security environment");
+    options.addOption(CliConstants.DISTRIBUTE_KEYTAB, false, "Distribute " +
+        "local keytab to cluster machines for service authentication. If not " 
+
+        "sepcified, pre-destributed keytab of which path specified by" +
+        " parameter" + CliConstants.KEYTAB + " on cluster machines will be " +
+        "used");
     options.addOption("h", "help", false, "Print help");
     return options;
   }
@@ -153,7 +162,8 @@ public class RunJobCli extends AbstractCli {
       // Do parsing
       GnuParser parser = new GnuParser();
       CommandLine cli = parser.parse(options, args);
-      parameters.updateParametersByParsedCommandline(cli, options, 
clientContext);
+      parameters.updateParametersByParsedCommandline(cli, options,
+          clientContext);
     } catch (ParseException e) {
       LOG.error("Exception in parse:", e.getMessage());
       printUsages();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/34387599/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/param/RunJobParameters.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/param/RunJobParameters.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/param/RunJobParameters.java
index d923e0f..6c8307f 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/param/RunJobParameters.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/param/RunJobParameters.java
@@ -51,6 +51,10 @@ public class RunJobParameters extends RunParameters {
   private boolean waitJobFinish = false;
   private boolean distributed = false;
 
+  private String keytab;
+  private String principal;
+  private boolean distributeKeytab = false;
+
   @Override
   public void updateParametersByParsedCommandline(CommandLine 
parsedCommandLine,
       Options options, ClientContext clientContext)
@@ -85,6 +89,12 @@ public class RunJobParameters extends RunParameters {
           + "please double check.");
     }
 
+    String kerberosKeytab = parsedCommandLine.getOptionValue(
+        CliConstants.KEYTAB);
+    String kerberosPrincipal = parsedCommandLine.getOptionValue(
+        CliConstants.PRINCIPAL);
+    CliUtils.doLoginIfSecure(kerberosKeytab, kerberosPrincipal);
+
     workerResource = null;
     if (nWorkers > 0) {
       String workerResourceStr = parsedCommandLine.getOptionValue(
@@ -149,10 +159,16 @@ public class RunJobParameters extends RunParameters {
     String psLaunchCommand = parsedCommandLine.getOptionValue(
         CliConstants.PS_LAUNCH_CMD);
 
+    boolean distributeKerberosKeytab = parsedCommandLine.hasOption(CliConstants
+        .DISTRIBUTE_KEYTAB);
+
     
this.setInputPath(input).setCheckpointPath(jobDir).setNumPS(nPS).setNumWorkers(nWorkers)
         .setPSLaunchCmd(psLaunchCommand).setWorkerLaunchCmd(workerLaunchCmd)
         .setPsResource(psResource)
-        .setTensorboardEnabled(tensorboard);
+        .setTensorboardEnabled(tensorboard)
+        .setKeytab(kerberosKeytab)
+        .setPrincipal(kerberosPrincipal)
+        .setDistributeKeytab(distributeKerberosKeytab);
 
     super.updateParametersByParsedCommandline(parsedCommandLine,
         options, clientContext);
@@ -271,4 +287,32 @@ public class RunJobParameters extends RunParameters {
   public List<Quicklink> getQuicklinks() {
     return quicklinks;
   }
+
+  public String getKeytab() {
+    return keytab;
+  }
+
+  public RunJobParameters setKeytab(String kerberosKeytab) {
+    this.keytab = kerberosKeytab;
+    return this;
+  }
+
+  public String getPrincipal() {
+    return principal;
+  }
+
+  public RunJobParameters setPrincipal(String kerberosPrincipal) {
+    this.principal = kerberosPrincipal;
+    return this;
+  }
+
+  public boolean isDistributeKeytab() {
+    return distributeKeytab;
+  }
+
+  public RunJobParameters setDistributeKeytab(
+      boolean distributeKerberosKeytab) {
+    this.distributeKeytab = distributeKerberosKeytab;
+    return this;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/34387599/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/ClientContext.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/ClientContext.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/ClientContext.java
index 31a8b1b..055b3c6 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/ClientContext.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/ClientContext.java
@@ -18,13 +18,14 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.submarine.common.conf.SubmarineConfiguration;
+import 
org.apache.hadoop.yarn.submarine.common.fs.DefaultRemoteDirectoryManager;
 import org.apache.hadoop.yarn.submarine.common.fs.RemoteDirectoryManager;
 import org.apache.hadoop.yarn.submarine.runtimes.RuntimeFactory;
 
 public class ClientContext {
   private Configuration yarnConf = new YarnConfiguration();
 
-  private RemoteDirectoryManager remoteDirectoryManager;
+  private volatile RemoteDirectoryManager remoteDirectoryManager;
   private YarnClient yarnClient;
   private Configuration submarineConfig;
   private RuntimeFactory runtimeFactory;
@@ -51,14 +52,16 @@ public class ClientContext {
   }
 
   public RemoteDirectoryManager getRemoteDirectoryManager() {
+    if(remoteDirectoryManager == null) {
+      synchronized (this) {
+        if(remoteDirectoryManager == null) {
+          remoteDirectoryManager = new DefaultRemoteDirectoryManager(this);
+        }
+      }
+    }
     return remoteDirectoryManager;
   }
 
-  public void setRemoteDirectoryManager(
-      RemoteDirectoryManager remoteDirectoryManager) {
-    this.remoteDirectoryManager = remoteDirectoryManager;
-  }
-
   public Configuration getSubmarineConfig() {
     return submarineConfig;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/34387599/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/fs/DefaultRemoteDirectoryManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/fs/DefaultRemoteDirectoryManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/fs/DefaultRemoteDirectoryManager.java
index b2e2b41..1d99b55 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/fs/DefaultRemoteDirectoryManager.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/fs/DefaultRemoteDirectoryManager.java
@@ -84,7 +84,8 @@ public class DefaultRemoteDirectoryManager implements 
RemoteDirectoryManager {
   }
 
   private Path getJobRootFolder(String jobName) throws IOException {
-    Path jobRootPath = getUserRootFolder();
+    Path userRoot = getUserRootFolder();
+    Path jobRootPath = new Path(userRoot, jobName);
     createFolderIfNotExist(jobRootPath);
     // Get a file status to make sure it is a absolute path.
     FileStatus fStatus = fs.getFileStatus(jobRootPath);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/34387599/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceJobSubmitter.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceJobSubmitter.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceJobSubmitter.java
index d9a88a5..4b4961f 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceJobSubmitter.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceJobSubmitter.java
@@ -15,9 +15,11 @@
 package org.apache.hadoop.yarn.submarine.runtimes.yarnservice;
 
 import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.client.api.AppAdminClient;
 import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -28,6 +30,7 @@ import org.apache.hadoop.yarn.service.api.records.ConfigFile;
 import org.apache.hadoop.yarn.service.api.records.Resource;
 import org.apache.hadoop.yarn.service.api.records.ResourceInformation;
 import org.apache.hadoop.yarn.service.api.records.Service;
+import org.apache.hadoop.yarn.service.api.records.KerberosPrincipal;
 import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
 import org.apache.hadoop.yarn.submarine.client.cli.param.Quicklink;
 import org.apache.hadoop.yarn.submarine.client.cli.param.RunJobParameters;
@@ -300,8 +303,26 @@ public class YarnServiceJobSubmitter implements 
JobSubmitter {
   private void uploadToRemoteFileAndLocalizeToContainerWorkDir(Path stagingDir,
       String fileToUpload, String destFilename, Component comp)
       throws IOException {
+    Path uploadedFilePath = uploadToRemoteFile(stagingDir, fileToUpload);
+    locateRemoteFileToContainerWorkDir(destFilename, comp, uploadedFilePath);
+  }
+
+  private void locateRemoteFileToContainerWorkDir(String destFilename,
+      Component comp, Path uploadedFilePath) throws IOException {
     FileSystem fs = FileSystem.get(clientContext.getYarnConfig());
 
+    FileStatus fileStatus = fs.getFileStatus(uploadedFilePath);
+    LOG.info("Uploaded file path = " + fileStatus.getPath());
+
+    // Set it to component's files list
+    comp.getConfiguration().getFiles().add(new ConfigFile().srcFile(
+        fileStatus.getPath().toUri().toString()).destFile(destFilename)
+        .type(ConfigFile.TypeEnum.STATIC));
+  }
+
+  private Path uploadToRemoteFile(Path stagingDir, String fileToUpload) throws
+      IOException {
+    FileSystem fs = FileSystem.get(clientContext.getYarnConfig());
     // Upload to remote FS under staging area
     File localFile = new File(fileToUpload);
     if (!localFile.exists()) {
@@ -320,14 +341,13 @@ public class YarnServiceJobSubmitter implements 
JobSubmitter {
       fs.copyFromLocalFile(new Path(fileToUpload), uploadedFilePath);
       uploadedFiles.add(uploadedFilePath);
     }
+    return uploadedFilePath;
+  }
 
-    FileStatus fileStatus = fs.getFileStatus(uploadedFilePath);
-    LOG.info("Uploaded file path = " + fileStatus.getPath());
-
-    // Set it to component's files list
-    comp.getConfiguration().getFiles().add(new ConfigFile().srcFile(
-        fileStatus.getPath().toUri().toString()).destFile(destFilename)
-        .type(ConfigFile.TypeEnum.STATIC));
+  private void setPermission(Path destPath, FsPermission permission) throws
+      IOException {
+    FileSystem fs = FileSystem.get(clientContext.getYarnConfig());
+    fs.setPermission(destPath, new FsPermission(permission));
   }
 
   private void handleLaunchCommand(RunJobParameters parameters,
@@ -475,6 +495,7 @@ public class YarnServiceJobSubmitter implements 
JobSubmitter {
     serviceSpec.setName(parameters.getName());
     serviceSpec.setVersion(String.valueOf(System.currentTimeMillis()));
     
serviceSpec.setArtifact(getDockerArtifact(parameters.getDockerImageName()));
+    handleKerberosPrincipal(parameters);
 
     handleServiceEnvs(serviceSpec, parameters);
 
@@ -547,6 +568,32 @@ public class YarnServiceJobSubmitter implements 
JobSubmitter {
     return serviceSpecFile.getAbsolutePath();
   }
 
+  private void handleKerberosPrincipal(RunJobParameters parameters) throws
+      IOException {
+    if(StringUtils.isNotBlank(parameters.getKeytab()) && StringUtils
+        .isNotBlank(parameters.getPrincipal())) {
+      String keytab = parameters.getKeytab();
+      String principal = parameters.getPrincipal();
+      if(parameters.isDistributeKeytab()) {
+        Path stagingDir =
+            clientContext.getRemoteDirectoryManager().getJobStagingArea(
+                parameters.getName(), true);
+        Path remoteKeytabPath = uploadToRemoteFile(stagingDir, keytab);
+        //only the owner has read access
+        setPermission(remoteKeytabPath,
+            FsPermission.createImmutable((short)Integer.parseInt("400", 8)));
+        serviceSpec.setKerberosPrincipal(new KerberosPrincipal().keytab(
+            remoteKeytabPath.toString()).principalName(principal));
+      } else {
+        if(!keytab.startsWith("file")) {
+          keytab = "file://" + keytab;
+        }
+        serviceSpec.setKerberosPrincipal(new KerberosPrincipal().keytab(
+            keytab).principalName(principal));
+      }
+    }
+  }
+
   /**
    * {@inheritDoc}
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/34387599/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/client/cli/TestRunJobCliParsing.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/client/cli/TestRunJobCliParsing.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/client/cli/TestRunJobCliParsing.java
index 240de06..184d53d 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/client/cli/TestRunJobCliParsing.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/client/cli/TestRunJobCliParsing.java
@@ -92,7 +92,9 @@ public class TestRunJobCliParsing {
             "--num_workers", "3", "--num_ps", "2", "--worker_launch_cmd",
             "python run-job.py", "--worker_resources", "memory=2048M,vcores=2",
             "--ps_resources", "memory=4G,vcores=4", "--tensorboard", "true",
-            "--ps_launch_cmd", "python run-ps.py", "--verbose" });
+            "--ps_launch_cmd", "python run-ps.py", "--keytab", "/keytab/path",
+            "--principal", "user/_h...@domain.com", "--distribute_keytab",
+            "--verbose" });
 
     RunJobParameters jobRunParameters = runJobCli.getRunJobParameters();
 
@@ -108,6 +110,11 @@ public class TestRunJobCliParsing {
         jobRunParameters.getWorkerResource());
     Assert.assertEquals(jobRunParameters.getDockerImageName(),
         "tf-docker:1.1.0");
+    Assert.assertEquals(jobRunParameters.getKeytab(),
+        "/keytab/path");
+    Assert.assertEquals(jobRunParameters.getPrincipal(),
+        "user/_h...@domain.com");
+    Assert.assertTrue(jobRunParameters.isDistributeKeytab());
     Assert.assertTrue(SubmarineLogs.isVerbose());
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to