[SPARK-2996] Implement userClassPathFirst for driver, yarn.

Yarn's config option `spark.yarn.user.classpath.first` does not work the same 
way as
`spark.files.userClassPathFirst`; Yarn's version is a lot more dangerous, in 
that it
modifies the system classpath, instead of restricting the changes to the user's 
class
loader. So this change implements the behavior of the latter for Yarn, and 
deprecates
the more dangerous choice.

To be able to achieve feature-parity, I also implemented the option for drivers 
(the existing
option only applies to executors). So now there are two options, each 
controlling whether
to apply userClassPathFirst to the driver or executors. The old option was 
deprecated, and
aliased to the new one (`spark.executor.userClassPathFirst`).

The existing "child-first" class loader also had to be fixed. It didn't handle 
resources, and it
was also doing some things that ended up causing JVM errors depending on how 
things
were being called.

Author: Marcelo Vanzin <van...@cloudera.com>

Closes #3233 from vanzin/SPARK-2996 and squashes the following commits:

9cf9cf1 [Marcelo Vanzin] Merge branch 'master' into SPARK-2996
a1499e2 [Marcelo Vanzin] Remove SPARK_HOME propagation.
fa7df88 [Marcelo Vanzin] Remove 'test.resource' file, create it dynamically.
a8c69f1 [Marcelo Vanzin] Review feedback.
cabf962 [Marcelo Vanzin] Merge branch 'master' into SPARK-2996
a1b8d7e [Marcelo Vanzin] Merge branch 'master' into SPARK-2996
3f768e3 [Marcelo Vanzin] Merge branch 'master' into SPARK-2996
2ce3c7a [Marcelo Vanzin] Merge branch 'master' into SPARK-2996
0e6d6be [Marcelo Vanzin] Merge branch 'master' into SPARK-2996
70d4044 [Marcelo Vanzin] Fix pyspark/yarn-cluster test.
0fe7777 [Marcelo Vanzin] Merge branch 'master' into SPARK-2996
0e6ef19 [Marcelo Vanzin] Move class loaders around and make names more 
meaninful.
fe970a7 [Marcelo Vanzin] Review feedback.
25d4fed [Marcelo Vanzin] Merge branch 'master' into SPARK-2996
3cb6498 [Marcelo Vanzin] Call the right loadClass() method on the parent.
fbb8ab5 [Marcelo Vanzin] Add locking in loadClass() to avoid deadlocks.
2e6c4b7 [Marcelo Vanzin] Mention new setting in documentation.
b6497f9 [Marcelo Vanzin] Merge branch 'master' into SPARK-2996
a10f379 [Marcelo Vanzin] Some feedback.
3730151 [Marcelo Vanzin] Merge branch 'master' into SPARK-2996
f513871 [Marcelo Vanzin] Merge branch 'master' into SPARK-2996
44010b6 [Marcelo Vanzin] Merge branch 'master' into SPARK-2996
7b57cba [Marcelo Vanzin] Remove now outdated message.
5304d64 [Marcelo Vanzin] Merge branch 'master' into SPARK-2996
35949c8 [Marcelo Vanzin] Merge branch 'master' into SPARK-2996
54e1a98 [Marcelo Vanzin] Merge branch 'master' into SPARK-2996
d1273b2 [Marcelo Vanzin] Add test file to rat exclude.
fa1aafa [Marcelo Vanzin] Remove write check on user jars.
89d8072 [Marcelo Vanzin] Cleanups.
a963ea3 [Marcelo Vanzin] Implement spark.driver.userClassPathFirst for 
standalone cluster mode.
50afa5f [Marcelo Vanzin] Fix Yarn executor command line.
7d14397 [Marcelo Vanzin] Register user jars in executor up front.
7f8603c [Marcelo Vanzin] Fix yarn-cluster mode without userClassPathFirst.
20373f5 [Marcelo Vanzin] Fix ClientBaseSuite.
55c88fa [Marcelo Vanzin] Run all Yarn integration tests via spark-submit.
0b64d92 [Marcelo Vanzin] Add deprecation warning to yarn option.
4a84d87 [Marcelo Vanzin] Fix the child-first class loader.
d0394b8 [Marcelo Vanzin] Add "deprecated configs" to SparkConf.
46d8cf2 [Marcelo Vanzin] Update doc with new option, change name to 
"userClassPathFirst".
a314f2d [Marcelo Vanzin] Enable driver class path isolation in SparkSubmit.
91f7e54 [Marcelo Vanzin] [yarn] Enable executor class path isolation.
a853e74 [Marcelo Vanzin] Re-work CoarseGrainedExecutorBackend command line 
arguments.
89522ef [Marcelo Vanzin] Add class path isolation support for Yarn cluster mode.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/20a60131
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/20a60131
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/20a60131

Branch: refs/heads/master
Commit: 20a6013106b56a1a1cc3e8cda092330ffbe77cc3
Parents: 36c4e1d
Author: Marcelo Vanzin <van...@cloudera.com>
Authored: Mon Feb 9 21:17:06 2015 -0800
Committer: Andrew Or <and...@databricks.com>
Committed: Mon Feb 9 21:17:28 2015 -0800

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/SparkConf.scala |  83 +++++-
 .../main/scala/org/apache/spark/TestUtils.scala |  19 +-
 .../scala/org/apache/spark/deploy/Client.scala  |   5 +-
 .../org/apache/spark/deploy/SparkSubmit.scala   |   8 +-
 .../spark/deploy/master/ui/MasterPage.scala     |   2 +-
 .../deploy/rest/StandaloneRestServer.scala      |   2 +-
 .../spark/deploy/worker/DriverRunner.scala      |  15 +-
 .../spark/deploy/worker/DriverWrapper.scala     |  20 +-
 .../executor/CoarseGrainedExecutorBackend.scala |  83 +++++-
 .../org/apache/spark/executor/Executor.scala    |  52 ++--
 .../spark/executor/ExecutorURLClassLoader.scala |  84 ------
 .../cluster/SparkDeploySchedulerBackend.scala   |   9 +-
 .../mesos/CoarseMesosSchedulerBackend.scala     |  21 +-
 .../spark/util/MutableURLClassLoader.scala      | 103 +++++++
 .../apache/spark/util/ParentClassLoader.scala   |   7 +-
 .../scala/org/apache/spark/SparkConfSuite.scala |  12 +
 .../apache/spark/deploy/SparkSubmitSuite.scala  |  27 ++
 .../executor/ExecutorURLClassLoaderSuite.scala  | 103 -------
 .../spark/util/MutableURLClassLoaderSuite.scala | 103 +++++++
 docs/configuration.md                           |  31 ++-
 pom.xml                                         |  12 +-
 project/SparkBuild.scala                        |   8 +-
 .../spark/deploy/yarn/ApplicationMaster.scala   |  25 +-
 .../org/apache/spark/deploy/yarn/Client.scala   | 133 ++++-----
 .../spark/deploy/yarn/ExecutorRunnable.scala    |  25 +-
 yarn/src/test/resources/log4j.properties        |   4 +-
 .../apache/spark/deploy/yarn/ClientSuite.scala  |   6 +-
 .../spark/deploy/yarn/YarnClusterSuite.scala    | 276 +++++++++++++------
 28 files changed, 833 insertions(+), 445 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/20a60131/core/src/main/scala/org/apache/spark/SparkConf.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala 
b/core/src/main/scala/org/apache/spark/SparkConf.scala
index 13aa996..0dbd261 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -18,6 +18,7 @@
 package org.apache.spark
 
 import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.atomic.AtomicBoolean
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable.LinkedHashSet
@@ -67,7 +68,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with 
Logging {
     if (value == null) {
       throw new NullPointerException("null value for " + key)
     }
-    settings.put(key, value)
+    settings.put(translateConfKey(key, warn = true), value)
     this
   }
 
@@ -139,7 +140,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable 
with Logging {
 
   /** Set a parameter if it isn't already configured */
   def setIfMissing(key: String, value: String): SparkConf = {
-    settings.putIfAbsent(key, value)
+    settings.putIfAbsent(translateConfKey(key, warn = true), value)
     this
   }
 
@@ -175,7 +176,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable 
with Logging {
 
   /** Get a parameter as an Option */
   def getOption(key: String): Option[String] = {
-    Option(settings.get(key))
+    Option(settings.get(translateConfKey(key)))
   }
 
   /** Get all parameters as a list of pairs */
@@ -228,7 +229,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable 
with Logging {
   def getAppId: String = get("spark.app.id")
 
   /** Does the configuration contain a given parameter? */
-  def contains(key: String): Boolean = settings.containsKey(key)
+  def contains(key: String): Boolean = 
settings.containsKey(translateConfKey(key))
 
   /** Copy this object */
   override def clone: SparkConf = {
@@ -285,7 +286,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable 
with Logging {
     // Validate memory fractions
     val memoryKeys = Seq(
       "spark.storage.memoryFraction",
-      "spark.shuffle.memoryFraction", 
+      "spark.shuffle.memoryFraction",
       "spark.shuffle.safetyFraction",
       "spark.storage.unrollFraction",
       "spark.storage.safetyFraction")
@@ -351,9 +352,20 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable 
with Logging {
   def toDebugString: String = {
     getAll.sorted.map{case (k, v) => k + "=" + v}.mkString("\n")
   }
+
 }
 
-private[spark] object SparkConf {
+private[spark] object SparkConf extends Logging {
+
+  private val deprecatedConfigs: Map[String, DeprecatedConfig] = {
+    val configs = Seq(
+      DeprecatedConfig("spark.files.userClassPathFirst", 
"spark.executor.userClassPathFirst",
+        "1.3"),
+      DeprecatedConfig("spark.yarn.user.classpath.first", null, "1.3",
+        "Use spark.{driver,executor}.userClassPathFirst instead."))
+    configs.map { x => (x.oldName, x) }.toMap
+  }
+
   /**
    * Return whether the given config is an akka config (e.g. 
akka.actor.provider).
    * Note that this does not include spark-specific akka configs (e.g. 
spark.akka.timeout).
@@ -380,4 +392,63 @@ private[spark] object SparkConf {
   def isSparkPortConf(name: String): Boolean = {
     (name.startsWith("spark.") && name.endsWith(".port")) || 
name.startsWith("spark.port.")
   }
+
+  /**
+   * Translate the configuration key if it is deprecated and has a 
replacement, otherwise just
+   * returns the provided key.
+   *
+   * @param userKey Configuration key from the user / caller.
+   * @param warn Whether to print a warning if the key is deprecated. Warnings 
will be printed
+   *             only once for each key.
+   */
+  def translateConfKey(userKey: String, warn: Boolean = false): String = {
+    deprecatedConfigs.get(userKey)
+      .map { deprecatedKey =>
+        if (warn) {
+          deprecatedKey.warn()
+        }
+        deprecatedKey.newName.getOrElse(userKey)
+      }.getOrElse(userKey)
+  }
+
+  /**
+   * Holds information about keys that have been deprecated or renamed.
+   *
+   * @param oldName Old configuration key.
+   * @param newName New configuration key, or `null` if key has no 
replacement, in which case the
+   *                deprecated key will be used (but the warning message will 
still be printed).
+   * @param version Version of Spark where key was deprecated.
+   * @param deprecationMessage Message to include in the deprecation warning; 
mandatory when
+   *                           `newName` is not provided.
+   */
+  private case class DeprecatedConfig(
+      oldName: String,
+      _newName: String,
+      version: String,
+      deprecationMessage: String = null) {
+
+    private val warned = new AtomicBoolean(false)
+    val newName = Option(_newName)
+
+    if (newName == null && (deprecationMessage == null || 
deprecationMessage.isEmpty())) {
+      throw new IllegalArgumentException("Need new config name or deprecation 
message.")
+    }
+
+    def warn(): Unit = {
+      if (warned.compareAndSet(false, true)) {
+        if (newName != null) {
+          val message = Option(deprecationMessage).getOrElse(
+            s"Please use the alternative '$newName' instead.")
+          logWarning(
+            s"The configuration option '$oldName' has been replaced as of 
Spark $version and " +
+            s"may be removed in the future. $message")
+        } else {
+          logWarning(
+            s"The configuration option '$oldName' has been deprecated as of 
Spark $version and " +
+            s"may be removed in the future. $deprecationMessage")
+        }
+      }
+    }
+
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/20a60131/core/src/main/scala/org/apache/spark/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/TestUtils.scala 
b/core/src/main/scala/org/apache/spark/TestUtils.scala
index be081c3..35b324b 100644
--- a/core/src/main/scala/org/apache/spark/TestUtils.scala
+++ b/core/src/main/scala/org/apache/spark/TestUtils.scala
@@ -17,12 +17,13 @@
 
 package org.apache.spark
 
-import java.io.{File, FileInputStream, FileOutputStream}
+import java.io.{ByteArrayInputStream, File, FileInputStream, FileOutputStream}
 import java.net.{URI, URL}
 import java.util.jar.{JarEntry, JarOutputStream}
 
 import scala.collection.JavaConversions._
 
+import com.google.common.base.Charsets.UTF_8
 import com.google.common.io.{ByteStreams, Files}
 import javax.tools.{JavaFileObject, SimpleJavaFileObject, ToolProvider}
 
@@ -59,6 +60,22 @@ private[spark] object TestUtils {
     createJar(files1 ++ files2, jarFile)
   }
 
+  /**
+   * Create a jar file containing multiple files. The `files` map contains a 
mapping of
+   * file names in the jar file to their contents.
+   */
+  def createJarWithFiles(files: Map[String, String], dir: File = null): URL = {
+    val tempDir = Option(dir).getOrElse(Utils.createTempDir())
+    val jarFile = File.createTempFile("testJar", ".jar", tempDir)
+    val jarStream = new JarOutputStream(new FileOutputStream(jarFile))
+    files.foreach { case (k, v) =>
+      val entry = new JarEntry(k)
+      jarStream.putNextEntry(entry)
+      ByteStreams.copy(new ByteArrayInputStream(v.getBytes(UTF_8)), jarStream)
+    }
+    jarStream.close()
+    jarFile.toURI.toURL
+  }
 
   /**
    * Create a jar file that contains this set of files. All files will be 
located at the root

http://git-wip-us.apache.org/repos/asf/spark/blob/20a60131/core/src/main/scala/org/apache/spark/deploy/Client.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala 
b/core/src/main/scala/org/apache/spark/deploy/Client.scala
index 38b3da0..237d26f 100644
--- a/core/src/main/scala/org/apache/spark/deploy/Client.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala
@@ -68,8 +68,9 @@ private class ClientActor(driverArgs: ClientArguments, conf: 
SparkConf)
           .map(Utils.splitCommandString).getOrElse(Seq.empty)
         val sparkJavaOpts = Utils.sparkJavaOpts(conf)
         val javaOpts = sparkJavaOpts ++ extraJavaOpts
-        val command = new Command(mainClass, Seq("{{WORKER_URL}}", 
driverArgs.mainClass) ++
-          driverArgs.driverOptions, sys.env, classPathEntries, 
libraryPathEntries, javaOpts)
+        val command = new Command(mainClass,
+          Seq("{{WORKER_URL}}", "{{USER_JAR}}", driverArgs.mainClass) ++ 
driverArgs.driverOptions,
+          sys.env, classPathEntries, libraryPathEntries, javaOpts)
 
         val driverDescription = new DriverDescription(
           driverArgs.jarUrl,

http://git-wip-us.apache.org/repos/asf/spark/blob/20a60131/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala 
b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 6d21392..c4bc505 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -37,7 +37,7 @@ import org.apache.ivy.plugins.resolver.{ChainResolver, 
IBiblioResolver}
 
 import org.apache.spark.deploy.rest._
 import org.apache.spark.executor._
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, 
Utils}
 
 /**
  * Whether to submit, kill, or request the status of an application.
@@ -467,11 +467,11 @@ object SparkSubmit {
     }
 
     val loader =
-      if (sysProps.getOrElse("spark.files.userClassPathFirst", 
"false").toBoolean) {
-        new ChildExecutorURLClassLoader(new Array[URL](0),
+      if (sysProps.getOrElse("spark.driver.userClassPathFirst", 
"false").toBoolean) {
+        new ChildFirstURLClassLoader(new Array[URL](0),
           Thread.currentThread.getContextClassLoader)
       } else {
-        new ExecutorURLClassLoader(new Array[URL](0),
+        new MutableURLClassLoader(new Array[URL](0),
           Thread.currentThread.getContextClassLoader)
       }
     Thread.currentThread.setContextClassLoader(loader)

http://git-wip-us.apache.org/repos/asf/spark/blob/20a60131/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala 
b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala
index b47a081..fd514f0 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala
@@ -196,7 +196,7 @@ private[spark] class MasterPage(parent: MasterWebUI) 
extends WebUIPage("") {
       <td sorttable_customkey={driver.desc.mem.toString}>
         {Utils.megabytesToString(driver.desc.mem.toLong)}
       </td>
-      <td>{driver.desc.command.arguments(1)}</td>
+      <td>{driver.desc.command.arguments(2)}</td>
     </tr>
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/20a60131/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala 
b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala
index 2033d67..6e4486e 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala
@@ -392,7 +392,7 @@ private class SubmitRequestServlet(
     val javaOpts = sparkJavaOpts ++ extraJavaOpts
     val command = new Command(
       "org.apache.spark.deploy.worker.DriverWrapper",
-      Seq("{{WORKER_URL}}", mainClass) ++ appArgs, // args to the DriverWrapper
+      Seq("{{WORKER_URL}}", "{{USER_JAR}}", mainClass) ++ appArgs, // args to 
the DriverWrapper
       environmentVariables, extraClassPath, extraLibraryPath, javaOpts)
     val actualDriverMemory = 
driverMemory.map(Utils.memoryStringToMb).getOrElse(DEFAULT_MEMORY)
     val actualDriverCores = driverCores.map(_.toInt).getOrElse(DEFAULT_CORES)

http://git-wip-us.apache.org/repos/asf/spark/blob/20a60131/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala 
b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
index 28cab36..b964a09 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
@@ -74,10 +74,15 @@ private[spark] class DriverRunner(
           val driverDir = createWorkingDirectory()
           val localJarFilename = downloadUserJar(driverDir)
 
-          // Make sure user application jar is on the classpath
+          def substituteVariables(argument: String): String = argument match {
+            case "{{WORKER_URL}}" => workerUrl
+            case "{{USER_JAR}}" => localJarFilename
+            case other => other
+          }
+
           // TODO: If we add ability to submit multiple jars they should also 
be added here
           val builder = CommandUtils.buildProcessBuilder(driverDesc.command, 
driverDesc.mem,
-            sparkHome.getAbsolutePath, substituteVariables, 
Seq(localJarFilename))
+            sparkHome.getAbsolutePath, substituteVariables)
           launchDriver(builder, driverDir, driverDesc.supervise)
         }
         catch {
@@ -111,12 +116,6 @@ private[spark] class DriverRunner(
     }
   }
 
-  /** Replace variables in a command argument passed to us */
-  private def substituteVariables(argument: String): String = argument match {
-    case "{{WORKER_URL}}" => workerUrl
-    case other => other
-  }
-
   /**
    * Creates the working directory for this driver.
    * Will throw an exception if there are errors preparing the directory.

http://git-wip-us.apache.org/repos/asf/spark/blob/20a60131/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala 
b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala
index 05e242e..ab467a5 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala
@@ -17,10 +17,12 @@
 
 package org.apache.spark.deploy.worker
 
+import java.io.File
+
 import akka.actor._
 
 import org.apache.spark.{SecurityManager, SparkConf}
-import org.apache.spark.util.{AkkaUtils, Utils}
+import org.apache.spark.util.{AkkaUtils, ChildFirstURLClassLoader, 
MutableURLClassLoader, Utils}
 
 /**
  * Utility object for launching driver programs such that they share fate with 
the Worker process.
@@ -28,21 +30,31 @@ import org.apache.spark.util.{AkkaUtils, Utils}
 object DriverWrapper {
   def main(args: Array[String]) {
     args.toList match {
-      case workerUrl :: mainClass :: extraArgs =>
+      case workerUrl :: userJar :: mainClass :: extraArgs =>
         val conf = new SparkConf()
         val (actorSystem, _) = AkkaUtils.createActorSystem("Driver",
           Utils.localHostName(), 0, conf, new SecurityManager(conf))
         actorSystem.actorOf(Props(classOf[WorkerWatcher], workerUrl), name = 
"workerWatcher")
 
+        val currentLoader = Thread.currentThread.getContextClassLoader
+        val userJarUrl = new File(userJar).toURI().toURL()
+        val loader =
+          if (sys.props.getOrElse("spark.driver.userClassPathFirst", 
"false").toBoolean) {
+            new ChildFirstURLClassLoader(Array(userJarUrl), currentLoader)
+          } else {
+            new MutableURLClassLoader(Array(userJarUrl), currentLoader)
+          }
+        Thread.currentThread.setContextClassLoader(loader)
+
         // Delegate to supplied main class
-        val clazz = Class.forName(args(1))
+        val clazz = Class.forName(mainClass, true, loader)
         val mainMethod = clazz.getMethod("main", classOf[Array[String]])
         mainMethod.invoke(null, extraArgs.toArray[String])
 
         actorSystem.shutdown()
 
       case _ =>
-        System.err.println("Usage: DriverWrapper <workerUrl> <driverMainClass> 
[options]")
+        System.err.println("Usage: DriverWrapper <workerUrl> <userJar> 
<driverMainClass> [options]")
         System.exit(-1)
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/20a60131/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index 3a42f8b..dd19e49 100644
--- 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -17,8 +17,10 @@
 
 package org.apache.spark.executor
 
+import java.net.URL
 import java.nio.ByteBuffer
 
+import scala.collection.mutable
 import scala.concurrent.Await
 
 import akka.actor.{Actor, ActorSelection, Props}
@@ -38,6 +40,7 @@ private[spark] class CoarseGrainedExecutorBackend(
     executorId: String,
     hostPort: String,
     cores: Int,
+    userClassPath: Seq[URL],
     env: SparkEnv)
   extends Actor with ActorLogReceive with ExecutorBackend with Logging {
 
@@ -63,7 +66,7 @@ private[spark] class CoarseGrainedExecutorBackend(
     case RegisteredExecutor =>
       logInfo("Successfully registered with driver")
       val (hostname, _) = Utils.parseHostPort(hostPort)
-      executor = new Executor(executorId, hostname, env, isLocal = false)
+      executor = new Executor(executorId, hostname, env, userClassPath, 
isLocal = false)
 
     case RegisterExecutorFailed(message) =>
       logError("Slave registration failed: " + message)
@@ -117,7 +120,8 @@ private[spark] object CoarseGrainedExecutorBackend extends 
Logging {
       hostname: String,
       cores: Int,
       appId: String,
-      workerUrl: Option[String]) {
+      workerUrl: Option[String],
+      userClassPath: Seq[URL]) {
 
     SignalLogger.register(log)
 
@@ -162,7 +166,7 @@ private[spark] object CoarseGrainedExecutorBackend extends 
Logging {
       val sparkHostPort = hostname + ":" + boundPort
       env.actorSystem.actorOf(
         Props(classOf[CoarseGrainedExecutorBackend],
-          driverUrl, executorId, sparkHostPort, cores, env),
+          driverUrl, executorId, sparkHostPort, cores, userClassPath, env),
         name = "Executor")
       workerUrl.foreach { url =>
         env.actorSystem.actorOf(Props(classOf[WorkerWatcher], url), name = 
"WorkerWatcher")
@@ -172,20 +176,69 @@ private[spark] object CoarseGrainedExecutorBackend 
extends Logging {
   }
 
   def main(args: Array[String]) {
-    args.length match {
-      case x if x < 5 =>
-        System.err.println(
+    var driverUrl: String = null
+    var executorId: String = null
+    var hostname: String = null
+    var cores: Int = 0
+    var appId: String = null
+    var workerUrl: Option[String] = None
+    val userClassPath = new mutable.ListBuffer[URL]()
+
+    var argv = args.toList
+    while (!argv.isEmpty) {
+      argv match {
+        case ("--driver-url") :: value :: tail =>
+          driverUrl = value
+          argv = tail
+        case ("--executor-id") :: value :: tail =>
+          executorId = value
+          argv = tail
+        case ("--hostname") :: value :: tail =>
+          hostname = value
+          argv = tail
+        case ("--cores") :: value :: tail =>
+          cores = value.toInt
+          argv = tail
+        case ("--app-id") :: value :: tail =>
+          appId = value
+          argv = tail
+        case ("--worker-url") :: value :: tail =>
           // Worker url is used in spark standalone mode to enforce 
fate-sharing with worker
-          "Usage: CoarseGrainedExecutorBackend <driverUrl> <executorId> 
<hostname> " +
-          "<cores> <appid> [<workerUrl>] ")
-        System.exit(1)
+          workerUrl = Some(value)
+          argv = tail
+        case ("--user-class-path") :: value :: tail =>
+          userClassPath += new URL(value)
+          argv = tail
+        case Nil =>
+        case tail =>
+          System.err.println(s"Unrecognized options: ${tail.mkString(" ")}")
+          printUsageAndExit()
+      }
+    }
 
-      // NB: These arguments are provided by SparkDeploySchedulerBackend (for 
standalone mode)
-      // and CoarseMesosSchedulerBackend (for mesos mode).
-      case 5 =>
-        run(args(0), args(1), args(2), args(3).toInt, args(4), None)
-      case x if x > 5 =>
-        run(args(0), args(1), args(2), args(3).toInt, args(4), Some(args(5)))
+    if (driverUrl == null || executorId == null || hostname == null || cores 
<= 0 ||
+      appId == null) {
+      printUsageAndExit()
     }
+
+    run(driverUrl, executorId, hostname, cores, appId, workerUrl, 
userClassPath)
   }
+
+  private def printUsageAndExit() = {
+    System.err.println(
+      """
+      |"Usage: CoarseGrainedExecutorBackend [options]
+      |
+      | Options are:
+      |   --driver-url <driverUrl>
+      |   --executor-id <executorId>
+      |   --hostname <hostname>
+      |   --cores <cores>
+      |   --app-id <appid>
+      |   --worker-url <workerUrl>
+      |   --user-class-path <url>
+      |""".stripMargin)
+    System.exit(1)
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/20a60131/core/src/main/scala/org/apache/spark/executor/Executor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala 
b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 5141483..6b22dcd 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -19,6 +19,7 @@ package org.apache.spark.executor
 
 import java.io.File
 import java.lang.management.ManagementFactory
+import java.net.URL
 import java.nio.ByteBuffer
 import java.util.concurrent._
 
@@ -33,7 +34,8 @@ import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.scheduler._
 import org.apache.spark.shuffle.FetchFailedException
 import org.apache.spark.storage.{StorageLevel, TaskResultBlockId}
-import org.apache.spark.util.{SparkUncaughtExceptionHandler, AkkaUtils, Utils}
+import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader,
+  SparkUncaughtExceptionHandler, AkkaUtils, Utils}
 
 /**
  * Spark executor used with Mesos, YARN, and the standalone scheduler.
@@ -43,6 +45,7 @@ private[spark] class Executor(
     executorId: String,
     executorHostname: String,
     env: SparkEnv,
+    userClassPath: Seq[URL] = Nil,
     isLocal: Boolean = false)
   extends Logging
 {
@@ -288,17 +291,23 @@ private[spark] class Executor(
    * created by the interpreter to the search path
    */
   private def createClassLoader(): MutableURLClassLoader = {
+    // Bootstrap the list of jars with the user class path.
+    val now = System.currentTimeMillis()
+    userClassPath.foreach { url =>
+      currentJars(url.getPath().split("/").last) = now
+    }
+
     val currentLoader = Utils.getContextOrSparkClassLoader
 
     // For each of the jars in the jarSet, add them to the class loader.
     // We assume each of the files has already been fetched.
-    val urls = currentJars.keySet.map { uri =>
+    val urls = userClassPath.toArray ++ currentJars.keySet.map { uri =>
       new File(uri.split("/").last).toURI.toURL
-    }.toArray
-    val userClassPathFirst = conf.getBoolean("spark.files.userClassPathFirst", 
false)
-    userClassPathFirst match {
-      case true => new ChildExecutorURLClassLoader(urls, currentLoader)
-      case false => new ExecutorURLClassLoader(urls, currentLoader)
+    }
+    if (conf.getBoolean("spark.executor.userClassPathFirst", false)) {
+      new ChildFirstURLClassLoader(urls, currentLoader)
+    } else {
+      new MutableURLClassLoader(urls, currentLoader)
     }
   }
 
@@ -311,7 +320,7 @@ private[spark] class Executor(
     if (classUri != null) {
       logInfo("Using REPL class URI: " + classUri)
       val userClassPathFirst: java.lang.Boolean =
-        conf.getBoolean("spark.files.userClassPathFirst", false)
+        conf.getBoolean("spark.executor.userClassPathFirst", false)
       try {
         val klass = Class.forName("org.apache.spark.repl.ExecutorClassLoader")
           .asInstanceOf[Class[_ <: ClassLoader]]
@@ -344,18 +353,23 @@ private[spark] class Executor(
           env.securityManager, hadoopConf, timestamp, useCache = !isLocal)
         currentFiles(name) = timestamp
       }
-      for ((name, timestamp) <- newJars if currentJars.getOrElse(name, -1L) < 
timestamp) {
-        logInfo("Fetching " + name + " with timestamp " + timestamp)
-        // Fetch file with useCache mode, close cache for local mode.
-        Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf,
-          env.securityManager, hadoopConf, timestamp, useCache = !isLocal)
-        currentJars(name) = timestamp
-        // Add it to our class loader
+      for ((name, timestamp) <- newJars) {
         val localName = name.split("/").last
-        val url = new File(SparkFiles.getRootDirectory, localName).toURI.toURL
-        if (!urlClassLoader.getURLs.contains(url)) {
-          logInfo("Adding " + url + " to class loader")
-          urlClassLoader.addURL(url)
+        val currentTimeStamp = currentJars.get(name)
+          .orElse(currentJars.get(localName))
+          .getOrElse(-1L)
+        if (currentTimeStamp < timestamp) {
+          logInfo("Fetching " + name + " with timestamp " + timestamp)
+          // Fetch file with useCache mode, close cache for local mode.
+          Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf,
+            env.securityManager, hadoopConf, timestamp, useCache = !isLocal)
+          currentJars(name) = timestamp
+          // Add it to our class loader
+          val url = new File(SparkFiles.getRootDirectory, 
localName).toURI.toURL
+          if (!urlClassLoader.getURLs.contains(url)) {
+            logInfo("Adding " + url + " to class loader")
+            urlClassLoader.addURL(url)
+          }
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/20a60131/core/src/main/scala/org/apache/spark/executor/ExecutorURLClassLoader.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/executor/ExecutorURLClassLoader.scala 
b/core/src/main/scala/org/apache/spark/executor/ExecutorURLClassLoader.scala
deleted file mode 100644
index 8011e75..0000000
--- a/core/src/main/scala/org/apache/spark/executor/ExecutorURLClassLoader.scala
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.executor
-
-import java.net.{URLClassLoader, URL}
-
-import org.apache.spark.util.ParentClassLoader
-
-/**
- * The addURL method in URLClassLoader is protected. We subclass it to make 
this accessible.
- * We also make changes so user classes can come before the default classes.
- */
-
-private[spark] trait MutableURLClassLoader extends ClassLoader {
-  def addURL(url: URL)
-  def getURLs: Array[URL]
-}
-
-private[spark] class ChildExecutorURLClassLoader(urls: Array[URL], parent: 
ClassLoader)
-  extends MutableURLClassLoader {
-
-  private object userClassLoader extends URLClassLoader(urls, null){
-    override def addURL(url: URL) {
-      super.addURL(url)
-    }
-    override def findClass(name: String): Class[_] = {
-      val loaded = super.findLoadedClass(name)
-      if (loaded != null) {
-        return loaded
-      }
-      try {
-        super.findClass(name)
-      } catch {
-        case e: ClassNotFoundException => {
-          parentClassLoader.loadClass(name)
-        }
-      }
-    }
-  }
-
-  private val parentClassLoader = new ParentClassLoader(parent)
-
-  override def findClass(name: String): Class[_] = {
-    try {
-      userClassLoader.findClass(name)
-    } catch {
-      case e: ClassNotFoundException => {
-        parentClassLoader.loadClass(name)
-      }
-    }
-  }
-
-  def addURL(url: URL) {
-    userClassLoader.addURL(url)
-  }
-
-  def getURLs() = {
-    userClassLoader.getURLs()
-  }
-}
-
-private[spark] class ExecutorURLClassLoader(urls: Array[URL], parent: 
ClassLoader)
-  extends URLClassLoader(urls, parent) with MutableURLClassLoader {
-
-  override def addURL(url: URL) {
-    super.addURL(url)
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/spark/blob/20a60131/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index d2e1680..40fc6b5 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -52,8 +52,13 @@ private[spark] class SparkDeploySchedulerBackend(
       conf.get("spark.driver.host"),
       conf.get("spark.driver.port"),
       CoarseGrainedSchedulerBackend.ACTOR_NAME)
-    val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}", 
"{{APP_ID}}",
-      "{{WORKER_URL}}")
+    val args = Seq(
+      "--driver-url", driverUrl,
+      "--executor-id", "{{EXECUTOR_ID}}",
+      "--hostname", "{{HOSTNAME}}",
+      "--cores", "{{CORES}}",
+      "--app-id", "{{APP_ID}}",
+      "--worker-url", "{{WORKER_URL}}")
     val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions")
       .map(Utils.splitCommandString).getOrElse(Seq.empty)
     val classPathEntries = sc.conf.getOption("spark.executor.extraClassPath")

http://git-wip-us.apache.org/repos/asf/spark/blob/20a60131/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
index 0d1c2a9..90dfe14 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@ -154,18 +154,25 @@ private[spark] class CoarseMesosSchedulerBackend(
     if (uri == null) {
       val runScript = new File(executorSparkHome, 
"./bin/spark-class").getCanonicalPath
       command.setValue(
-        "%s \"%s\" org.apache.spark.executor.CoarseGrainedExecutorBackend %s 
%s %s %d %s".format(
-          prefixEnv, runScript, driverUrl, offer.getSlaveId.getValue,
-          offer.getHostname, numCores, appId))
+        "%s \"%s\" org.apache.spark.executor.CoarseGrainedExecutorBackend"
+          .format(prefixEnv, runScript) +
+        s" --driver-url $driverUrl" +
+        s" --executor-id ${offer.getSlaveId.getValue}" +
+        s" --hostname ${offer.getHostname}" +
+        s" --cores $numCores" +
+        s" --app-id $appId")
     } else {
       // Grab everything to the first '.'. We'll use that and '*' to
       // glob the directory "correctly".
       val basename = uri.split('/').last.split('.').head
       command.setValue(
-        ("cd %s*; %s " +
-          "./bin/spark-class 
org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %d %s")
-          .format(basename, prefixEnv, driverUrl, offer.getSlaveId.getValue,
-            offer.getHostname, numCores, appId))
+        s"cd $basename*; $prefixEnv " +
+         "./bin/spark-class 
org.apache.spark.executor.CoarseGrainedExecutorBackend" +
+        s" --driver-url $driverUrl" +
+        s" --executor-id ${offer.getSlaveId.getValue}" +
+        s" --hostname ${offer.getHostname}" +
+        s" --cores $numCores" +
+        s" --app-id $appId")
       command.addUris(CommandInfo.URI.newBuilder().setValue(uri))
     }
     command.build()

http://git-wip-us.apache.org/repos/asf/spark/blob/20a60131/core/src/main/scala/org/apache/spark/util/MutableURLClassLoader.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/util/MutableURLClassLoader.scala 
b/core/src/main/scala/org/apache/spark/util/MutableURLClassLoader.scala
new file mode 100644
index 0000000..d9c7103
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/MutableURLClassLoader.scala
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import java.net.{URLClassLoader, URL}
+import java.util.Enumeration
+import java.util.concurrent.ConcurrentHashMap
+
+import scala.collection.JavaConversions._
+
+import org.apache.spark.util.ParentClassLoader
+
+/**
+ * URL class loader that exposes the `addURL` and `getURLs` methods in 
URLClassLoader.
+ */
+private[spark] class MutableURLClassLoader(urls: Array[URL], parent: 
ClassLoader)
+  extends URLClassLoader(urls, parent) {
+
+  override def addURL(url: URL): Unit = {
+    super.addURL(url)
+  }
+
+  override def getURLs(): Array[URL] = {
+    super.getURLs()
+  }
+
+}
+
+/**
+ * A mutable class loader that gives preference to its own URLs over the 
parent class loader
+ * when loading classes and resources.
+ */
+private[spark] class ChildFirstURLClassLoader(urls: Array[URL], parent: 
ClassLoader)
+  extends MutableURLClassLoader(urls, null) {
+
+  private val parentClassLoader = new ParentClassLoader(parent)
+
+  /**
+   * Used to implement fine-grained class loading locks similar to what is 
done by Java 7. This
+   * prevents deadlock issues when using non-hierarchical class loaders.
+   *
+   * Note that due to Java 6 compatibility (and some issues with implementing 
class loaders in
+   * Scala), Java 7's `ClassLoader.registerAsParallelCapable` method is not 
called.
+   */
+  private val locks = new ConcurrentHashMap[String, Object]()
+
+  override def loadClass(name: String, resolve: Boolean): Class[_] = {
+    var lock = locks.get(name)
+    if (lock == null) {
+      val newLock = new Object()
+      lock = locks.putIfAbsent(name, newLock)
+      if (lock == null) {
+        lock = newLock
+      }
+    }
+
+    lock.synchronized {
+      try {
+        super.loadClass(name, resolve)
+      } catch {
+        case e: ClassNotFoundException =>
+          parentClassLoader.loadClass(name, resolve)
+      }
+    }
+  }
+
+  override def getResource(name: String): URL = {
+    val url = super.findResource(name)
+    val res = if (url != null) url else parentClassLoader.getResource(name)
+    res
+  }
+
+  override def getResources(name: String): Enumeration[URL] = {
+    val urls = super.findResources(name)
+    val res =
+      if (urls != null && urls.hasMoreElements()) {
+        urls
+      } else {
+        parentClassLoader.getResources(name)
+      }
+    res
+  }
+
+  override def addURL(url: URL) {
+    super.addURL(url)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/20a60131/core/src/main/scala/org/apache/spark/util/ParentClassLoader.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/ParentClassLoader.scala 
b/core/src/main/scala/org/apache/spark/util/ParentClassLoader.scala
index 3abc126..6d8d9e8 100644
--- a/core/src/main/scala/org/apache/spark/util/ParentClassLoader.scala
+++ b/core/src/main/scala/org/apache/spark/util/ParentClassLoader.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.util
 
 /**
- * A class loader which makes findClass accesible to the child
+ * A class loader which makes some protected methods in ClassLoader accesible.
  */
 private[spark] class ParentClassLoader(parent: ClassLoader) extends 
ClassLoader(parent) {
 
@@ -29,4 +29,9 @@ private[spark] class ParentClassLoader(parent: ClassLoader) 
extends ClassLoader(
   override def loadClass(name: String): Class[_] = {
     super.loadClass(name)
   }
+
+  override def loadClass(name: String, resolve: Boolean): Class[_] = {
+    super.loadClass(name, resolve)
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/20a60131/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala 
b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
index e08210a..ea6b73b 100644
--- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
@@ -197,6 +197,18 @@ class SparkConfSuite extends FunSuite with 
LocalSparkContext with ResetSystemPro
     serializer.newInstance().serialize(new StringBuffer())
   }
 
+  test("deprecated config keys") {
+    val conf = new SparkConf()
+      .set("spark.files.userClassPathFirst", "true")
+      .set("spark.yarn.user.classpath.first", "true")
+    assert(conf.contains("spark.files.userClassPathFirst"))
+    assert(conf.contains("spark.executor.userClassPathFirst"))
+    assert(conf.contains("spark.yarn.user.classpath.first"))
+    assert(conf.getBoolean("spark.files.userClassPathFirst", false))
+    assert(conf.getBoolean("spark.executor.userClassPathFirst", false))
+    assert(conf.getBoolean("spark.yarn.user.classpath.first", false))
+  }
+
 }
 
 class Class1 {}

http://git-wip-us.apache.org/repos/asf/spark/blob/20a60131/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala 
b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index 1ddccae..46d745c 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -21,6 +21,8 @@ import java.io._
 
 import scala.collection.mutable.ArrayBuffer
 
+import com.google.common.base.Charsets.UTF_8
+import com.google.common.io.ByteStreams
 import org.scalatest.FunSuite
 import org.scalatest.Matchers
 import org.scalatest.concurrent.Timeouts
@@ -450,6 +452,19 @@ class SparkSubmitSuite extends FunSuite with Matchers with 
ResetSystemProperties
       PythonRunner.formatPaths(Utils.resolveURIs(pyFiles)).mkString(","))
   }
 
+  test("user classpath first in driver") {
+    val systemJar = TestUtils.createJarWithFiles(Map("test.resource" -> 
"SYSTEM"))
+    val userJar = TestUtils.createJarWithFiles(Map("test.resource" -> "USER"))
+    val args = Seq(
+      "--class", UserClasspathFirstTest.getClass.getName.stripSuffix("$"),
+      "--name", "testApp",
+      "--master", "local",
+      "--conf", "spark.driver.extraClassPath=" + systemJar,
+      "--conf", "spark.driver.userClassPathFirst=true",
+      userJar.toString)
+    runSparkSubmit(args)
+  }
+
   test("SPARK_CONF_DIR overrides spark-defaults.conf") {
     forConfDir(Map("spark.executor.memory" -> "2.3g")) { path =>
       val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
@@ -541,3 +556,15 @@ object SimpleApplicationTest {
     }
   }
 }
+
+object UserClasspathFirstTest {
+  def main(args: Array[String]) {
+    val ccl = Thread.currentThread().getContextClassLoader()
+    val resource = ccl.getResourceAsStream("test.resource")
+    val bytes = ByteStreams.toByteArray(resource)
+    val contents = new String(bytes, 0, bytes.length, UTF_8)
+    if (contents != "USER") {
+      throw new SparkException("Should have read user resource, but instead 
read: " + contents)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/20a60131/core/src/test/scala/org/apache/spark/executor/ExecutorURLClassLoaderSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/executor/ExecutorURLClassLoaderSuite.scala
 
b/core/src/test/scala/org/apache/spark/executor/ExecutorURLClassLoaderSuite.scala
deleted file mode 100644
index b7912c0..0000000
--- 
a/core/src/test/scala/org/apache/spark/executor/ExecutorURLClassLoaderSuite.scala
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.executor
-
-import java.net.URLClassLoader
-
-import org.scalatest.FunSuite
-
-import org.apache.spark.{LocalSparkContext, SparkContext, SparkException, 
TestUtils}
-import org.apache.spark.util.Utils
-
-class ExecutorURLClassLoaderSuite extends FunSuite {
-
-  val urls2 = List(TestUtils.createJarWithClasses(
-      classNames = Seq("FakeClass1", "FakeClass2", "FakeClass3"),
-      toStringValue = "2")).toArray
-  val urls = List(TestUtils.createJarWithClasses(
-      classNames = Seq("FakeClass1"),
-      classNamesWithBase = Seq(("FakeClass2", "FakeClass3")), // FakeClass3 is 
in parent
-      toStringValue = "1",
-      classpathUrls = urls2)).toArray
-
-  test("child first") {
-    val parentLoader = new URLClassLoader(urls2, null)
-    val classLoader = new ChildExecutorURLClassLoader(urls, parentLoader)
-    val fakeClass = classLoader.loadClass("FakeClass2").newInstance()
-    val fakeClassVersion = fakeClass.toString
-    assert(fakeClassVersion === "1")
-    val fakeClass2 = classLoader.loadClass("FakeClass2").newInstance()
-    assert(fakeClass.getClass === fakeClass2.getClass)
-  }
-
-  test("parent first") {
-    val parentLoader = new URLClassLoader(urls2, null)
-    val classLoader = new ExecutorURLClassLoader(urls, parentLoader)
-    val fakeClass = classLoader.loadClass("FakeClass1").newInstance()
-    val fakeClassVersion = fakeClass.toString
-    assert(fakeClassVersion === "2")
-    val fakeClass2 = classLoader.loadClass("FakeClass1").newInstance()
-    assert(fakeClass.getClass === fakeClass2.getClass)
-  }
-
-  test("child first can fall back") {
-    val parentLoader = new URLClassLoader(urls2, null)
-    val classLoader = new ChildExecutorURLClassLoader(urls, parentLoader)
-    val fakeClass = classLoader.loadClass("FakeClass3").newInstance()
-    val fakeClassVersion = fakeClass.toString
-    assert(fakeClassVersion === "2")
-  }
-
-  test("child first can fail") {
-    val parentLoader = new URLClassLoader(urls2, null)
-    val classLoader = new ChildExecutorURLClassLoader(urls, parentLoader)
-    intercept[java.lang.ClassNotFoundException] {
-      classLoader.loadClass("FakeClassDoesNotExist").newInstance()
-    }
-  }
-
-  test("driver sets context class loader in local mode") {
-    // Test the case where the driver program sets a context classloader and 
then runs a job
-    // in local mode. This is what happens when ./spark-submit is called with 
"local" as the
-    // master.
-    val original = Thread.currentThread().getContextClassLoader
-
-    val className = "ClassForDriverTest"
-    val jar = TestUtils.createJarWithClasses(Seq(className))
-    val contextLoader = new URLClassLoader(Array(jar), 
Utils.getContextOrSparkClassLoader)
-    Thread.currentThread().setContextClassLoader(contextLoader)
-
-    val sc = new SparkContext("local", "driverLoaderTest")
-
-    try {
-      sc.makeRDD(1 to 5, 2).mapPartitions { x =>
-        val loader = Thread.currentThread().getContextClassLoader
-        Class.forName(className, true, loader).newInstance()
-        Seq().iterator
-      }.count()
-    }
-    catch {
-      case e: SparkException if 
e.getMessage.contains("ClassNotFoundException") =>
-        fail("Local executor could not find class", e)
-      case t: Throwable => fail("Unexpected exception ", t)
-    }
-
-    sc.stop()
-    Thread.currentThread().setContextClassLoader(original)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/20a60131/core/src/test/scala/org/apache/spark/util/MutableURLClassLoaderSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/util/MutableURLClassLoaderSuite.scala 
b/core/src/test/scala/org/apache/spark/util/MutableURLClassLoaderSuite.scala
new file mode 100644
index 0000000..31e3b7e
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/util/MutableURLClassLoaderSuite.scala
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import java.net.URLClassLoader
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.{LocalSparkContext, SparkContext, SparkException, 
TestUtils}
+import org.apache.spark.util.Utils
+
+class MutableURLClassLoaderSuite extends FunSuite {
+
+  val urls2 = List(TestUtils.createJarWithClasses(
+      classNames = Seq("FakeClass1", "FakeClass2", "FakeClass3"),
+      toStringValue = "2")).toArray
+  val urls = List(TestUtils.createJarWithClasses(
+      classNames = Seq("FakeClass1"),
+      classNamesWithBase = Seq(("FakeClass2", "FakeClass3")), // FakeClass3 is 
in parent
+      toStringValue = "1",
+      classpathUrls = urls2)).toArray
+
+  test("child first") {
+    val parentLoader = new URLClassLoader(urls2, null)
+    val classLoader = new ChildFirstURLClassLoader(urls, parentLoader)
+    val fakeClass = classLoader.loadClass("FakeClass2").newInstance()
+    val fakeClassVersion = fakeClass.toString
+    assert(fakeClassVersion === "1")
+    val fakeClass2 = classLoader.loadClass("FakeClass2").newInstance()
+    assert(fakeClass.getClass === fakeClass2.getClass)
+  }
+
+  test("parent first") {
+    val parentLoader = new URLClassLoader(urls2, null)
+    val classLoader = new MutableURLClassLoader(urls, parentLoader)
+    val fakeClass = classLoader.loadClass("FakeClass1").newInstance()
+    val fakeClassVersion = fakeClass.toString
+    assert(fakeClassVersion === "2")
+    val fakeClass2 = classLoader.loadClass("FakeClass1").newInstance()
+    assert(fakeClass.getClass === fakeClass2.getClass)
+  }
+
+  test("child first can fall back") {
+    val parentLoader = new URLClassLoader(urls2, null)
+    val classLoader = new ChildFirstURLClassLoader(urls, parentLoader)
+    val fakeClass = classLoader.loadClass("FakeClass3").newInstance()
+    val fakeClassVersion = fakeClass.toString
+    assert(fakeClassVersion === "2")
+  }
+
+  test("child first can fail") {
+    val parentLoader = new URLClassLoader(urls2, null)
+    val classLoader = new ChildFirstURLClassLoader(urls, parentLoader)
+    intercept[java.lang.ClassNotFoundException] {
+      classLoader.loadClass("FakeClassDoesNotExist").newInstance()
+    }
+  }
+
+  test("driver sets context class loader in local mode") {
+    // Test the case where the driver program sets a context classloader and 
then runs a job
+    // in local mode. This is what happens when ./spark-submit is called with 
"local" as the
+    // master.
+    val original = Thread.currentThread().getContextClassLoader
+
+    val className = "ClassForDriverTest"
+    val jar = TestUtils.createJarWithClasses(Seq(className))
+    val contextLoader = new URLClassLoader(Array(jar), 
Utils.getContextOrSparkClassLoader)
+    Thread.currentThread().setContextClassLoader(contextLoader)
+
+    val sc = new SparkContext("local", "driverLoaderTest")
+
+    try {
+      sc.makeRDD(1 to 5, 2).mapPartitions { x =>
+        val loader = Thread.currentThread().getContextClassLoader
+        Class.forName(className, true, loader).newInstance()
+        Seq().iterator
+      }.count()
+    }
+    catch {
+      case e: SparkException if 
e.getMessage.contains("ClassNotFoundException") =>
+        fail("Local executor could not find class", e)
+      case t: Throwable => fail("Unexpected exception ", t)
+    }
+
+    sc.stop()
+    Thread.currentThread().setContextClassLoader(original)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/20a60131/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index 00e973c..eb0d6d3 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -231,6 +231,15 @@ Apart from these, the following properties are also 
available, and may be useful
   </td>
 </tr>
 <tr>
+  <td><code>spark.driver.userClassPathFirst</code></td>
+  <td>false</td>
+  <td>
+    (Experimental) Whether to give user-added jars precedence over Spark's own 
jars when loading
+    classes in the the driver. This feature can be used to mitigate conflicts 
between Spark's
+    dependencies and user dependencies. It is currently an experimental 
feature.
+  </td>
+</tr>
+<tr>
   <td><code>spark.executor.extraJavaOptions</code></td>
   <td>(none)</td>
   <td>
@@ -297,13 +306,11 @@ Apart from these, the following properties are also 
available, and may be useful
   </td>
 </tr>
 <tr>
-  <td><code>spark.files.userClassPathFirst</code></td>
+  <td><code>spark.executor.userClassPathFirst</code></td>
   <td>false</td>
   <td>
-    (Experimental) Whether to give user-added jars precedence over Spark's own 
jars when
-    loading classes in Executors. This feature can be used to mitigate 
conflicts between
-    Spark's dependencies and user dependencies. It is currently an 
experimental feature.
-    (Currently, this setting does not work for YARN, see <a 
href="https://issues.apache.org/jira/browse/SPARK-2996";>SPARK-2996</a> for more 
details).
+    (Experimental) Same functionality as 
<code>spark.driver.userClassPathFirst</code>, but
+    applied to executor instances.
   </td>
 </tr>
 <tr>
@@ -865,8 +872,8 @@ Apart from these, the following properties are also 
available, and may be useful
   <td><code>spark.network.timeout</code></td>
   <td>120</td>
   <td>
-    Default timeout for all network interactions, in seconds. This config will 
be used in 
-    place of <code>spark.core.connection.ack.wait.timeout</code>, 
<code>spark.akka.timeout</code>, 
+    Default timeout for all network interactions, in seconds. This config will 
be used in
+    place of <code>spark.core.connection.ack.wait.timeout</code>, 
<code>spark.akka.timeout</code>,
     <code>spark.storage.blockManagerSlaveTimeoutMs</code> or
     <code>spark.shuffle.io.connectionTimeout</code>, if they are not 
configured.
   </td>
@@ -911,8 +918,8 @@ Apart from these, the following properties are also 
available, and may be useful
   <td><code>spark.shuffle.io.preferDirectBufs</code></td>
   <td>true</td>
   <td>
-    (Netty only) Off-heap buffers are used to reduce garbage collection during 
shuffle and cache 
-    block transfer. For environments where off-heap memory is tightly limited, 
users may wish to 
+    (Netty only) Off-heap buffers are used to reduce garbage collection during 
shuffle and cache
+    block transfer. For environments where off-heap memory is tightly limited, 
users may wish to
     turn this off to force all allocations from Netty to be on-heap.
   </td>
 </tr>
@@ -920,7 +927,7 @@ Apart from these, the following properties are also 
available, and may be useful
   <td><code>spark.shuffle.io.numConnectionsPerPeer</code></td>
   <td>1</td>
   <td>
-    (Netty only) Connections between hosts are reused in order to reduce 
connection buildup for 
+    (Netty only) Connections between hosts are reused in order to reduce 
connection buildup for
     large clusters. For clusters with many hard disks and few hosts, this may 
result in insufficient
     concurrency to saturate all disks, and so users may consider increasing 
this value.
   </td>
@@ -930,7 +937,7 @@ Apart from these, the following properties are also 
available, and may be useful
   <td>3</td>
   <td>
     (Netty only) Fetches that fail due to IO-related exceptions are 
automatically retried if this is
-    set to a non-zero value. This retry logic helps stabilize large shuffles 
in the face of long GC 
+    set to a non-zero value. This retry logic helps stabilize large shuffles 
in the face of long GC
     pauses or transient network connectivity issues.
   </td>
 </tr>
@@ -939,7 +946,7 @@ Apart from these, the following properties are also 
available, and may be useful
   <td>5</td>
   <td>
     (Netty only) Seconds to wait between retries of fetches. The maximum delay 
caused by retrying
-    is simply <code>maxRetries * retryWait</code>, by default 15 seconds. 
+    is simply <code>maxRetries * retryWait</code>, by default 15 seconds.
   </td>
 </tr>
 </table>

http://git-wip-us.apache.org/repos/asf/spark/blob/20a60131/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index f6f176d..a9e968a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -342,7 +342,7 @@
           </exclusion>
         </exclusions>
       </dependency>
- 
+
       <!-- Shaded deps marked as provided. These are promoted to compile scope
            in the modules where we want the shaded classes to appear in the
            associated jar. -->
@@ -395,7 +395,7 @@
         <scope>provided</scope>
       </dependency>
       <!-- End of shaded deps -->
- 
+
       <dependency>
         <groupId>org.apache.commons</groupId>
         <artifactId>commons-lang3</artifactId>
@@ -1178,13 +1178,19 @@
             </includes>
             
<reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
             <argLine>-Xmx3g -XX:MaxPermSize=${MaxPermGen} 
-XX:ReservedCodeCacheSize=512m</argLine>
+            <environmentVariables>
+              <!--
+                Setting SPARK_DIST_CLASSPATH is a simple way to make sure any 
child processes
+                launched by the tests have access to the correct test-time 
classpath.
+              -->
+              <SPARK_DIST_CLASSPATH>${test_classpath}</SPARK_DIST_CLASSPATH>
+            </environmentVariables>
             <systemProperties>
               <java.awt.headless>true</java.awt.headless>
               
<spark.test.home>${session.executionRootDirectory}</spark.test.home>
               <spark.testing>1</spark.testing>
               <spark.ui.enabled>false</spark.ui.enabled>
               
<spark.ui.showConsoleProgress>false</spark.ui.showConsoleProgress>
-              
<spark.executor.extraClassPath>${test_classpath}</spark.executor.extraClassPath>
               
<spark.driver.allowMultipleContexts>true</spark.driver.allowMultipleContexts>
             </systemProperties>
             <failIfNoTests>false</failIfNoTests>

http://git-wip-us.apache.org/repos/asf/spark/blob/20a60131/project/SparkBuild.scala
----------------------------------------------------------------------
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 95f8dfa..8fb1239 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -411,6 +411,10 @@ object TestSettings {
   lazy val settings = Seq (
     // Fork new JVMs for tests and set Java options for those
     fork := true,
+    // Setting SPARK_DIST_CLASSPATH is a simple way to make sure any child 
processes
+    // launched by the tests have access to the correct test-time classpath.
+    envVars in Test += ("SPARK_DIST_CLASSPATH" ->
+      (fullClasspath in 
Test).value.files.map(_.getAbsolutePath).mkString(":").stripSuffix(":")),
     javaOptions in Test += "-Dspark.test.home=" + sparkHome,
     javaOptions in Test += "-Dspark.testing=1",
     javaOptions in Test += "-Dspark.port.maxRetries=100",
@@ -423,10 +427,6 @@ object TestSettings {
     javaOptions in Test += "-ea",
     javaOptions in Test ++= "-Xmx3g -XX:PermSize=128M -XX:MaxNewSize=256m 
-XX:MaxPermSize=1g"
       .split(" ").toSeq,
-    // This places test scope jars on the classpath of executors during tests.
-    javaOptions in Test +=
-      "-Dspark.executor.extraClassPath=" + (fullClasspath in Test).value.files.
-      map(_.getAbsolutePath).mkString(":").stripSuffix(":"),
     javaOptions += "-Xmx3g",
     // Show full stack trace and duration in test cases.
     testOptions in Test += Tests.Argument("-oDF"),

http://git-wip-us.apache.org/repos/asf/spark/blob/20a60131/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 4cc320c..a9bf861 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -19,9 +19,9 @@ package org.apache.spark.deploy.yarn
 
 import scala.util.control.NonFatal
 
-import java.io.IOException
+import java.io.{File, IOException}
 import java.lang.reflect.InvocationTargetException
-import java.net.Socket
+import java.net.{Socket, URL}
 import java.util.concurrent.atomic.AtomicReference
 
 import akka.actor._
@@ -38,7 +38,8 @@ import org.apache.spark.deploy.{PythonRunner, SparkHadoopUtil}
 import org.apache.spark.deploy.history.HistoryServer
 import org.apache.spark.scheduler.cluster.YarnSchedulerBackend
 import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
-import org.apache.spark.util.{AkkaUtils, SignalLogger, Utils}
+import org.apache.spark.util.{AkkaUtils, ChildFirstURLClassLoader, 
MutableURLClassLoader,
+  SignalLogger, Utils}
 
 /**
  * Common application master functionality for Spark on Yarn.
@@ -244,7 +245,6 @@ private[spark] class ApplicationMaster(
       host: String,
       port: String,
       isClusterMode: Boolean): Unit = {
-    
     val driverUrl = AkkaUtils.address(
       AkkaUtils.protocol(actorSystem),
       SparkEnv.driverActorSystemName,
@@ -453,12 +453,24 @@ private[spark] class ApplicationMaster(
   private def startUserApplication(): Thread = {
     logInfo("Starting the user application in a separate Thread")
     System.setProperty("spark.executor.instances", args.numExecutors.toString)
+
+    val classpath = Client.getUserClasspath(sparkConf)
+    val urls = classpath.map { entry =>
+      new URL("file:" + new File(entry.getPath()).getAbsolutePath())
+    }
+    val userClassLoader =
+      if (Client.isUserClassPathFirst(sparkConf, isDriver = true)) {
+        new ChildFirstURLClassLoader(urls, Utils.getContextOrSparkClassLoader)
+      } else {
+        new MutableURLClassLoader(urls, Utils.getContextOrSparkClassLoader)
+      }
+
     if (args.primaryPyFile != null && args.primaryPyFile.endsWith(".py")) {
       System.setProperty("spark.submit.pyFiles",
         PythonRunner.formatPaths(args.pyFiles).mkString(","))
     }
-    val mainMethod = Class.forName(args.userClass, false,
-      Thread.currentThread.getContextClassLoader).getMethod("main", 
classOf[Array[String]])
+    val mainMethod = userClassLoader.loadClass(args.userClass)
+      .getMethod("main", classOf[Array[String]])
 
     val userThread = new Thread {
       override def run() {
@@ -483,6 +495,7 @@ private[spark] class ApplicationMaster(
         }
       }
     }
+    userThread.setContextClassLoader(userClassLoader)
     userThread.setName("Driver")
     userThread.start()
     userThread

http://git-wip-us.apache.org/repos/asf/spark/blob/20a60131/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 8afc1cc..46d9df9 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -183,8 +183,7 @@ private[spark] class Client(
   private[yarn] def copyFileToRemote(
       destDir: Path,
       srcPath: Path,
-      replication: Short,
-      setPerms: Boolean = false): Path = {
+      replication: Short): Path = {
     val destFs = destDir.getFileSystem(hadoopConf)
     val srcFs = srcPath.getFileSystem(hadoopConf)
     var destPath = srcPath
@@ -193,9 +192,7 @@ private[spark] class Client(
       logInfo(s"Uploading resource $srcPath -> $destPath")
       FileUtil.copy(srcFs, srcPath, destFs, destPath, false, hadoopConf)
       destFs.setReplication(destPath, replication)
-      if (setPerms) {
-        destFs.setPermission(destPath, new FsPermission(APP_FILE_PERMISSION))
-      }
+      destFs.setPermission(destPath, new FsPermission(APP_FILE_PERMISSION))
     } else {
       logInfo(s"Source and destination file systems are the same. Not copying 
$srcPath")
     }
@@ -239,23 +236,22 @@ private[spark] class Client(
     /**
      * Copy the given main resource to the distributed cache if the scheme is 
not "local".
      * Otherwise, set the corresponding key in our SparkConf to handle it 
downstream.
-     * Each resource is represented by a 4-tuple of:
+     * Each resource is represented by a 3-tuple of:
      *   (1) destination resource name,
      *   (2) local path to the resource,
-     *   (3) Spark property key to set if the scheme is not local, and
-     *   (4) whether to set permissions for this resource
+     *   (3) Spark property key to set if the scheme is not local
      */
     List(
-      (SPARK_JAR, sparkJar(sparkConf), CONF_SPARK_JAR, false),
-      (APP_JAR, args.userJar, CONF_SPARK_USER_JAR, true),
-      ("log4j.properties", oldLog4jConf.orNull, null, false)
-    ).foreach { case (destName, _localPath, confKey, setPermissions) =>
+      (SPARK_JAR, sparkJar(sparkConf), CONF_SPARK_JAR),
+      (APP_JAR, args.userJar, CONF_SPARK_USER_JAR),
+      ("log4j.properties", oldLog4jConf.orNull, null)
+    ).foreach { case (destName, _localPath, confKey) =>
       val localPath: String = if (_localPath != null) _localPath.trim() else ""
       if (!localPath.isEmpty()) {
         val localURI = new URI(localPath)
         if (localURI.getScheme != LOCAL_SCHEME) {
           val src = getQualifiedLocalPath(localURI, hadoopConf)
-          val destPath = copyFileToRemote(dst, src, replication, 
setPermissions)
+          val destPath = copyFileToRemote(dst, src, replication)
           val destFs = FileSystem.get(destPath.toUri(), hadoopConf)
           distCacheMgr.addResource(destFs, hadoopConf, destPath,
             localResources, LocalResourceType.FILE, destName, statCache)
@@ -707,7 +703,7 @@ object Client extends Logging {
    * Return the path to the given application's staging directory.
    */
   private def getAppStagingDir(appId: ApplicationId): String = {
-    SPARK_STAGING + Path.SEPARATOR + appId.toString() + Path.SEPARATOR
+    buildPath(SPARK_STAGING, appId.toString())
   }
 
   /**
@@ -783,7 +779,13 @@ object Client extends Logging {
 
   /**
    * Populate the classpath entry in the given environment map.
-   * This includes the user jar, Spark jar, and any extra application jars.
+   *
+   * User jars are generally not added to the JVM's system classpath; those 
are handled by the AM
+   * and executor backend. When the deprecated 
`spark.yarn.user.classpath.first` is used, user jars
+   * are included in the system classpath, though. The extra class path and 
other uploaded files are
+   * always made available through the system class path.
+   *
+   * @param args Client arguments (when starting the AM) or null (when 
starting executors).
    */
   private[yarn] def populateClasspath(
       args: ClientArguments,
@@ -795,48 +797,38 @@ object Client extends Logging {
     addClasspathEntry(
       YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), env
     )
-
-    // Normally the users app.jar is last in case conflicts with spark jars
     if (sparkConf.getBoolean("spark.yarn.user.classpath.first", false)) {
-      addUserClasspath(args, sparkConf, env)
-      addFileToClasspath(sparkJar(sparkConf), SPARK_JAR, env)
-      populateHadoopClasspath(conf, env)
-    } else {
-      addFileToClasspath(sparkJar(sparkConf), SPARK_JAR, env)
-      populateHadoopClasspath(conf, env)
-      addUserClasspath(args, sparkConf, env)
+      val userClassPath =
+        if (args != null) {
+          getUserClasspath(Option(args.userJar), Option(args.addJars))
+        } else {
+          getUserClasspath(sparkConf)
+        }
+      userClassPath.foreach { x =>
+        addFileToClasspath(x, null, env)
+      }
     }
-
-    // Append all jar files under the working directory to the classpath.
-    addClasspathEntry(
-      YarnSparkHadoopUtil.expandEnvironment(Environment.PWD) + Path.SEPARATOR 
+ "*", env
-    )
+    addFileToClasspath(new URI(sparkJar(sparkConf)), SPARK_JAR, env)
+    populateHadoopClasspath(conf, env)
+    sys.env.get(ENV_DIST_CLASSPATH).foreach(addClasspathEntry(_, env))
   }
 
   /**
-   * Adds the user jars which have local: URIs (or alternate names, such as 
APP_JAR) explicitly
-   * to the classpath.
+   * Returns a list of URIs representing the user classpath.
+   *
+   * @param conf Spark configuration.
    */
-  private def addUserClasspath(
-      args: ClientArguments,
-      conf: SparkConf,
-      env: HashMap[String, String]): Unit = {
-
-    // If `args` is not null, we are launching an AM container.
-    // Otherwise, we are launching executor containers.
-    val (mainJar, secondaryJars) =
-      if (args != null) {
-        (args.userJar, args.addJars)
-      } else {
-        (conf.get(CONF_SPARK_USER_JAR, null), 
conf.get(CONF_SPARK_YARN_SECONDARY_JARS, null))
-      }
+  def getUserClasspath(conf: SparkConf): Array[URI] = {
+    getUserClasspath(conf.getOption(CONF_SPARK_USER_JAR),
+      conf.getOption(CONF_SPARK_YARN_SECONDARY_JARS))
+  }
 
-    addFileToClasspath(mainJar, APP_JAR, env)
-    if (secondaryJars != null) {
-      secondaryJars.split(",").filter(_.nonEmpty).foreach { jar =>
-        addFileToClasspath(jar, null, env)
-      }
-    }
+  private def getUserClasspath(
+      mainJar: Option[String],
+      secondaryJars: Option[String]): Array[URI] = {
+    val mainUri = mainJar.orElse(Some(APP_JAR)).map(new URI(_))
+    val secondaryUris = secondaryJars.map(_.split(",")).toSeq.flatten.map(new 
URI(_))
+    (mainUri ++ secondaryUris).toArray
   }
 
   /**
@@ -847,27 +839,19 @@ object Client extends Logging {
    *
    * If not a "local:" file and no alternate name, the environment is not 
modified.
    *
-   * @param path      Path to add to classpath (optional).
+   * @param uri       URI to add to classpath (optional).
    * @param fileName  Alternate name for the file (optional).
    * @param env       Map holding the environment variables.
    */
   private def addFileToClasspath(
-      path: String,
+      uri: URI,
       fileName: String,
       env: HashMap[String, String]): Unit = {
-    if (path != null) {
-      scala.util.control.Exception.ignoring(classOf[URISyntaxException]) {
-        val uri = new URI(path)
-        if (uri.getScheme == LOCAL_SCHEME) {
-          addClasspathEntry(uri.getPath, env)
-          return
-        }
-      }
-    }
-    if (fileName != null) {
-      addClasspathEntry(
-        YarnSparkHadoopUtil.expandEnvironment(Environment.PWD) + 
Path.SEPARATOR + fileName, env
-      )
+    if (uri != null && uri.getScheme == LOCAL_SCHEME) {
+      addClasspathEntry(uri.getPath, env)
+    } else if (fileName != null) {
+      addClasspathEntry(buildPath(
+        YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), fileName), env)
     }
   }
 
@@ -963,4 +947,23 @@ object Client extends Logging {
     new Path(qualifiedURI)
   }
 
+  /**
+   * Whether to consider jars provided by the user to have precedence over the 
Spark jars when
+   * loading user classes.
+   */
+  def isUserClassPathFirst(conf: SparkConf, isDriver: Boolean): Boolean = {
+    if (isDriver) {
+      conf.getBoolean("spark.driver.userClassPathFirst", false)
+    } else {
+      conf.getBoolean("spark.executor.userClassPathFirst", false)
+    }
+  }
+
+  /**
+   * Joins all the path components using Path.SEPARATOR.
+   */
+  def buildPath(components: String*): String = {
+    components.mkString(Path.SEPARATOR)
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/20a60131/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
index 7cd8c5f..6d5b8fd 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.deploy.yarn
 
+import java.io.File
 import java.net.URI
 import java.nio.ByteBuffer
 
@@ -57,7 +58,7 @@ class ExecutorRunnable(
   var nmClient: NMClient = _
   val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
   lazy val env = prepareEnvironment(container)
-  
+
   def run = {
     logInfo("Starting Executor Container")
     nmClient = NMClient.createNMClient()
@@ -185,6 +186,16 @@ class ExecutorRunnable(
     // For log4j configuration to reference
     javaOpts += ("-Dspark.yarn.app.container.log.dir=" + 
ApplicationConstants.LOG_DIR_EXPANSION_VAR)
 
+    val userClassPath = Client.getUserClasspath(sparkConf).flatMap { uri =>
+      val absPath =
+        if (new File(uri.getPath()).isAbsolute()) {
+          uri.getPath()
+        } else {
+          Client.buildPath(Environment.PWD.$(), uri.getPath())
+        }
+      Seq("--user-class-path", "file:" + absPath)
+    }.toSeq
+
     val commands = prefixEnv ++ Seq(
       YarnSparkHadoopUtil.expandEnvironment(Environment.JAVA_HOME) + 
"/bin/java",
       "-server",
@@ -196,11 +207,13 @@ class ExecutorRunnable(
       "-XX:OnOutOfMemoryError='kill %p'") ++
       javaOpts ++
       Seq("org.apache.spark.executor.CoarseGrainedExecutorBackend",
-        masterAddress.toString,
-        slaveId.toString,
-        hostname.toString,
-        executorCores.toString,
-        appId,
+        "--driver-url", masterAddress.toString,
+        "--executor-id", slaveId.toString,
+        "--hostname", hostname.toString,
+        "--cores", executorCores.toString,
+        "--app-id", appId) ++
+      userClassPath ++
+      Seq(
         "1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout",
         "2>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")
 

http://git-wip-us.apache.org/repos/asf/spark/blob/20a60131/yarn/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/yarn/src/test/resources/log4j.properties 
b/yarn/src/test/resources/log4j.properties
index 287c8e3..aab41fa 100644
--- a/yarn/src/test/resources/log4j.properties
+++ b/yarn/src/test/resources/log4j.properties
@@ -16,7 +16,7 @@
 #
 
 # Set everything to be logged to the file target/unit-tests.log
-log4j.rootCategory=INFO, file
+log4j.rootCategory=DEBUG, file
 log4j.appender.file=org.apache.log4j.FileAppender
 log4j.appender.file.append=true
 log4j.appender.file.file=target/unit-tests.log
@@ -25,4 +25,4 @@ log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd 
HH:mm:ss.SSS} %t %p %c{
 
 # Ignore messages below warning level from Jetty, because it's a bit verbose
 log4j.logger.org.eclipse.jetty=WARN
-org.eclipse.jetty.LEVEL=WARN
+log4j.logger.org.apache.hadoop=WARN

http://git-wip-us.apache.org/repos/asf/spark/blob/20a60131/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
----------------------------------------------------------------------
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala 
b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
index 2bb3dcf..f8f8129 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
@@ -82,6 +82,7 @@ class ClientSuite extends FunSuite with Matchers {
   test("Local jar URIs") {
     val conf = new Configuration()
     val sparkConf = new SparkConf().set(Client.CONF_SPARK_JAR, SPARK)
+      .set("spark.yarn.user.classpath.first", "true")
     val env = new MutableHashMap[String, String]()
     val args = new ClientArguments(Array("--jar", USER, "--addJars", ADDED), 
sparkConf)
 
@@ -98,13 +99,10 @@ class ClientSuite extends FunSuite with Matchers {
     })
     if (classOf[Environment].getMethods().exists(_.getName == "$$")) {
       cp should contain("{{PWD}}")
-      cp should contain(s"{{PWD}}${Path.SEPARATOR}*")
     } else if (Utils.isWindows) {
       cp should contain("%PWD%")
-      cp should contain(s"%PWD%${Path.SEPARATOR}*")
     } else {
       cp should contain(Environment.PWD.$())
-      cp should contain(s"${Environment.PWD.$()}${File.separator}*")
     }
     cp should not contain (Client.SPARK_JAR)
     cp should not contain (Client.APP_JAR)
@@ -117,7 +115,7 @@ class ClientSuite extends FunSuite with Matchers {
 
     val client = spy(new Client(args, conf, sparkConf))
     doReturn(new Path("/")).when(client).copyFileToRemote(any(classOf[Path]),
-      any(classOf[Path]), anyShort(), anyBoolean())
+      any(classOf[Path]), anyShort())
 
     val tempDir = Utils.createTempDir()
     try {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to