Repository: spark Updated Branches: refs/heads/master 744d5af65 -> 45cf714ee
[SPARK-23475][WEBUI] Skipped stages should be evicted before completed stages ## What changes were proposed in this pull request? The root cause of missing completed stages is because `cleanupStages` will never remove skipped stages. This PR changes the logic to always remove skipped stage first. This is safe since the job itself contains enough information to render skipped stages in the UI. ## How was this patch tested? The new unit tests. Author: Shixiong Zhu <zsxw...@gmail.com> Closes #20656 from zsxwing/SPARK-23475. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/45cf714e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/45cf714e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/45cf714e Branch: refs/heads/master Commit: 45cf714ee6d4eead2fe00794a0d754fa6d33d4a6 Parents: 744d5af Author: Shixiong Zhu <zsxw...@gmail.com> Authored: Wed Feb 21 19:43:11 2018 -0800 Committer: gatorsmile <gatorsm...@gmail.com> Committed: Wed Feb 21 19:43:11 2018 -0800 ---------------------------------------------------------------------- .../apache/spark/status/AppStatusListener.scala | 5 ++- .../spark/status/AppStatusListenerSuite.scala | 36 ++++++++++++++++++++ 2 files changed, 40 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/45cf714e/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala index 79a17e2..5ea161c 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala @@ -915,7 +915,10 @@ private[spark] class AppStatusListener( return } - val view = kvstore.view(classOf[StageDataWrapper]).index("completionTime").first(0L) + // As the completion time of a skipped stage is always -1, we will remove skipped stages first. + // This is safe since the job itself contains enough information to render skipped stages in the + // UI. + val view = kvstore.view(classOf[StageDataWrapper]).index("completionTime") val stages = KVUtils.viewToSeq(view, countToDelete.toInt) { s => s.info.status != v1.StageStatus.ACTIVE && s.info.status != v1.StageStatus.PENDING } http://git-wip-us.apache.org/repos/asf/spark/blob/45cf714e/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala index 673d191..1cd7195 100644 --- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala @@ -1089,6 +1089,42 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { } } + test("skipped stages should be evicted before completed stages") { + val testConf = conf.clone().set(MAX_RETAINED_STAGES, 2) + val listener = new AppStatusListener(store, testConf, true) + + val stage1 = new StageInfo(1, 0, "stage1", 4, Nil, Nil, "details1") + val stage2 = new StageInfo(2, 0, "stage2", 4, Nil, Nil, "details2") + + // Sart job 1 + time += 1 + listener.onJobStart(SparkListenerJobStart(1, time, Seq(stage1, stage2), null)) + + // Start and stop stage 1 + time += 1 + stage1.submissionTime = Some(time) + listener.onStageSubmitted(SparkListenerStageSubmitted(stage1, new Properties())) + + time += 1 + stage1.completionTime = Some(time) + listener.onStageCompleted(SparkListenerStageCompleted(stage1)) + + // Stop job 1 and stage 2 will become SKIPPED + time += 1 + listener.onJobEnd(SparkListenerJobEnd(1, time, JobSucceeded)) + + // Submit stage 3 and verify stage 2 is evicted + val stage3 = new StageInfo(3, 0, "stage3", 4, Nil, Nil, "details3") + time += 1 + stage3.submissionTime = Some(time) + listener.onStageSubmitted(SparkListenerStageSubmitted(stage3, new Properties())) + + assert(store.count(classOf[StageDataWrapper]) === 2) + intercept[NoSuchElementException] { + store.read(classOf[StageDataWrapper], Array(2, 0)) + } + } + test("eviction should respect task completion time") { val testConf = conf.clone().set(MAX_RETAINED_TASKS_PER_STAGE, 2) val listener = new AppStatusListener(store, testConf, true) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org