Repository: spark Updated Branches: refs/heads/master e08ea7393 -> 4bba10c41
[SPARK-3233] Executor never stop its SparnEnv, BlockManager, ConnectionManager etc. Author: Kousuke Saruta <saru...@oss.nttdata.co.jp> Closes #2138 from sarutak/SPARK-3233 and squashes the following commits: c0205b7 [Kousuke Saruta] Merge branch 'SPARK-3233' of github.com:sarutak/spark into SPARK-3233 064679d [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-3233 d3005fd [Kousuke Saruta] Modified Class definition format of BlockManagerMaster 039b747 [Kousuke Saruta] Modified style 889e2d1 [Kousuke Saruta] Modified BlockManagerMaster to be able to be past isDriver flag 4da8535 [Kousuke Saruta] Modified BlockManagerMaster#stop to send StopBlockManagerMaster message when sender is Driver 6518c3a [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-3233 d5ab19a [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-3233 6bce25c [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-3233 6058a58 [Kousuke Saruta] Modified Executor not to invoke SparkEnv#stop in local mode e5ad9d3 [Kousuke Saruta] Modified Executor to stop SparnEnv at the end of itself Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4bba10c4 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4bba10c4 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4bba10c4 Branch: refs/heads/master Commit: 4bba10c41acaf84a1c4a8e2db467c22f5ab7cbb9 Parents: e08ea73 Author: Kousuke Saruta <saru...@oss.nttdata.co.jp> Authored: Wed Sep 3 18:42:01 2014 -0700 Committer: Andrew Or <andrewo...@gmail.com> Committed: Wed Sep 3 18:42:01 2014 -0700 ---------------------------------------------------------------------- core/src/main/scala/org/apache/spark/SparkEnv.scala | 2 +- core/src/main/scala/org/apache/spark/executor/Executor.scala | 3 +++ .../scala/org/apache/spark/storage/BlockManagerMaster.scala | 8 ++++++-- .../main/scala/org/apache/spark/storage/ThreadingTest.scala | 2 +- .../scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala | 2 +- .../scala/org/apache/spark/storage/BlockManagerSuite.scala | 2 +- 6 files changed, 13 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/4bba10c4/core/src/main/scala/org/apache/spark/SparkEnv.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 7271656..2973d00 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -225,7 +225,7 @@ object SparkEnv extends Logging { val blockManagerMaster = new BlockManagerMaster(registerOrLookup( "BlockManagerMaster", - new BlockManagerMasterActor(isLocal, conf, listenerBus)), conf) + new BlockManagerMasterActor(isLocal, conf, listenerBus)), conf, isDriver) val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster, serializer, conf, securityManager, mapOutputTracker, shuffleManager) http://git-wip-us.apache.org/repos/asf/spark/blob/4bba10c4/core/src/main/scala/org/apache/spark/executor/Executor.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index d7d19f6..dd903dc 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -123,6 +123,9 @@ private[spark] class Executor( env.metricsSystem.report() isStopped = true threadPool.shutdown() + if (!isLocal) { + env.stop() + } } class TaskRunner( http://git-wip-us.apache.org/repos/asf/spark/blob/4bba10c4/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala index e67b3dc..2e26259 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala @@ -27,7 +27,11 @@ import org.apache.spark.storage.BlockManagerMessages._ import org.apache.spark.util.AkkaUtils private[spark] -class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends Logging { +class BlockManagerMaster( + var driverActor: ActorRef, + conf: SparkConf, + isDriver: Boolean) + extends Logging { private val AKKA_RETRY_ATTEMPTS: Int = AkkaUtils.numRetries(conf) private val AKKA_RETRY_INTERVAL_MS: Int = AkkaUtils.retryWaitMs(conf) @@ -196,7 +200,7 @@ class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends Log /** Stop the driver actor, called only on the Spark driver node */ def stop() { - if (driverActor != null) { + if (driverActor != null && isDriver) { tell(StopBlockManagerMaster) driverActor = null logInfo("BlockManagerMaster stopped") http://git-wip-us.apache.org/repos/asf/spark/blob/4bba10c4/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala b/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala index aa83ea9..7540f0d 100644 --- a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala +++ b/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala @@ -99,7 +99,7 @@ private[spark] object ThreadingTest { val serializer = new KryoSerializer(conf) val blockManagerMaster = new BlockManagerMaster( actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf, new LiveListenerBus))), - conf) + conf, true) val blockManager = new BlockManager( "<driver>", actorSystem, blockManagerMaster, serializer, 1024 * 1024, conf, new SecurityManager(conf), new MapOutputTrackerMaster(conf), new HashShuffleManager(conf)) http://git-wip-us.apache.org/repos/asf/spark/blob/4bba10c4/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 1a42fc1..0bb91fe 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -120,7 +120,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F */ val cacheLocations = new HashMap[(Int, Int), Seq[BlockManagerId]] // stub out BlockManagerMaster.getLocations to use our cacheLocations - val blockManagerMaster = new BlockManagerMaster(null, conf) { + val blockManagerMaster = new BlockManagerMaster(null, conf, true) { override def getLocations(blockIds: Array[BlockId]): Seq[Seq[BlockManagerId]] = { blockIds.map { _.asRDDId.map(id => (id.rddId -> id.splitIndex)).flatMap(key => cacheLocations.get(key)). http://git-wip-us.apache.org/repos/asf/spark/blob/4bba10c4/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 14ffada..c200654 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -93,7 +93,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter master = new BlockManagerMaster( actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf, new LiveListenerBus))), - conf) + conf, true) val initialize = PrivateMethod[Unit]('initialize) SizeEstimator invokePrivate initialize() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org