Repository: spark Updated Branches: refs/heads/master bf5cb8af4 -> bd2c12fb4
[SPARK-12920][CORE] Honor "spark.ui.retainedStages" to reduce mem-pressure When large number of jobs are run concurrently with Spark thrift server, thrift server starts running at high CPU due to GC pressure. Job UI retention causes memory pressure with large jobs. https://issues.apache.org/jira/secure/attachment/12783302/SPARK-12920.profiler_job_progress_listner.png has the profiler snapshot. This PR honors `spark.ui.retainedStages` strictly to reduce memory pressure. Manual and unit tests Author: Rajesh Balamohan <[email protected]> Closes #10846 from rajeshbalamohan/SPARK-12920. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bd2c12fb Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bd2c12fb Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bd2c12fb Branch: refs/heads/master Commit: bd2c12fb4994785d5becce541aee9ba73fef1c4c Parents: bf5cb8a Author: Rajesh Balamohan <[email protected]> Authored: Wed Aug 10 15:30:22 2016 -0700 Committer: Marcelo Vanzin <[email protected]> Committed: Wed Aug 10 15:30:52 2016 -0700 ---------------------------------------------------------------------- .../spark/ui/jobs/JobProgressListener.scala | 4 +- .../ui/jobs/JobProgressListenerSuite.scala | 50 +++++++++++++------- 2 files changed, 36 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/bd2c12fb/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index c882740..491f716 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -140,7 +140,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { /** If stages is too large, remove and garbage collect old stages */ private def trimStagesIfNecessary(stages: ListBuffer[StageInfo]) = synchronized { if (stages.size > retainedStages) { - val toRemove = math.max(retainedStages / 10, 1) + val toRemove = (stages.size - retainedStages) stages.take(toRemove).foreach { s => stageIdToData.remove((s.stageId, s.attemptId)) stageIdToInfo.remove(s.stageId) @@ -152,7 +152,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { /** If jobs is too large, remove and garbage collect old jobs */ private def trimJobsIfNecessary(jobs: ListBuffer[JobUIData]) = synchronized { if (jobs.size > retainedJobs) { - val toRemove = math.max(retainedJobs / 10, 1) + val toRemove = (jobs.size - retainedJobs) jobs.take(toRemove).foreach { job => // Remove the job's UI data, if it exists jobIdToData.remove(job.jobId).foreach { removedJob => http://git-wip-us.apache.org/repos/asf/spark/blob/bd2c12fb/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala index edab727..8418fa7 100644 --- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala @@ -84,18 +84,27 @@ class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with } test("test LRU eviction of stages") { + def runWithListener(listener: JobProgressListener) : Unit = { + for (i <- 1 to 50) { + listener.onStageSubmitted(createStageStartEvent(i)) + listener.onStageCompleted(createStageEndEvent(i)) + } + assertActiveJobsStateIsEmpty(listener) + } val conf = new SparkConf() conf.set("spark.ui.retainedStages", 5.toString) - val listener = new JobProgressListener(conf) - - for (i <- 1 to 50) { - listener.onStageSubmitted(createStageStartEvent(i)) - listener.onStageCompleted(createStageEndEvent(i)) - } - assertActiveJobsStateIsEmpty(listener) + var listener = new JobProgressListener(conf) + // Test with 5 retainedStages + runWithListener(listener) listener.completedStages.size should be (5) listener.completedStages.map(_.stageId).toSet should be (Set(50, 49, 48, 47, 46)) + + // Test with 0 retainedStages + conf.set("spark.ui.retainedStages", 0.toString) + listener = new JobProgressListener(conf) + runWithListener(listener) + listener.completedStages.size should be (0) } test("test clearing of stageIdToActiveJobs") { @@ -121,20 +130,29 @@ class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with } test("test clearing of jobGroupToJobIds") { + def runWithListener(listener: JobProgressListener): Unit = { + // Run 50 jobs, each with one stage + for (jobId <- 0 to 50) { + listener.onJobStart(createJobStartEvent(jobId, Seq(0), jobGroup = Some(jobId.toString))) + listener.onStageSubmitted(createStageStartEvent(0)) + listener.onStageCompleted(createStageEndEvent(0, failed = false)) + listener.onJobEnd(createJobEndEvent(jobId, false)) + } + assertActiveJobsStateIsEmpty(listener) + } val conf = new SparkConf() conf.set("spark.ui.retainedJobs", 5.toString) - val listener = new JobProgressListener(conf) - // Run 50 jobs, each with one stage - for (jobId <- 0 to 50) { - listener.onJobStart(createJobStartEvent(jobId, Seq(0), jobGroup = Some(jobId.toString))) - listener.onStageSubmitted(createStageStartEvent(0)) - listener.onStageCompleted(createStageEndEvent(0, failed = false)) - listener.onJobEnd(createJobEndEvent(jobId, false)) - } - assertActiveJobsStateIsEmpty(listener) + var listener = new JobProgressListener(conf) + runWithListener(listener) // This collection won't become empty, but it should be bounded by spark.ui.retainedJobs listener.jobGroupToJobIds.size should be (5) + + // Test with 0 jobs + conf.set("spark.ui.retainedJobs", 0.toString) + listener = new JobProgressListener(conf) + runWithListener(listener) + listener.jobGroupToJobIds.size should be (0) } test("test LRU eviction of jobs") { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
