Repository: spark
Updated Branches:
  refs/heads/branch-1.4 065d114c6 -> cb13c98b1


[SPARK-7217] [STREAMING] Add configuration to control the default behavior of 
StreamingContext.stop() implicitly calling SparkContext.stop()

In environments like notebooks, the SparkContext is managed by the underlying 
infrastructure and it is expected that the SparkContext will not be stopped. 
However, StreamingContext.stop() calls SparkContext.stop() as a non-intuitive 
side-effect. This PR adds a configuration in SparkConf that sets the default 
StreamingContext stop behavior. It should be such that the existing behavior 
does not change for existing users.

Author: Tathagata Das <[email protected]>

Closes #5929 from tdas/SPARK-7217 and squashes the following commits:

869a763 [Tathagata Das] Changed implementation.
685fe00 [Tathagata Das] Added configuration

(cherry picked from commit 01187f59b3d118495b6cfea965690829b99a36fa)
Signed-off-by: Tathagata Das <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cb13c98b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cb13c98b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cb13c98b

Branch: refs/heads/branch-1.4
Commit: cb13c98b1aa7fd35a285dadfecac684630c57ad4
Parents: 065d114
Author: Tathagata Das <[email protected]>
Authored: Thu May 7 00:24:44 2015 -0700
Committer: Tathagata Das <[email protected]>
Committed: Thu May 7 00:27:02 2015 -0700

----------------------------------------------------------------------
 .../spark/streaming/StreamingContext.scala       | 10 +++++++---
 .../spark/streaming/StreamingContextSuite.scala  | 19 +++++++++++++++++--
 2 files changed, 24 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/cb13c98b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index b1ad0d4..bbdb4e8 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -563,13 +563,17 @@ class StreamingContext private[streaming] (
 
   /**
    * Stop the execution of the streams immediately (does not wait for all 
received data
-   * to be processed).
+   * to be processed). By default, if `stopSparkContext` is not specified, the 
underlying
+   * SparkContext will also be stopped. This implicit behavior can be 
configured using the
+   * SparkConf configuration spark.streaming.stopSparkContextByDefault.
    *
-   * @param stopSparkContext if true, stops the associated SparkContext. The 
underlying SparkContext
+   * @param stopSparkContext If true, stops the associated SparkContext. The 
underlying SparkContext
    *                         will be stopped regardless of whether this 
StreamingContext has been
    *                         started.
    */
-  def stop(stopSparkContext: Boolean = true): Unit = synchronized {
+  def stop(
+      stopSparkContext: Boolean = 
conf.getBoolean("spark.streaming.stopSparkContextByDefault", true)
+     ): Unit = synchronized {
     stop(stopSparkContext, false)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/cb13c98b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index 5207b71..a589deb 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -118,6 +118,11 @@ class StreamingContextSuite extends FunSuite with 
BeforeAndAfter with Timeouts w
     assert(ssc.state === ssc.StreamingContextState.Started)
     ssc.stop()
     assert(ssc.state === ssc.StreamingContextState.Stopped)
+
+    // Make sure that the SparkContext is also stopped by default
+    intercept[Exception] {
+      ssc.sparkContext.makeRDD(1 to 10)
+    }
   }
 
   test("start multiple times") {
@@ -154,16 +159,26 @@ class StreamingContextSuite extends FunSuite with 
BeforeAndAfter with Timeouts w
   }
 
   test("stop only streaming context") {
-    ssc = new StreamingContext(master, appName, batchDuration)
+    val conf = new SparkConf().setMaster(master).setAppName(appName)
+
+    // Explicitly do not stop SparkContext
+    ssc = new StreamingContext(conf, batchDuration)
     sc = ssc.sparkContext
     addInputStream(ssc).register()
     ssc.start()
     ssc.stop(stopSparkContext = false)
     assert(sc.makeRDD(1 to 100).collect().size === 100)
-    ssc = new StreamingContext(sc, batchDuration)
+    sc.stop()
+
+    // Implicitly do not stop SparkContext
+    conf.set("spark.streaming.stopSparkContextByDefault", "false")
+    ssc = new StreamingContext(conf, batchDuration)
+    sc = ssc.sparkContext
     addInputStream(ssc).register()
     ssc.start()
     ssc.stop()
+    assert(sc.makeRDD(1 to 100).collect().size === 100)
+    sc.stop()
   }
 
   test("stop(stopSparkContext=true) after stop(stopSparkContext=false)") {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to