Repository: spark
Updated Branches:
  refs/heads/branch-1.6 0a13e4c07 -> 4259a2858


[SPARK-15736][CORE][BRANCH-1.6] Gracefully handle loss of DiskStore files

If an RDD partition is cached on disk and the DiskStore file is lost, then 
reads of that cached partition will fail and the missing partition is supposed 
to be recomputed by a new task attempt. In the current BlockManager 
implementation, however, the missing file does not trigger any metadata updates 
/ does not invalidate the cache, so subsequent task attempts will be scheduled 
on the same executor and the doomed read will be repeatedly retried, leading to 
repeated task failures and eventually a total job failure.

In order to fix this problem, the executor with the missing file needs to 
properly mark the corresponding block as missing so that it stops advertising 
itself as a cache location for that block.

This patch fixes this bug and adds an end-to-end regression test (in 
`FailureSuite`) and a set of unit tests (`in BlockManagerSuite`).

This is a branch-1.6 backport of #13473.

Author: Josh Rosen <joshro...@databricks.com>

Closes #13479 from JoshRosen/handle-missing-cache-files-branch-1.6.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4259a285
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4259a285
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4259a285

Branch: refs/heads/branch-1.6
Commit: 4259a28588a4dceb55d7bf1bf9327065dd751863
Parents: 0a13e4c
Author: Josh Rosen <joshro...@databricks.com>
Authored: Thu Jun 2 17:47:31 2016 -0700
Committer: Andrew Or <and...@databricks.com>
Committed: Thu Jun 2 17:47:31 2016 -0700

----------------------------------------------------------------------
 .../org/apache/spark/storage/BlockManager.scala | 13 ++++---
 .../scala/org/apache/spark/FailureSuite.scala   | 12 ++++++
 .../spark/storage/BlockManagerSuite.scala       | 41 ++++++++++++++++++++
 3 files changed, 61 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4259a285/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 288f756..339ee144 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -507,11 +507,14 @@ private[spark] class BlockManager(
         // Look for block on disk, potentially storing it back in memory if 
required
         if (level.useDisk) {
           logDebug(s"Getting block $blockId from disk")
-          val bytes: ByteBuffer = diskStore.getBytes(blockId) match {
-            case Some(b) => b
-            case None =>
-              throw new BlockException(
-                blockId, s"Block $blockId not found on disk, though it should 
be")
+          val bytes: ByteBuffer = if (diskStore.contains(blockId)) {
+            // DiskStore.getBytes() always returns Some, so this .get() is 
guaranteed to be safe
+            diskStore.getBytes(blockId).get
+          } else {
+            // Remove the missing block so that its unavailability is reported 
to the driver
+            removeBlock(blockId)
+            throw new BlockException(
+              blockId, s"Block $blockId not found on disk, though it should 
be")
           }
           assert(0 == bytes.position())
 

http://git-wip-us.apache.org/repos/asf/spark/blob/4259a285/core/src/test/scala/org/apache/spark/FailureSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/FailureSuite.scala 
b/core/src/test/scala/org/apache/spark/FailureSuite.scala
index 203dab9..85983b2 100644
--- a/core/src/test/scala/org/apache/spark/FailureSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FailureSuite.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark
 
+import org.apache.spark.storage.StorageLevel
 import org.apache.spark.util.NonSerializable
 
 import java.io.{IOException, NotSerializableException, ObjectInputStream}
@@ -238,6 +239,17 @@ class FailureSuite extends SparkFunSuite with 
LocalSparkContext {
     FailureSuiteState.clear()
   }
 
+  test("failure because cached RDD files are missing") {
+    sc = new SparkContext("local[1,2]", "test")
+    val rdd = sc.parallelize(1 to 2, 2).persist(StorageLevel.DISK_ONLY)
+    rdd.count()
+    // Directly delete all files from the disk store, triggering failures when 
reading cached data:
+    
SparkEnv.get.blockManager.diskBlockManager.getAllFiles().foreach(_.delete())
+    // Each task should fail once due to missing cached data, but then should 
succeed on its second
+    // attempt because the missing cache locations will be purged and the 
blocks will be recomputed.
+    rdd.count()
+  }
+
   // TODO: Need to add tests with shuffle fetch failures.
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/4259a285/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 4e66714..47e8545 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -1361,4 +1361,45 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
     assert(result.data === Right(bytes))
     assert(result.droppedBlocks === Nil)
   }
+
+  private def testReadWithLossOfOnDiskFiles(
+      storageLevel: StorageLevel,
+      readMethod: BlockManager => Option[_]): Unit = {
+    store = makeBlockManager(12000)
+    assert(store.putSingle("blockId", new Array[Byte](4000), 
storageLevel).nonEmpty)
+    assert(store.getStatus("blockId").isDefined)
+    // Directly delete all files from the disk store, triggering failures when 
reading blocks:
+    store.diskBlockManager.getAllFiles().foreach(_.delete())
+    // The BlockManager still thinks that these blocks exist:
+    assert(store.getStatus("blockId").isDefined)
+    // Because the BlockManager's metadata claims that the block exists (i.e. 
that it's present
+    // in at least one store), the read attempts to read it and fails when the 
on-disk file is
+    // missing.
+    intercept[BlockException] {
+      readMethod(store)
+    }
+    // Subsequent read attempts will succeed; the block isn't present but we 
return an expected
+    // "block not found" response rather than a fatal error:
+    assert(readMethod(store).isEmpty)
+    // The reason why this second read succeeded is because the metadata entry 
for the missing
+    // block was removed as a result of the read failure:
+    assert(store.getStatus("blockId").isEmpty)
+  }
+
+  test("remove cached block if a read fails due to missing on-disk files") {
+    val storageLevels = Seq(
+      StorageLevel(useDisk = true, useMemory = false, deserialized = false),
+      StorageLevel(useDisk = true, useMemory = false, deserialized = true))
+    val readMethods = Map[String, BlockManager => Option[_]](
+      "getLocalBytes" -> ((m: BlockManager) => m.getLocalBytes("blockId")),
+      "getLocal" -> ((m: BlockManager) => m.getLocal("blockId"))
+    )
+    testReadWithLossOfOnDiskFiles(StorageLevel.DISK_ONLY, 
_.getLocalBytes("blockId"))
+    for ((readMethodName, readMethod) <- readMethods; storageLevel <- 
storageLevels) {
+      withClue(s"$readMethodName $storageLevel") {
+        testReadWithLossOfOnDiskFiles(storageLevel, readMethod)
+      }
+    }
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to