Repository: spark
Updated Branches:
  refs/heads/branch-1.6 ce0a222f5 -> 94524cef4

[SPARK-17485] Prevent failed remote reads of cached blocks from failing entire 
job (branch-1.6 backport)

This patch is a branch-1.6 backport of #15037:

## What changes were proposed in this pull request?

In Spark's `RDD.getOrCompute` we first try to read a local copy of a cached RDD 
block, then a remote copy, and only fall back to recomputing the block if no 
cached copy (local or remote) can be read. This logic works correctly in the 
case where no remote copies of the block exist, but if there _are_ remote 
copies and reads of those copies fail (due to network issues or internal Spark 
bugs) then the BlockManager will throw a `BlockFetchException` that will fail 
the task (and which could possibly fail the whole job if the read failures keep 

In the cases of TorrentBroadcast and task result fetching we really do want to 
fail the entire job in case no remote blocks can be fetched, but this logic is 
inappropriate for reads of cached RDD blocks because those can/should be 
recomputed in case cached blocks are unavailable.

Therefore, I think that the `BlockManager.getRemoteBytes()` method should never 
throw on remote fetch errors and, instead, should handle failures by returning 

## How was this patch tested?

Block manager changes should be covered by modified tests in 
`BlockManagerSuite`: the old tests expected exceptions to be thrown on failed 
remote reads, while the modified tests now expect `None` to be returned from 
the `getRemote*` method.

I also manually inspected all usages of `BlockManager.getRemoteValues()`, 
`getRemoteBytes()`, and `get()` to verify that they correctly pattern-match on 
the result and handle `None`. Note that these `None` branches are already 
exercised because the old `getRemoteBytes` returned `None` when no remote 
locations for the block could be found (which could occur if an executor died 
and its block manager de-registered with the master).

Author: Josh Rosen <>

Closes #15186 from JoshRosen/SPARK-17485-branch-1.6-backport.


Branch: refs/heads/branch-1.6
Commit: 94524cef4cf367a0e73ebe0e919cc21f25f1043f
Parents: ce0a222
Author: Josh Rosen <>
Authored: Thu Sep 22 11:05:35 2016 -0700
Committer: Josh Rosen <>
Committed: Thu Sep 22 11:05:35 2016 -0700

 .../spark/storage/BlockFetchException.scala     | 24 --------------------
 .../org/apache/spark/storage/BlockManager.scala |  3 ++-
 .../spark/storage/BlockManagerSuite.scala       |  7 +++---
 3 files changed, 5 insertions(+), 29 deletions(-)
diff --git 
deleted file mode 100644
index f6e46ae..0000000
--- a/core/src/main/scala/org/apache/spark/storage/BlockFetchException.scala
+++ /dev/null
@@ -1,24 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import org.apache.spark.SparkException
-case class BlockFetchException(messages: String, throwable: Throwable)
-  extends SparkException(messages, throwable)
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
index 339ee144..1fc6f39 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -602,8 +602,9 @@ private[spark] class BlockManager(
           numFetchFailures += 1
           if (numFetchFailures == locations.size) {
             // An exception is thrown while fetching this block from all 
-            throw new BlockFetchException(s"Failed to fetch block from" +
+            logWarning(s"Failed to fetch block from" +
               s" ${locations.size} locations. Most recent failure cause:", e)
+            return None
           } else {
             // This location failed, so we retry fetch from a different one by 
returning null here
             logWarning(s"Failed to fetch remote block $blockId " +
diff --git 
index 47e8545..fc76b91 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -509,10 +509,9 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
       assert(list1Get.isDefined, "list1Get expected to be fetched")
       store3 = null
-      // exception throw because there is no locations
-      intercept[BlockFetchException] {
-        list1Get = store.getRemoteBytes("list1")
-      }
+      // Fetch should fail because there are no locations, but no exception 
should be thrown
+      list1Get = store.getRemoteBytes("list1")
+      assert(list1Get.isEmpty, "list1Get expected to fail")
     } finally {
       origTimeoutOpt match {
         case Some(t) => conf.set("", t)

To unsubscribe, e-mail:
For additional commands, e-mail:

Reply via email to