Repository: spark
Updated Branches:
  refs/heads/branch-0.9 54c3b7e3b -> 45561cd9f


[WIP] SPARK-1676: Cache Hadoop UGIs by default to prevent FileSystem leak

Move the doAs in Executor higher up so that we only have 1 ugi and aren't 
leaking filesystems.
Fix spark on yarn to work when the cluster is running as user "yarn" but the 
clients are launched as the user and want to read/write to hdfs as the user.

Note this hasn't been fully tested yet.  Need to test in standalone mode.

Putting this up for people to look at and possibly test.  I don't have access 
to a mesos cluster.

This is alternative to https://github.com/apache/spark/pull/607

Author: Thomas Graves <tgra...@apache.org>

Closes #621 from tgravescs/SPARK-1676 and squashes the following commits:

244d55a [Thomas Graves] fix line length
44163d4 [Thomas Graves] Rework
9398853 [Thomas Graves] change to have doAs in executor higher up.

(cherry picked from commit 3d0a02dff3011e8894d98d903cd086bc95e56807)
Signed-off-by: Aaron Davidson <aa...@databricks.com>

Conflicts:
        core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
        
core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
        
yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
        
yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
        
yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/45561cd9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/45561cd9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/45561cd9

Branch: refs/heads/branch-0.9
Commit: 45561cd9fcf0445274d82da495401d32b62aa915
Parents: 54c3b7e
Author: Thomas Graves <tgra...@apache.org>
Authored: Sat May 3 10:59:05 2014 -0700
Committer: Aaron Davidson <aa...@databricks.com>
Committed: Sat May 3 11:13:06 2014 -0700

----------------------------------------------------------------------
 .../apache/spark/deploy/SparkHadoopUtil.scala   | 17 +++++++--
 .../executor/CoarseGrainedExecutorBackend.scala | 36 ++++++++++++--------
 .../org/apache/spark/executor/Executor.scala    |  4 +--
 .../spark/executor/MesosExecutorBackend.scala   | 14 ++++----
 .../spark/deploy/yarn/ApplicationMaster.scala   | 10 +++---
 .../spark/deploy/yarn/WorkerLauncher.scala      |  6 +++-
 .../spark/deploy/yarn/ApplicationMaster.scala   | 12 +++----
 .../spark/deploy/yarn/WorkerLauncher.scala      |  5 ++-
 8 files changed, 64 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/45561cd9/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala 
b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index ec15647..f31dd4e 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -23,25 +23,36 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.mapred.JobConf
 import org.apache.hadoop.security.UserGroupInformation
 
-import org.apache.spark.{SparkContext, SparkException}
+import org.apache.spark.{Logging, SparkContext, SparkException}
 
 import scala.collection.JavaConversions._
 
 /**
  * Contains util methods to interact with Hadoop from Spark.
  */
-class SparkHadoopUtil {
+class SparkHadoopUtil extends Logging {
   val conf = newConfiguration()
   UserGroupInformation.setConfiguration(conf)
 
-  def runAsUser(user: String)(func: () => Unit) {
+  /**
+   * Runs the given function with a Hadoop UserGroupInformation as a thread 
local variable
+   * (distributed to child threads), used for authenticating HDFS and YARN 
calls.
+   *
+   * IMPORTANT NOTE: If this function is going to be called repeated in the 
same process
+   * you need to look https://issues.apache.org/jira/browse/HDFS-3545 and 
possibly
+   * do a FileSystem.closeAllForUGI in order to avoid leaking Filesystems
+   */
+  def runAsSparkUser(func: () => Unit) {
+    val user = 
Option(System.getenv("SPARK_USER")).getOrElse(SparkContext.SPARK_UNKNOWN_USER)
     if (user != SparkContext.SPARK_UNKNOWN_USER) {
+      logDebug("running as user: " + user)
       val ugi = UserGroupInformation.createRemoteUser(user)
       transferCredentials(UserGroupInformation.getCurrentUser(), ugi)
       ugi.doAs(new PrivilegedExceptionAction[Unit] {
         def run: Unit = func()
       })
     } else {
+      logDebug("running as SPARK_UNKNOWN_USER")
       func()
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/45561cd9/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index 45b43b4..8ef527b 100644
--- 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -24,6 +24,7 @@ import akka.remote._
 
 import org.apache.spark.{SparkConf, SparkContext, Logging}
 import org.apache.spark.TaskState.TaskState
+import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.deploy.worker.WorkerWatcher
 import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
 import org.apache.spark.util.{AkkaUtils, Utils}
@@ -94,22 +95,27 @@ private[spark] class CoarseGrainedExecutorBackend(
 private[spark] object CoarseGrainedExecutorBackend {
   def run(driverUrl: String, executorId: String, hostname: String, cores: Int,
           workerUrl: Option[String]) {
-    // Debug code
-    Utils.checkHost(hostname)
-
-    // Create a new ActorSystem to run the backend, because we can't create a 
SparkEnv / Executor
-    // before getting started with all our system properties, etc
-    val (actorSystem, boundPort) = 
AkkaUtils.createActorSystem("sparkExecutor", hostname, 0,
-      indestructible = true, conf = new SparkConf)
-    // set it
-    val sparkHostPort = hostname + ":" + boundPort
-    actorSystem.actorOf(
-      Props(classOf[CoarseGrainedExecutorBackend], driverUrl, executorId, 
sparkHostPort, cores),
-      name = "Executor")
-    workerUrl.foreach{ url =>
-      actorSystem.actorOf(Props(classOf[WorkerWatcher], url), name = 
"WorkerWatcher")
+    SparkHadoopUtil.get.runAsSparkUser { () =>
+
+      // Debug code
+      Utils.checkHost(hostname)
+
+      // Create a new ActorSystem to run the backend, because we can't create 
a SparkEnv / Executor
+      // before getting started with all our system properties, etc
+      val (actorSystem, boundPort) = 
AkkaUtils.createActorSystem("sparkExecutor", hostname, 0,
+        indestructible = true, conf = new SparkConf)
+      // set it
+      val sparkHostPort = hostname + ":" + boundPort
+      actorSystem.actorOf(
+        Props(classOf[CoarseGrainedExecutorBackend], driverUrl, executorId, 
sparkHostPort, cores),
+        name = "Executor")
+      workerUrl.foreach {
+        url =>
+          actorSystem.actorOf(Props(classOf[WorkerWatcher], url), name = 
"WorkerWatcher")
+      }
+      actorSystem.awaitTermination()
+
     }
-    actorSystem.awaitTermination()
   }
 
   def main(args: Array[String]) {

http://git-wip-us.apache.org/repos/asf/spark/blob/45561cd9/core/src/main/scala/org/apache/spark/executor/Executor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala 
b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 661a390..89907e5 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -127,8 +127,6 @@ private[spark] class Executor(
   // Maintains the list of running tasks.
   private val runningTasks = new ConcurrentHashMap[Long, TaskRunner]
 
-  val sparkUser = 
Option(System.getenv("SPARK_USER")).getOrElse(SparkContext.SPARK_UNKNOWN_USER)
-
   def launchTask(context: ExecutorBackend, taskId: Long, serializedTask: 
ByteBuffer) {
     val tr = new TaskRunner(context, taskId, serializedTask)
     runningTasks.put(taskId, tr)
@@ -173,7 +171,7 @@ private[spark] class Executor(
       }
     }
 
-    override def run(): Unit = SparkHadoopUtil.get.runAsUser(sparkUser) { () =>
+    override def run() {
       val startTime = System.currentTimeMillis()
       SparkEnv.set(env)
       Thread.currentThread.setContextClassLoader(replClassLoader)

http://git-wip-us.apache.org/repos/asf/spark/blob/45561cd9/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala 
b/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
index b56d8c9..d5ef00c 100644
--- a/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
@@ -24,10 +24,10 @@ import com.google.protobuf.ByteString
 import org.apache.mesos.{Executor => MesosExecutor, MesosExecutorDriver, 
MesosNativeLibrary, ExecutorDriver}
 import org.apache.mesos.Protos.{TaskStatus => MesosTaskStatus, _}
 
-import org.apache.spark.Logging
-import org.apache.spark.TaskState
+import org.apache.spark.{Logging, TaskState}
 import org.apache.spark.TaskState.TaskState
 import org.apache.spark.util.Utils
+import org.apache.spark.deploy.SparkHadoopUtil
 
 
 private[spark] class MesosExecutorBackend
@@ -96,9 +96,11 @@ private[spark] class MesosExecutorBackend
  */
 private[spark] object MesosExecutorBackend {
   def main(args: Array[String]) {
-    MesosNativeLibrary.load()
-    // Create a new Executor and start it running
-    val runner = new MesosExecutorBackend()
-    new MesosExecutorDriver(runner).run()
+    SparkHadoopUtil.get.runAsSparkUser { () =>
+        MesosNativeLibrary.load()
+        // Create a new Executor and start it running
+        val runner = new MesosExecutorBackend()
+        new MesosExecutorDriver(runner).run()
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/45561cd9/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git 
a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
 
b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index e045b9f..66bcaf1 100644
--- 
a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ 
b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -68,9 +68,6 @@ class ApplicationMaster(args: ApplicationMasterArguments, 
conf: Configuration,
 
   private var registered = false
 
-  private val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse(
-    SparkContext.SPARK_UNKNOWN_USER)
-
   def run() {
     // Setup the directories so things go to yarn approved directories rather
     // then user specified and /tmp.
@@ -184,7 +181,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, 
conf: Configuration,
       false /* initialize */ ,
       Thread.currentThread.getContextClassLoader).getMethod("main", 
classOf[Array[String]])
     val t = new Thread {
-      override def run(): Unit = SparkHadoopUtil.get.runAsUser(sparkUser) { () 
=>
+      override def run() {
+
         var successed = false
         try {
           // Copy
@@ -472,6 +470,8 @@ object ApplicationMaster {
 
   def main(argStrings: Array[String]) {
     val args = new ApplicationMasterArguments(argStrings)
-    new ApplicationMaster(args).run()
+    SparkHadoopUtil.get.runAsSparkUser { () =>
+      new ApplicationMaster(args).run()
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/45561cd9/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
----------------------------------------------------------------------
diff --git 
a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala 
b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
index 138c279..1d76bfe 100644
--- 
a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
+++ 
b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
@@ -33,6 +33,7 @@ import org.apache.spark.{SparkConf, SparkContext, Logging}
 import org.apache.spark.util.{Utils, AkkaUtils}
 import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
 import org.apache.spark.scheduler.SplitInfo
+import org.apache.spark.deploy.SparkHadoopUtil
 
 class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, 
sparkConf: SparkConf)
   extends Logging {
@@ -265,6 +266,9 @@ class WorkerLauncher(args: ApplicationMasterArguments, 
conf: Configuration, spar
 object WorkerLauncher {
   def main(argStrings: Array[String]) {
     val args = new ApplicationMasterArguments(argStrings)
-    new WorkerLauncher(args).run()
+    SparkHadoopUtil.get.runAsSparkUser { () =>
+      new WorkerLauncher(args).run()
+    }
   }
 }
+

http://git-wip-us.apache.org/repos/asf/spark/blob/45561cd9/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git 
a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
 
b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index b312a42..83f8234 100644
--- 
a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ 
b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -70,9 +70,6 @@ class ApplicationMaster(args: ApplicationMasterArguments, 
conf: Configuration,
 
   private var registered = false
 
-  private val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse(
-    SparkContext.SPARK_UNKNOWN_USER)
-
   def run() {
     // Setup the directories so things go to YARN approved directories rather
     // than user specified and /tmp.
@@ -156,8 +153,9 @@ class ApplicationMaster(args: ApplicationMasterArguments, 
conf: Configuration,
       false /* initialize */ ,
       Thread.currentThread.getContextClassLoader).getMethod("main", 
classOf[Array[String]])
     val t = new Thread {
-      override def run(): Unit = SparkHadoopUtil.get.runAsUser(sparkUser) { () 
=>
-        var successed = false
+      override def run() {
+
+      var successed = false
         try {
           // Copy
           var mainArgs: Array[String] = new Array[String](args.userArgs.size)
@@ -440,6 +438,8 @@ object ApplicationMaster {
 
   def main(argStrings: Array[String]) {
     val args = new ApplicationMasterArguments(argStrings)
-    new ApplicationMaster(args).run()
+    SparkHadoopUtil.get.runAsSparkUser { () =>
+      new ApplicationMaster(args).run()
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/45561cd9/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
----------------------------------------------------------------------
diff --git 
a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala 
b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
index 40600f3..3ab2ef5 100644
--- 
a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
+++ 
b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
@@ -34,6 +34,7 @@ import 
org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
 import org.apache.spark.scheduler.SplitInfo
 import org.apache.hadoop.yarn.client.api.AMRMClient
 import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
+import org.apache.spark.deploy.SparkHadoopUtil
 
 class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, 
sparkConf: SparkConf)
   extends Logging {
@@ -245,6 +246,8 @@ class WorkerLauncher(args: ApplicationMasterArguments, 
conf: Configuration, spar
 object WorkerLauncher {
   def main(argStrings: Array[String]) {
     val args = new ApplicationMasterArguments(argStrings)
-    new WorkerLauncher(args).run()
+    SparkHadoopUtil.get.runAsSparkUser { () =>
+      new WorkerLauncher(args).run()
+    }
   }
 }

Reply via email to