Repository: spark
Updated Branches:
  refs/heads/master eb45b52e8 -> 40b983c3b


[SPARK-22952][CORE] Deprecate stageAttemptId in favour of stageAttemptNumber

## What changes were proposed in this pull request?
1.  Deprecate attemptId in StageInfo and add `def attemptNumber() = attemptId`
2. Replace usage of stageAttemptId with stageAttemptNumber

## How was this patch tested?
I manually checked the compiler warning info

Author: Xianjin YE <[email protected]>

Closes #20178 from advancedxy/SPARK-22952.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/40b983c3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/40b983c3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/40b983c3

Branch: refs/heads/master
Commit: 40b983c3b44b6771f07302ce87987fa4716b5ebf
Parents: eb45b52
Author: Xianjin YE <[email protected]>
Authored: Mon Jan 8 23:49:07 2018 +0800
Committer: Wenchen Fan <[email protected]>
Committed: Mon Jan 8 23:49:07 2018 +0800

----------------------------------------------------------------------
 .../apache/spark/scheduler/DAGScheduler.scala   | 15 +++---
 .../org/apache/spark/scheduler/StageInfo.scala  |  4 +-
 .../spark/scheduler/StatsReportListener.scala   |  2 +-
 .../apache/spark/status/AppStatusListener.scala |  7 +--
 .../org/apache/spark/status/LiveEntity.scala    |  4 +-
 .../spark/ui/scope/RDDOperationGraph.scala      |  2 +-
 .../org/apache/spark/util/JsonProtocol.scala    |  2 +-
 .../spark/status/AppStatusListenerSuite.scala   | 54 +++++++++++---------
 .../sql/execution/ui/SQLAppStatusListener.scala |  2 +-
 9 files changed, 51 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/40b983c3/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index c2498d4..199937b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -815,7 +815,8 @@ class DAGScheduler(
   private[scheduler] def handleBeginEvent(task: Task[_], taskInfo: TaskInfo) {
     // Note that there is a chance that this task is launched after the stage 
is cancelled.
     // In that case, we wouldn't have the stage anymore in stageIdToStage.
-    val stageAttemptId = 
stageIdToStage.get(task.stageId).map(_.latestInfo.attemptId).getOrElse(-1)
+    val stageAttemptId =
+      
stageIdToStage.get(task.stageId).map(_.latestInfo.attemptNumber).getOrElse(-1)
     listenerBus.post(SparkListenerTaskStart(task.stageId, stageAttemptId, 
taskInfo))
   }
 
@@ -1050,7 +1051,7 @@ class DAGScheduler(
             val locs = taskIdToLocations(id)
             val part = stage.rdd.partitions(id)
             stage.pendingPartitions += id
-            new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
+            new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber,
               taskBinary, part, locs, properties, serializedTaskMetrics, 
Option(jobId),
               Option(sc.applicationId), sc.applicationAttemptId)
           }
@@ -1060,7 +1061,7 @@ class DAGScheduler(
             val p: Int = stage.partitions(id)
             val part = stage.rdd.partitions(p)
             val locs = taskIdToLocations(id)
-            new ResultTask(stage.id, stage.latestInfo.attemptId,
+            new ResultTask(stage.id, stage.latestInfo.attemptNumber,
               taskBinary, part, locs, id, properties, serializedTaskMetrics,
               Option(jobId), Option(sc.applicationId), sc.applicationAttemptId)
           }
@@ -1076,7 +1077,7 @@ class DAGScheduler(
       logInfo(s"Submitting ${tasks.size} missing tasks from $stage 
(${stage.rdd}) (first 15 " +
         s"tasks are for partitions ${tasks.take(15).map(_.partitionId)})")
       taskScheduler.submitTasks(new TaskSet(
-        tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, 
properties))
+        tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, 
properties))
     } else {
       // Because we posted SparkListenerStageSubmitted earlier, we should mark
       // the stage as completed here in case there are no tasks to run
@@ -1245,7 +1246,7 @@ class DAGScheduler(
             val status = event.result.asInstanceOf[MapStatus]
             val execId = status.location.executorId
             logDebug("ShuffleMapTask finished on " + execId)
-            if (stageIdToStage(task.stageId).latestInfo.attemptId == 
task.stageAttemptId) {
+            if (stageIdToStage(task.stageId).latestInfo.attemptNumber == 
task.stageAttemptId) {
               // This task was for the currently running attempt of the stage. 
Since the task
               // completed successfully from the perspective of the 
TaskSetManager, mark it as
               // no longer pending (the TaskSetManager may consider the task 
complete even
@@ -1324,10 +1325,10 @@ class DAGScheduler(
         val failedStage = stageIdToStage(task.stageId)
         val mapStage = shuffleIdToMapStage(shuffleId)
 
-        if (failedStage.latestInfo.attemptId != task.stageAttemptId) {
+        if (failedStage.latestInfo.attemptNumber != task.stageAttemptId) {
           logInfo(s"Ignoring fetch failure from $task as it's from 
$failedStage attempt" +
             s" ${task.stageAttemptId} and there is a more recent attempt for 
that stage " +
-            s"(attempt ID ${failedStage.latestInfo.attemptId}) running")
+            s"(attempt ${failedStage.latestInfo.attemptNumber}) running")
         } else {
           // It is likely that we receive multiple FetchFailed for a single 
stage (because we have
           // multiple tasks running concurrently on different executors). In 
that case, it is

http://git-wip-us.apache.org/repos/asf/spark/blob/40b983c3/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala 
b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
index c513ed3..903e25b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
@@ -30,7 +30,7 @@ import org.apache.spark.storage.RDDInfo
 @DeveloperApi
 class StageInfo(
     val stageId: Int,
-    val attemptId: Int,
+    @deprecated("Use attemptNumber instead", "2.3.0") val attemptId: Int,
     val name: String,
     val numTasks: Int,
     val rddInfos: Seq[RDDInfo],
@@ -56,6 +56,8 @@ class StageInfo(
     completionTime = Some(System.currentTimeMillis)
   }
 
+  def attemptNumber(): Int = attemptId
+
   private[spark] def getStatusString: String = {
     if (completionTime.isDefined) {
       if (failureReason.isDefined) {

http://git-wip-us.apache.org/repos/asf/spark/blob/40b983c3/core/src/main/scala/org/apache/spark/scheduler/StatsReportListener.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/StatsReportListener.scala 
b/core/src/main/scala/org/apache/spark/scheduler/StatsReportListener.scala
index 3c8cab7..3c7af4f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/StatsReportListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/StatsReportListener.scala
@@ -79,7 +79,7 @@ class StatsReportListener extends SparkListener with Logging {
       x => info.completionTime.getOrElse(System.currentTimeMillis()) - x
     ).getOrElse("-")
 
-    s"Stage(${info.stageId}, ${info.attemptId}); Name: '${info.name}'; " +
+    s"Stage(${info.stageId}, ${info.attemptNumber}); Name: '${info.name}'; " +
       s"Status: ${info.getStatusString}$failureReason; numTasks: 
${info.numTasks}; " +
       s"Took: $timeTaken msec"
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/40b983c3/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala 
b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
index 487a782..88b75dd 100644
--- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
+++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
@@ -529,7 +529,8 @@ private[spark] class AppStatusListener(
   }
 
   override def onStageCompleted(event: SparkListenerStageCompleted): Unit = {
-    val maybeStage = Option(liveStages.remove((event.stageInfo.stageId, 
event.stageInfo.attemptId)))
+    val maybeStage =
+      Option(liveStages.remove((event.stageInfo.stageId, 
event.stageInfo.attemptNumber)))
     maybeStage.foreach { stage =>
       val now = System.nanoTime()
       stage.info = event.stageInfo
@@ -785,7 +786,7 @@ private[spark] class AppStatusListener(
   }
 
   private def getOrCreateStage(info: StageInfo): LiveStage = {
-    val stage = liveStages.computeIfAbsent((info.stageId, info.attemptId),
+    val stage = liveStages.computeIfAbsent((info.stageId, info.attemptNumber),
       new Function[(Int, Int), LiveStage]() {
         override def apply(key: (Int, Int)): LiveStage = new LiveStage()
       })
@@ -912,7 +913,7 @@ private[spark] class AppStatusListener(
   private def cleanupTasks(stage: LiveStage): Unit = {
     val countToDelete = calculateNumberToRemove(stage.savedTasks.get(), 
maxTasksPerStage).toInt
     if (countToDelete > 0) {
-      val stageKey = Array(stage.info.stageId, stage.info.attemptId)
+      val stageKey = Array(stage.info.stageId, stage.info.attemptNumber)
       val view = 
kvstore.view(classOf[TaskDataWrapper]).index("stage").first(stageKey)
         .last(stageKey)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/40b983c3/core/src/main/scala/org/apache/spark/status/LiveEntity.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala 
b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala
index 52e83f2..305c2fa 100644
--- a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala
+++ b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala
@@ -412,14 +412,14 @@ private class LiveStage extends LiveEntity {
 
   def executorSummary(executorId: String): LiveExecutorStageSummary = {
     executorSummaries.getOrElseUpdate(executorId,
-      new LiveExecutorStageSummary(info.stageId, info.attemptId, executorId))
+      new LiveExecutorStageSummary(info.stageId, info.attemptNumber, 
executorId))
   }
 
   def toApi(): v1.StageData = {
     new v1.StageData(
       status,
       info.stageId,
-      info.attemptId,
+      info.attemptNumber,
 
       info.numTasks,
       activeTasks,

http://git-wip-us.apache.org/repos/asf/spark/blob/40b983c3/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala 
b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala
index 827a863..9488582 100644
--- a/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala
+++ b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala
@@ -116,7 +116,7 @@ private[spark] object RDDOperationGraph extends Logging {
     // Use a special prefix here to differentiate this cluster from other 
operation clusters
     val stageClusterId = STAGE_CLUSTER_PREFIX + stage.stageId
     val stageClusterName = s"Stage ${stage.stageId}" +
-      { if (stage.attemptId == 0) "" else s" (attempt ${stage.attemptId})" }
+      { if (stage.attemptNumber == 0) "" else s" (attempt 
${stage.attemptNumber})" }
     val rootCluster = new RDDOperationCluster(stageClusterId, stageClusterName)
 
     var rootNodeCount = 0

http://git-wip-us.apache.org/repos/asf/spark/blob/40b983c3/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala 
b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 5e60218..ff83301 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -263,7 +263,7 @@ private[spark] object JsonProtocol {
     val completionTime = 
stageInfo.completionTime.map(JInt(_)).getOrElse(JNothing)
     val failureReason = 
stageInfo.failureReason.map(JString(_)).getOrElse(JNothing)
     ("Stage ID" -> stageInfo.stageId) ~
-    ("Stage Attempt ID" -> stageInfo.attemptId) ~
+    ("Stage Attempt ID" -> stageInfo.attemptNumber) ~
     ("Stage Name" -> stageInfo.name) ~
     ("Number of Tasks" -> stageInfo.numTasks) ~
     ("RDD Info" -> rddInfo) ~

http://git-wip-us.apache.org/repos/asf/spark/blob/40b983c3/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala 
b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
index 997c7de..b8c84e2 100644
--- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
@@ -195,7 +195,9 @@ class AppStatusListenerSuite extends SparkFunSuite with 
BeforeAndAfter {
 
     val s1Tasks = createTasks(4, execIds)
     s1Tasks.foreach { task =>
-      listener.onTaskStart(SparkListenerTaskStart(stages.head.stageId, 
stages.head.attemptId, task))
+      listener.onTaskStart(SparkListenerTaskStart(stages.head.stageId,
+        stages.head.attemptNumber,
+        task))
     }
 
     assert(store.count(classOf[TaskDataWrapper]) === s1Tasks.size)
@@ -213,10 +215,11 @@ class AppStatusListenerSuite extends SparkFunSuite with 
BeforeAndAfter {
       check[TaskDataWrapper](task.taskId) { wrapper =>
         assert(wrapper.info.taskId === task.taskId)
         assert(wrapper.stageId === stages.head.stageId)
-        assert(wrapper.stageAttemptId === stages.head.attemptId)
-        assert(Arrays.equals(wrapper.stage, Array(stages.head.stageId, 
stages.head.attemptId)))
+        assert(wrapper.stageAttemptId === stages.head.attemptNumber)
+        assert(Arrays.equals(wrapper.stage, Array(stages.head.stageId, 
stages.head.attemptNumber)))
 
-        val runtime = Array[AnyRef](stages.head.stageId: JInteger, 
stages.head.attemptId: JInteger,
+        val runtime = Array[AnyRef](stages.head.stageId: JInteger,
+          stages.head.attemptNumber: JInteger,
           -1L: JLong)
         assert(Arrays.equals(wrapper.runtime, runtime))
 
@@ -237,7 +240,7 @@ class AppStatusListenerSuite extends SparkFunSuite with 
BeforeAndAfter {
         Some(1L), None, true, false, None)
       listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate(
         task.executorId,
-        Seq((task.taskId, stages.head.stageId, stages.head.attemptId, 
Seq(accum)))))
+        Seq((task.taskId, stages.head.stageId, stages.head.attemptNumber, 
Seq(accum)))))
     }
 
     check[StageDataWrapper](key(stages.head)) { stage =>
@@ -254,12 +257,12 @@ class AppStatusListenerSuite extends SparkFunSuite with 
BeforeAndAfter {
     // Fail one of the tasks, re-start it.
     time += 1
     s1Tasks.head.markFinished(TaskState.FAILED, time)
-    listener.onTaskEnd(SparkListenerTaskEnd(stages.head.stageId, 
stages.head.attemptId,
+    listener.onTaskEnd(SparkListenerTaskEnd(stages.head.stageId, 
stages.head.attemptNumber,
       "taskType", TaskResultLost, s1Tasks.head, null))
 
     time += 1
     val reattempt = newAttempt(s1Tasks.head, nextTaskId())
-    listener.onTaskStart(SparkListenerTaskStart(stages.head.stageId, 
stages.head.attemptId,
+    listener.onTaskStart(SparkListenerTaskStart(stages.head.stageId, 
stages.head.attemptNumber,
       reattempt))
 
     assert(store.count(classOf[TaskDataWrapper]) === s1Tasks.size + 1)
@@ -289,7 +292,7 @@ class AppStatusListenerSuite extends SparkFunSuite with 
BeforeAndAfter {
     val killed = s1Tasks.drop(1).head
     killed.finishTime = time
     killed.failed = true
-    listener.onTaskEnd(SparkListenerTaskEnd(stages.head.stageId, 
stages.head.attemptId,
+    listener.onTaskEnd(SparkListenerTaskEnd(stages.head.stageId, 
stages.head.attemptNumber,
       "taskType", TaskKilled("killed"), killed, null))
 
     check[JobDataWrapper](1) { job =>
@@ -311,13 +314,13 @@ class AppStatusListenerSuite extends SparkFunSuite with 
BeforeAndAfter {
     time += 1
     val denied = newAttempt(killed, nextTaskId())
     val denyReason = TaskCommitDenied(1, 1, 1)
-    listener.onTaskStart(SparkListenerTaskStart(stages.head.stageId, 
stages.head.attemptId,
+    listener.onTaskStart(SparkListenerTaskStart(stages.head.stageId, 
stages.head.attemptNumber,
       denied))
 
     time += 1
     denied.finishTime = time
     denied.failed = true
-    listener.onTaskEnd(SparkListenerTaskEnd(stages.head.stageId, 
stages.head.attemptId,
+    listener.onTaskEnd(SparkListenerTaskEnd(stages.head.stageId, 
stages.head.attemptNumber,
       "taskType", denyReason, denied, null))
 
     check[JobDataWrapper](1) { job =>
@@ -337,7 +340,7 @@ class AppStatusListenerSuite extends SparkFunSuite with 
BeforeAndAfter {
 
     // Start a new attempt.
     val reattempt2 = newAttempt(denied, nextTaskId())
-    listener.onTaskStart(SparkListenerTaskStart(stages.head.stageId, 
stages.head.attemptId,
+    listener.onTaskStart(SparkListenerTaskStart(stages.head.stageId, 
stages.head.attemptNumber,
       reattempt2))
 
     // Succeed all tasks in stage 1.
@@ -350,7 +353,7 @@ class AppStatusListenerSuite extends SparkFunSuite with 
BeforeAndAfter {
     time += 1
     pending.foreach { task =>
       task.markFinished(TaskState.FINISHED, time)
-      listener.onTaskEnd(SparkListenerTaskEnd(stages.head.stageId, 
stages.head.attemptId,
+      listener.onTaskEnd(SparkListenerTaskEnd(stages.head.stageId, 
stages.head.attemptNumber,
         "taskType", Success, task, s1Metrics))
     }
 
@@ -414,13 +417,15 @@ class AppStatusListenerSuite extends SparkFunSuite with 
BeforeAndAfter {
     time += 1
     val s2Tasks = createTasks(4, execIds)
     s2Tasks.foreach { task =>
-      listener.onTaskStart(SparkListenerTaskStart(stages.last.stageId, 
stages.last.attemptId, task))
+      listener.onTaskStart(SparkListenerTaskStart(stages.last.stageId,
+        stages.last.attemptNumber,
+        task))
     }
 
     time += 1
     s2Tasks.foreach { task =>
       task.markFinished(TaskState.FAILED, time)
-      listener.onTaskEnd(SparkListenerTaskEnd(stages.last.stageId, 
stages.last.attemptId,
+      listener.onTaskEnd(SparkListenerTaskEnd(stages.last.stageId, 
stages.last.attemptNumber,
         "taskType", TaskResultLost, task, null))
     }
 
@@ -455,7 +460,7 @@ class AppStatusListenerSuite extends SparkFunSuite with 
BeforeAndAfter {
 
     // - Re-submit stage 2, all tasks, and succeed them and the stage.
     val oldS2 = stages.last
-    val newS2 = new StageInfo(oldS2.stageId, oldS2.attemptId + 1, oldS2.name, 
oldS2.numTasks,
+    val newS2 = new StageInfo(oldS2.stageId, oldS2.attemptNumber + 1, 
oldS2.name, oldS2.numTasks,
       oldS2.rddInfos, oldS2.parentIds, oldS2.details, oldS2.taskMetrics)
 
     time += 1
@@ -466,14 +471,14 @@ class AppStatusListenerSuite extends SparkFunSuite with 
BeforeAndAfter {
     val newS2Tasks = createTasks(4, execIds)
 
     newS2Tasks.foreach { task =>
-      listener.onTaskStart(SparkListenerTaskStart(newS2.stageId, 
newS2.attemptId, task))
+      listener.onTaskStart(SparkListenerTaskStart(newS2.stageId, 
newS2.attemptNumber, task))
     }
 
     time += 1
     newS2Tasks.foreach { task =>
       task.markFinished(TaskState.FINISHED, time)
-      listener.onTaskEnd(SparkListenerTaskEnd(newS2.stageId, newS2.attemptId, 
"taskType", Success,
-        task, null))
+      listener.onTaskEnd(SparkListenerTaskEnd(newS2.stageId, 
newS2.attemptNumber, "taskType",
+        Success, task, null))
     }
 
     time += 1
@@ -522,14 +527,15 @@ class AppStatusListenerSuite extends SparkFunSuite with 
BeforeAndAfter {
     val j2s2Tasks = createTasks(4, execIds)
 
     j2s2Tasks.foreach { task =>
-      listener.onTaskStart(SparkListenerTaskStart(j2Stages.last.stageId, 
j2Stages.last.attemptId,
+      listener.onTaskStart(SparkListenerTaskStart(j2Stages.last.stageId,
+        j2Stages.last.attemptNumber,
         task))
     }
 
     time += 1
     j2s2Tasks.foreach { task =>
       task.markFinished(TaskState.FINISHED, time)
-      listener.onTaskEnd(SparkListenerTaskEnd(j2Stages.last.stageId, 
j2Stages.last.attemptId,
+      listener.onTaskEnd(SparkListenerTaskEnd(j2Stages.last.stageId, 
j2Stages.last.attemptNumber,
         "taskType", Success, task, null))
     }
 
@@ -919,13 +925,13 @@ class AppStatusListenerSuite extends SparkFunSuite with 
BeforeAndAfter {
     time += 1
     val tasks = createTasks(2, Array("1"))
     tasks.foreach { task =>
-      listener.onTaskStart(SparkListenerTaskStart(attempt2.stageId, 
attempt2.attemptId, task))
+      listener.onTaskStart(SparkListenerTaskStart(attempt2.stageId, 
attempt2.attemptNumber, task))
     }
     assert(store.count(classOf[TaskDataWrapper]) === 2)
 
     // Start a 3rd task. The finished tasks should be deleted.
     createTasks(1, Array("1")).foreach { task =>
-      listener.onTaskStart(SparkListenerTaskStart(attempt2.stageId, 
attempt2.attemptId, task))
+      listener.onTaskStart(SparkListenerTaskStart(attempt2.stageId, 
attempt2.attemptNumber, task))
     }
     assert(store.count(classOf[TaskDataWrapper]) === 2)
     intercept[NoSuchElementException] {
@@ -934,7 +940,7 @@ class AppStatusListenerSuite extends SparkFunSuite with 
BeforeAndAfter {
 
     // Start a 4th task. The first task should be deleted, even if it's still 
running.
     createTasks(1, Array("1")).foreach { task =>
-      listener.onTaskStart(SparkListenerTaskStart(attempt2.stageId, 
attempt2.attemptId, task))
+      listener.onTaskStart(SparkListenerTaskStart(attempt2.stageId, 
attempt2.attemptNumber, task))
     }
     assert(store.count(classOf[TaskDataWrapper]) === 2)
     intercept[NoSuchElementException] {
@@ -960,7 +966,7 @@ class AppStatusListenerSuite extends SparkFunSuite with 
BeforeAndAfter {
     }
   }
 
-  private def key(stage: StageInfo): Array[Int] = Array(stage.stageId, 
stage.attemptId)
+  private def key(stage: StageInfo): Array[Int] = Array(stage.stageId, 
stage.attemptNumber)
 
   private def check[T: ClassTag](key: Any)(fn: T => Unit): Unit = {
     val value = store.read(classTag[T].runtimeClass, key).asInstanceOf[T]

http://git-wip-us.apache.org/repos/asf/spark/blob/40b983c3/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
index d8adbe7..73a1052 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
@@ -99,7 +99,7 @@ class SQLAppStatusListener(
     // Reset the metrics tracking object for the new attempt.
     Option(stageMetrics.get(event.stageInfo.stageId)).foreach { metrics =>
       metrics.taskMetrics.clear()
-      metrics.attemptId = event.stageInfo.attemptId
+      metrics.attemptId = event.stageInfo.attemptNumber
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to