Repository: spark Updated Branches: refs/heads/branch-1.4 56016326c -> 2bbb685f4
[SPARK-7532] [STREAMING] StreamingContext.start() made to logWarning and not throw exception Author: Tathagata Das <[email protected]> Closes #6060 from tdas/SPARK-7532 and squashes the following commits: 6fe2e83 [Tathagata Das] Update docs 7dadfc3 [Tathagata Das] Fixed bug again 99c7678 [Tathagata Das] Added logInfo 65aec20 [Tathagata Das] Fix bug 5bf031b [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into SPARK-7532 1a9a818 [Tathagata Das] Fix scaladoc c584313 [Tathagata Das] StreamingContext.start() made to logWarning and not throw exception (cherry picked from commit ec6f2a9774167014566fb9608ee4394d2ce5fd6a) Signed-off-by: Tathagata Das <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2bbb685f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2bbb685f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2bbb685f Branch: refs/heads/branch-1.4 Commit: 2bbb685f4c8a52c069b1c7220f49c7a512d1dd73 Parents: 5601632 Author: Tathagata Das <[email protected]> Authored: Tue May 12 08:48:24 2015 -0700 Committer: Tathagata Das <[email protected]> Committed: Tue May 12 08:48:34 2015 -0700 ---------------------------------------------------------------------- .../spark/streaming/StreamingContext.scala | 27 ++++++++++---------- .../spark/streaming/StreamingContextSuite.scala | 4 +-- 2 files changed, 14 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/2bbb685f/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 2c5834d..8461e90 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -528,28 +528,27 @@ class StreamingContext private[streaming] ( /** * Start the execution of the streams. * - * @throws SparkException if the context has already been started or stopped. + * @throws SparkException if the StreamingContext is already stopped. */ def start(): Unit = synchronized { - import StreamingContext._ state match { case INITIALIZED => - // good to start + validate() + startSite.set(DStream.getCreationSite()) + sparkContext.setCallSite(startSite.get) + StreamingContext.ACTIVATION_LOCK.synchronized { + StreamingContext.assertNoOtherContextIsActive() + scheduler.start() + uiTab.foreach(_.attach()) + state = StreamingContextState.ACTIVE + StreamingContext.setActiveContext(this) + } + logInfo("StreamingContext started") case ACTIVE => - throw new SparkException("StreamingContext has already been started") + logWarning("StreamingContext has already been started") case STOPPED => throw new SparkException("StreamingContext has already been stopped") } - validate() - startSite.set(DStream.getCreationSite()) - sparkContext.setCallSite(startSite.get) - ACTIVATION_LOCK.synchronized { - assertNoOtherContextIsActive() - scheduler.start() - uiTab.foreach(_.attach()) - state = StreamingContextState.ACTIVE - setActiveContext(this) - } } /** http://git-wip-us.apache.org/repos/asf/spark/blob/2bbb685f/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index b8247db..4729951 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -136,9 +136,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w addInputStream(ssc).register() ssc.start() assert(ssc.getState() === StreamingContextState.ACTIVE) - intercept[SparkException] { - ssc.start() - } + ssc.start() assert(ssc.getState() === StreamingContextState.ACTIVE) } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
