[ https://issues.apache.org/jira/browse/SPARK-13343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16536409#comment-16536409 ]
Hieu Tri Huynh edited comment on SPARK-13343 at 7/8/18 9:02 PM: ---------------------------------------------------------------- While working on this issue, I noticed another problem caused by the race between 2 finished attempts of a task when speculation is enabled. The problem is that if both 2 attempts of a ShuffleMapTask finished before the driver kill any of them, the accumulators will be updated multiple times. Therefore, when users update their accumulators in a shuffle map stage, their accumulators will be wrong if the race happens. The code can show a case when this problem happens. {code:java} def testAccum(spark: SparkSession): Unit = { val accum = spark.sparkContext.longAccumulator("My Accumulator") val my_rdd = spark.sparkContext.parallelize(0 to 1000, 100).map(x => { accum.add(x) (x,x) }).groupByKey() println(my_rdd.count) println(accum.value) } {code} And below is the value of the accumulator !Screen Shot 2018-07-08 at 3.49.52 PM.png|width=1220,height=94! The correct result should be 500500. This issue only happens for ShuffleMapTask because inside method handleTaskCompletion of the DAGScheduler, there is a check to make sure accumulator is only updated once for resultTask. But there is not such check for shuffleMapTask. was (Author: hthuynh2): While working on this issue, I noticed another problem caused by the race between 2 finished attempts of a task when speculation is enabled. The problem is that if both 2 attempts of a ShuffleMapTask finished before the driver kill any of them, the accumulators will be updated multiple times. Therefore, when users update their accumulators in a shuffle map stage, their accumulators will be wrong if the race happens. The following function can show a case when this problem happens. {code:java} def testAccum(spark: SparkSession): Unit = { val accum = spark.sparkContext.longAccumulator("My Accumulator") val my_rdd = spark.sparkContext.parallelize(0 to 1000, 100).map(x => { accum.add(x) (x,x) }).groupByKey() println(my_rdd.count) println(accum.value) } {code} And below is the value of the accumulator !Screen Shot 2018-07-08 at 3.49.52 PM.png|width=1220,height=94! The correct result should be 500500. This issue only happens for ShuffleMapTask because inside method handleTaskCompletion of the DAGScheduler, there is a check to make sure accumulator is only updated once for resultTask. But there is not such check for shuffleMapTask. > speculative tasks that didn't commit shouldn't be marked as success > ------------------------------------------------------------------- > > Key: SPARK-13343 > URL: https://issues.apache.org/jira/browse/SPARK-13343 > Project: Spark > Issue Type: Improvement > Components: Spark Core > Affects Versions: 1.6.0 > Reporter: Thomas Graves > Priority: Major > Attachments: Screen Shot 2018-07-08 at 3.49.52 PM.png, image.png, > image.png > > > Currently Speculative tasks that didn't commit can show up as success > (depending on timing of commit). This is a bit confusing because that task > didn't really succeed in the sense it didn't write anything. > I think these tasks should be marked as KILLED or something that is more > obvious to the user exactly what happened. it is happened to hit the timing > where it got a commit denied exception then it shows up as failed and counts > against your task failures. It shouldn't count against task failures since > that failure really doesn't matter. > MapReduce handles these situation so perhaps we can look there for a model. -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org