Repository: spark Updated Branches: refs/heads/branch-0.9 1e366904c -> 748f002b3
SPARK-1051. On YARN, executors don't doAs submitting user This reopens https://github.com/apache/incubator-spark/pull/538 against the new repo Author: Sandy Ryza <[email protected]> Closes #29 from sryza/sandy-spark-1051 and squashes the following commits: 708ce49 [Sandy Ryza] SPARK-1051. doAs submitting user in YARN Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/748f002b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/748f002b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/748f002b Branch: refs/heads/branch-0.9 Commit: 748f002b3a58af118f7de0b6ea9170895b571c78 Parents: 1e36690 Author: Sandy Ryza <[email protected]> Authored: Fri Feb 28 12:43:01 2014 -0600 Committer: Thomas Graves <[email protected]> Committed: Thu Mar 20 14:48:05 2014 -0500 ---------------------------------------------------------------------- .../org/apache/spark/deploy/SparkHadoopUtil.scala | 18 ++++++++++-------- .../spark/deploy/yarn/ApplicationMaster.scala | 6 +++++- .../org/apache/spark/deploy/yarn/Client.scala | 1 + .../spark/deploy/yarn/YarnSparkHadoopUtil.scala | 4 ++++ .../spark/deploy/yarn/ApplicationMaster.scala | 6 +++++- .../org/apache/spark/deploy/yarn/Client.scala | 1 + 6 files changed, 26 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/748f002b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index b479225..ec15647 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -25,6 +25,8 @@ import org.apache.hadoop.security.UserGroupInformation import org.apache.spark.{SparkContext, SparkException} +import scala.collection.JavaConversions._ + /** * Contains util methods to interact with Hadoop from Spark. */ @@ -33,15 +35,9 @@ class SparkHadoopUtil { UserGroupInformation.setConfiguration(conf) def runAsUser(user: String)(func: () => Unit) { - // if we are already running as the user intended there is no reason to do the doAs. It - // will actually break secure HDFS access as it doesn't fill in the credentials. Also if - // the user is UNKNOWN then we shouldn't be creating a remote unknown user - // (this is actually the path spark on yarn takes) since SPARK_USER is initialized only - // in SparkContext. - val currentUser = Option(System.getProperty("user.name")). - getOrElse(SparkContext.SPARK_UNKNOWN_USER) - if (user != SparkContext.SPARK_UNKNOWN_USER && currentUser != user) { + if (user != SparkContext.SPARK_UNKNOWN_USER) { val ugi = UserGroupInformation.createRemoteUser(user) + transferCredentials(UserGroupInformation.getCurrentUser(), ugi) ugi.doAs(new PrivilegedExceptionAction[Unit] { def run: Unit = func() }) @@ -50,6 +46,12 @@ class SparkHadoopUtil { } } + def transferCredentials(source: UserGroupInformation, dest: UserGroupInformation) { + for (token <- source.getTokens()) { + dest.addToken(token) + } + } + /** * Return an appropriate (subclass) of Configuration. Creating config can initializes some Hadoop * subsystems. http://git-wip-us.apache.org/repos/asf/spark/blob/748f002b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala ---------------------------------------------------------------------- diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 2e46d75..7aa1894 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.ipc.YarnRPC import org.apache.hadoop.yarn.util.{ConverterUtils, Records} import org.apache.spark.{SparkConf, SparkContext, Logging} +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.util.Utils class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, @@ -65,6 +66,9 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, private val maxNumWorkerFailures = sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numWorkers * 2, 3)) + private val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse( + SparkContext.SPARK_UNKNOWN_USER) + def run() { // Setup the directories so things go to yarn approved directories rather // then user specified and /tmp. @@ -173,7 +177,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, false /* initialize */ , Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]]) val t = new Thread { - override def run() { + override def run(): Unit = SparkHadoopUtil.get.runAsUser(sparkUser) { () => var successed = false try { // Copy http://git-wip-us.apache.org/repos/asf/spark/blob/748f002b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala ---------------------------------------------------------------------- diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index e56bc02..6956cd6 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -314,6 +314,7 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf) Client.populateClasspath(yarnConf, sparkConf, log4jConfLocalRes != null, env) env("SPARK_YARN_MODE") = "true" env("SPARK_YARN_STAGING_DIR") = stagingDir + env("SPARK_USER") = UserGroupInformation.getCurrentUser().getShortUserName() // Set the environment variables to be passed on to the Workers. distCacheMgr.setDistFilesEnv(env) http://git-wip-us.apache.org/repos/asf/spark/blob/748f002b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala ---------------------------------------------------------------------- diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala index 2ba2366..d6c12a9 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala @@ -28,6 +28,10 @@ import org.apache.hadoop.conf.Configuration */ class YarnSparkHadoopUtil extends SparkHadoopUtil { + override def transferCredentials(source: UserGroupInformation, dest: UserGroupInformation) { + dest.addCredentials(source.getCredentials()) + } + // Note that all params which start with SPARK are propagated all the way through, so if in yarn mode, this MUST be set to true. override def isYarnMode(): Boolean = { true } http://git-wip-us.apache.org/repos/asf/spark/blob/748f002b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala ---------------------------------------------------------------------- diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 4b777d5..45a7f38 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.ipc.YarnRPC import org.apache.hadoop.yarn.util.{ConverterUtils, Records} import org.apache.spark.{SparkConf, SparkContext, Logging} +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.util.Utils @@ -67,6 +68,9 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, private val maxNumWorkerFailures = sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numWorkers * 2, 3)) + private val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse( + SparkContext.SPARK_UNKNOWN_USER) + def run() { // Setup the directories so things go to YARN approved directories rather // than user specified and /tmp. @@ -145,7 +149,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, false /* initialize */ , Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]]) val t = new Thread { - override def run() { + override def run(): Unit = SparkHadoopUtil.get.runAsUser(sparkUser) { () => var successed = false try { // Copy http://git-wip-us.apache.org/repos/asf/spark/blob/748f002b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala ---------------------------------------------------------------------- diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 51d9adb..8ccdea6 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -333,6 +333,7 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf) Client.populateClasspath(yarnConf, sparkConf, log4jConfLocalRes != null, env) env("SPARK_YARN_MODE") = "true" env("SPARK_YARN_STAGING_DIR") = stagingDir + env("SPARK_USER") = UserGroupInformation.getCurrentUser().getShortUserName() // Set the environment variables to be passed on to the Workers. distCacheMgr.setDistFilesEnv(env)
