Repository: spark Updated Branches: refs/heads/branch-2.3 373ac642f -> 23ba4416e
[SPARK-23481][WEBUI] lastStageAttempt should fail when a stage doesn't exist ## What changes were proposed in this pull request? The issue here is `AppStatusStore.lastStageAttempt` will return the next available stage in the store when a stage doesn't exist. This PR adds `last(stageId)` to ensure it returns a correct `StageData` ## How was this patch tested? The new unit test. Author: Shixiong Zhu <zsxw...@gmail.com> Closes #20654 from zsxwing/SPARK-23481. (cherry picked from commit 744d5af652ee8cece361cbca31e5201134e0fb42) Signed-off-by: Shixiong Zhu <zsxw...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/23ba4416 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/23ba4416 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/23ba4416 Branch: refs/heads/branch-2.3 Commit: 23ba4416e1bbbaa818876d7a837f7a5e260aa048 Parents: 373ac64 Author: Shixiong Zhu <zsxw...@gmail.com> Authored: Wed Feb 21 15:37:28 2018 -0800 Committer: Shixiong Zhu <zsxw...@gmail.com> Committed: Wed Feb 21 15:37:36 2018 -0800 ---------------------------------------------------------------------- .../apache/spark/status/AppStatusStore.scala | 6 +++- .../spark/status/AppStatusListenerSuite.scala | 33 ++++++++++++++++++++ 2 files changed, 38 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/23ba4416/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala index efc2853..688f25a 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala @@ -95,7 +95,11 @@ private[spark] class AppStatusStore( } def lastStageAttempt(stageId: Int): v1.StageData = { - val it = store.view(classOf[StageDataWrapper]).index("stageId").reverse().first(stageId) + val it = store.view(classOf[StageDataWrapper]) + .index("stageId") + .reverse() + .first(stageId) + .last(stageId) .closeableIterator() try { if (it.hasNext()) { http://git-wip-us.apache.org/repos/asf/spark/blob/23ba4416/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala index 01d76a2..f3fa4c9 100644 --- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala @@ -1057,6 +1057,39 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { } } + test("lastStageAttempt should fail when the stage doesn't exist") { + val testConf = conf.clone().set(MAX_RETAINED_STAGES, 1) + val listener = new AppStatusListener(store, testConf, true) + val appStore = new AppStatusStore(store) + + val stage1 = new StageInfo(1, 0, "stage1", 4, Nil, Nil, "details1") + val stage2 = new StageInfo(2, 0, "stage2", 4, Nil, Nil, "details2") + val stage3 = new StageInfo(3, 0, "stage3", 4, Nil, Nil, "details3") + + time += 1 + stage1.submissionTime = Some(time) + listener.onStageSubmitted(SparkListenerStageSubmitted(stage1, new Properties())) + stage1.completionTime = Some(time) + listener.onStageCompleted(SparkListenerStageCompleted(stage1)) + + // Make stage 3 complete before stage 2 so that stage 3 will be evicted + time += 1 + stage3.submissionTime = Some(time) + listener.onStageSubmitted(SparkListenerStageSubmitted(stage3, new Properties())) + stage3.completionTime = Some(time) + listener.onStageCompleted(SparkListenerStageCompleted(stage3)) + + time += 1 + stage2.submissionTime = Some(time) + listener.onStageSubmitted(SparkListenerStageSubmitted(stage2, new Properties())) + stage2.completionTime = Some(time) + listener.onStageCompleted(SparkListenerStageCompleted(stage2)) + + assert(appStore.asOption(appStore.lastStageAttempt(1)) === None) + assert(appStore.asOption(appStore.lastStageAttempt(2)).map(_.stageId) === Some(2)) + assert(appStore.asOption(appStore.lastStageAttempt(3)) === None) + } + test("driver logs") { val listener = new AppStatusListener(store, conf, true) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org