[ 
https://issues.apache.org/jira/browse/SPARK-26089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16699670#comment-16699670
 ] 

Imran Rashid commented on SPARK-26089:
--------------------------------------

yeah the stacktrace is basically the same as SPARK-4105.  Its on an old 
version, but where we have a lot of backports, so there are differences, but 
not meaningful ones.

I actually think (1) is very simple, I'm not certain why that wasn't the 
initial approach.  Currently to detect corruption, the 
{{ShuffleBlockFetcherIterator}} reads an entire input stream into a 
ChunkedByteBuffer, and then passes on another inputstream from that ByteBuffer 
on:

https://github.com/apache/spark/blob/6f1a1c1248e0341a690aee655af05da9e9cbff90/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L453-L463

The idea is that when you read from that input stream, you convert any 
exception to a FetchFailedException.

But there is no reason you couldn't do that in a stream, rather than by first 
reading everything into memory.  Something like:

{code}
class FetchFailingInputStream(val wrapped: InputStream) implements InputStream {
  def read(): Int = {
    try {
      wrapped.read()
    } catch {
       case ioe: IOException => 
         throw new FetchFailedException("error reading fetched shuffle data", 
ioe)
    }
  }
  
  def read(dest: Array[Byte], offset: Int, length: Int): Int = {
    ...
  }
}
{code}

probably with a little more complicated error handling, as its done in 
ShuffleBlockFetcherIterator now.

Note that if you do that, then the FetchFailedException would already trigger 
blacklisting by the usual logic.  (OTOH, maybe it would be too aggressive -- 
would you really want to blacklist the entire node after one of these failures? 
 hard to know it wasn't an error on the receiver side.)

> Handle large corrupt shuffle blocks
> -----------------------------------
>
>                 Key: SPARK-26089
>                 URL: https://issues.apache.org/jira/browse/SPARK-26089
>             Project: Spark
>          Issue Type: Improvement
>          Components: Scheduler, Shuffle, Spark Core
>    Affects Versions: 2.4.0
>            Reporter: Imran Rashid
>            Priority: Major
>
> We've seen a bad disk lead to corruption in a shuffle block, which lead to 
> tasks repeatedly failing after fetching the data with an IOException.  The 
> tasks get retried, but the same corrupt data gets fetched again, and the 
> tasks keep failing.  As there isn't a fetch-failure, the jobs eventually 
> fail, spark never tries to regenerate the shuffle data.
> This is the same as SPARK-4105, but that fix only covered small blocks.  
> There was some discussion during that change about this limitation 
> (https://github.com/apache/spark/pull/15923#discussion_r88756017) and 
> followups to cover larger blocks (which would involve spilling to disk to 
> avoid OOM), but it looks like that never happened.
> I can think of a few approaches to this:
> 1) wrap the shuffle block input stream with another input stream, that 
> converts all exceptions into FetchFailures.  This is similar to the fix of 
> SPARK-4105, but that reads the entire input stream up-front, and instead I'm 
> proposing to do it within the InputStream itself so its streaming and does 
> not have a large memory overhead.
> 2) Add checksums to shuffle blocks.  This was proposed 
> [here|https://github.com/apache/spark/pull/15894] and abandoned as being too 
> complex.
> 3) Try to tackle this with blacklisting instead: when there is any failure in 
> a task that is reading shuffle data, assign some "blame" to the source of the 
> shuffle data, and eventually blacklist the source.  It seems really tricky to 
> get sensible heuristics for this, though.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to