[
https://issues.apache.org/jira/browse/SPARK-19276?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Imran Rashid updated SPARK-19276:
---------------------------------
Description:
The scheduler handles node failures by looking for a special
{{FetchFailedException}} thrown by the shuffle block fetcher. This is handled
in {{Executor}} and then passed as a special msg back to the driver:
https://github.com/apache/spark/blob/278fa1eb305220a85c816c948932d6af8fa619aa/core/src/main/scala/org/apache/spark/executor/Executor.scala#L403
However, user code exists in between the shuffle block fetcher and that catch
block -- it could intercept the exception, wrap it with something else, and
throw a different exception. If that happens, spark treats it as an ordinary
task failure, and retries the task, rather than regenerating the missing
shuffle data. The task eventually is retried 4 times, its doomed to fail each
time, and the job is failed.
You might think that no user code should do that -- but even sparksql does it:
https://github.com/apache/spark/blob/278fa1eb305220a85c816c948932d6af8fa619aa/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala#L214
Here's an example stack trace. This is from Spark 1.6, so the sql code is not
the same, but the problem is still there:
{noformat}
17/01/13 19:18:02 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 1983.0
(TID 304851, xxx): org.apache.spark.SparkException: Task failed while writing
rows.
at
org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:414)
at
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150)
at
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.shuffle.FetchFailedException: Failed to connect to
xxx/yyy:zzz
at
org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:323)
...
17/01/13 19:19:29 ERROR scheduler.TaskSetManager: Task 0 in stage 1983.0 failed
4 times; aborting job
{noformat}
I think the right fix here is to also set a fetch failure status in the
{{TaskContextImpl}}, so the executor can check that instead of just one
exception.
was:
The scheduler handles node failures by looking for a special
{{FetchFailedException}} thrown by the shuffle block fetcher. This is handled
in {{Executor}} and then passed as a special msg back to the driver:
https://github.com/apache/spark/blob/278fa1eb305220a85c816c948932d6af8fa619aa/core/src/main/scala/org/apache/spark/executor/Executor.scala#L403
However, user code exists in between the shuffle block fetcher and that catch
block -- it could intercept the exception, wrap it with something else, and
throw a different exception. If that happens, spark treats it as an ordinary
task failure, and retries the task, rather than regenerating the missing
shuffle data. The task eventually is retried 4 times, its doomed to fail each
time, and the job is failed.
You might think that no user code should do that -- but even sparksql does it:
https://github.com/apache/spark/blob/278fa1eb305220a85c816c948932d6af8fa619aa/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala#L214
I think the right fix here is to also set a fetch failure status in the
{{TaskContextImpl}}, so the executor can check that instead of just one
exception.
> FetchFailures can be hidden be user (or sql) exception handling
> ---------------------------------------------------------------
>
> Key: SPARK-19276
> URL: https://issues.apache.org/jira/browse/SPARK-19276
> Project: Spark
> Issue Type: Bug
> Components: Scheduler, Spark Core, SQL
> Affects Versions: 2.1.0
> Reporter: Imran Rashid
> Priority: Critical
>
> The scheduler handles node failures by looking for a special
> {{FetchFailedException}} thrown by the shuffle block fetcher. This is
> handled in {{Executor}} and then passed as a special msg back to the driver:
> https://github.com/apache/spark/blob/278fa1eb305220a85c816c948932d6af8fa619aa/core/src/main/scala/org/apache/spark/executor/Executor.scala#L403
> However, user code exists in between the shuffle block fetcher and that catch
> block -- it could intercept the exception, wrap it with something else, and
> throw a different exception. If that happens, spark treats it as an ordinary
> task failure, and retries the task, rather than regenerating the missing
> shuffle data. The task eventually is retried 4 times, its doomed to fail
> each time, and the job is failed.
> You might think that no user code should do that -- but even sparksql does it:
> https://github.com/apache/spark/blob/278fa1eb305220a85c816c948932d6af8fa619aa/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala#L214
> Here's an example stack trace. This is from Spark 1.6, so the sql code is
> not the same, but the problem is still there:
> {noformat}
> 17/01/13 19:18:02 WARN scheduler.TaskSetManager: Lost task 0.0 in stage
> 1983.0 (TID 304851, xxx): org.apache.spark.SparkException: Task failed while
> writing rows.
> at
> org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:414)
> at
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150)
> at
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
> at org.apache.spark.scheduler.Task.run(Task.scala:89)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.spark.shuffle.FetchFailedException: Failed to connect
> to xxx/yyy:zzz
> at
> org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:323)
> ...
> 17/01/13 19:19:29 ERROR scheduler.TaskSetManager: Task 0 in stage 1983.0
> failed 4 times; aborting job
> {noformat}
> I think the right fix here is to also set a fetch failure status in the
> {{TaskContextImpl}}, so the executor can check that instead of just one
> exception.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]