Repository: spark
Updated Branches:
  refs/heads/master e702fb1d5 -> 5ee216618


[SPARK-25533][CORE][WEBUI] AppSummary should hold the information about 
succeeded Jobs and completed stages only

## What changes were proposed in this pull request?
Currently, In the spark UI, when there are failed jobs or failed stages, 
display message for the completed jobs and completed stages are not consistent 
with the previous versions of spark.
Reason is because, AppSummary holds the information about all the jobs and 
stages. But, In the below code, it checks against the completedJobs and 
completedStages. So, AppSummary should hold only successful jobs and stages.

https://github.com/apache/spark/blob/66d29870c09e6050dd846336e596faaa8b0d14ad/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala#L306
 
https://github.com/apache/spark/blob/66d29870c09e6050dd846336e596faaa8b0d14ad/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala#L119
So, we should  keep only completed jobs and stage information in the 
AppSummary, to make it consistent with Spark2.2

## How was this patch tested?
Test steps:
 bin/spark-shell
```
sc.parallelize(1 to 5, 5).collect()
sc.parallelize(1 to 5, 2).map{ x => throw new 
RuntimeException("Fail")}.collect()
```
**Before fix:**

![screenshot from 2018-09-26 
03-24-53](https://user-images.githubusercontent.com/23054875/46045669-f60bcd80-c13b-11e8-9aa6-a2e5a2038dba.png)

![screenshot from 2018-09-26 
03-25-08](https://user-images.githubusercontent.com/23054875/46045699-0ae86100-c13c-11e8-94e5-ad35944c7615.png)

**After fix:**
![screenshot from 2018-09-26 
03-16-14](https://user-images.githubusercontent.com/23054875/46045636-d83e6880-c13b-11e8-98df-f49d15c18958.png)
![screenshot from 2018-09-26 
03-16-28](https://user-images.githubusercontent.com/23054875/46045645-e1c7d080-c13b-11e8-8c9c-d32e1f663356.png)

Closes #22549 from shahidki31/SPARK-25533.

Authored-by: Shahid <shahidk...@gmail.com>
Signed-off-by: Marcelo Vanzin <van...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5ee21661
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5ee21661
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5ee21661

Branch: refs/heads/master
Commit: 5ee21661834e837d414bc20591982a092c0aece3
Parents: e702fb1
Author: Shahid <shahidk...@gmail.com>
Authored: Wed Sep 26 10:47:49 2018 -0700
Committer: Marcelo Vanzin <van...@cloudera.com>
Committed: Wed Sep 26 10:47:49 2018 -0700

----------------------------------------------------------------------
 .../org/apache/spark/status/AppStatusListener.scala   | 14 ++++++++------
 1 file changed, 8 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/5ee21661/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala 
b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
index f21eee1..36aaf67 100644
--- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
+++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
@@ -388,10 +388,11 @@ private[spark] class AppStatusListener(
 
       job.completionTime = if (event.time > 0) Some(new Date(event.time)) else 
None
       update(job, now, last = true)
+      if (job.status == JobExecutionStatus.SUCCEEDED) {
+        appSummary = new AppSummary(appSummary.numCompletedJobs + 1, 
appSummary.numCompletedStages)
+        kvstore.write(appSummary)
+      }
     }
-
-    appSummary = new AppSummary(appSummary.numCompletedJobs + 1, 
appSummary.numCompletedStages)
-    kvstore.write(appSummary)
   }
 
   override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit = {
@@ -653,13 +654,14 @@ private[spark] class AppStatusListener(
       if (removeStage) {
         liveStages.remove((event.stageInfo.stageId, 
event.stageInfo.attemptNumber))
       }
+      if (stage.status == v1.StageStatus.COMPLETE) {
+        appSummary = new AppSummary(appSummary.numCompletedJobs, 
appSummary.numCompletedStages + 1)
+        kvstore.write(appSummary)
+      }
     }
 
     // remove any dead executors that were not running for any currently 
active stages
     deadExecutors.retain((execId, exec) => isExecutorActiveForLiveStages(exec))
-
-    appSummary = new AppSummary(appSummary.numCompletedJobs, 
appSummary.numCompletedStages + 1)
-    kvstore.write(appSummary)
   }
 
   private def removeBlackListedStageFrom(exec: LiveExecutor, stageId: Int, 
now: Long) = {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to