Repository: spark Updated Branches: refs/heads/master 556d83e0d -> 35f9163ad
[SPARK-26119][CORE][WEBUI] Task summary table should contain only successful tasks' metrics ## What changes were proposed in this pull request? Task summary table in the stage page currently displays the summary of all the tasks. However, we should display the task summary of only successful tasks, to follow the behavior of previous versions of spark. ## How was this patch tested? Added UT. attached screenshot Before patch:   Closes #23088 from shahidki31/summaryMetrics. Authored-by: Shahid <[email protected]> Signed-off-by: Marcelo Vanzin <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/35f9163a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/35f9163a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/35f9163a Branch: refs/heads/master Commit: 35f9163adf5c067229afbe57ed60d5dd5f2422c8 Parents: 556d83e Author: Shahid <[email protected]> Authored: Tue Dec 4 11:00:58 2018 -0800 Committer: Marcelo Vanzin <[email protected]> Committed: Tue Dec 4 11:00:58 2018 -0800 ---------------------------------------------------------------------- .../apache/spark/status/AppStatusStore.scala | 73 ++++++++++++++------ .../spark/status/AppStatusStoreSuite.scala | 33 ++++++++- 2 files changed, 81 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/35f9163a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala index 5c0ed4d..b35781c 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala @@ -148,11 +148,20 @@ private[spark] class AppStatusStore( // cheaper for disk stores (avoids deserialization). val count = { Utils.tryWithResource( - store.view(classOf[TaskDataWrapper]) - .parent(stageKey) - .index(TaskIndexNames.EXEC_RUN_TIME) - .first(0L) - .closeableIterator() + if (store.isInstanceOf[InMemoryStore]) { + store.view(classOf[TaskDataWrapper]) + .parent(stageKey) + .index(TaskIndexNames.STATUS) + .first("SUCCESS") + .last("SUCCESS") + .closeableIterator() + } else { + store.view(classOf[TaskDataWrapper]) + .parent(stageKey) + .index(TaskIndexNames.EXEC_RUN_TIME) + .first(0L) + .closeableIterator() + } ) { it => var _count = 0L while (it.hasNext()) { @@ -221,30 +230,50 @@ private[spark] class AppStatusStore( // stabilize once the stage finishes. It's also slow, especially with disk stores. val indices = quantiles.map { q => math.min((q * count).toLong, count - 1) } + // TODO: Summary metrics needs to display all the successful tasks' metrics (SPARK-26119). + // For InMemory case, it is efficient to find using the following code. But for diskStore case + // we need an efficient solution to avoid deserialization time overhead. For that, we need to + // rework on the way indexing works, so that we can index by specific metrics for successful + // and failed tasks differently (would be tricky). Also would require changing the disk store + // version (to invalidate old stores). def scanTasks(index: String)(fn: TaskDataWrapper => Long): IndexedSeq[Double] = { - Utils.tryWithResource( - store.view(classOf[TaskDataWrapper]) + if (store.isInstanceOf[InMemoryStore]) { + val quantileTasks = store.view(classOf[TaskDataWrapper]) .parent(stageKey) .index(index) .first(0L) - .closeableIterator() - ) { it => - var last = Double.NaN - var currentIdx = -1L - indices.map { idx => - if (idx == currentIdx) { - last - } else { - val diff = idx - currentIdx - currentIdx = idx - if (it.skip(diff - 1)) { - last = fn(it.next()).toDouble + .asScala + .filter { _.status == "SUCCESS"} // Filter "SUCCESS" tasks + .toIndexedSeq + + indices.map { index => + fn(quantileTasks(index.toInt)).toDouble + }.toIndexedSeq + } else { + Utils.tryWithResource( + store.view(classOf[TaskDataWrapper]) + .parent(stageKey) + .index(index) + .first(0L) + .closeableIterator() + ) { it => + var last = Double.NaN + var currentIdx = -1L + indices.map { idx => + if (idx == currentIdx) { last } else { - Double.NaN + val diff = idx - currentIdx + currentIdx = idx + if (it.skip(diff - 1)) { + last = fn(it.next()).toDouble + last + } else { + Double.NaN + } } - } - }.toIndexedSeq + }.toIndexedSeq + } } } http://git-wip-us.apache.org/repos/asf/spark/blob/35f9163a/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala index 92f90f3..75a6581 100644 --- a/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala +++ b/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala @@ -77,6 +77,34 @@ class AppStatusStoreSuite extends SparkFunSuite { assert(store.count(classOf[CachedQuantile]) === 2) } + test("only successfull task have taskSummary") { + val store = new InMemoryStore() + (0 until 5).foreach { i => store.write(newTaskData(i, status = "FAILED")) } + val appStore = new AppStatusStore(store).taskSummary(stageId, attemptId, uiQuantiles) + assert(appStore.size === 0) + } + + test("summary should contain task metrics of only successfull tasks") { + val store = new InMemoryStore() + + for (i <- 0 to 5) { + if (i % 2 == 1) { + store.write(newTaskData(i, status = "FAILED")) + } else { + store.write(newTaskData(i)) + } + } + + val summary = new AppStatusStore(store).taskSummary(stageId, attemptId, uiQuantiles).get + + val values = Array(0.0, 2.0, 4.0) + + val dist = new Distribution(values, 0, values.length).getQuantiles(uiQuantiles.sorted) + dist.zip(summary.executorRunTime).foreach { case (expected, actual) => + assert(expected === actual) + } + } + private def compareQuantiles(count: Int, quantiles: Array[Double]): Unit = { val store = new InMemoryStore() val values = (0 until count).map { i => @@ -93,12 +121,11 @@ class AppStatusStoreSuite extends SparkFunSuite { } } - private def newTaskData(i: Int): TaskDataWrapper = { + private def newTaskData(i: Int, status: String = "SUCCESS"): TaskDataWrapper = { new TaskDataWrapper( - i, i, i, i, i, i, i.toString, i.toString, i.toString, i.toString, false, Nil, None, + i, i, i, i, i, i, i.toString, i.toString, status, i.toString, false, Nil, None, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, stageId, attemptId) } - } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
