Repository: spark
Updated Branches:
  refs/heads/branch-1.4 c234d788f -> 11648fa01


[SPARK-7361] [STREAMING] Throw unambiguous exception when attempting to start 
multiple StreamingContexts in the same JVM

Currently attempt to start a streamingContext while another one is started 
throws a confusing exception that the action name JobScheduler is already 
registered. Instead its best to throw a proper exception as it is not supported.

Author: Tathagata Das <[email protected]>

Closes #5907 from tdas/SPARK-7361 and squashes the following commits:

fb81c4a [Tathagata Das] Fix typo
a9cd5bb [Tathagata Das] Added startSite to StreamingContext
5fdfc0d [Tathagata Das] Merge remote-tracking branch 'apache-github/master' 
into SPARK-7361
5870e2b [Tathagata Das] Added check for multiple streaming contexts

(cherry picked from commit 1b46556999ca126cb593ef052d24afcb75383223)
Signed-off-by: Tathagata Das <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/11648fa0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/11648fa0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/11648fa0

Branch: refs/heads/branch-1.4
Commit: 11648fa01149931c62862f67cc837f6a172e7d2b
Parents: c234d78
Author: Tathagata Das <[email protected]>
Authored: Mon May 11 10:58:56 2015 -0700
Committer: Tathagata Das <[email protected]>
Committed: Mon May 11 10:59:10 2015 -0700

----------------------------------------------------------------------
 .../spark/streaming/StreamingContext.scala      | 48 ++++++++++++++++----
 .../spark/streaming/StreamingContextSuite.scala | 18 ++++++++
 2 files changed, 58 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/11648fa0/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index bbdb4e8..5abe136 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.streaming
 
 import java.io.InputStream
-import java.util.concurrent.atomic.AtomicInteger
+import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}
 
 import scala.collection.Map
 import scala.collection.mutable.Queue
@@ -28,8 +28,9 @@ import akka.actor.{Props, SupervisorStrategy}
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.io.{BytesWritable, LongWritable, Text}
-import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
+import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
+
 import org.apache.spark._
 import org.apache.spark.annotation.Experimental
 import org.apache.spark.input.FixedLengthBinaryInputFormat
@@ -37,8 +38,9 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.dstream._
 import org.apache.spark.streaming.receiver.{ActorReceiver, 
ActorSupervisorStrategy, Receiver}
-import org.apache.spark.streaming.scheduler._
+import org.apache.spark.streaming.scheduler.{JobScheduler, StreamingListener}
 import org.apache.spark.streaming.ui.{StreamingJobProgressListener, 
StreamingTab}
+import org.apache.spark.util.CallSite
 
 /**
  * Main entry point for Spark Streaming functionality. It provides methods 
used to create
@@ -202,6 +204,8 @@ class StreamingContext private[streaming] (
   import StreamingContextState._
   private[streaming] var state = Initialized
 
+  private val startSite = new AtomicReference[CallSite](null)
+
   /**
    * Return the associated Spark context
    */
@@ -518,6 +522,7 @@ class StreamingContext private[streaming] (
    * @throws SparkException if the context has already been started or stopped.
    */
   def start(): Unit = synchronized {
+    import StreamingContext._
     if (state == Started) {
       throw new SparkException("StreamingContext has already been started")
     }
@@ -525,10 +530,15 @@ class StreamingContext private[streaming] (
       throw new SparkException("StreamingContext has already been stopped")
     }
     validate()
-    sparkContext.setCallSite(DStream.getCreationSite())
-    scheduler.start()
-    uiTab.foreach(_.attach())
-    state = Started
+    startSite.set(DStream.getCreationSite())
+    sparkContext.setCallSite(startSite.get)
+    ACTIVATION_LOCK.synchronized {
+      assertNoOtherContextIsActive()
+      scheduler.start()
+      uiTab.foreach(_.attach())
+      state = Started
+      setActiveContext(this)
+    }
   }
 
   /**
@@ -603,6 +613,7 @@ class StreamingContext private[streaming] (
     uiTab.foreach(_.detach())
     // The state should always be Stopped after calling `stop()`, even if we 
haven't started yet:
     state = Stopped
+    StreamingContext.setActiveContext(null)
   }
 }
 
@@ -612,8 +623,29 @@ class StreamingContext private[streaming] (
  */
 
 object StreamingContext extends Logging {
+  /**
+   * Lock that guards access to global variables that track active 
StreamingContext.
+   */
+  private val ACTIVATION_LOCK = new Object()
 
-  private[streaming] val DEFAULT_CLEANER_TTL = 3600
+  private val activeContext = new AtomicReference[StreamingContext](null)
+
+  private def assertNoOtherContextIsActive(): Unit = {
+    ACTIVATION_LOCK.synchronized {
+      if (activeContext.get() != null) {
+        throw new SparkException(
+          "Only one StreamingContext may be started in this JVM. " +
+            "Currently running StreamingContext was started at" +
+            activeContext.get.startSite.get.longForm)
+      }
+    }
+  }
+
+  private def setActiveContext(ssc: StreamingContext): Unit = {
+    ACTIVATION_LOCK.synchronized {
+      activeContext.set(ssc)
+    }
+  }
 
   @deprecated("Replaced by implicit functions in the DStream companion object. 
This is " +
     "kept here only for backward compatibility.", "1.3.0")

http://git-wip-us.apache.org/repos/asf/spark/blob/11648fa0/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index a589deb..11c7fd8 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -480,6 +480,24 @@ class StreamingContextSuite extends FunSuite with 
BeforeAndAfter with Timeouts w
     }
   }
 
+  test("multiple streaming contexts") {
+    sc = new SparkContext(new 
SparkConf().setMaster(master).setAppName(appName))
+    ssc = new StreamingContext(sc, Seconds(1))
+    val input = addInputStream(ssc)
+    input.foreachRDD { rdd => rdd.count }
+    ssc.start()
+
+    // Creating another streaming context should not create errors
+    val anotherSsc = new StreamingContext(sc, Seconds(10))
+    val anotherInput = addInputStream(anotherSsc)
+    anotherInput.foreachRDD { rdd => rdd.count }
+
+    val exception = intercept[SparkException] {
+      anotherSsc.start()
+    }
+    assert(exception.getMessage.contains("StreamingContext"), "Did not get the 
right exception")
+  }
+
   test("DStream and generated RDD creation sites") {
     testPackage.test()
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to