Repository: spark Updated Branches: refs/heads/master bb68f4774 -> dd681f502
SPARK-1587 Fix thread leak mvn test fails (intermittently) due to thread leak - since scalatest runs all tests in same vm. Author: Mridul Muralidharan <[email protected]> Closes #504 from mridulm/resource_leak_fixes and squashes the following commits: a5d10d0 [Mridul Muralidharan] Prevent thread leaks while running tests : cleanup all threads when SparkContext.stop is invoked. Causes tests to fail 7b5e19c [Mridul Muralidharan] Prevent NPE while running tests Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dd681f50 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dd681f50 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dd681f50 Branch: refs/heads/master Commit: dd681f502eafe39cfb8a5a62ea2d28016ac6013d Parents: bb68f47 Author: Mridul Muralidharan <[email protected]> Authored: Wed Apr 23 23:20:55 2014 -0700 Committer: Aaron Davidson <[email protected]> Committed: Wed Apr 23 23:20:55 2014 -0700 ---------------------------------------------------------------------- .../apache/spark/metrics/MetricsSystem.scala | 22 ++++++++------- .../spark/scheduler/TaskSchedulerImpl.scala | 1 + .../org/apache/spark/storage/BlockManager.scala | 2 ++ .../apache/spark/storage/DiskBlockManager.scala | 28 ++++++++++++-------- .../spark/storage/ShuffleBlockManager.scala | 4 +++ .../scala/org/apache/spark/ui/JettyUtils.scala | 1 + .../spark/storage/DiskBlockManagerSuite.scala | 5 ++++ 7 files changed, 42 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/dd681f50/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala index c5bda20..651511d 100644 --- a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala +++ b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala @@ -129,17 +129,19 @@ private[spark] class MetricsSystem private (val instance: String, sinkConfigs.foreach { kv => val classPath = kv._2.getProperty("class") - try { - val sink = Class.forName(classPath) - .getConstructor(classOf[Properties], classOf[MetricRegistry], classOf[SecurityManager]) - .newInstance(kv._2, registry, securityMgr) - if (kv._1 == "servlet") { - metricsServlet = Some(sink.asInstanceOf[MetricsServlet]) - } else { - sinks += sink.asInstanceOf[Sink] + if (null != classPath) { + try { + val sink = Class.forName(classPath) + .getConstructor(classOf[Properties], classOf[MetricRegistry], classOf[SecurityManager]) + .newInstance(kv._2, registry, securityMgr) + if (kv._1 == "servlet") { + metricsServlet = Some(sink.asInstanceOf[MetricsServlet]) + } else { + sinks += sink.asInstanceOf[Sink] + } + } catch { + case e: Exception => logError("Sink class " + classPath + " cannot be instantialized", e) } - } catch { - case e: Exception => logError("Sink class " + classPath + " cannot be instantialized", e) } } } http://git-wip-us.apache.org/repos/asf/spark/blob/dd681f50/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index be19d9b..5a68f38 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -356,6 +356,7 @@ private[spark] class TaskSchedulerImpl( if (taskResultGetter != null) { taskResultGetter.stop() } + starvationTimer.cancel() // sleeping for an arbitrary 1 seconds to ensure that messages are sent out. Thread.sleep(1000L) http://git-wip-us.apache.org/repos/asf/spark/blob/dd681f50/core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index f15fa4d..ccd5c53 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -1021,6 +1021,8 @@ private[spark] class BlockManager( heartBeatTask.cancel() } connectionManager.stop() + shuffleBlockManager.stop() + diskBlockManager.stop() actorSystem.stop(slaveActor) blockInfo.clear() memoryStore.clear() http://git-wip-us.apache.org/repos/asf/spark/blob/dd681f50/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala index 7a24c8f..054f66a 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -150,20 +150,26 @@ private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootD Runtime.getRuntime.addShutdownHook(new Thread("delete Spark local dirs") { override def run() { logDebug("Shutdown hook called") - localDirs.foreach { localDir => - try { - if (!Utils.hasRootAsShutdownDeleteDir(localDir)) Utils.deleteRecursively(localDir) - } catch { - case t: Throwable => - logError("Exception while deleting local spark dir: " + localDir, t) - } - } + stop() + } + }) + } - if (shuffleSender != null) { - shuffleSender.stop() + private[spark] def stop() { + localDirs.foreach { localDir => + if (localDir.isDirectory() && localDir.exists()) { + try { + if (!Utils.hasRootAsShutdownDeleteDir(localDir)) Utils.deleteRecursively(localDir) + } catch { + case t: Throwable => + logError("Exception while deleting local spark dir: " + localDir, t) } } - }) + } + + if (shuffleSender != null) { + shuffleSender.stop() + } } private[storage] def startShuffleBlockSender(port: Int): Int = { http://git-wip-us.apache.org/repos/asf/spark/blob/dd681f50/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala index 4cd4cdb..35910e5 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala @@ -207,6 +207,10 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging { private def cleanup(cleanupTime: Long) { shuffleStates.clearOldValues(cleanupTime, (shuffleId, state) => removeShuffleBlocks(shuffleId)) } + + def stop() { + metadataCleaner.cancel() + } } private[spark] http://git-wip-us.apache.org/repos/asf/spark/blob/dd681f50/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala index 750f5a5..fdeb15b 100644 --- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala @@ -195,6 +195,7 @@ private[spark] object JettyUtils extends Logging { (server, server.getConnectors.head.getLocalPort) case f: Failure[_] => server.stop() + pool.stop() logInfo("Failed to create UI at port, %s. Trying again.".format(currentPort)) logInfo("Error was: " + f.toString) connect((currentPort + 1) % 65536) http://git-wip-us.apache.org/repos/asf/spark/blob/dd681f50/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala index 9b29e2a..42bfbf1 100644 --- a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala @@ -53,6 +53,11 @@ class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach { shuffleBlockManager.idToSegmentMap.clear() } + override def afterEach() { + diskBlockManager.stop() + shuffleBlockManager.idToSegmentMap.clear() + } + test("basic block creation") { val blockId = new TestBlockId("test") assertSegmentEquals(blockId, blockId.name, 0, 0)
