Repository: spark
Updated Branches:
  refs/heads/master f88f51bbd -> 0c88ce541


[SPARK-6468][Block Manager] Fix the race condition of subDirs in 
DiskBlockManager

There are two race conditions of `subDirs` in `DiskBlockManager`:

1. `getAllFiles` does not use correct locks to read the contents in `subDirs`. 
Although it's designed for testing, it's still worth to add correct locks to 
eliminate the race condition.
2. The double-check has a race condition in `getFile(filename: String)`. If a 
thread finds `subDirs(dirId)(subDirId)` is not null out of the `synchronized` 
block, it may not be able to see the correct content of the File instance 
pointed by `subDirs(dirId)(subDirId)` according to the Java memory model (there 
is no volatile variable here).

This PR fixed the above race conditions.

Author: zsxwing <zsxw...@gmail.com>

Closes #5136 from zsxwing/SPARK-6468 and squashes the following commits:

cbb872b [zsxwing] Fix the race condition of subDirs in DiskBlockManager


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0c88ce54
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0c88ce54
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0c88ce54

Branch: refs/heads/master
Commit: 0c88ce5416d7687bc806a7655e17009ad5823d30
Parents: f88f51b
Author: zsxwing <zsxw...@gmail.com>
Authored: Thu Mar 26 12:54:48 2015 +0000
Committer: Sean Owen <so...@cloudera.com>
Committed: Thu Mar 26 12:54:48 2015 +0000

----------------------------------------------------------------------
 .../apache/spark/storage/DiskBlockManager.scala | 32 +++++++++++---------
 1 file changed, 18 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0c88ce54/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
index 12cd8ea..2883137 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
@@ -47,6 +47,8 @@ private[spark] class DiskBlockManager(blockManager: 
BlockManager, conf: SparkCon
     logError("Failed to create any local dir.")
     System.exit(ExecutorExitCode.DISK_STORE_FAILED_TO_CREATE_DIR)
   }
+  // The content of subDirs is immutable but the content of subDirs(i) is 
mutable. And the content
+  // of subDirs(i) is protected by the lock of subDirs(i)
   private val subDirs = Array.fill(localDirs.length)(new 
Array[File](subDirsPerLocalDir))
 
   private val shutdownHook = addShutdownHook()
@@ -61,20 +63,17 @@ private[spark] class DiskBlockManager(blockManager: 
BlockManager, conf: SparkCon
     val subDirId = (hash / localDirs.length) % subDirsPerLocalDir
 
     // Create the subdirectory if it doesn't already exist
-    var subDir = subDirs(dirId)(subDirId)
-    if (subDir == null) {
-      subDir = subDirs(dirId).synchronized {
-        val old = subDirs(dirId)(subDirId)
-        if (old != null) {
-          old
-        } else {
-          val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
-          if (!newDir.exists() && !newDir.mkdir()) {
-            throw new IOException(s"Failed to create local dir in $newDir.")
-          }
-          subDirs(dirId)(subDirId) = newDir
-          newDir
+    val subDir = subDirs(dirId).synchronized {
+      val old = subDirs(dirId)(subDirId)
+      if (old != null) {
+        old
+      } else {
+        val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
+        if (!newDir.exists() && !newDir.mkdir()) {
+          throw new IOException(s"Failed to create local dir in $newDir.")
         }
+        subDirs(dirId)(subDirId) = newDir
+        newDir
       }
     }
 
@@ -91,7 +90,12 @@ private[spark] class DiskBlockManager(blockManager: 
BlockManager, conf: SparkCon
   /** List all the files currently stored on disk by the disk manager. */
   def getAllFiles(): Seq[File] = {
     // Get all the files inside the array of array of directories
-    subDirs.flatten.filter(_ != null).flatMap { dir =>
+    subDirs.flatMap { dir =>
+      dir.synchronized {
+        // Copy the content of dir because it may be modified in other threads
+        dir.clone()
+      }
+    }.filter(_ != null).flatMap { dir =>
       val files = dir.listFiles()
       if (files != null) files else Seq.empty
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to