Repository: spark
Updated Branches:
  refs/heads/master bf5cb8af4 -> bd2c12fb4


[SPARK-12920][CORE] Honor "spark.ui.retainedStages" to reduce mem-pressure

When large number of jobs are run concurrently with Spark thrift server, thrift 
server starts running at high CPU due to GC pressure. Job UI retention causes 
memory pressure with large jobs. 
https://issues.apache.org/jira/secure/attachment/12783302/SPARK-12920.profiler_job_progress_listner.png
 has the profiler snapshot. This PR honors `spark.ui.retainedStages` strictly 
to reduce memory pressure.

Manual and unit tests

Author: Rajesh Balamohan <[email protected]>

Closes #10846 from rajeshbalamohan/SPARK-12920.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bd2c12fb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bd2c12fb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bd2c12fb

Branch: refs/heads/master
Commit: bd2c12fb4994785d5becce541aee9ba73fef1c4c
Parents: bf5cb8a
Author: Rajesh Balamohan <[email protected]>
Authored: Wed Aug 10 15:30:22 2016 -0700
Committer: Marcelo Vanzin <[email protected]>
Committed: Wed Aug 10 15:30:52 2016 -0700

----------------------------------------------------------------------
 .../spark/ui/jobs/JobProgressListener.scala     |  4 +-
 .../ui/jobs/JobProgressListenerSuite.scala      | 50 +++++++++++++-------
 2 files changed, 36 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/bd2c12fb/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala 
b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index c882740..491f716 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -140,7 +140,7 @@ class JobProgressListener(conf: SparkConf) extends 
SparkListener with Logging {
   /** If stages is too large, remove and garbage collect old stages */
   private def trimStagesIfNecessary(stages: ListBuffer[StageInfo]) = 
synchronized {
     if (stages.size > retainedStages) {
-      val toRemove = math.max(retainedStages / 10, 1)
+      val toRemove = (stages.size - retainedStages)
       stages.take(toRemove).foreach { s =>
         stageIdToData.remove((s.stageId, s.attemptId))
         stageIdToInfo.remove(s.stageId)
@@ -152,7 +152,7 @@ class JobProgressListener(conf: SparkConf) extends 
SparkListener with Logging {
   /** If jobs is too large, remove and garbage collect old jobs */
   private def trimJobsIfNecessary(jobs: ListBuffer[JobUIData]) = synchronized {
     if (jobs.size > retainedJobs) {
-      val toRemove = math.max(retainedJobs / 10, 1)
+      val toRemove = (jobs.size - retainedJobs)
       jobs.take(toRemove).foreach { job =>
         // Remove the job's UI data, if it exists
         jobIdToData.remove(job.jobId).foreach { removedJob =>

http://git-wip-us.apache.org/repos/asf/spark/blob/bd2c12fb/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala 
b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
index edab727..8418fa7 100644
--- 
a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
@@ -84,18 +84,27 @@ class JobProgressListenerSuite extends SparkFunSuite with 
LocalSparkContext with
   }
 
   test("test LRU eviction of stages") {
+    def runWithListener(listener: JobProgressListener) : Unit = {
+      for (i <- 1 to 50) {
+        listener.onStageSubmitted(createStageStartEvent(i))
+        listener.onStageCompleted(createStageEndEvent(i))
+      }
+      assertActiveJobsStateIsEmpty(listener)
+    }
     val conf = new SparkConf()
     conf.set("spark.ui.retainedStages", 5.toString)
-    val listener = new JobProgressListener(conf)
-
-    for (i <- 1 to 50) {
-      listener.onStageSubmitted(createStageStartEvent(i))
-      listener.onStageCompleted(createStageEndEvent(i))
-    }
-    assertActiveJobsStateIsEmpty(listener)
+    var listener = new JobProgressListener(conf)
 
+    // Test with 5 retainedStages
+    runWithListener(listener)
     listener.completedStages.size should be (5)
     listener.completedStages.map(_.stageId).toSet should be (Set(50, 49, 48, 
47, 46))
+
+    // Test with 0 retainedStages
+    conf.set("spark.ui.retainedStages", 0.toString)
+    listener = new JobProgressListener(conf)
+    runWithListener(listener)
+    listener.completedStages.size should be (0)
   }
 
   test("test clearing of stageIdToActiveJobs") {
@@ -121,20 +130,29 @@ class JobProgressListenerSuite extends SparkFunSuite with 
LocalSparkContext with
   }
 
   test("test clearing of jobGroupToJobIds") {
+    def runWithListener(listener: JobProgressListener): Unit = {
+      // Run 50 jobs, each with one stage
+      for (jobId <- 0 to 50) {
+        listener.onJobStart(createJobStartEvent(jobId, Seq(0), jobGroup = 
Some(jobId.toString)))
+        listener.onStageSubmitted(createStageStartEvent(0))
+        listener.onStageCompleted(createStageEndEvent(0, failed = false))
+        listener.onJobEnd(createJobEndEvent(jobId, false))
+      }
+      assertActiveJobsStateIsEmpty(listener)
+    }
     val conf = new SparkConf()
     conf.set("spark.ui.retainedJobs", 5.toString)
-    val listener = new JobProgressListener(conf)
 
-    // Run 50 jobs, each with one stage
-    for (jobId <- 0 to 50) {
-      listener.onJobStart(createJobStartEvent(jobId, Seq(0), jobGroup = 
Some(jobId.toString)))
-      listener.onStageSubmitted(createStageStartEvent(0))
-      listener.onStageCompleted(createStageEndEvent(0, failed = false))
-      listener.onJobEnd(createJobEndEvent(jobId, false))
-    }
-    assertActiveJobsStateIsEmpty(listener)
+    var listener = new JobProgressListener(conf)
+    runWithListener(listener)
     // This collection won't become empty, but it should be bounded by 
spark.ui.retainedJobs
     listener.jobGroupToJobIds.size should be (5)
+
+    // Test with 0 jobs
+    conf.set("spark.ui.retainedJobs", 0.toString)
+    listener = new JobProgressListener(conf)
+    runWithListener(listener)
+    listener.jobGroupToJobIds.size should be (0)
   }
 
   test("test LRU eviction of jobs") {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to