Repository: spark
Updated Branches:
  refs/heads/master 7cbe21644 -> c133907c5


[SPARK-17577][SPARKR][CORE] SparkR support add files to Spark job and get by 
executors

## What changes were proposed in this pull request?
Scala/Python users can add files to Spark job by submit options ```--files``` 
or ```SparkContext.addFile()```. Meanwhile, users can get the added file by 
```SparkFiles.get(filename)```.
We should also support this function for SparkR users, since they also have the 
requirements for some shared dependency files. For example, SparkR users can 
download third party R packages to driver firstly, add these files to the Spark 
job as dependency by this API and then each executor can install these packages 
by ```install.packages```.

## How was this patch tested?
Add unit test.

Author: Yanbo Liang <yblia...@gmail.com>

Closes #15131 from yanboliang/spark-17577.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c133907c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c133907c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c133907c

Branch: refs/heads/master
Commit: c133907c5d9a6e6411b896b5e0cff48b2beff09f
Parents: 7cbe216
Author: Yanbo Liang <yblia...@gmail.com>
Authored: Wed Sep 21 20:08:28 2016 -0700
Committer: Yanbo Liang <yblia...@gmail.com>
Committed: Wed Sep 21 20:08:28 2016 -0700

----------------------------------------------------------------------
 R/pkg/NAMESPACE                                 |  3 ++
 R/pkg/R/context.R                               | 48 ++++++++++++++++++++
 R/pkg/inst/tests/testthat/test_context.R        | 13 ++++++
 .../scala/org/apache/spark/SparkContext.scala   |  6 +--
 4 files changed, 67 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c133907c/R/pkg/NAMESPACE
----------------------------------------------------------------------
diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index a5e9cbd..267a38c 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -336,6 +336,9 @@ export("as.DataFrame",
        "read.parquet",
        "read.text",
        "spark.lapply",
+       "spark.addFile",
+       "spark.getSparkFilesRootDirectory",
+       "spark.getSparkFiles",
        "sql",
        "str",
        "tableToDF",

http://git-wip-us.apache.org/repos/asf/spark/blob/c133907c/R/pkg/R/context.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/context.R b/R/pkg/R/context.R
index 13ade49..4793578 100644
--- a/R/pkg/R/context.R
+++ b/R/pkg/R/context.R
@@ -225,6 +225,54 @@ setCheckpointDir <- function(sc, dirName) {
   invisible(callJMethod(sc, "setCheckpointDir", 
suppressWarnings(normalizePath(dirName))))
 }
 
+#' Add a file or directory to be downloaded with this Spark job on every node.
+#'
+#' The path passed can be either a local file, a file in HDFS (or other 
Hadoop-supported
+#' filesystems), or an HTTP, HTTPS or FTP URI. To access the file in Spark 
jobs,
+#' use spark.getSparkFiles(fileName) to find its download location.
+#'
+#' @rdname spark.addFile
+#' @param path The path of the file to be added
+#' @export
+#' @examples
+#'\dontrun{
+#' spark.addFile("~/myfile")
+#'}
+#' @note spark.addFile since 2.1.0
+spark.addFile <- function(path) {
+  sc <- getSparkContext()
+  invisible(callJMethod(sc, "addFile", suppressWarnings(normalizePath(path))))
+}
+
+#' Get the root directory that contains files added through spark.addFile.
+#'
+#' @rdname spark.getSparkFilesRootDirectory
+#' @return the root directory that contains files added through spark.addFile
+#' @export
+#' @examples
+#'\dontrun{
+#' spark.getSparkFilesRootDirectory()
+#'}
+#' @note spark.getSparkFilesRootDirectory since 2.1.0
+spark.getSparkFilesRootDirectory <- function() {
+  callJStatic("org.apache.spark.SparkFiles", "getRootDirectory")
+}
+
+#' Get the absolute path of a file added through spark.addFile.
+#'
+#' @rdname spark.getSparkFiles
+#' @param fileName The name of the file added through spark.addFile
+#' @return the absolute path of a file added through spark.addFile.
+#' @export
+#' @examples
+#'\dontrun{
+#' spark.getSparkFiles("myfile")
+#'}
+#' @note spark.getSparkFiles since 2.1.0
+spark.getSparkFiles <- function(fileName) {
+  callJStatic("org.apache.spark.SparkFiles", "get", as.character(fileName))
+}
+
 #' Run a function over a list of elements, distributing the computations with 
Spark
 #'
 #' Run a function over a list of elements, distributing the computations with 
Spark. Applies a

http://git-wip-us.apache.org/repos/asf/spark/blob/c133907c/R/pkg/inst/tests/testthat/test_context.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/testthat/test_context.R 
b/R/pkg/inst/tests/testthat/test_context.R
index 1ab7f31..0495418 100644
--- a/R/pkg/inst/tests/testthat/test_context.R
+++ b/R/pkg/inst/tests/testthat/test_context.R
@@ -166,3 +166,16 @@ test_that("spark.lapply should perform simple transforms", 
{
   expect_equal(doubled, as.list(2 * 1:10))
   sparkR.session.stop()
 })
+
+test_that("add and get file to be downloaded with Spark job on every node", {
+  sparkR.sparkContext()
+  path <- tempfile(pattern = "hello", fileext = ".txt")
+  filename <- basename(path)
+  words <- "Hello World!"
+  writeLines(words, path)
+  spark.addFile(path)
+  download_path <- spark.getSparkFiles(filename)
+  expect_equal(readLines(download_path), words)
+  unlink(path)
+  sparkR.session.stop()
+})

http://git-wip-us.apache.org/repos/asf/spark/blob/c133907c/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index db84172..1981ad5 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1427,7 +1427,7 @@ class SparkContext(config: SparkConf) extends Logging 
with ExecutorAllocationCli
    * supported for Hadoop-supported filesystems.
    */
   def addFile(path: String, recursive: Boolean): Unit = {
-    val uri = new URI(path)
+    val uri = new Path(path).toUri
     val schemeCorrectedPath = uri.getScheme match {
       case null | "local" => new File(path).getCanonicalFile.toURI.toString
       case _ => path
@@ -1458,8 +1458,8 @@ class SparkContext(config: SparkConf) extends Logging 
with ExecutorAllocationCli
       logInfo(s"Added file $path at $key with timestamp $timestamp")
       // Fetch the file locally so that closures which are run on the driver 
can still use the
       // SparkFiles API to access files.
-      Utils.fetchFile(path, new File(SparkFiles.getRootDirectory()), conf, 
env.securityManager,
-        hadoopConfiguration, timestamp, useCache = false)
+      Utils.fetchFile(uri.toString, new File(SparkFiles.getRootDirectory()), 
conf,
+        env.securityManager, hadoopConfiguration, timestamp, useCache = false)
       postEnvironmentUpdate()
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to