[ 
https://issues.apache.org/jira/browse/SPARK-13343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16536409#comment-16536409
 ] 

Hieu Tri Huynh edited comment on SPARK-13343 at 7/8/18 9:02 PM:
----------------------------------------------------------------

While working on this issue, I noticed another problem caused by the race 
between 2 finished attempts of a task when speculation is enabled. The problem 
is that if both 2 attempts of a ShuffleMapTask finished before the driver kill 
any of them, the accumulators will be updated multiple times. Therefore, when 
users update their accumulators in a shuffle map stage, their accumulators will 
be wrong if the race happens.

The code can show a case when this problem happens.
{code:java}
def testAccum(spark: SparkSession): Unit = {
   val accum = spark.sparkContext.longAccumulator("My Accumulator")
   val my_rdd = spark.sparkContext.parallelize(0 to 1000, 100).map(x => {
       accum.add(x)
       (x,x)
   }).groupByKey()
   println(my_rdd.count)
   println(accum.value)
}
{code}
And below is the value of the accumulator

 

!Screen Shot 2018-07-08 at 3.49.52 PM.png|width=1220,height=94!

The correct result should be 500500. 

This issue only happens for ShuffleMapTask because inside method 
handleTaskCompletion of the DAGScheduler, there is a check to make sure 
accumulator is only updated once for resultTask. But there is not such check 
for shuffleMapTask.

 


was (Author: hthuynh2):
While working on this issue, I noticed another problem caused by the race 
between 2 finished attempts of a task when speculation is enabled. The problem 
is that if both 2 attempts of a ShuffleMapTask finished before the driver kill 
any of them, the accumulators will be updated multiple times. Therefore, when 
users update their accumulators in a shuffle map stage, their accumulators will 
be wrong if the race happens.

The following function can show a case when this problem happens.
{code:java}
def testAccum(spark: SparkSession): Unit = {
   val accum = spark.sparkContext.longAccumulator("My Accumulator")
   val my_rdd = spark.sparkContext.parallelize(0 to 1000, 100).map(x => {
       accum.add(x)
       (x,x)
   }).groupByKey()
   println(my_rdd.count)
   println(accum.value)
}
{code}
And below is the value of the accumulator

 

!Screen Shot 2018-07-08 at 3.49.52 PM.png|width=1220,height=94!

The correct result should be 500500. 

This issue only happens for ShuffleMapTask because inside method 
handleTaskCompletion of the DAGScheduler, there is a check to make sure 
accumulator is only updated once for resultTask. But there is not such check 
for shuffleMapTask.

 

> speculative tasks that didn't commit shouldn't be marked as success
> -------------------------------------------------------------------
>
>                 Key: SPARK-13343
>                 URL: https://issues.apache.org/jira/browse/SPARK-13343
>             Project: Spark
>          Issue Type: Improvement
>          Components: Spark Core
>    Affects Versions: 1.6.0
>            Reporter: Thomas Graves
>            Priority: Major
>         Attachments: Screen Shot 2018-07-08 at 3.49.52 PM.png, image.png, 
> image.png
>
>
> Currently Speculative tasks that didn't commit can show up as success  
> (depending on timing of commit). This is a bit confusing because that task 
> didn't really succeed in the sense it didn't write anything.
> I think these tasks should be marked as KILLED or something that is more 
> obvious to the user exactly what happened. it is happened to hit the timing 
> where it got a commit denied exception then it shows up as failed and counts 
> against your task failures. It shouldn't count against task failures since 
> that failure really doesn't matter.
> MapReduce handles these situation so perhaps we can look there for a model.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to