Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/42#discussion_r10504565
  
    --- Diff: 
core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala ---
    @@ -106,121 +114,154 @@ private[spark] class JobProgressListener(val sc: 
SparkContext) extends SparkList
         }
         description.map(d => stageIdToDescription(stage.stageId) = d)
     
    -    val stages = poolToActiveStages.getOrElseUpdate(poolName, new 
HashSet[StageInfo]())
    -    stages += stage
    +    val stages = poolToActiveStages.getOrElseUpdate(poolName, new 
HashMap[Int, StageInfo]())
    +    stages(stage.stageId) = stage
       }
     
       override def onTaskStart(taskStart: SparkListenerTaskStart) = 
synchronized {
    -    val sid = taskStart.task.stageId
    -    val tasksActive = stageIdToTasksActive.getOrElseUpdate(sid, new 
HashSet[TaskInfo]())
    -    tasksActive += taskStart.taskInfo
    -    val taskList = stageIdToTaskInfos.getOrElse(
    -      sid, HashSet[(TaskInfo, Option[TaskMetrics], 
Option[ExceptionFailure])]())
    -    taskList += ((taskStart.taskInfo, None, None))
    -    stageIdToTaskInfos(sid) = taskList
    +    val sid = taskStart.stageId
    +    val taskInfo = taskStart.taskInfo
    +    if (taskInfo != null) {
    +      val tasksActive = stageIdToTasksActive.getOrElseUpdate(sid, new 
HashMap[Long, TaskInfo]())
    +      tasksActive(taskInfo.taskId) = taskInfo
    +      val taskMap = stageIdToTaskData.getOrElse(sid, HashMap[Long, 
TaskUIData]())
    +      taskMap(taskInfo.taskId) = new TaskUIData(taskInfo)
    +      stageIdToTaskData(sid) = taskMap
    +    }
       }
     
    -  override def onTaskGettingResult(taskGettingResult: 
SparkListenerTaskGettingResult)
    -      = synchronized {
    +  override def onTaskGettingResult(taskGettingResult: 
SparkListenerTaskGettingResult) {
         // Do nothing: because we don't do a deep copy of the TaskInfo, the 
TaskInfo in
         // stageToTaskInfos already has the updated status.
       }
     
       override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = synchronized {
    -    val sid = taskEnd.task.stageId
    -
    -    // create executor summary map if necessary
    -    val executorSummaryMap = 
stageIdToExecutorSummaries.getOrElseUpdate(key = sid,
    -      op = new HashMap[String, ExecutorSummary]())
    -    executorSummaryMap.getOrElseUpdate(key = taskEnd.taskInfo.executorId,
    -      op = new ExecutorSummary())
    -
    -    val executorSummary = 
executorSummaryMap.get(taskEnd.taskInfo.executorId)
    -    executorSummary match {
    -      case Some(y) => {
    -        // first update failed-task, succeed-task
    +    val sid = taskEnd.stageId
    +    val info = taskEnd.taskInfo
    +
    +    if (info != null) {
    +      // create executor summary map if necessary
    +      val executorSummaryMap = 
stageIdToExecutorSummaries.getOrElseUpdate(key = sid,
    +        op = new HashMap[String, ExecutorSummary]())
    +      executorSummaryMap.getOrElseUpdate(key = info.executorId, op = new 
ExecutorSummary())
    +
    +      val executorSummary = executorSummaryMap.get(info.executorId)
    +      executorSummary match {
    +        case Some(y) => {
    +          // first update failed-task, succeed-task
    +          taskEnd.reason match {
    +            case Success =>
    +              y.succeededTasks += 1
    +            case _ =>
    +              y.failedTasks += 1
    +          }
    +
    +          // update duration
    +          y.taskTime += info.duration
    +
    +          val metrics = taskEnd.taskMetrics
    +          if (metrics != null) {
    +            metrics.shuffleReadMetrics.foreach { y.shuffleRead += 
_.remoteBytesRead }
    +            metrics.shuffleWriteMetrics.foreach { y.shuffleWrite += 
_.shuffleBytesWritten }
    +            y.memoryBytesSpilled += metrics.memoryBytesSpilled
    +            y.diskBytesSpilled += metrics.diskBytesSpilled
    +          }
    +        }
    +        case _ => {}
    +      }
    +
    +      val tasksActive = stageIdToTasksActive.getOrElseUpdate(sid, new 
HashMap[Long, TaskInfo]())
    +      // Remove by taskId, rather than by TaskInfo, in case the TaskInfo 
is from storage
    +      tasksActive.remove(info.taskId)
    +
    --- End diff --
    
    Intended because of null check above, otherwise largely the same.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to