Repository: spark
Updated Branches:
  refs/heads/master 5855e0057 -> 229f90225


[SPARK-15736][CORE] Gracefully handle loss of DiskStore files

If an RDD partition is cached on disk and the DiskStore file is lost, then 
reads of that cached partition will fail and the missing partition is supposed 
to be recomputed by a new task attempt. In the current BlockManager 
implementation, however, the missing file does not trigger any metadata updates 
/ does not invalidate the cache, so subsequent task attempts will be scheduled 
on the same executor and the doomed read will be repeatedly retried, leading to 
repeated task failures and eventually a total job failure.

In order to fix this problem, the executor with the missing file needs to 
properly mark the corresponding block as missing so that it stops advertising 
itself as a cache location for that block.

This patch fixes this bug and adds an end-to-end regression test (in 
`FailureSuite`) and a set of unit tests (`in BlockManagerSuite`).

Author: Josh Rosen <[email protected]>

Closes #13473 from JoshRosen/handle-missing-cache-files.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/229f9022
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/229f9022
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/229f9022

Branch: refs/heads/master
Commit: 229f90225748343972d7202c5567b45364cd8497
Parents: 5855e00
Author: Josh Rosen <[email protected]>
Authored: Thu Jun 2 17:36:31 2016 -0700
Committer: Andrew Or <[email protected]>
Committed: Thu Jun 2 17:36:31 2016 -0700

----------------------------------------------------------------------
 .../org/apache/spark/storage/BlockManager.scala | 20 +++++++---
 .../scala/org/apache/spark/FailureSuite.scala   | 12 ++++++
 .../spark/storage/BlockManagerSuite.scala       | 40 ++++++++++++++++++++
 3 files changed, 66 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/229f9022/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 2f9473a..83a9cbd 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -403,6 +403,17 @@ private[spark] class BlockManager(
   }
 
   /**
+   * Cleanup code run in response to a failed local read.
+   * Must be called while holding a read lock on the block.
+   */
+  private def handleLocalReadFailure(blockId: BlockId): Nothing = {
+    releaseLock(blockId)
+    // Remove the missing block so that its unavailability is reported to the 
driver
+    removeBlock(blockId)
+    throw new SparkException(s"Block $blockId was not found even though it's 
read-locked")
+  }
+
+  /**
    * Get block from local block manager as an iterator of Java objects.
    */
   def getLocalValues(blockId: BlockId): Option[BlockResult] = {
@@ -441,8 +452,7 @@ private[spark] class BlockManager(
           val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, 
releaseLock(blockId))
           Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
         } else {
-          releaseLock(blockId)
-          throw new SparkException(s"Block $blockId was not found even though 
it's read-locked")
+          handleLocalReadFailure(blockId)
         }
     }
   }
@@ -489,8 +499,7 @@ private[spark] class BlockManager(
         // The block was not found on disk, so serialize an in-memory copy:
         serializerManager.dataSerialize(blockId, 
memoryStore.getValues(blockId).get)
       } else {
-        releaseLock(blockId)
-        throw new SparkException(s"Block $blockId was not found even though 
it's read-locked")
+        handleLocalReadFailure(blockId)
       }
     } else {  // storage level is serialized
       if (level.useMemory && memoryStore.contains(blockId)) {
@@ -499,8 +508,7 @@ private[spark] class BlockManager(
         val diskBytes = diskStore.getBytes(blockId)
         maybeCacheDiskBytesInMemory(info, blockId, level, 
diskBytes).getOrElse(diskBytes)
       } else {
-        releaseLock(blockId)
-        throw new SparkException(s"Block $blockId was not found even though 
it's read-locked")
+        handleLocalReadFailure(blockId)
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/229f9022/core/src/test/scala/org/apache/spark/FailureSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/FailureSuite.scala 
b/core/src/test/scala/org/apache/spark/FailureSuite.scala
index 333c23b..132f636 100644
--- a/core/src/test/scala/org/apache/spark/FailureSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FailureSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark
 import java.io.{IOException, NotSerializableException, ObjectInputStream}
 
 import org.apache.spark.memory.TestMemoryConsumer
+import org.apache.spark.storage.StorageLevel
 import org.apache.spark.util.NonSerializable
 
 // Common state shared by FailureSuite-launched tasks. We use a global object
@@ -241,6 +242,17 @@ class FailureSuite extends SparkFunSuite with 
LocalSparkContext {
     FailureSuiteState.clear()
   }
 
+  test("failure because cached RDD partitions are missing from DiskStore 
(SPARK-15736)") {
+    sc = new SparkContext("local[1,2]", "test")
+    val rdd = sc.parallelize(1 to 2, 2).persist(StorageLevel.DISK_ONLY)
+    rdd.count()
+    // Directly delete all files from the disk store, triggering failures when 
reading cached data:
+    
SparkEnv.get.blockManager.diskBlockManager.getAllFiles().foreach(_.delete())
+    // Each task should fail once due to missing cached data, but then should 
succeed on its second
+    // attempt because the missing cache locations will be purged and the 
blocks will be recomputed.
+    rdd.count()
+  }
+
   // TODO: Need to add tests with shuffle fetch failures.
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/229f9022/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index a258030..6821582 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -1139,6 +1139,46 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
     assert(store.getSingle("a3").isDefined, "a3 was not in store")
   }
 
+  private def testReadWithLossOfOnDiskFiles(
+      storageLevel: StorageLevel,
+      readMethod: BlockManager => Option[_]): Unit = {
+    store = makeBlockManager(12000)
+    assert(store.putSingle("blockId", new Array[Byte](4000), storageLevel))
+    assert(store.getStatus("blockId").isDefined)
+    // Directly delete all files from the disk store, triggering failures when 
reading blocks:
+    store.diskBlockManager.getAllFiles().foreach(_.delete())
+    // The BlockManager still thinks that these blocks exist:
+    assert(store.getStatus("blockId").isDefined)
+    // Because the BlockManager's metadata claims that the block exists (i.e. 
that it's present
+    // in at least one store), the read attempts to read it and fails when the 
on-disk file is
+    // missing.
+    intercept[SparkException] {
+      readMethod(store)
+    }
+    // Subsequent read attempts will succeed; the block isn't present but we 
return an expected
+    // "block not found" response rather than a fatal error:
+    assert(readMethod(store).isEmpty)
+    // The reason why this second read succeeded is because the metadata entry 
for the missing
+    // block was removed as a result of the read failure:
+    assert(store.getStatus("blockId").isEmpty)
+  }
+
+  test("remove block if a read fails due to missing DiskStore files 
(SPARK-15736)") {
+    val storageLevels = Seq(
+      StorageLevel(useDisk = true, useMemory = false, deserialized = false),
+      StorageLevel(useDisk = true, useMemory = false, deserialized = true))
+    val readMethods = Map[String, BlockManager => Option[_]](
+      "getLocalBytes" -> ((m: BlockManager) => m.getLocalBytes("blockId")),
+      "getLocalValues" -> ((m: BlockManager) => m.getLocalValues("blockId"))
+    )
+    testReadWithLossOfOnDiskFiles(StorageLevel.DISK_ONLY, 
_.getLocalBytes("blockId"))
+    for ((readMethodName, readMethod) <- readMethods; storageLevel <- 
storageLevels) {
+      withClue(s"$readMethodName $storageLevel") {
+        testReadWithLossOfOnDiskFiles(storageLevel, readMethod)
+      }
+    }
+  }
+
   test("SPARK-13328: refresh block locations (fetch should fail after hitting 
a threshold)") {
     val mockBlockTransferService =
       new 
MockBlockTransferService(conf.getInt("spark.block.failures.beforeLocationRefresh",
 5))


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to