Reza Safi created SPARK-25318:
---------------------------------
Summary: Add exception handling when wrapping the input stream
during the the fetch or stage retry in response to a corrupted block
Key: SPARK-25318
URL: https://issues.apache.org/jira/browse/SPARK-25318
Project: Spark
Issue Type: Improvement
Components: Spark Core
Affects Versions: 2.3.1, 2.2.2, 2.1.3, 2.4.0
Reporter: Reza Safi
SPARK-4105 provided a solution to block corruption issue by retrying the fetch
or the stage. In the solution there is a step that wraps the input stream with
compression and/or encryption. This step is prune to exceptions, but in the
current code there is no exception handling for this step and this has caused
confusion for the user.. In fact we have customers who reported an exception
like the following when SPARK-4105 is available to them:
{noformat}
2018-08-28 22:35:54,361 ERROR [Driver]
org.apache.spark.deploy.yarn.ApplicationMaster:95 User class threw exception:
java.lang.RuntimeException: org.apache.spark.SparkException: Job aborted due to
stage failure: Task 452 in stage 209.0 failed 4 times, most recent
failure: Lost task 452.3 in stage y.0 (TID z, xxxxx, executor xx):
java.io.IOException: FAILED_TO_UNCOMPRESS(5)
3976 at
org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:78)
3977 at org.xerial.snappy.SnappyNative.rawUncompress(Native Method)
3978 at org.xerial.snappy.Snappy.rawUncompress(Snappy.java:395)
3979 at org.xerial.snappy.Snappy.uncompress(Snappy.java:431)
3980 at
org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:127)
3981 at
org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:88)
3982 at
org.xerial.snappy.SnappyInputStream.<init>(SnappyInputStream.java:58)
3983 at
org.apache.spark.io.SnappyCompressionCodec.compressedInputStream(CompressionCodec.scala:159)
3984 at
org.apache.spark.storage.BlockManager.wrapForCompression(BlockManager.scala:1219)
3985 at
org.apache.spark.shuffle.BlockStoreShuffleReader$$anonfun$2.apply(BlockStoreShuffleReader.scala:48)
3986 at
org.apache.spark.shuffle.BlockStoreShuffleReader$$anonfun$2.apply(BlockStoreShuffleReader.scala:47)
3987 at
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:328)
3988 at
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:55)
3989 at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
3990 a
{noformat}
In this customer's version of spark, line 328 of
ShuffleBlockFetcherIterator.scala is the line that the following occurs:
{noformat}
input = streamWrapper(blockId, in)
{noformat}
It would be nice to add exception handling around this line to avoid confusions.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]