This is an automated email from the ASF dual-hosted git repository. vanzin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 48d04f7 [SPARK-28638][WEBUI] Task summary should only contain successful tasks' metrics 48d04f7 is described below commit 48d04f74ca895497b9d8bab18c7708f76f55c520 Author: Gengliang Wang <gengliang.w...@databricks.com> AuthorDate: Mon Aug 12 11:47:29 2019 -0700 [SPARK-28638][WEBUI] Task summary should only contain successful tasks' metrics ## What changes were proposed in this pull request? Currently, on requesting summary metrics, cached data are returned if the current number of "SUCCESS" tasks is the same as the value in cached data. However, the number of "SUCCESS" tasks is wrong when there are running tasks. In `AppStatusStore`, the KVStore is `ElementTrackingStore`, instead of `InMemoryStore`. The value count is always the number of "SUCCESS" tasks + "RUNNING" tasks. Thus, even when the running tasks are finished, the out-of-update cached data is returned. This PR is to fix the code in getting the number of "SUCCESS" tasks. ## How was this patch tested? Test manually, run ``` sc.parallelize(1 to 160, 40).map(i => Thread.sleep(i*100)).collect() ``` and keep refreshing the stage page , we can see the task summary metrics is wrong. ### Before fix: ![image](https://user-images.githubusercontent.com/1097932/62560343-6a141780-b8af-11e9-8942-d88540659a93.png) ### After fix: ![image](https://user-images.githubusercontent.com/1097932/62560355-7009f880-b8af-11e9-8ba8-10c083a48d7b.png) Closes #25369 from gengliangwang/fixStagePage. Authored-by: Gengliang Wang <gengliang.w...@databricks.com> Signed-off-by: Marcelo Vanzin <van...@cloudera.com> --- .../org/apache/spark/status/AppStatusStore.scala | 11 ++++++-- .../apache/spark/status/AppStatusStoreSuite.scala | 32 ++++++++++++++-------- 2 files changed, 30 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala index 6a96778..964ab27 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala @@ -136,6 +136,12 @@ private[spark] class AppStatusStore( store.read(classOf[StageDataWrapper], Array(stageId, stageAttemptId)).locality } + // SPARK-26119: we only want to consider successful tasks when calculating the metrics summary, + // but currently this is very expensive when using a disk store. So we only trigger the slower + // code path when we know we have all data in memory. The following method checks whether all + // the data will be in memory. + private def isInMemoryStore: Boolean = store.isInstanceOf[InMemoryStore] || listener.isDefined + /** * Calculates a summary of the task metrics for the given stage attempt, returning the * requested quantiles for the recorded metrics. @@ -156,7 +162,8 @@ private[spark] class AppStatusStore( // cheaper for disk stores (avoids deserialization). val count = { Utils.tryWithResource( - if (store.isInstanceOf[InMemoryStore]) { + if (isInMemoryStore) { + // For Live UI, we should count the tasks with status "SUCCESS" only. store.view(classOf[TaskDataWrapper]) .parent(stageKey) .index(TaskIndexNames.STATUS) @@ -245,7 +252,7 @@ private[spark] class AppStatusStore( // and failed tasks differently (would be tricky). Also would require changing the disk store // version (to invalidate old stores). def scanTasks(index: String)(fn: TaskDataWrapper => Long): IndexedSeq[Double] = { - if (store.isInstanceOf[InMemoryStore]) { + if (isInMemoryStore) { val quantileTasks = store.view(classOf[TaskDataWrapper]) .parent(stageKey) .index(index) diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala index 75a6581..165fdb7 100644 --- a/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala +++ b/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala @@ -17,8 +17,7 @@ package org.apache.spark.status -import org.apache.spark.SparkFunSuite -import org.apache.spark.status.api.v1.TaskMetricDistributions +import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.util.Distribution import org.apache.spark.util.kvstore._ @@ -77,14 +76,23 @@ class AppStatusStoreSuite extends SparkFunSuite { assert(store.count(classOf[CachedQuantile]) === 2) } - test("only successfull task have taskSummary") { + private def createLiveStore(inMemoryStore: InMemoryStore): AppStatusStore = { + val conf = new SparkConf() + val store = new ElementTrackingStore(inMemoryStore, conf) + val listener = new AppStatusListener(store, conf, true, None) + new AppStatusStore(store, listener = Some(listener)) + } + + test("SPARK-28638: only successful tasks have taskSummary when with in memory kvstore") { val store = new InMemoryStore() (0 until 5).foreach { i => store.write(newTaskData(i, status = "FAILED")) } - val appStore = new AppStatusStore(store).taskSummary(stageId, attemptId, uiQuantiles) - assert(appStore.size === 0) + Seq(new AppStatusStore(store), createLiveStore(store)).foreach { appStore => + val summary = appStore.taskSummary(stageId, attemptId, uiQuantiles) + assert(summary.size === 0) + } } - test("summary should contain task metrics of only successfull tasks") { + test("SPARK-28638: summary should contain successful tasks only when with in memory kvstore") { val store = new InMemoryStore() for (i <- 0 to 5) { @@ -95,13 +103,15 @@ class AppStatusStoreSuite extends SparkFunSuite { } } - val summary = new AppStatusStore(store).taskSummary(stageId, attemptId, uiQuantiles).get + Seq(new AppStatusStore(store), createLiveStore(store)).foreach { appStore => + val summary = appStore.taskSummary(stageId, attemptId, uiQuantiles).get - val values = Array(0.0, 2.0, 4.0) + val values = Array(0.0, 2.0, 4.0) - val dist = new Distribution(values, 0, values.length).getQuantiles(uiQuantiles.sorted) - dist.zip(summary.executorRunTime).foreach { case (expected, actual) => - assert(expected === actual) + val dist = new Distribution(values, 0, values.length).getQuantiles(uiQuantiles.sorted) + dist.zip(summary.executorRunTime).foreach { case (expected, actual) => + assert(expected === actual) + } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org