Repository: spark
Updated Branches:
  refs/heads/master 96b27855c -> 1826372d0


[SPARK-4085] Propagate FetchFailedException when Spark fails to read local 
shuffle file.

cc aarondav kayousterhout pwendell

This should go into 1.2?

Author: Reynold Xin <[email protected]>

Closes #3579 from rxin/SPARK-4085 and squashes the following commits:

255b4fd [Reynold Xin] Updated test.
f9814d9 [Reynold Xin] Code review feedback.
2afaf35 [Reynold Xin] [SPARK-4085] Propagate FetchFailedException when Spark 
fails to read local shuffle file.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1826372d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1826372d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1826372d

Branch: refs/heads/master
Commit: 1826372d0a1bc80db9015106dd5d2d155ada33f5
Parents: 96b2785
Author: Reynold Xin <[email protected]>
Authored: Wed Dec 3 16:28:24 2014 -0800
Committer: Patrick Wendell <[email protected]>
Committed: Wed Dec 3 16:28:24 2014 -0800

----------------------------------------------------------------------
 .../storage/ShuffleBlockFetcherIterator.scala   | 28 ++++++++++++--------
 .../spark/ExternalShuffleServiceSuite.scala     |  2 --
 .../scala/org/apache/spark/ShuffleSuite.scala   | 23 ++++++++++++++++
 3 files changed, 40 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1826372d/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
 
b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
index 83170f7..2499c11 100644
--- 
a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
+++ 
b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.storage
 
+import java.io.{InputStream, IOException}
 import java.util.concurrent.LinkedBlockingQueue
 
 import scala.collection.mutable.{ArrayBuffer, HashSet, Queue}
@@ -289,17 +290,22 @@ final class ShuffleBlockFetcherIterator(
     }
 
     val iteratorTry: Try[Iterator[Any]] = result match {
-      case FailureFetchResult(_, e) => Failure(e)
-      case SuccessFetchResult(blockId, _, buf) => {
-        val is = blockManager.wrapForCompression(blockId, 
buf.createInputStream())
-        val iter = serializer.newInstance().deserializeStream(is).asIterator
-        Success(CompletionIterator[Any, Iterator[Any]](iter, {
-          // Once the iterator is exhausted, release the buffer and set 
currentResult to null
-          // so we don't release it again in cleanup.
-          currentResult = null
-          buf.release()
-        }))
-      }
+      case FailureFetchResult(_, e) =>
+        Failure(e)
+      case SuccessFetchResult(blockId, _, buf) =>
+        // There is a chance that createInputStream can fail (e.g. fetching a 
local file that does
+        // not exist, SPARK-4085). In that case, we should propagate the right 
exception so
+        // the scheduler gets a FetchFailedException.
+        Try(buf.createInputStream()).map { is0 =>
+          val is = blockManager.wrapForCompression(blockId, is0)
+          val iter = serializer.newInstance().deserializeStream(is).asIterator
+          CompletionIterator[Any, Iterator[Any]](iter, {
+            // Once the iterator is exhausted, release the buffer and set 
currentResult to null
+            // so we don't release it again in cleanup.
+            currentResult = null
+            buf.release()
+          })
+        }
     }
 
     (result.blockId, iteratorTry)

http://git-wip-us.apache.org/repos/asf/spark/blob/1826372d/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala 
b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
index cc3592e..bac6fdb 100644
--- a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
@@ -17,8 +17,6 @@
 
 package org.apache.spark
 
-import java.util.concurrent.atomic.AtomicInteger
-
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.spark.network.TransportContext

http://git-wip-us.apache.org/repos/asf/spark/blob/1826372d/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala 
b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
index 5a133c0..58a9624 100644
--- a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
@@ -23,6 +23,7 @@ import org.scalatest.Matchers
 import org.apache.spark.ShuffleSuite.NonJavaSerializableClass
 import org.apache.spark.rdd.{CoGroupedRDD, OrderedRDDFunctions, RDD, 
ShuffledRDD, SubtractedRDD}
 import org.apache.spark.serializer.KryoSerializer
+import org.apache.spark.storage.{ShuffleDataBlockId, ShuffleBlockId}
 import org.apache.spark.util.MutablePair
 
 abstract class ShuffleSuite extends FunSuite with Matchers with 
LocalSparkContext {
@@ -263,6 +264,28 @@ abstract class ShuffleSuite extends FunSuite with Matchers 
with LocalSparkContex
       }
     }
   }
+
+  test("[SPARK-4085] rerun map stage if reduce stage cannot find its local 
shuffle file") {
+    val myConf = conf.clone().set("spark.test.noStageRetry", "false")
+    sc = new SparkContext("local", "test", myConf)
+    val rdd = sc.parallelize(1 to 10, 2).map((_, 1)).reduceByKey(_ + _)
+    rdd.count()
+
+    // Delete one of the local shuffle blocks.
+    val hashFile = sc.env.blockManager.diskBlockManager.getFile(new 
ShuffleBlockId(0, 0, 0))
+    val sortFile = sc.env.blockManager.diskBlockManager.getFile(new 
ShuffleDataBlockId(0, 0, 0))
+    assert(hashFile.exists() || sortFile.exists())
+
+    if (hashFile.exists()) {
+      hashFile.delete()
+    }
+    if (sortFile.exists()) {
+      sortFile.delete()
+    }
+
+    // This count should retry the execution of the previous stage and rerun 
shuffle.
+    rdd.count()
+  }
 }
 
 object ShuffleSuite {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to