This is an automated email from the ASF dual-hosted git repository.

vanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 48d04f7  [SPARK-28638][WEBUI] Task summary should only contain 
successful tasks' metrics
48d04f7 is described below

commit 48d04f74ca895497b9d8bab18c7708f76f55c520
Author: Gengliang Wang <gengliang.w...@databricks.com>
AuthorDate: Mon Aug 12 11:47:29 2019 -0700

    [SPARK-28638][WEBUI] Task summary should only contain successful tasks' 
metrics
    
    ## What changes were proposed in this pull request?
    
    Currently, on requesting summary metrics, cached data are returned if the 
current number of "SUCCESS" tasks is the same as the value in cached data.
    However, the number of "SUCCESS" tasks is wrong when there are running 
tasks. In `AppStatusStore`, the KVStore is `ElementTrackingStore`, instead of 
`InMemoryStore`. The value count is always the number of "SUCCESS" tasks + 
"RUNNING" tasks.
    Thus, even when the running tasks are finished, the out-of-update cached 
data is returned.
    
    This PR is to fix the code in getting the number of "SUCCESS" tasks.
    
    ## How was this patch tested?
    
    Test manually, run
    ```
    sc.parallelize(1 to 160, 40).map(i => Thread.sleep(i*100)).collect()
    ```
    and keep refreshing the stage page , we can see the task summary metrics is 
wrong.
    
    ### Before fix:
    
![image](https://user-images.githubusercontent.com/1097932/62560343-6a141780-b8af-11e9-8942-d88540659a93.png)
    
    ### After fix:
    
![image](https://user-images.githubusercontent.com/1097932/62560355-7009f880-b8af-11e9-8ba8-10c083a48d7b.png)
    
    Closes #25369 from gengliangwang/fixStagePage.
    
    Authored-by: Gengliang Wang <gengliang.w...@databricks.com>
    Signed-off-by: Marcelo Vanzin <van...@cloudera.com>
---
 .../org/apache/spark/status/AppStatusStore.scala   | 11 ++++++--
 .../apache/spark/status/AppStatusStoreSuite.scala  | 32 ++++++++++++++--------
 2 files changed, 30 insertions(+), 13 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala 
b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
index 6a96778..964ab27 100644
--- a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
+++ b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
@@ -136,6 +136,12 @@ private[spark] class AppStatusStore(
     store.read(classOf[StageDataWrapper], Array(stageId, 
stageAttemptId)).locality
   }
 
+  // SPARK-26119: we only want to consider successful tasks when calculating 
the metrics summary,
+  // but currently this is very expensive when using a disk store. So we only 
trigger the slower
+  // code path when we know we have all data in memory. The following method 
checks whether all
+  // the data will be in memory.
+  private def isInMemoryStore: Boolean = store.isInstanceOf[InMemoryStore] || 
listener.isDefined
+
   /**
    * Calculates a summary of the task metrics for the given stage attempt, 
returning the
    * requested quantiles for the recorded metrics.
@@ -156,7 +162,8 @@ private[spark] class AppStatusStore(
     // cheaper for disk stores (avoids deserialization).
     val count = {
       Utils.tryWithResource(
-        if (store.isInstanceOf[InMemoryStore]) {
+        if (isInMemoryStore) {
+          // For Live UI, we should count the tasks with status "SUCCESS" only.
           store.view(classOf[TaskDataWrapper])
             .parent(stageKey)
             .index(TaskIndexNames.STATUS)
@@ -245,7 +252,7 @@ private[spark] class AppStatusStore(
     // and failed tasks differently (would be tricky). Also would require 
changing the disk store
     // version (to invalidate old stores).
     def scanTasks(index: String)(fn: TaskDataWrapper => Long): 
IndexedSeq[Double] = {
-      if (store.isInstanceOf[InMemoryStore]) {
+      if (isInMemoryStore) {
         val quantileTasks = store.view(classOf[TaskDataWrapper])
           .parent(stageKey)
           .index(index)
diff --git 
a/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala 
b/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala
index 75a6581..165fdb7 100644
--- a/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala
+++ b/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala
@@ -17,8 +17,7 @@
 
 package org.apache.spark.status
 
-import org.apache.spark.SparkFunSuite
-import org.apache.spark.status.api.v1.TaskMetricDistributions
+import org.apache.spark.{SparkConf, SparkFunSuite}
 import org.apache.spark.util.Distribution
 import org.apache.spark.util.kvstore._
 
@@ -77,14 +76,23 @@ class AppStatusStoreSuite extends SparkFunSuite {
     assert(store.count(classOf[CachedQuantile]) === 2)
   }
 
-  test("only successfull task have taskSummary") {
+  private def createLiveStore(inMemoryStore: InMemoryStore): AppStatusStore = {
+    val conf = new SparkConf()
+    val store = new ElementTrackingStore(inMemoryStore, conf)
+    val listener = new AppStatusListener(store, conf, true, None)
+    new AppStatusStore(store, listener = Some(listener))
+  }
+
+  test("SPARK-28638: only successful tasks have taskSummary when with in 
memory kvstore") {
     val store = new InMemoryStore()
     (0 until 5).foreach { i => store.write(newTaskData(i, status = "FAILED")) }
-    val appStore = new AppStatusStore(store).taskSummary(stageId, attemptId, 
uiQuantiles)
-    assert(appStore.size === 0)
+    Seq(new AppStatusStore(store), createLiveStore(store)).foreach { appStore 
=>
+      val summary = appStore.taskSummary(stageId, attemptId, uiQuantiles)
+      assert(summary.size === 0)
+    }
   }
 
-  test("summary should contain task metrics of only successfull tasks") {
+  test("SPARK-28638: summary should contain successful tasks only when with in 
memory kvstore") {
     val store = new InMemoryStore()
 
     for (i <- 0 to 5) {
@@ -95,13 +103,15 @@ class AppStatusStoreSuite extends SparkFunSuite {
       }
     }
 
-    val summary = new AppStatusStore(store).taskSummary(stageId, attemptId, 
uiQuantiles).get
+    Seq(new AppStatusStore(store), createLiveStore(store)).foreach { appStore 
=>
+      val summary = appStore.taskSummary(stageId, attemptId, uiQuantiles).get
 
-    val values = Array(0.0, 2.0, 4.0)
+      val values = Array(0.0, 2.0, 4.0)
 
-    val dist = new Distribution(values, 0, 
values.length).getQuantiles(uiQuantiles.sorted)
-    dist.zip(summary.executorRunTime).foreach { case (expected, actual) =>
-      assert(expected === actual)
+      val dist = new Distribution(values, 0, 
values.length).getQuantiles(uiQuantiles.sorted)
+      dist.zip(summary.executorRunTime).foreach { case (expected, actual) =>
+        assert(expected === actual)
+      }
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to