[
https://issues.apache.org/jira/browse/SPARK-17397?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Hyukjin Kwon updated SPARK-17397:
---------------------------------
Labels: bulk-closed (was: )
> Show example of what to do when awaitTermination() throws an Exception
> ----------------------------------------------------------------------
>
> Key: SPARK-17397
> URL: https://issues.apache.org/jira/browse/SPARK-17397
> Project: Spark
> Issue Type: Improvement
> Components: Documentation, DStreams
> Affects Versions: 2.0.0
> Environment: Linux, Scala but probably general
> Reporter: Spiro Michaylov
> Priority: Minor
> Labels: bulk-closed
>
> When awaitTermination propagates an exception that was thrown in processing a
> batch, the StreamingContext keeps running. Perhaps this is by design, but I
> don't see any mention of it in the API docs or the streaming programming
> guide. It's not clear what idiom should be used to block the thread until the
> context HAS been stopped in a situation where stream processing is throwing
> lots of exceptions.
> For example, in the following, streaming takes the full 30 seconds to
> terminate. My hope in asking this is to improve my own understanding and
> perhaps inspire documentation improvements. I'm not filing a bug because it's
> not clear to me whether this is working as intended.
> {code}
> val conf = new
> SparkConf().setAppName("ExceptionPropagation").setMaster("local[4]")
> val sc = new SparkContext(conf)
> // streams will produce data every second
> val ssc = new StreamingContext(sc, Seconds(1))
> val qm = new QueueMaker(sc, ssc)
> // create the stream
> val stream = // create some stream
> // register for data
> stream
> .map(x => { throw new SomeException("something"); x} )
> .foreachRDD(r => println("*** count = " + r.count()))
> // start streaming
> ssc.start()
> new Thread("Delayed Termination") {
> override def run() {
> Thread.sleep(30000)
> ssc.stop()
> }
> }.start()
> println("*** producing data")
> // start producing data
> qm.populateQueue()
> try {
> ssc.awaitTermination()
> println("*** streaming terminated")
> } catch {
> case e: Exception => {
> println("*** streaming exception caught in monitor thread")
> }
> }
> // if the above goes down the exception path, there seems no
> // good way to block here until the streaming context is stopped
> println("*** done")
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]