Repository: spark Updated Branches: refs/heads/branch-2.4 99698279d -> dc6047613
[SPARK-25318] Add exception handling when wrapping the input stream during the the fetch or stage retry in response to a corrupted block SPARK-4105 provided a solution to block corruption issue by retrying the fetch or the stage. In that solution there is a step that wraps the input stream with compression and/or encryption. This step is prone to exceptions, but in the current code there is no exception handling for this step and this has caused confusion for the user. The confusion was that after SPARK-4105 the user expects to see either a fetchFailed exception or a warning about a corrupted block. However an exception during wrapping can fail the job without any of those. This change adds exception handling for the wrapping step and also adds a fetch retry if we experience a corruption during the wrapping step. The reason for adding the retry is that usually user won't experience the same failure after rerunning the job and so it seems reasonable try to fetch and wrap one more time instead of failing. Closes #22325 from rezasafi/localcorruption. Authored-by: Reza Safi <rezas...@cloudera.com> Signed-off-by: Marcelo Vanzin <van...@cloudera.com> (cherry picked from commit bd2ae857d1c5f251056de38a7a40540986756b94) Signed-off-by: Marcelo Vanzin <van...@cloudera.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dc604761 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dc604761 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dc604761 Branch: refs/heads/branch-2.4 Commit: dc60476133b76d8e80ac4d925d6e146cf65e9d2b Parents: 9969827 Author: Reza Safi <rezas...@cloudera.com> Authored: Wed Sep 26 09:29:58 2018 -0700 Committer: Marcelo Vanzin <van...@cloudera.com> Committed: Wed Sep 26 09:30:13 2018 -0700 ---------------------------------------------------------------------- .../storage/ShuffleBlockFetcherIterator.scala | 50 ++++++++++---------- 1 file changed, 25 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/dc604761/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala index e534c74..aecc228 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala @@ -448,35 +448,35 @@ final class ShuffleBlockFetcherIterator( buf.release() throwFetchFailedException(blockId, address, e) } - - input = streamWrapper(blockId, in) - // Only copy the stream if it's wrapped by compression or encryption, also the size of - // block is small (the decompressed block is smaller than maxBytesInFlight) - if (detectCorrupt && !input.eq(in) && size < maxBytesInFlight / 3) { - val originalInput = input - val out = new ChunkedByteBufferOutputStream(64 * 1024, ByteBuffer.allocate) - try { + var isStreamCopied: Boolean = false + try { + input = streamWrapper(blockId, in) + // Only copy the stream if it's wrapped by compression or encryption, also the size of + // block is small (the decompressed block is smaller than maxBytesInFlight) + if (detectCorrupt && !input.eq(in) && size < maxBytesInFlight / 3) { + isStreamCopied = true + val out = new ChunkedByteBufferOutputStream(64 * 1024, ByteBuffer.allocate) // Decompress the whole block at once to detect any corruption, which could increase // the memory usage tne potential increase the chance of OOM. // TODO: manage the memory used here, and spill it into disk in case of OOM. - Utils.copyStream(input, out) - out.close() + Utils.copyStream(input, out, closeStreams = true) input = out.toChunkedByteBuffer.toInputStream(dispose = true) - } catch { - case e: IOException => - buf.release() - if (buf.isInstanceOf[FileSegmentManagedBuffer] - || corruptedBlocks.contains(blockId)) { - throwFetchFailedException(blockId, address, e) - } else { - logWarning(s"got an corrupted block $blockId from $address, fetch again", e) - corruptedBlocks += blockId - fetchRequests += FetchRequest(address, Array((blockId, size))) - result = null - } - } finally { - // TODO: release the buf here to free memory earlier - originalInput.close() + } + } catch { + case e: IOException => + buf.release() + if (buf.isInstanceOf[FileSegmentManagedBuffer] + || corruptedBlocks.contains(blockId)) { + throwFetchFailedException(blockId, address, e) + } else { + logWarning(s"got an corrupted block $blockId from $address, fetch again", e) + corruptedBlocks += blockId + fetchRequests += FetchRequest(address, Array((blockId, size))) + result = null + } + } finally { + // TODO: release the buf here to free memory earlier + if (isStreamCopied) { in.close() } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org