Repository: spark
Updated Branches:
  refs/heads/master 79f0b371a -> 7dda0844e


[SPARK-2645] [CORE] Allow SparkEnv.stop() to be called multiple times without 
side effects.

Fix for SparkContext stop behavior - Allow sc.stop() to be called multiple 
times without side effects.

Author: Joshi <[email protected]>
Author: Rekha Joshi <[email protected]>

Closes #6973 from rekhajoshm/SPARK-2645 and squashes the following commits:

277043e [Joshi] Fix for SparkContext stop behavior
446b0a4 [Joshi] Fix for SparkContext stop behavior
2ce5760 [Joshi] Fix for SparkContext stop behavior
c97839a [Joshi] Fix for SparkContext stop behavior
1aff39c [Joshi] Fix for SparkContext stop behavior
12f66b5 [Joshi] Fix for SparkContext stop behavior
72bb484 [Joshi] Fix for SparkContext stop behavior
a5a7d7f [Joshi] Fix for SparkContext stop behavior
9193a0c [Joshi] Fix for SparkContext stop behavior
58dba70 [Joshi] SPARK-2645: Fix for SparkContext stop behavior
380c5b0 [Joshi] SPARK-2645: Fix for SparkContext stop behavior
b566b66 [Joshi] SPARK-2645: Fix for SparkContext stop behavior
0be142d [Rekha Joshi] Merge pull request #3 from apache/master
106fd8e [Rekha Joshi] Merge pull request #2 from apache/master
e3677c9 [Rekha Joshi] Merge pull request #1 from apache/master


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7dda0844
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7dda0844
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7dda0844

Branch: refs/heads/master
Commit: 7dda0844e1eb6df7455af68592751806b3b92251
Parents: 79f0b37
Author: Joshi <[email protected]>
Authored: Tue Jun 30 14:00:35 2015 -0700
Committer: Andrew Or <[email protected]>
Committed: Tue Jun 30 14:00:35 2015 -0700

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/SparkEnv.scala  | 66 ++++++++++----------
 .../org/apache/spark/SparkContextSuite.scala    | 13 ++++
 2 files changed, 47 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7dda0844/core/src/main/scala/org/apache/spark/SparkEnv.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala 
b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index b066557..1b133fb 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -22,7 +22,6 @@ import java.net.Socket
 
 import akka.actor.ActorSystem
 
-import scala.collection.JavaConversions._
 import scala.collection.mutable
 import scala.util.Properties
 
@@ -90,39 +89,42 @@ class SparkEnv (
   private var driverTmpDirToDelete: Option[String] = None
 
   private[spark] def stop() {
-    isStopped = true
-    pythonWorkers.foreach { case(key, worker) => worker.stop() }
-    Option(httpFileServer).foreach(_.stop())
-    mapOutputTracker.stop()
-    shuffleManager.stop()
-    broadcastManager.stop()
-    blockManager.stop()
-    blockManager.master.stop()
-    metricsSystem.stop()
-    outputCommitCoordinator.stop()
-    rpcEnv.shutdown()
-
-    // Unfortunately Akka's awaitTermination doesn't actually wait for the 
Netty server to shut
-    // down, but let's call it anyway in case it gets fixed in a later release
-    // UPDATE: In Akka 2.1.x, this hangs if there are remote actors, so we 
can't call it.
-    // actorSystem.awaitTermination()
-
-    // Note that blockTransferService is stopped by BlockManager since it is 
started by it.
-
-    // If we only stop sc, but the driver process still run as a services then 
we need to delete
-    // the tmp dir, if not, it will create too many tmp dirs.
-    // We only need to delete the tmp dir create by driver, because 
sparkFilesDir is point to the
-    // current working dir in executor which we do not need to delete.
-    driverTmpDirToDelete match {
-      case Some(path) => {
-        try {
-          Utils.deleteRecursively(new File(path))
-        } catch {
-          case e: Exception =>
-            logWarning(s"Exception while deleting Spark temp dir: $path", e)
+
+    if (!isStopped) {
+      isStopped = true
+      pythonWorkers.values.foreach(_.stop())
+      Option(httpFileServer).foreach(_.stop())
+      mapOutputTracker.stop()
+      shuffleManager.stop()
+      broadcastManager.stop()
+      blockManager.stop()
+      blockManager.master.stop()
+      metricsSystem.stop()
+      outputCommitCoordinator.stop()
+      rpcEnv.shutdown()
+
+      // Unfortunately Akka's awaitTermination doesn't actually wait for the 
Netty server to shut
+      // down, but let's call it anyway in case it gets fixed in a later 
release
+      // UPDATE: In Akka 2.1.x, this hangs if there are remote actors, so we 
can't call it.
+      // actorSystem.awaitTermination()
+
+      // Note that blockTransferService is stopped by BlockManager since it is 
started by it.
+
+      // If we only stop sc, but the driver process still run as a services 
then we need to delete
+      // the tmp dir, if not, it will create too many tmp dirs.
+      // We only need to delete the tmp dir create by driver, because 
sparkFilesDir is point to the
+      // current working dir in executor which we do not need to delete.
+      driverTmpDirToDelete match {
+        case Some(path) => {
+          try {
+            Utils.deleteRecursively(new File(path))
+          } catch {
+            case e: Exception =>
+              logWarning(s"Exception while deleting Spark temp dir: $path", e)
+          }
         }
+        case None => // We just need to delete tmp dir created by driver, so 
do nothing on executor
       }
-      case None => // We just need to delete tmp dir created by driver, so do 
nothing on executor
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/7dda0844/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala 
b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
index 6838b35..5c57940 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
@@ -30,6 +30,7 @@ import org.apache.spark.util.Utils
 
 import scala.concurrent.Await
 import scala.concurrent.duration.Duration
+import org.scalatest.Matchers._
 
 class SparkContextSuite extends SparkFunSuite with LocalSparkContext {
 
@@ -272,4 +273,16 @@ class SparkContextSuite extends SparkFunSuite with 
LocalSparkContext {
       sc.stop()
     }
   }
+
+  test("calling multiple sc.stop() must not throw any exception") {
+    noException should be thrownBy {
+      sc = new SparkContext(new 
SparkConf().setAppName("test").setMaster("local"))
+      val cnt = sc.parallelize(1 to 4).count()
+      sc.cancelAllJobs()
+      sc.stop()
+      // call stop second time
+      sc.stop()
+    }
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to