Imran Rashid created SPARK-10042:
------------------------------------

             Summary: Use consistent behavior for Internal Accumulators across 
stage retries
                 Key: SPARK-10042
                 URL: https://issues.apache.org/jira/browse/SPARK-10042
             Project: Spark
          Issue Type: Improvement
          Components: Spark Core, Web UI
    Affects Versions: 1.5.0
            Reporter: Imran Rashid


[~andrewor14]

The internal accumulators introduced in SPARK-8735 aren't counted in a 
consistent manner during stage retries.  Whether the accumulators are counted 
once or multiple times is very random.

First a little interlude on how stage failures & retry works.  When there is a 
fetch failure, spark looks at the block manager that it failed to fetch data 
from, and it assumes none of the data from that BM is available.  It fails the 
stage with the fetch failure, then it goes back to the ShuffleMapStage that 
produced the data.  It looks at which partitions were stored in the failed BM, 
and it reruns just those partitions.  Meanwhile, all currently running tasks 
for current stage keep running, potentially producing more fetch failures.  In 
fact, some of those tasks can even keep running until the dependent stage has 
been re-run, and this stage has been restarted.  (Yes, this can and does happen 
under real workloads, and is the cause of a SPARK-8029, a serious failure in 
real workloads.)

If Spark has lost multiple BMs (which might mean its lost all the shuffle map 
output of an earlier stage), there are a few different ways that shuffle map 
output will get regenerated.  Perhaps there will be enough tasks running to 
trigger fetch failures on all the lost BMs before the earlier stage is 
restarted, so by the time the stage is re-scheduled, the scheduler knows to 
rerun all the tasks.  Or maybe it only gets a failure on one block manager, so 
it re-generates the map output for that one block manager, and then on trying 
the downstream stage, it realizes another block manager is down, and repeats 
the process, one BM at a time, till everything has been regenerated.  Or 
perhaps as its regenerating the map output from the first failure, the "zombie" 
tasks from the failed stage that are still running trigger fetch failures from 
all the other block managers.  And then as soon as shuffle map stage is done 
regenerating data for one BM, it'll immediately regenerate the data for the 
other lost BMs before trying the downstream stage.  (And then there are 
assorted combinations as well.)

This means that it is totally unpredictable how many partitions will get rerun 
for the ShuffleMapStage that was previously successful.  Eg., run your example 
program:

{noformat}
import org.apache.spark._
import org.apache.spark.shuffle.FetchFailedException

val data = sc.parallelize(1 to 1e3.toInt, 500).map(identity).groupBy(identity)
val shuffleHandle = data.dependencies.head.asInstanceOf[ShuffleDependency[_, _, 
_]].shuffleHandle

// Simulate fetch failures
val mappedData = data.map { case (i, _) =>
  val taskContext = TaskContext.get
  if (taskContext.attemptNumber() == 0 && taskContext.partitionId() == 50) {
    // Cause the post-shuffle stage to fail on its first attempt with a single 
task failure
    val env = SparkEnv.get
    val bmAddress = env.blockManager.blockManagerId
    val shuffleId = shuffleHandle.shuffleId
    val mapId = 0
    val reduceId = taskContext.partitionId()
    val message = "Simulated fetch failure"
    throw new FetchFailedException(bmAddress, shuffleId, mapId, reduceId, 
message)
  } else {
    (i, i)
  }
}

mappedData.reduceByKey ({ _ + _ }, 500).count()

{noformat}


with the current condition on resetting the accumulators, that is 
{{(stage.internalAccumulators.isEmpty || allPartitions == 
partitionsToCompute)}}.  In {{local}} mode all partitions will get re-run.  
Then try running it with {{local-cluster[2,1,1024]}} (which will create two 
block managers).  Here's some example debug output from when I ran it:

{noformat}
=== STAGE ShuffleMapStage (0) IS SUBMITTING MISSING TASKS ===
  - all partitions: 0, 1, 2, 3, 4
  - partitions to compute: 0, 1, 2, 3, 4
  - internal accum values:
=== STAGE 0 IS CREATING NEW ACCUMULATORS ===
=== STAGE ShuffleMapStage (1) IS SUBMITTING MISSING TASKS ===
  - all partitions: 0, 1, 2, 3, 4
  - partitions to compute: 0, 1, 2, 3, 4
  - internal accum values:
=== STAGE 1 IS CREATING NEW ACCUMULATORS ===
15/08/14 17:17:41 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 6, 
192.168.1.106): FetchFailed(BlockManagerId(0, 192.168.1.106, 61639), 
shuffleId=0, mapId=0, reduceId=0, message=
org.apache.spark.shuffle.FetchFailedException: Simulated fetch failure
...)
=== STAGE ShuffleMapStage (0) IS SUBMITTING MISSING TASKS ===
  - all partitions: 0, 1, 2, 3, 4
  - partitions to compute: 1, 2, 4
  - internal accum values: 0
=== STAGE ShuffleMapStage (1) IS SUBMITTING MISSING TASKS ===
  - all partitions: 0, 1, 2, 3, 4
  - partitions to compute: 0, 2, 3, 4
  - internal accum values: 3936
15/08/14 17:17:41 WARN TaskSetManager: Lost task 0.0 in stage 1.1 (TID 11, 
192.168.1.106): FetchFailed(BlockManagerId(0, 192.168.1.106, 61639), 
shuffleId=0, mapId=0, reduceId=0, message=
org.apache.spark.shuffle.FetchFailedException: Simulated fetch failure
...
=== STAGE ShuffleMapStage (0) IS SUBMITTING MISSING TASKS ===
  - all partitions: 0, 1, 2, 3, 4
  - partitions to compute: 2
  - internal accum values: 0
=== STAGE ShuffleMapStage (1) IS SUBMITTING MISSING TASKS ===
  - all partitions: 0, 1, 2, 3, 4
  - partitions to compute: 0, 4
  - internal accum values: 7872
15/08/14 17:17:41 WARN TaskSetManager: Lost task 0.0 in stage 1.2 (TID 15, 
192.168.1.106): FetchFailed(BlockManagerId(1, 192.168.1.106, 61640), 
shuffleId=0, mapId=0, reduceId=0, message=
org.apache.spark.shuffle.FetchFailedException: Simulated fetch failure
...
=== STAGE ShuffleMapStage (0) IS SUBMITTING MISSING TASKS ===
  - all partitions: 0, 1, 2, 3, 4
  - partitions to compute: 0, 1, 3, 4
  - internal accum values: 0
=== STAGE ShuffleMapStage (1) IS SUBMITTING MISSING TASKS ===
  - all partitions: 0, 1, 2, 3, 4
  - partitions to compute: 0, 1, 2, 3, 4
  - internal accum values: 9840
=== STAGE 1 IS CREATING NEW ACCUMULATORS ===
...
{noformat}

As you can see, {{partitionsToCompute != allPartitions}} in most cases.  For 
example, in the second submission of stage 1, we would have **double counted** 
the accumulators for partitions 0,2,3,4.  By the third submission of stage 1, 
we would have **triple counted** partitions 0 & 4.  Or then again, we just 
might reset the values and count singly, as we do in the final iteration you 
see here.

I had earlier suggested that we should never reset the value, just initialize 
it once, and have the value keep increasing.  But maybe that isn't what you 
want -- maybe you want to *always* reset the value?  Then the metrics would 
clearly apply to that one stage *attempt* alone.  In any case, we are stuck 
with the fact that skipped stages (which come from a shared narrow dependency) 
do not share the same {{Stage}} object, even though they are conceptually the 
same stage to a user.  So retries from skipped stages also suggests that our 
goal should be for each attempt to have a cleared value for the accumulators, 
since that is the behavior we're stuck with on retries via a skipped stage in 
any case.  We could either always reset internal accumulators, or have them be 
a property of the stage **attempt** which just gets intiailized w/ the attempt 
and never reset.

Another option would be for the UI to just display the *update* from each task, 
rather than the accumulator value at the end of the task

https://github.com/apache/spark/blob/cf016075a006034c24c5b758edb279f3e151d25d/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala#L372

That would make the global value of the accumulator entirely irrelevant.  In 
fact, I'm not certain how to interpret the sum of the memory used in each task. 
 If I have 10K tasks, running in 20 slots, the sum across all 10K tasks is 
probably over-estimating the memory used by 500x.  Its even stranger to report 
the quartiles of that partial sum as tasks complete.  I highly doubt most users 
will understand what that summary metric means, and even if they did 
understand, it seems to have very little value.

(Only using the *update* from each task would also mean that we wouldn't be 
using the accumulators to "accumulate" anything, it just becomes the place we 
happen to cram our per-task metrics.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to