Imran Rashid created SPARK-10042:
------------------------------------
Summary: Use consistent behavior for Internal Accumulators across
stage retries
Key: SPARK-10042
URL: https://issues.apache.org/jira/browse/SPARK-10042
Project: Spark
Issue Type: Improvement
Components: Spark Core, Web UI
Affects Versions: 1.5.0
Reporter: Imran Rashid
[~andrewor14]
The internal accumulators introduced in SPARK-8735 aren't counted in a
consistent manner during stage retries. Whether the accumulators are counted
once or multiple times is very random.
First a little interlude on how stage failures & retry works. When there is a
fetch failure, spark looks at the block manager that it failed to fetch data
from, and it assumes none of the data from that BM is available. It fails the
stage with the fetch failure, then it goes back to the ShuffleMapStage that
produced the data. It looks at which partitions were stored in the failed BM,
and it reruns just those partitions. Meanwhile, all currently running tasks
for current stage keep running, potentially producing more fetch failures. In
fact, some of those tasks can even keep running until the dependent stage has
been re-run, and this stage has been restarted. (Yes, this can and does happen
under real workloads, and is the cause of a SPARK-8029, a serious failure in
real workloads.)
If Spark has lost multiple BMs (which might mean its lost all the shuffle map
output of an earlier stage), there are a few different ways that shuffle map
output will get regenerated. Perhaps there will be enough tasks running to
trigger fetch failures on all the lost BMs before the earlier stage is
restarted, so by the time the stage is re-scheduled, the scheduler knows to
rerun all the tasks. Or maybe it only gets a failure on one block manager, so
it re-generates the map output for that one block manager, and then on trying
the downstream stage, it realizes another block manager is down, and repeats
the process, one BM at a time, till everything has been regenerated. Or
perhaps as its regenerating the map output from the first failure, the "zombie"
tasks from the failed stage that are still running trigger fetch failures from
all the other block managers. And then as soon as shuffle map stage is done
regenerating data for one BM, it'll immediately regenerate the data for the
other lost BMs before trying the downstream stage. (And then there are
assorted combinations as well.)
This means that it is totally unpredictable how many partitions will get rerun
for the ShuffleMapStage that was previously successful. Eg., run your example
program:
{noformat}
import org.apache.spark._
import org.apache.spark.shuffle.FetchFailedException
val data = sc.parallelize(1 to 1e3.toInt, 500).map(identity).groupBy(identity)
val shuffleHandle = data.dependencies.head.asInstanceOf[ShuffleDependency[_, _,
_]].shuffleHandle
// Simulate fetch failures
val mappedData = data.map { case (i, _) =>
val taskContext = TaskContext.get
if (taskContext.attemptNumber() == 0 && taskContext.partitionId() == 50) {
// Cause the post-shuffle stage to fail on its first attempt with a single
task failure
val env = SparkEnv.get
val bmAddress = env.blockManager.blockManagerId
val shuffleId = shuffleHandle.shuffleId
val mapId = 0
val reduceId = taskContext.partitionId()
val message = "Simulated fetch failure"
throw new FetchFailedException(bmAddress, shuffleId, mapId, reduceId,
message)
} else {
(i, i)
}
}
mappedData.reduceByKey ({ _ + _ }, 500).count()
{noformat}
with the current condition on resetting the accumulators, that is
{{(stage.internalAccumulators.isEmpty || allPartitions ==
partitionsToCompute)}}. In {{local}} mode all partitions will get re-run.
Then try running it with {{local-cluster[2,1,1024]}} (which will create two
block managers). Here's some example debug output from when I ran it:
{noformat}
=== STAGE ShuffleMapStage (0) IS SUBMITTING MISSING TASKS ===
- all partitions: 0, 1, 2, 3, 4
- partitions to compute: 0, 1, 2, 3, 4
- internal accum values:
=== STAGE 0 IS CREATING NEW ACCUMULATORS ===
=== STAGE ShuffleMapStage (1) IS SUBMITTING MISSING TASKS ===
- all partitions: 0, 1, 2, 3, 4
- partitions to compute: 0, 1, 2, 3, 4
- internal accum values:
=== STAGE 1 IS CREATING NEW ACCUMULATORS ===
15/08/14 17:17:41 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 6,
192.168.1.106): FetchFailed(BlockManagerId(0, 192.168.1.106, 61639),
shuffleId=0, mapId=0, reduceId=0, message=
org.apache.spark.shuffle.FetchFailedException: Simulated fetch failure
...)
=== STAGE ShuffleMapStage (0) IS SUBMITTING MISSING TASKS ===
- all partitions: 0, 1, 2, 3, 4
- partitions to compute: 1, 2, 4
- internal accum values: 0
=== STAGE ShuffleMapStage (1) IS SUBMITTING MISSING TASKS ===
- all partitions: 0, 1, 2, 3, 4
- partitions to compute: 0, 2, 3, 4
- internal accum values: 3936
15/08/14 17:17:41 WARN TaskSetManager: Lost task 0.0 in stage 1.1 (TID 11,
192.168.1.106): FetchFailed(BlockManagerId(0, 192.168.1.106, 61639),
shuffleId=0, mapId=0, reduceId=0, message=
org.apache.spark.shuffle.FetchFailedException: Simulated fetch failure
...
=== STAGE ShuffleMapStage (0) IS SUBMITTING MISSING TASKS ===
- all partitions: 0, 1, 2, 3, 4
- partitions to compute: 2
- internal accum values: 0
=== STAGE ShuffleMapStage (1) IS SUBMITTING MISSING TASKS ===
- all partitions: 0, 1, 2, 3, 4
- partitions to compute: 0, 4
- internal accum values: 7872
15/08/14 17:17:41 WARN TaskSetManager: Lost task 0.0 in stage 1.2 (TID 15,
192.168.1.106): FetchFailed(BlockManagerId(1, 192.168.1.106, 61640),
shuffleId=0, mapId=0, reduceId=0, message=
org.apache.spark.shuffle.FetchFailedException: Simulated fetch failure
...
=== STAGE ShuffleMapStage (0) IS SUBMITTING MISSING TASKS ===
- all partitions: 0, 1, 2, 3, 4
- partitions to compute: 0, 1, 3, 4
- internal accum values: 0
=== STAGE ShuffleMapStage (1) IS SUBMITTING MISSING TASKS ===
- all partitions: 0, 1, 2, 3, 4
- partitions to compute: 0, 1, 2, 3, 4
- internal accum values: 9840
=== STAGE 1 IS CREATING NEW ACCUMULATORS ===
...
{noformat}
As you can see, {{partitionsToCompute != allPartitions}} in most cases. For
example, in the second submission of stage 1, we would have **double counted**
the accumulators for partitions 0,2,3,4. By the third submission of stage 1,
we would have **triple counted** partitions 0 & 4. Or then again, we just
might reset the values and count singly, as we do in the final iteration you
see here.
I had earlier suggested that we should never reset the value, just initialize
it once, and have the value keep increasing. But maybe that isn't what you
want -- maybe you want to *always* reset the value? Then the metrics would
clearly apply to that one stage *attempt* alone. In any case, we are stuck
with the fact that skipped stages (which come from a shared narrow dependency)
do not share the same {{Stage}} object, even though they are conceptually the
same stage to a user. So retries from skipped stages also suggests that our
goal should be for each attempt to have a cleared value for the accumulators,
since that is the behavior we're stuck with on retries via a skipped stage in
any case. We could either always reset internal accumulators, or have them be
a property of the stage **attempt** which just gets intiailized w/ the attempt
and never reset.
Another option would be for the UI to just display the *update* from each task,
rather than the accumulator value at the end of the task
https://github.com/apache/spark/blob/cf016075a006034c24c5b758edb279f3e151d25d/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala#L372
That would make the global value of the accumulator entirely irrelevant. In
fact, I'm not certain how to interpret the sum of the memory used in each task.
If I have 10K tasks, running in 20 slots, the sum across all 10K tasks is
probably over-estimating the memory used by 500x. Its even stranger to report
the quartiles of that partial sum as tasks complete. I highly doubt most users
will understand what that summary metric means, and even if they did
understand, it seems to have very little value.
(Only using the *update* from each task would also mean that we wouldn't be
using the accumulators to "accumulate" anything, it just becomes the place we
happen to cram our per-task metrics.)
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]