[
https://issues.apache.org/jira/browse/SPARK-17610?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Tao Wang updated SPARK-17610:
-----------------------------
Description:
We have a problem in our environment, in which the failed stage has not been
resubmitted ever. Because it is caused by FetchFailed exception, I took a look
at the corresponsive code segment and found some issues:
In DAGScheduler.handleTaskCompletion, it first check if the `failedStages` is
empty, and do two steps when the answer is true:
1. send `ResubmitFailedStages` to evnetProcessLoop
2. add failed stages into `failedStages`
in `eventProcessLoop`, it first take all elements in `failedStages` to resubmit
them, then clear the set.
If the event happens like below, there might be some problem:
assume t1 < t2 < t3
at t1, failed stage 1 was handled, the ResubmitFailedStages was send to
eventProcessLoop
at t2, eventProcessLoop handle the ResubmitFailedStages and clear the empty
`failedStages`
at t3, failed stage 1 was added into `failedStages`
now failed stage 1 has not been resubmitted for now.
after anytime at t3, the `failedStages` will never be empty even if we have new
failed stages caused by FetchFailed coming in, because the `failedStages`
containing failed stage 1 is not empty.
The codes is below:
{code}
} else if (failedStages.isEmpty) {
// Don't schedule an event to resubmit failed stages if failed
isn't empty, because
// in that case the event will already have been scheduled.
// TODO: Cancel running tasks in the stage
logInfo(s"Resubmitting $mapStage (${mapStage.name}) and " +
s"$failedStage (${failedStage.name}) due to fetch failure")
messageScheduler.schedule(new Runnable {
override def run(): Unit =
eventProcessLoop.post(ResubmitFailedStages)
}, DAGScheduler.RESUBMIT_TIMEOUT, TimeUnit.MILLISECONDS)
}
failedStages += failedStage
failedStages += mapStage
{code}
was:
We have a problem in our environment, in which the failed stage has not been
resubmitted ever. Because it is caused by FetchFailed exception, I took a look
at the corresponsive code segment and found some issues:
In DAGScheduler.handleTaskCompletion, it first check if the `failedStages` is
empty, and do two steps when the answer is true:
1. send `ResubmitFailedStages` to evnetProcessLoop
2. add failed stages into `failedStages`
in `eventProcessLoop`, it first take all elements in `failedStages` to resubmit
them, then clear the set.
If the event happens like below, there might be some problem:
assume t1 < t2 < t3
at t1, failed stage 1 was handled, the ResubmitFailedStages was send to
eventProcessLoop
at t2, eventProcessLoop handle the ResubmitFailedStages and clear the empty
`failedStages`
at t3, failed stage 1 was added into `failedStages`
now failed stage 1 has not been resubmitted for now.
after anytime at t3, the `failedStages` will never be empty even if we have new
failed stages caused by FetchFailed coming in, because the `failedStages`
containing failed stage 1 is not empty.
> The failed stage caused by FetchFailed may never be resubmitted
> ---------------------------------------------------------------
>
> Key: SPARK-17610
> URL: https://issues.apache.org/jira/browse/SPARK-17610
> Project: Spark
> Issue Type: Bug
> Components: Scheduler
> Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.6.0, 1.6.1, 1.6.2, 2.0.0
> Reporter: Tao Wang
> Priority: Critical
>
> We have a problem in our environment, in which the failed stage has not been
> resubmitted ever. Because it is caused by FetchFailed exception, I took a
> look at the corresponsive code segment and found some issues:
> In DAGScheduler.handleTaskCompletion, it first check if the `failedStages` is
> empty, and do two steps when the answer is true:
> 1. send `ResubmitFailedStages` to evnetProcessLoop
> 2. add failed stages into `failedStages`
> in `eventProcessLoop`, it first take all elements in `failedStages` to
> resubmit them, then clear the set.
> If the event happens like below, there might be some problem:
> assume t1 < t2 < t3
> at t1, failed stage 1 was handled, the ResubmitFailedStages was send to
> eventProcessLoop
> at t2, eventProcessLoop handle the ResubmitFailedStages and clear the empty
> `failedStages`
> at t3, failed stage 1 was added into `failedStages`
> now failed stage 1 has not been resubmitted for now.
> after anytime at t3, the `failedStages` will never be empty even if we have
> new failed stages caused by FetchFailed coming in, because the `failedStages`
> containing failed stage 1 is not empty.
> The codes is below:
> {code}
> } else if (failedStages.isEmpty) {
> // Don't schedule an event to resubmit failed stages if failed
> isn't empty, because
> // in that case the event will already have been scheduled.
> // TODO: Cancel running tasks in the stage
> logInfo(s"Resubmitting $mapStage (${mapStage.name}) and " +
> s"$failedStage (${failedStage.name}) due to fetch failure")
> messageScheduler.schedule(new Runnable {
> override def run(): Unit =
> eventProcessLoop.post(ResubmitFailedStages)
> }, DAGScheduler.RESUBMIT_TIMEOUT, TimeUnit.MILLISECONDS)
> }
> failedStages += failedStage
> failedStages += mapStage
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]