[
https://issues.apache.org/jira/browse/SPARK-21009?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16041039#comment-16041039
]
Yuming Wang commented on SPARK-21009:
-------------------------------------
This could be duplicate of
[SPARK-20342|https://issues.apache.org/jira/browse/SPARK-20342].
> SparkListenerTaskEnd.taskInfo.accumulables might not be accurate
> ----------------------------------------------------------------
>
> Key: SPARK-21009
> URL: https://issues.apache.org/jira/browse/SPARK-21009
> Project: Spark
> Issue Type: Bug
> Components: Spark Core
> Affects Versions: 2.2.0
> Reporter: Bogdan Raducanu
>
> The following code reproduces it:
> {code}
> test("test") {
> val foundMetrics = mutable.Set.empty[String]
> spark.sparkContext.addSparkListener(new SparkListener {
> override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
> taskEnd.taskInfo.accumulables.foreach { a =>
> if (a.name.isDefined) {
> foundMetrics.add(a.name.get)
> }
> }
> }
> })
> for (iter <- 0 until 100) {
> foundMetrics.clear()
> println(s"iter = $iter")
> spark.range(10).groupBy().agg("id" -> "sum").collect
> spark.sparkContext.listenerBus.waitUntilEmpty(3000)
> assert(foundMetrics.size > 0)
> }
> }
> {code}
> The problem comes from DAGScheduler.handleTaskCompletion.
> The SparkListenerTaskEnd event is sent before updateAccumulators is called,
> so it might not be up to date.
> The code there looks like it needs refactoring.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]