Repository: spark
Updated Branches:
  refs/heads/branch-2.4 99698279d -> dc6047613


[SPARK-25318] Add exception handling when wrapping the input stream during the 
the fetch or stage retry in response to a corrupted block

SPARK-4105 provided a solution to block corruption issue by retrying the fetch 
or the stage. In that solution there is a step that wraps the input stream with 
compression and/or encryption. This step is prone to exceptions, but in the 
current code there is no exception handling for this step and this has caused 
confusion for the user. The  confusion was that after SPARK-4105 the user 
expects to see either a fetchFailed exception or a warning about a corrupted 
block. However an exception during wrapping can fail the job without any of 
those.  This change adds exception handling for the wrapping step and also adds 
a fetch retry if we experience a corruption during the wrapping step. The 
reason for adding the retry is that usually user won't experience the same 
failure after rerunning the job and so it seems reasonable try to fetch and 
wrap one more time instead of failing.

Closes #22325 from rezasafi/localcorruption.

Authored-by: Reza Safi <rezas...@cloudera.com>
Signed-off-by: Marcelo Vanzin <van...@cloudera.com>
(cherry picked from commit bd2ae857d1c5f251056de38a7a40540986756b94)
Signed-off-by: Marcelo Vanzin <van...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dc604761
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dc604761
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dc604761

Branch: refs/heads/branch-2.4
Commit: dc60476133b76d8e80ac4d925d6e146cf65e9d2b
Parents: 9969827
Author: Reza Safi <rezas...@cloudera.com>
Authored: Wed Sep 26 09:29:58 2018 -0700
Committer: Marcelo Vanzin <van...@cloudera.com>
Committed: Wed Sep 26 09:30:13 2018 -0700

----------------------------------------------------------------------
 .../storage/ShuffleBlockFetcherIterator.scala   | 50 ++++++++++----------
 1 file changed, 25 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/dc604761/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
 
b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
index e534c74..aecc228 100644
--- 
a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
+++ 
b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
@@ -448,35 +448,35 @@ final class ShuffleBlockFetcherIterator(
               buf.release()
               throwFetchFailedException(blockId, address, e)
           }
-
-          input = streamWrapper(blockId, in)
-          // Only copy the stream if it's wrapped by compression or 
encryption, also the size of
-          // block is small (the decompressed block is smaller than 
maxBytesInFlight)
-          if (detectCorrupt && !input.eq(in) && size < maxBytesInFlight / 3) {
-            val originalInput = input
-            val out = new ChunkedByteBufferOutputStream(64 * 1024, 
ByteBuffer.allocate)
-            try {
+          var isStreamCopied: Boolean = false
+          try {
+            input = streamWrapper(blockId, in)
+            // Only copy the stream if it's wrapped by compression or 
encryption, also the size of
+            // block is small (the decompressed block is smaller than 
maxBytesInFlight)
+            if (detectCorrupt && !input.eq(in) && size < maxBytesInFlight / 3) 
{
+              isStreamCopied = true
+              val out = new ChunkedByteBufferOutputStream(64 * 1024, 
ByteBuffer.allocate)
               // Decompress the whole block at once to detect any corruption, 
which could increase
               // the memory usage tne potential increase the chance of OOM.
               // TODO: manage the memory used here, and spill it into disk in 
case of OOM.
-              Utils.copyStream(input, out)
-              out.close()
+              Utils.copyStream(input, out, closeStreams = true)
               input = out.toChunkedByteBuffer.toInputStream(dispose = true)
-            } catch {
-              case e: IOException =>
-                buf.release()
-                if (buf.isInstanceOf[FileSegmentManagedBuffer]
-                  || corruptedBlocks.contains(blockId)) {
-                  throwFetchFailedException(blockId, address, e)
-                } else {
-                  logWarning(s"got an corrupted block $blockId from $address, 
fetch again", e)
-                  corruptedBlocks += blockId
-                  fetchRequests += FetchRequest(address, Array((blockId, 
size)))
-                  result = null
-                }
-            } finally {
-              // TODO: release the buf here to free memory earlier
-              originalInput.close()
+            }
+          } catch {
+            case e: IOException =>
+              buf.release()
+              if (buf.isInstanceOf[FileSegmentManagedBuffer]
+                || corruptedBlocks.contains(blockId)) {
+                throwFetchFailedException(blockId, address, e)
+              } else {
+                logWarning(s"got an corrupted block $blockId from $address, 
fetch again", e)
+                corruptedBlocks += blockId
+                fetchRequests += FetchRequest(address, Array((blockId, size)))
+                result = null
+              }
+          } finally {
+            // TODO: release the buf here to free memory earlier
+            if (isStreamCopied) {
               in.close()
             }
           }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to