Repository: spark
Updated Branches:
  refs/heads/master 6a40a7684 -> 7aacb7bfa


[SPARK-2713] Executors of same application in same host should only download 
files & jars once

If Spark lunched multiple executors in one host for one application, every 
executor would download it dependent files and jars (if not using local: url) 
independently. It maybe result in huge latency. In my case, it result in 20 
seconds latency to download dependent jars(size about 17M) when I lunched 32 
executors in every host(total 4 hosts).

This patch will cache downloaded files and jars for executors to reduce network 
throughput and download latency. In my case, the latency was reduced from 20 
seconds to less than 1 second.

Author: Li Zhihui <zhihui...@intel.com>
Author: li-zhihui <zhihui...@intel.com>

Closes #1616 from li-zhihui/cachefiles and squashes the following commits:

36940df [Li Zhihui] Close cache for local mode
935fed6 [Li Zhihui] Clean code.
f9330d4 [Li Zhihui] Clean code again
7050d46 [Li Zhihui] Clean code
074a422 [Li Zhihui] Fix: deal with spark.files.overwrite
03ed3a8 [li-zhihui] rename cache file name as XXXXXXXXX_cache
2766055 [li-zhihui] Use url.hashCode + timestamp as cachedFileName
76a7b66 [Li Zhihui] Clean code & use applcation work directory as cache 
directory
3510eb0 [Li Zhihui] Keep fetchFile private
2ffd742 [Li Zhihui] add comment for FileLock
e0ebd48 [Li Zhihui] Try and finally lock.release
7fb7c0b [Li Zhihui] Release lock before copy files
6b997bf [Li Zhihui] Executors of same application in same host should only 
download files & jars once


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7aacb7bf
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7aacb7bf
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7aacb7bf

Branch: refs/heads/master
Commit: 7aacb7bfad4ec73fd8f18555c72ef6962c14358f
Parents: 6a40a76
Author: Li Zhihui <zhihui...@intel.com>
Authored: Fri Oct 24 13:01:36 2014 -0700
Committer: Andrew Or <and...@databricks.com>
Committed: Fri Oct 24 13:01:36 2014 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   |  5 +-
 .../org/apache/spark/executor/Executor.scala    | 10 ++-
 .../scala/org/apache/spark/util/Utils.scala     | 87 ++++++++++++++++----
 3 files changed, 82 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7aacb7bf/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index ac7935b..55602a9 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -837,11 +837,12 @@ class SparkContext(config: SparkConf) extends Logging {
       case "local"       => "file:" + uri.getPath
       case _             => path
     }
-    addedFiles(key) = System.currentTimeMillis
+    val timestamp = System.currentTimeMillis
+    addedFiles(key) = timestamp
 
     // Fetch the file locally in case a job is executed using 
DAGScheduler.runLocally().
     Utils.fetchFile(path, new File(SparkFiles.getRootDirectory()), conf, 
env.securityManager,
-      hadoopConfiguration)
+      hadoopConfiguration, timestamp, useCache = false)
 
     logInfo("Added file " + path + " at " + key + " with timestamp " + 
addedFiles(key))
     postEnvironmentUpdate()

http://git-wip-us.apache.org/repos/asf/spark/blob/7aacb7bf/core/src/main/scala/org/apache/spark/executor/Executor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala 
b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 616c7e6..0b75b9b 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -322,14 +322,16 @@ private[spark] class Executor(
       // Fetch missing dependencies
       for ((name, timestamp) <- newFiles if currentFiles.getOrElse(name, -1L) 
< timestamp) {
         logInfo("Fetching " + name + " with timestamp " + timestamp)
-        Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf, 
env.securityManager,
-          hadoopConf)
+        // Fetch file with useCache mode, close cache for local mode.
+        Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf,
+          env.securityManager, hadoopConf, timestamp, useCache = !isLocal)
         currentFiles(name) = timestamp
       }
       for ((name, timestamp) <- newJars if currentJars.getOrElse(name, -1L) < 
timestamp) {
         logInfo("Fetching " + name + " with timestamp " + timestamp)
-        Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf, 
env.securityManager,
-          hadoopConf)
+        // Fetch file with useCache mode, close cache for local mode.
+        Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf,
+          env.securityManager, hadoopConf, timestamp, useCache = !isLocal)
         currentJars(name) = timestamp
         // Add it to our class loader
         val localName = name.split("/").last

http://git-wip-us.apache.org/repos/asf/spark/blob/7aacb7bf/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 0aeff64..ccbddd9 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -347,15 +347,84 @@ private[spark] object Utils extends Logging {
   }
 
   /**
-   * Download a file requested by the executor. Supports fetching the file in 
a variety of ways,
+   * Download a file to target directory. Supports fetching the file in a 
variety of ways,
+   * including HTTP, HDFS and files on a standard filesystem, based on the URL 
parameter.
+   *
+   * If `useCache` is true, first attempts to fetch the file to a local cache 
that's shared 
+   * across executors running the same application. `useCache` is used mainly 
for 
+   * the executors, and not in local mode.
+   *
+   * Throws SparkException if the target file already exists and has different 
contents than
+   * the requested file.
+   */
+  def fetchFile(
+      url: String,
+      targetDir: File,
+      conf: SparkConf,
+      securityMgr: SecurityManager,
+      hadoopConf: Configuration,
+      timestamp: Long,
+      useCache: Boolean) {
+    val fileName = url.split("/").last
+    val targetFile = new File(targetDir, fileName)
+    if (useCache) {
+      val cachedFileName = s"${url.hashCode}${timestamp}_cache"
+      val lockFileName = s"${url.hashCode}${timestamp}_lock"
+      val localDir = new File(getLocalDir(conf))
+      val lockFile = new File(localDir, lockFileName)
+      val raf = new RandomAccessFile(lockFile, "rw")
+      // Only one executor entry.
+      // The FileLock is only used to control synchronization for executors 
download file,
+      // it's always safe regardless of lock type (mandatory or advisory).
+      val lock = raf.getChannel().lock()
+      val cachedFile = new File(localDir, cachedFileName)
+      try {
+        if (!cachedFile.exists()) {
+          doFetchFile(url, localDir, cachedFileName, conf, securityMgr, 
hadoopConf)
+        }
+      } finally {
+        lock.release()
+      }
+      if (targetFile.exists && !Files.equal(cachedFile, targetFile)) {
+        if (conf.getBoolean("spark.files.overwrite", false)) {
+          targetFile.delete()
+          logInfo((s"File $targetFile exists and does not match contents of 
$url, " +
+            s"replacing it with $url"))
+        } else {
+          throw new SparkException(s"File $targetFile exists and does not 
match contents of $url")
+        }
+      }
+      Files.copy(cachedFile, targetFile)
+    } else {
+      doFetchFile(url, targetDir, fileName, conf, securityMgr, hadoopConf)
+    }
+    
+    // Decompress the file if it's a .tar or .tar.gz
+    if (fileName.endsWith(".tar.gz") || fileName.endsWith(".tgz")) {
+      logInfo("Untarring " + fileName)
+      Utils.execute(Seq("tar", "-xzf", fileName), targetDir)
+    } else if (fileName.endsWith(".tar")) {
+      logInfo("Untarring " + fileName)
+      Utils.execute(Seq("tar", "-xf", fileName), targetDir)
+    }
+    // Make the file executable - That's necessary for scripts
+    FileUtil.chmod(targetFile.getAbsolutePath, "a+x")
+  }
+
+  /**
+   * Download a file to target directory. Supports fetching the file in a 
variety of ways,
    * including HTTP, HDFS and files on a standard filesystem, based on the URL 
parameter.
    *
    * Throws SparkException if the target file already exists and has different 
contents than
    * the requested file.
    */
-  def fetchFile(url: String, targetDir: File, conf: SparkConf, securityMgr: 
SecurityManager,
-    hadoopConf: Configuration) {
-    val filename = url.split("/").last
+  private def doFetchFile(
+      url: String,
+      targetDir: File,
+      filename: String,
+      conf: SparkConf,
+      securityMgr: SecurityManager,
+      hadoopConf: Configuration) {
     val tempDir = getLocalDir(conf)
     val tempFile =  File.createTempFile("fetchFileTemp", null, new 
File(tempDir))
     val targetFile = new File(targetDir, filename)
@@ -443,16 +512,6 @@ private[spark] object Utils extends Logging {
         }
         Files.move(tempFile, targetFile)
     }
-    // Decompress the file if it's a .tar or .tar.gz
-    if (filename.endsWith(".tar.gz") || filename.endsWith(".tgz")) {
-      logInfo("Untarring " + filename)
-      Utils.execute(Seq("tar", "-xzf", filename), targetDir)
-    } else if (filename.endsWith(".tar")) {
-      logInfo("Untarring " + filename)
-      Utils.execute(Seq("tar", "-xf", filename), targetDir)
-    }
-    // Make the file executable - That's necessary for scripts
-    FileUtil.chmod(targetFile.getAbsolutePath, "a+x")
   }
 
   /**


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to