Repository: spark Updated Branches: refs/heads/master c7c7ac833 -> 7446f5ff9
discarded exceeded completedDrivers When completedDrivers number exceeds the threshold, the first Max(spark.deploy.retainedDrivers, 1) will be discarded. Author: lianhuiwang <[email protected]> Closes #1114 from lianhuiwang/retained-drivers and squashes the following commits: 8789418 [lianhuiwang] discarded exceeded completedDrivers Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7446f5ff Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7446f5ff Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7446f5ff Branch: refs/heads/master Commit: 7446f5ff93142d2dd5c79c63fa947f47a1d4db8b Parents: c7c7ac8 Author: lianhuiwang <[email protected]> Authored: Tue Jul 15 00:22:06 2014 -0700 Committer: Reynold Xin <[email protected]> Committed: Tue Jul 15 00:22:06 2014 -0700 ---------------------------------------------------------------------- core/src/main/scala/org/apache/spark/deploy/master/Master.scala | 5 +++++ 1 file changed, 5 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/7446f5ff/core/src/main/scala/org/apache/spark/deploy/master/Master.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index d9f8105..9fa556d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -57,6 +57,7 @@ private[spark] class Master( def createDateFormat = new SimpleDateFormat("yyyyMMddHHmmss") // For application IDs val WORKER_TIMEOUT = conf.getLong("spark.worker.timeout", 60) * 1000 val RETAINED_APPLICATIONS = conf.getInt("spark.deploy.retainedApplications", 200) + val RETAINED_DRIVERS = conf.getInt("spark.deploy.retainedDrivers", 200) val REAPER_ITERATIONS = conf.getInt("spark.dead.worker.persistence", 15) val RECOVERY_DIR = conf.get("spark.deploy.recoveryDirectory", "") val RECOVERY_MODE = conf.get("spark.deploy.recoveryMode", "NONE") @@ -741,6 +742,10 @@ private[spark] class Master( case Some(driver) => logInfo(s"Removing driver: $driverId") drivers -= driver + if (completedDrivers.size >= RETAINED_DRIVERS) { + val toRemove = math.max(RETAINED_DRIVERS / 10, 1) + completedDrivers.trimStart(toRemove) + } completedDrivers += driver persistenceEngine.removeDriver(driver) driver.state = finalState
