Repository: spark
Updated Branches:
  refs/heads/stage-clean-up [created] 64c593ed0


SPARK-1337: Application web UI garbage collects newest stages instead old ones


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/64c593ed
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/64c593ed
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/64c593ed

Branch: refs/heads/stage-clean-up
Commit: 64c593ed0026809d116c99a6965cc6783d42d709
Parents: 33e6361
Author: Patrick Wendell <pwend...@gmail.com>
Authored: Thu Apr 3 17:51:11 2014 -0700
Committer: Patrick Wendell <pwend...@gmail.com>
Committed: Thu Apr 3 20:58:26 2014 -0700

----------------------------------------------------------------------
 .../spark/ui/jobs/JobProgressListener.scala     |  6 ++--
 .../ui/jobs/JobProgressListenerSuite.scala      | 33 ++++++++++++++++++--
 2 files changed, 34 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/64c593ed/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala 
b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index d10aa12..34cb45e 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -81,8 +81,8 @@ private[ui] class JobProgressListener(conf: SparkConf) 
extends SparkListener {
   /** If stages is too large, remove and garbage collect old stages */
   private def trimIfNecessary(stages: ListBuffer[StageInfo]) = synchronized {
     if (stages.size > retainedStages) {
-      val toRemove = retainedStages / 10
-      stages.takeRight(toRemove).foreach( s => {
+      val toRemove = math.max(retainedStages / 10, 1)
+      stages.take(toRemove).foreach( s => {
         stageIdToTaskData.remove(s.stageId)
         stageIdToTime.remove(s.stageId)
         stageIdToShuffleRead.remove(s.stageId)
@@ -95,7 +95,7 @@ private[ui] class JobProgressListener(conf: SparkConf) 
extends SparkListener {
         stageIdToPool.remove(s.stageId)
         if (stageIdToDescription.contains(s.stageId)) 
{stageIdToDescription.remove(s.stageId)}
       })
-      stages.trimEnd(toRemove)
+      stages.trimStart(toRemove)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/64c593ed/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala 
b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
index d8a3e85..67ceee5 100644
--- 
a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
@@ -18,13 +18,42 @@
 package org.apache.spark.ui.jobs
 
 import org.scalatest.FunSuite
+import org.scalatest.matchers.ShouldMatchers
 
-import org.apache.spark.{LocalSparkContext, SparkContext, Success}
+import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, Success}
 import org.apache.spark.executor.{ShuffleReadMetrics, TaskMetrics}
 import org.apache.spark.scheduler._
 import org.apache.spark.util.Utils
 
-class JobProgressListenerSuite extends FunSuite with LocalSparkContext {
+class JobProgressListenerSuite extends FunSuite with LocalSparkContext with 
ShouldMatchers {
+  test("test LRU eviction of stages") {
+    val conf = new SparkConf()
+    conf.set("spark.ui.retainedStages", 5.toString)
+    val listener = new JobProgressListener(conf)
+
+    def createStageStartEvent(stageId: Int) = {
+      val stageInfo = new StageInfo(stageId, stageId.toString, 0, null)
+      SparkListenerStageSubmitted(stageInfo)
+    }
+
+    def createStageEndEvent(stageId: Int) = {
+      val stageInfo = new StageInfo(stageId, stageId.toString, 0, null)
+      SparkListenerStageCompleted(stageInfo)
+    }
+
+    for (i <- 1 to 50) {
+      listener.onStageSubmitted(createStageStartEvent(i))
+      listener.onStageCompleted(createStageEndEvent(i))
+    }
+
+    listener.completedStages.size should be (5)
+    listener.completedStages.filter(_.stageId == 50).size should be (1)
+    listener.completedStages.filter(_.stageId == 49).size should be (1)
+    listener.completedStages.filter(_.stageId == 48).size should be (1)
+    listener.completedStages.filter(_.stageId == 47).size should be (1)
+    listener.completedStages.filter(_.stageId == 46).size should be (1)
+  }
+
   test("test executor id to summary") {
     val sc = new SparkContext("local", "test")
     val listener = new JobProgressListener(sc.conf)

Reply via email to