Repository: spark Updated Branches: refs/heads/branch-1.4 065d114c6 -> cb13c98b1
[SPARK-7217] [STREAMING] Add configuration to control the default behavior of StreamingContext.stop() implicitly calling SparkContext.stop() In environments like notebooks, the SparkContext is managed by the underlying infrastructure and it is expected that the SparkContext will not be stopped. However, StreamingContext.stop() calls SparkContext.stop() as a non-intuitive side-effect. This PR adds a configuration in SparkConf that sets the default StreamingContext stop behavior. It should be such that the existing behavior does not change for existing users. Author: Tathagata Das <[email protected]> Closes #5929 from tdas/SPARK-7217 and squashes the following commits: 869a763 [Tathagata Das] Changed implementation. 685fe00 [Tathagata Das] Added configuration (cherry picked from commit 01187f59b3d118495b6cfea965690829b99a36fa) Signed-off-by: Tathagata Das <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cb13c98b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cb13c98b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cb13c98b Branch: refs/heads/branch-1.4 Commit: cb13c98b1aa7fd35a285dadfecac684630c57ad4 Parents: 065d114 Author: Tathagata Das <[email protected]> Authored: Thu May 7 00:24:44 2015 -0700 Committer: Tathagata Das <[email protected]> Committed: Thu May 7 00:27:02 2015 -0700 ---------------------------------------------------------------------- .../spark/streaming/StreamingContext.scala | 10 +++++++--- .../spark/streaming/StreamingContextSuite.scala | 19 +++++++++++++++++-- 2 files changed, 24 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/cb13c98b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index b1ad0d4..bbdb4e8 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -563,13 +563,17 @@ class StreamingContext private[streaming] ( /** * Stop the execution of the streams immediately (does not wait for all received data - * to be processed). + * to be processed). By default, if `stopSparkContext` is not specified, the underlying + * SparkContext will also be stopped. This implicit behavior can be configured using the + * SparkConf configuration spark.streaming.stopSparkContextByDefault. * - * @param stopSparkContext if true, stops the associated SparkContext. The underlying SparkContext + * @param stopSparkContext If true, stops the associated SparkContext. The underlying SparkContext * will be stopped regardless of whether this StreamingContext has been * started. */ - def stop(stopSparkContext: Boolean = true): Unit = synchronized { + def stop( + stopSparkContext: Boolean = conf.getBoolean("spark.streaming.stopSparkContextByDefault", true) + ): Unit = synchronized { stop(stopSparkContext, false) } http://git-wip-us.apache.org/repos/asf/spark/blob/cb13c98b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index 5207b71..a589deb 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -118,6 +118,11 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w assert(ssc.state === ssc.StreamingContextState.Started) ssc.stop() assert(ssc.state === ssc.StreamingContextState.Stopped) + + // Make sure that the SparkContext is also stopped by default + intercept[Exception] { + ssc.sparkContext.makeRDD(1 to 10) + } } test("start multiple times") { @@ -154,16 +159,26 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w } test("stop only streaming context") { - ssc = new StreamingContext(master, appName, batchDuration) + val conf = new SparkConf().setMaster(master).setAppName(appName) + + // Explicitly do not stop SparkContext + ssc = new StreamingContext(conf, batchDuration) sc = ssc.sparkContext addInputStream(ssc).register() ssc.start() ssc.stop(stopSparkContext = false) assert(sc.makeRDD(1 to 100).collect().size === 100) - ssc = new StreamingContext(sc, batchDuration) + sc.stop() + + // Implicitly do not stop SparkContext + conf.set("spark.streaming.stopSparkContextByDefault", "false") + ssc = new StreamingContext(conf, batchDuration) + sc = ssc.sparkContext addInputStream(ssc).register() ssc.start() ssc.stop() + assert(sc.makeRDD(1 to 100).collect().size === 100) + sc.stop() } test("stop(stopSparkContext=true) after stop(stopSparkContext=false)") { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
