Repository: spark
Updated Branches:
  refs/heads/branch-0.9 1e366904c -> 748f002b3


SPARK-1051. On YARN, executors don't doAs submitting user

This reopens https://github.com/apache/incubator-spark/pull/538 against the new 
repo

Author: Sandy Ryza <[email protected]>

Closes #29 from sryza/sandy-spark-1051 and squashes the following commits:

708ce49 [Sandy Ryza] SPARK-1051. doAs submitting user in YARN


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/748f002b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/748f002b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/748f002b

Branch: refs/heads/branch-0.9
Commit: 748f002b3a58af118f7de0b6ea9170895b571c78
Parents: 1e36690
Author: Sandy Ryza <[email protected]>
Authored: Fri Feb 28 12:43:01 2014 -0600
Committer: Thomas Graves <[email protected]>
Committed: Thu Mar 20 14:48:05 2014 -0500

----------------------------------------------------------------------
 .../org/apache/spark/deploy/SparkHadoopUtil.scala | 18 ++++++++++--------
 .../spark/deploy/yarn/ApplicationMaster.scala     |  6 +++++-
 .../org/apache/spark/deploy/yarn/Client.scala     |  1 +
 .../spark/deploy/yarn/YarnSparkHadoopUtil.scala   |  4 ++++
 .../spark/deploy/yarn/ApplicationMaster.scala     |  6 +++++-
 .../org/apache/spark/deploy/yarn/Client.scala     |  1 +
 6 files changed, 26 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/748f002b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala 
b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index b479225..ec15647 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -25,6 +25,8 @@ import org.apache.hadoop.security.UserGroupInformation
 
 import org.apache.spark.{SparkContext, SparkException}
 
+import scala.collection.JavaConversions._
+
 /**
  * Contains util methods to interact with Hadoop from Spark.
  */
@@ -33,15 +35,9 @@ class SparkHadoopUtil {
   UserGroupInformation.setConfiguration(conf)
 
   def runAsUser(user: String)(func: () => Unit) {
-    // if we are already running as the user intended there is no reason to do 
the doAs. It
-    // will actually break secure HDFS access as it doesn't fill in the 
credentials. Also if
-    // the user is UNKNOWN then we shouldn't be creating a remote unknown user
-    // (this is actually the path spark on yarn takes) since SPARK_USER is 
initialized only
-    // in SparkContext.
-    val currentUser = Option(System.getProperty("user.name")).
-      getOrElse(SparkContext.SPARK_UNKNOWN_USER)
-    if (user != SparkContext.SPARK_UNKNOWN_USER && currentUser != user) {
+    if (user != SparkContext.SPARK_UNKNOWN_USER) {
       val ugi = UserGroupInformation.createRemoteUser(user)
+      transferCredentials(UserGroupInformation.getCurrentUser(), ugi)
       ugi.doAs(new PrivilegedExceptionAction[Unit] {
         def run: Unit = func()
       })
@@ -50,6 +46,12 @@ class SparkHadoopUtil {
     }
   }
 
+  def transferCredentials(source: UserGroupInformation, dest: 
UserGroupInformation) {
+    for (token <- source.getTokens()) {
+      dest.addToken(token)
+    }
+  }
+
   /**
    * Return an appropriate (subclass) of Configuration. Creating config can 
initializes some Hadoop
    * subsystems.

http://git-wip-us.apache.org/repos/asf/spark/blob/748f002b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git 
a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
 
b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 2e46d75..7aa1894 100644
--- 
a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ 
b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.ipc.YarnRPC
 import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
 
 import org.apache.spark.{SparkConf, SparkContext, Logging}
+import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.util.Utils
 
 class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
@@ -65,6 +66,9 @@ class ApplicationMaster(args: ApplicationMasterArguments, 
conf: Configuration,
   private val maxNumWorkerFailures = 
sparkConf.getInt("spark.yarn.max.worker.failures",
     math.max(args.numWorkers * 2, 3))
 
+  private val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse(
+    SparkContext.SPARK_UNKNOWN_USER)
+
   def run() {
     // Setup the directories so things go to yarn approved directories rather
     // then user specified and /tmp.
@@ -173,7 +177,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, 
conf: Configuration,
       false /* initialize */ ,
       Thread.currentThread.getContextClassLoader).getMethod("main", 
classOf[Array[String]])
     val t = new Thread {
-      override def run() {
+      override def run(): Unit = SparkHadoopUtil.get.runAsUser(sparkUser) { () 
=>
         var successed = false
         try {
           // Copy

http://git-wip-us.apache.org/repos/asf/spark/blob/748f002b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git 
a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala 
b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index e56bc02..6956cd6 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -314,6 +314,7 @@ class Client(args: ClientArguments, conf: Configuration, 
sparkConf: SparkConf)
     Client.populateClasspath(yarnConf, sparkConf, log4jConfLocalRes != null, 
env)
     env("SPARK_YARN_MODE") = "true"
     env("SPARK_YARN_STAGING_DIR") = stagingDir
+    env("SPARK_USER") = 
UserGroupInformation.getCurrentUser().getShortUserName()
 
     // Set the environment variables to be passed on to the Workers.
     distCacheMgr.setDistFilesEnv(env)

http://git-wip-us.apache.org/repos/asf/spark/blob/748f002b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
----------------------------------------------------------------------
diff --git 
a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
 
b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index 2ba2366..d6c12a9 100644
--- 
a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ 
b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -28,6 +28,10 @@ import org.apache.hadoop.conf.Configuration
  */
 class YarnSparkHadoopUtil extends SparkHadoopUtil {
 
+  override def transferCredentials(source: UserGroupInformation, dest: 
UserGroupInformation) {
+    dest.addCredentials(source.getCredentials())
+  }
+
   // Note that all params which start with SPARK are propagated all the way 
through, so if in yarn mode, this MUST be set to true.
   override def isYarnMode(): Boolean = { true }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/748f002b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git 
a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
 
b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 4b777d5..45a7f38 100644
--- 
a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ 
b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.ipc.YarnRPC
 import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
 
 import org.apache.spark.{SparkConf, SparkContext, Logging}
+import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.util.Utils
 
 
@@ -67,6 +68,9 @@ class ApplicationMaster(args: ApplicationMasterArguments, 
conf: Configuration,
   private val maxNumWorkerFailures = 
sparkConf.getInt("spark.yarn.max.worker.failures",
     math.max(args.numWorkers * 2, 3))
 
+  private val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse(
+    SparkContext.SPARK_UNKNOWN_USER)
+
   def run() {
     // Setup the directories so things go to YARN approved directories rather
     // than user specified and /tmp.
@@ -145,7 +149,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, 
conf: Configuration,
       false /* initialize */ ,
       Thread.currentThread.getContextClassLoader).getMethod("main", 
classOf[Array[String]])
     val t = new Thread {
-      override def run() {
+      override def run(): Unit = SparkHadoopUtil.get.runAsUser(sparkUser) { () 
=>
         var successed = false
         try {
           // Copy

http://git-wip-us.apache.org/repos/asf/spark/blob/748f002b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git 
a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala 
b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 51d9adb..8ccdea6 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -333,6 +333,7 @@ class Client(args: ClientArguments, conf: Configuration, 
sparkConf: SparkConf)
     Client.populateClasspath(yarnConf, sparkConf, log4jConfLocalRes != null, 
env)
     env("SPARK_YARN_MODE") = "true"
     env("SPARK_YARN_STAGING_DIR") = stagingDir
+    env("SPARK_USER") = 
UserGroupInformation.getCurrentUser().getShortUserName()
 
     // Set the environment variables to be passed on to the Workers.
     distCacheMgr.setDistFilesEnv(env)

Reply via email to