Repository: spark
Updated Branches:
  refs/heads/master ec8973d12 -> c043a3e9d


[SPARK-8851] [YARN] In Client mode, make sure the client logs in and updates 
tokens

In client side, the flow is SparkSubmit -> SparkContext -> yarn/Client. Since 
the yarn client only gets a cloned config and the staging dir is set here, it 
is not really possible to do re-logins in the SparkContext. So, do the initial 
logins in Spark Submit and do re-logins as we do now in the AM, but the Client 
behaves like an executor in this specific context and reads the credentials 
file to update the tokens. This way, even if the streaming context is started 
up from checkpoint - it is fine since we have logged in from SparkSubmit itself 
itself.

Author: Hari Shreedharan <hshreedha...@apache.org>

Closes #7394 from harishreedharan/yarn-client-login and squashes the following 
commits:

9a2166f [Hari Shreedharan] make it possible to use command line args and config 
parameters together.
de08f57 [Hari Shreedharan] Fix import order.
5c4fa63 [Hari Shreedharan] Add a comment explaining what is being done in 
YarnClientSchedulerBackend.
c872caa [Hari Shreedharan] Fix typo in log message.
2c80540 [Hari Shreedharan] Move token renewal to YarnClientSchedulerBackend.
0c48ac2 [Hari Shreedharan] Remove direct use of ExecutorDelegationTokenUpdater 
in Client.
26f8bfa [Hari Shreedharan] [SPARK-8851][YARN] In Client mode, make sure the 
client logs in and updates tokens.
58b1969 [Hari Shreedharan] Simple attempt 1.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c043a3e9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c043a3e9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c043a3e9

Branch: refs/heads/master
Commit: c043a3e9df55721f21332f7c44ff351832d20324
Parents: ec8973d
Author: Hari Shreedharan <hshreedha...@apache.org>
Authored: Fri Jul 17 09:38:08 2015 -0500
Committer: Tom Graves <tgra...@yahoo-inc.com>
Committed: Fri Jul 17 09:38:08 2015 -0500

----------------------------------------------------------------------
 .../apache/spark/deploy/SparkHadoopUtil.scala   | 29 +++++++++++-------
 .../org/apache/spark/deploy/SparkSubmit.scala   | 10 ++++--
 .../org/apache/spark/deploy/yarn/Client.scala   | 32 +++++++++++++-------
 .../cluster/YarnClientSchedulerBackend.scala    | 11 +++++--
 4 files changed, 56 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c043a3e9/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala 
b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index 9f94118..6b14d40 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -25,6 +25,7 @@ import java.util.{Arrays, Comparator}
 import scala.collection.JavaConversions._
 import scala.concurrent.duration._
 import scala.language.postfixOps
+import scala.util.control.NonFatal
 
 import com.google.common.primitives.Longs
 import org.apache.hadoop.conf.Configuration
@@ -248,19 +249,25 @@ class SparkHadoopUtil extends Logging {
       dir: Path,
       prefix: String,
       exclusionSuffix: String): Array[FileStatus] = {
-    val fileStatuses = remoteFs.listStatus(dir,
-      new PathFilter {
-        override def accept(path: Path): Boolean = {
-          val name = path.getName
-          name.startsWith(prefix) && !name.endsWith(exclusionSuffix)
+    try {
+      val fileStatuses = remoteFs.listStatus(dir,
+        new PathFilter {
+          override def accept(path: Path): Boolean = {
+            val name = path.getName
+            name.startsWith(prefix) && !name.endsWith(exclusionSuffix)
+          }
+        })
+      Arrays.sort(fileStatuses, new Comparator[FileStatus] {
+        override def compare(o1: FileStatus, o2: FileStatus): Int = {
+          Longs.compare(o1.getModificationTime, o2.getModificationTime)
         }
       })
-    Arrays.sort(fileStatuses, new Comparator[FileStatus] {
-      override def compare(o1: FileStatus, o2: FileStatus): Int = {
-        Longs.compare(o1.getModificationTime, o2.getModificationTime)
-      }
-    })
-    fileStatuses
+      fileStatuses
+    } catch {
+      case NonFatal(e) =>
+        logWarning("Error while attempting to list files from application 
staging dir", e)
+        Array.empty
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/c043a3e9/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala 
b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 036cb6e..0b39ee8 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -508,8 +508,14 @@ object SparkSubmit {
     }
 
     // Let YARN know it's a pyspark app, so it distributes needed libraries.
-    if (clusterManager == YARN && args.isPython) {
-      sysProps.put("spark.yarn.isPython", "true")
+    if (clusterManager == YARN) {
+      if (args.isPython) {
+        sysProps.put("spark.yarn.isPython", "true")
+      }
+      if (args.principal != null) {
+        require(args.keytab != null, "Keytab must be specified when the keytab 
is specified")
+        UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab)
+      }
     }
 
     // In yarn-cluster mode, use yarn.Client as a wrapper around the user class

http://git-wip-us.apache.org/repos/asf/spark/blob/c043a3e9/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index b74ea9a..bc28ce5 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -80,10 +80,12 @@ private[spark] class Client(
   private val isClusterMode = args.isClusterMode
 
   private var loginFromKeytab = false
+  private var principal: String = null
+  private var keytab: String = null
+
   private val fireAndForget = isClusterMode &&
     !sparkConf.getBoolean("spark.yarn.submit.waitAppCompletion", true)
 
-
   def stop(): Unit = yarnClient.stop()
 
   /**
@@ -339,7 +341,7 @@ private[spark] class Client(
     if (loginFromKeytab) {
       logInfo("To enable the AM to login from keytab, credentials are being 
copied over to the AM" +
         " via the YARN Secure Distributed Cache.")
-      val (_, localizedPath) = distribute(args.keytab,
+      val (_, localizedPath) = distribute(keytab,
         destName = Some(sparkConf.get("spark.yarn.keytab")),
         appMasterOnly = true)
       require(localizedPath != null, "Keytab file already distributed.")
@@ -785,19 +787,27 @@ private[spark] class Client(
   }
 
   def setupCredentials(): Unit = {
-    if (args.principal != null) {
-      require(args.keytab != null, "Keytab must be specified when principal is 
specified.")
+    loginFromKeytab = args.principal != null || 
sparkConf.contains("spark.yarn.principal")
+    if (loginFromKeytab) {
+      principal =
+        if (args.principal != null) args.principal else 
sparkConf.get("spark.yarn.principal")
+      keytab = {
+        if (args.keytab != null) {
+          args.keytab
+        } else {
+          sparkConf.getOption("spark.yarn.keytab").orNull
+        }
+      }
+
+      require(keytab != null, "Keytab must be specified when principal is 
specified.")
       logInfo("Attempting to login to the Kerberos" +
-        s" using principal: ${args.principal} and keytab: ${args.keytab}")
-      val f = new File(args.keytab)
+        s" using principal: $principal and keytab: $keytab")
+      val f = new File(keytab)
       // Generate a file name that can be used for the keytab file, that does 
not conflict
       // with any user file.
       val keytabFileName = f.getName + "-" + UUID.randomUUID().toString
-      UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab)
-      loginFromKeytab = true
       sparkConf.set("spark.yarn.keytab", keytabFileName)
-      sparkConf.set("spark.yarn.principal", args.principal)
-      logInfo("Successfully logged into the KDC.")
+      sparkConf.set("spark.yarn.principal", principal)
     }
     credentials = UserGroupInformation.getCurrentUser.getCredentials
   }
@@ -1162,7 +1172,7 @@ object Client extends Logging {
    *
    * If not a "local:" file and no alternate name, the environment is not 
modified.
    *
-   * @parma conf      Spark configuration.
+   * @param conf      Spark configuration.
    * @param uri       URI to add to classpath (optional).
    * @param fileName  Alternate name for the file (optional).
    * @param env       Map holding the environment variables.

http://git-wip-us.apache.org/repos/asf/spark/blob/c043a3e9/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
 
b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index 3a0b944..d97fa2e 100644
--- 
a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ 
b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -20,10 +20,9 @@ package org.apache.spark.scheduler.cluster
 import scala.collection.mutable.ArrayBuffer
 
 import org.apache.hadoop.yarn.api.records.{ApplicationId, YarnApplicationState}
-import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException
 
 import org.apache.spark.{SparkException, Logging, SparkContext}
-import org.apache.spark.deploy.yarn.{Client, ClientArguments}
+import org.apache.spark.deploy.yarn.{Client, ClientArguments, 
YarnSparkHadoopUtil}
 import org.apache.spark.scheduler.TaskSchedulerImpl
 
 private[spark] class YarnClientSchedulerBackend(
@@ -62,6 +61,13 @@ private[spark] class YarnClientSchedulerBackend(
     super.start()
 
     waitForApplication()
+
+    // SPARK-8851: In yarn-client mode, the AM still does the credentials 
refresh. The driver
+    // reads the credentials from HDFS, just like the executors and updates 
its own credentials
+    // cache.
+    if (conf.contains("spark.yarn.credentials.file")) {
+      YarnSparkHadoopUtil.get.startExecutorDelegationTokenRenewer(conf)
+    }
     monitorThread = asyncMonitorApplication()
     monitorThread.start()
   }
@@ -158,6 +164,7 @@ private[spark] class YarnClientSchedulerBackend(
     }
     super.stop()
     client.stop()
+    YarnSparkHadoopUtil.get.stopExecutorDelegationTokenRenewer()
     logInfo("Stopped")
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to