Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/42#discussion_r10504565 --- Diff: core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala --- @@ -106,121 +114,154 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList } description.map(d => stageIdToDescription(stage.stageId) = d) - val stages = poolToActiveStages.getOrElseUpdate(poolName, new HashSet[StageInfo]()) - stages += stage + val stages = poolToActiveStages.getOrElseUpdate(poolName, new HashMap[Int, StageInfo]()) + stages(stage.stageId) = stage } override def onTaskStart(taskStart: SparkListenerTaskStart) = synchronized { - val sid = taskStart.task.stageId - val tasksActive = stageIdToTasksActive.getOrElseUpdate(sid, new HashSet[TaskInfo]()) - tasksActive += taskStart.taskInfo - val taskList = stageIdToTaskInfos.getOrElse( - sid, HashSet[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]()) - taskList += ((taskStart.taskInfo, None, None)) - stageIdToTaskInfos(sid) = taskList + val sid = taskStart.stageId + val taskInfo = taskStart.taskInfo + if (taskInfo != null) { + val tasksActive = stageIdToTasksActive.getOrElseUpdate(sid, new HashMap[Long, TaskInfo]()) + tasksActive(taskInfo.taskId) = taskInfo + val taskMap = stageIdToTaskData.getOrElse(sid, HashMap[Long, TaskUIData]()) + taskMap(taskInfo.taskId) = new TaskUIData(taskInfo) + stageIdToTaskData(sid) = taskMap + } } - override def onTaskGettingResult(taskGettingResult: SparkListenerTaskGettingResult) - = synchronized { + override def onTaskGettingResult(taskGettingResult: SparkListenerTaskGettingResult) { // Do nothing: because we don't do a deep copy of the TaskInfo, the TaskInfo in // stageToTaskInfos already has the updated status. } override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = synchronized { - val sid = taskEnd.task.stageId - - // create executor summary map if necessary - val executorSummaryMap = stageIdToExecutorSummaries.getOrElseUpdate(key = sid, - op = new HashMap[String, ExecutorSummary]()) - executorSummaryMap.getOrElseUpdate(key = taskEnd.taskInfo.executorId, - op = new ExecutorSummary()) - - val executorSummary = executorSummaryMap.get(taskEnd.taskInfo.executorId) - executorSummary match { - case Some(y) => { - // first update failed-task, succeed-task + val sid = taskEnd.stageId + val info = taskEnd.taskInfo + + if (info != null) { + // create executor summary map if necessary + val executorSummaryMap = stageIdToExecutorSummaries.getOrElseUpdate(key = sid, + op = new HashMap[String, ExecutorSummary]()) + executorSummaryMap.getOrElseUpdate(key = info.executorId, op = new ExecutorSummary()) + + val executorSummary = executorSummaryMap.get(info.executorId) + executorSummary match { + case Some(y) => { + // first update failed-task, succeed-task + taskEnd.reason match { + case Success => + y.succeededTasks += 1 + case _ => + y.failedTasks += 1 + } + + // update duration + y.taskTime += info.duration + + val metrics = taskEnd.taskMetrics + if (metrics != null) { + metrics.shuffleReadMetrics.foreach { y.shuffleRead += _.remoteBytesRead } + metrics.shuffleWriteMetrics.foreach { y.shuffleWrite += _.shuffleBytesWritten } + y.memoryBytesSpilled += metrics.memoryBytesSpilled + y.diskBytesSpilled += metrics.diskBytesSpilled + } + } + case _ => {} + } + + val tasksActive = stageIdToTasksActive.getOrElseUpdate(sid, new HashMap[Long, TaskInfo]()) + // Remove by taskId, rather than by TaskInfo, in case the TaskInfo is from storage + tasksActive.remove(info.taskId) + --- End diff -- Intended because of null check above, otherwise largely the same.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---