Repository: spark
Updated Branches:
  refs/heads/master 33e63618d -> ee6e9e7d8


SPARK-1337: Application web UI garbage collects newest stages

Simple fix...

Author: Patrick Wendell <pwend...@gmail.com>

Closes #320 from pwendell/stage-clean-up and squashes the following commits:

29be62e [Patrick Wendell] SPARK-1337: Application web UI garbage collects 
newest stages instead old ones


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ee6e9e7d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ee6e9e7d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ee6e9e7d

Branch: refs/heads/master
Commit: ee6e9e7d863022304ac9ced405b353b63accb6ab
Parents: 33e6361
Author: Patrick Wendell <pwend...@gmail.com>
Authored: Thu Apr 3 22:13:56 2014 -0700
Committer: Patrick Wendell <pwend...@gmail.com>
Committed: Thu Apr 3 22:13:56 2014 -0700

----------------------------------------------------------------------
 .../spark/ui/jobs/JobProgressListener.scala     |  8 ++---
 .../ui/jobs/JobProgressListenerSuite.scala      | 33 ++++++++++++++++++--
 2 files changed, 35 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ee6e9e7d/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala 
b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index d10aa12..cd4be57 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -81,8 +81,8 @@ private[ui] class JobProgressListener(conf: SparkConf) 
extends SparkListener {
   /** If stages is too large, remove and garbage collect old stages */
   private def trimIfNecessary(stages: ListBuffer[StageInfo]) = synchronized {
     if (stages.size > retainedStages) {
-      val toRemove = retainedStages / 10
-      stages.takeRight(toRemove).foreach( s => {
+      val toRemove = math.max(retainedStages / 10, 1)
+      stages.take(toRemove).foreach { s =>
         stageIdToTaskData.remove(s.stageId)
         stageIdToTime.remove(s.stageId)
         stageIdToShuffleRead.remove(s.stageId)
@@ -94,8 +94,8 @@ private[ui] class JobProgressListener(conf: SparkConf) 
extends SparkListener {
         stageIdToTasksFailed.remove(s.stageId)
         stageIdToPool.remove(s.stageId)
         if (stageIdToDescription.contains(s.stageId)) 
{stageIdToDescription.remove(s.stageId)}
-      })
-      stages.trimEnd(toRemove)
+      }
+      stages.trimStart(toRemove)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ee6e9e7d/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala 
b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
index d8a3e85..67ceee5 100644
--- 
a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
@@ -18,13 +18,42 @@
 package org.apache.spark.ui.jobs
 
 import org.scalatest.FunSuite
+import org.scalatest.matchers.ShouldMatchers
 
-import org.apache.spark.{LocalSparkContext, SparkContext, Success}
+import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, Success}
 import org.apache.spark.executor.{ShuffleReadMetrics, TaskMetrics}
 import org.apache.spark.scheduler._
 import org.apache.spark.util.Utils
 
-class JobProgressListenerSuite extends FunSuite with LocalSparkContext {
+class JobProgressListenerSuite extends FunSuite with LocalSparkContext with 
ShouldMatchers {
+  test("test LRU eviction of stages") {
+    val conf = new SparkConf()
+    conf.set("spark.ui.retainedStages", 5.toString)
+    val listener = new JobProgressListener(conf)
+
+    def createStageStartEvent(stageId: Int) = {
+      val stageInfo = new StageInfo(stageId, stageId.toString, 0, null)
+      SparkListenerStageSubmitted(stageInfo)
+    }
+
+    def createStageEndEvent(stageId: Int) = {
+      val stageInfo = new StageInfo(stageId, stageId.toString, 0, null)
+      SparkListenerStageCompleted(stageInfo)
+    }
+
+    for (i <- 1 to 50) {
+      listener.onStageSubmitted(createStageStartEvent(i))
+      listener.onStageCompleted(createStageEndEvent(i))
+    }
+
+    listener.completedStages.size should be (5)
+    listener.completedStages.filter(_.stageId == 50).size should be (1)
+    listener.completedStages.filter(_.stageId == 49).size should be (1)
+    listener.completedStages.filter(_.stageId == 48).size should be (1)
+    listener.completedStages.filter(_.stageId == 47).size should be (1)
+    listener.completedStages.filter(_.stageId == 46).size should be (1)
+  }
+
   test("test executor id to summary") {
     val sc = new SparkContext("local", "test")
     val listener = new JobProgressListener(sc.conf)

Reply via email to