Updated Branches:
  refs/heads/branch-0.8 57fdb3feb -> e094dafda

Merge pull request #125 from velvia/2013-10/local-jar-uri

Add support for local:// URI scheme for addJars()

This PR adds support for a new URI scheme for SparkContext.addJars():  
`local://file/path`.
The *local* scheme indicates that the `/file/path` exists on every worker node. 
   The reason for its existence is for big library JARs, which would be really 
expensive to serve using the standard HTTP fileserver distribution method, 
especially for big clusters.  Today the only inexpensive method (assuming such 
a file is on every host, via say NFS, rsync, etc.) of doing this is to add the 
JAR to the SPARK_CLASSPATH, but we want a method where the user does not need 
to modify the Spark configuration.

I would add something to the docs, but it's not obvious where to add it.

Oh, and it would be great if this could be merged in time for 0.8.1.

(cherry picked from commit 618c1f6cf3008caae7a8c0202721a6bd77d29a0f)
Signed-off-by: Reynold Xin <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/a9e7787e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/a9e7787e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/a9e7787e

Branch: refs/heads/branch-0.8
Commit: a9e7787e17b6749f61c77835a3f111bf01f0fe8e
Parents: 57fdb3f
Author: Matei Zaharia <[email protected]>
Authored: Wed Oct 30 12:03:44 2013 -0700
Committer: Reynold Xin <[email protected]>
Committed: Sun Nov 3 23:48:26 2013 -0800

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/SparkContext.scala  |  6 +++++-
 .../scala/org/apache/spark/FileServerSuite.scala    | 16 ++++++++++++++++
 2 files changed, 21 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a9e7787e/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index d22795d..2832f31 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -688,7 +688,7 @@ class SparkContext(
   /**
    * Adds a JAR dependency for all tasks to be executed on this SparkContext 
in the future.
    * The `path` passed can be either a local file, a file in HDFS (or other 
Hadoop-supported
-   * filesystems), or an HTTP, HTTPS or FTP URI.
+   * filesystems), an HTTP, HTTPS or FTP URI, or local:/path for a file on 
every worker node.
    */
   def addJar(path: String) {
     if (path == null) {
@@ -701,6 +701,7 @@ class SparkContext(
       } else {
         val uri = new URI(path)
         key = uri.getScheme match {
+          // A JAR file which exists only on the driver node
           case null | "file" =>
             if (SparkHadoopUtil.get.isYarnMode()) {
               // In order for this to work on yarn the user must specify the 
--addjars option to
@@ -718,6 +719,9 @@ class SparkContext(
             } else {
               env.httpFileServer.addJar(new File(uri.getPath))
             }
+          // A JAR file which exists locally on every worker node
+          case "local" =>
+            "file:" + uri.getPath
           case _ =>
             path
         }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a9e7787e/core/src/test/scala/org/apache/spark/FileServerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/FileServerSuite.scala 
b/core/src/test/scala/org/apache/spark/FileServerSuite.scala
index 35d1d41..c210dd5 100644
--- a/core/src/test/scala/org/apache/spark/FileServerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FileServerSuite.scala
@@ -120,4 +120,20 @@ class FileServerSuite extends FunSuite with 
LocalSparkContext {
     }.collect()
     assert(result.toSet === Set((1,2), (2,7), (3,121)))
   }
+
+  test ("Dynamically adding JARS on a standalone cluster using local: URL") {
+    sc = new SparkContext("local-cluster[1,1,512]", "test")
+    val sampleJarFile = 
getClass.getClassLoader.getResource("uncommons-maths-1.2.2.jar").getFile()
+    sc.addJar(sampleJarFile.replace("file", "local"))
+    val testData = Array((1,1), (1,1), (2,1), (3,5), (2,3), (3,0))
+    val result = sc.parallelize(testData).reduceByKey { (x,y) =>
+      val fac = Thread.currentThread.getContextClassLoader()
+                                    .loadClass("org.uncommons.maths.Maths")
+                                    .getDeclaredMethod("factorial", 
classOf[Int])
+      val a = fac.invoke(null, 
x.asInstanceOf[java.lang.Integer]).asInstanceOf[Long].toInt
+      val b = fac.invoke(null, 
y.asInstanceOf[java.lang.Integer]).asInstanceOf[Long].toInt
+      a + b
+    }.collect()
+    assert(result.toSet === Set((1,2), (2,7), (3,121)))
+  }
 }

Reply via email to