[ https://issues.apache.org/jira/browse/SPARK-17397?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15466183#comment-15466183 ]
Spiro Michaylov commented on SPARK-17397: ----------------------------------------- Apologies: I was originally going to file it as a doc bug, and perhaps I should have done that, although I actually suspect there's a feature enhancement to file as well. I guess maybe the user list was a better place to hash that out. I mostly had more mundane exceptions in mind, say throwing a custom exception when processing a stream like this fragment from my code above. {code} stream .map(x => { throw new SomeException("something"); x} ) .foreachRDD(r => println("*** count = " + r.count())) {code} I got the impression from the unit tests (StreamingContextSuite) that such processing exceptions were intended to propagate through awaitTermination() and indeed they do. In so far as the intended usage is orderly shutdown of the StreamingContext when the first such exception propagates, I agree it's just a doc issue, and perhaps the intended usage pattern is something like: {code} try { ssc.awaitTermination() println("*** streaming terminated") } catch { case e: Exception => { ssc.stop() // ADDED THIS AS A WAY TO GET AN ORDERLY SHUTDOWN } } {code} But it seems like to get resilient streaming applications you'd like to have the option of seeing which exceptions have propagated, how severe, and how many, so you'd like (at least _I_ would like) something like: {code} var terminated = false var nastyExceptionCount = 0 while (!terminated && nastyExceptionCount < 100) { try { ssc.awaitTermination() terminated = true println("*** streaming terminated") } catch { case ne: NastyException => { // log it and count it but don't panic nastyExceptionCount = nastyExceptionCount + 1 } case e: Exception => { // yawn! } } } if (!terminated) ssc.stop() // because we got too many nasty exceptions {code} However, that doesn't seem to be supported (looks like the first exception gets repeated every time awaitTermination() is called), so I'm inclined to file it as either a bug or a feature request according to your taste, or just drop it if you think what I'm proposing is inappropriate. In any case, I don't want to abuse the process more than I already have: I'm happy to either drop this, take it to the user list, or open other tickets if desired and close this one. > what to do when awaitTermination() throws? > ------------------------------------------- > > Key: SPARK-17397 > URL: https://issues.apache.org/jira/browse/SPARK-17397 > Project: Spark > Issue Type: Question > Components: Streaming > Affects Versions: 2.0.0 > Environment: Linux, Scala but probably general > Reporter: Spiro Michaylov > Priority: Minor > > When awaitTermination propagates an exception that was thrown in processing a > batch, the StreamingContext keeps running. Perhaps this is by design, but I > don't see any mention of it in the API docs or the streaming programming > guide. It's not clear what idiom should be used to block the thread until the > context HAS been stopped in a situation where stream processing is throwing > lots of exceptions. > For example, in the following, streaming takes the full 30 seconds to > terminate. My hope in asking this is to improve my own understanding and > perhaps inspire documentation improvements. I'm not filing a bug because it's > not clear to me whether this is working as intended. > {code} > val conf = new > SparkConf().setAppName("ExceptionPropagation").setMaster("local[4]") > val sc = new SparkContext(conf) > // streams will produce data every second > val ssc = new StreamingContext(sc, Seconds(1)) > val qm = new QueueMaker(sc, ssc) > // create the stream > val stream = // create some stream > // register for data > stream > .map(x => { throw new SomeException("something"); x} ) > .foreachRDD(r => println("*** count = " + r.count())) > // start streaming > ssc.start() > new Thread("Delayed Termination") { > override def run() { > Thread.sleep(30000) > ssc.stop() > } > }.start() > println("*** producing data") > // start producing data > qm.populateQueue() > try { > ssc.awaitTermination() > println("*** streaming terminated") > } catch { > case e: Exception => { > println("*** streaming exception caught in monitor thread") > } > } > // if the above goes down the exception path, there seems no > // good way to block here until the streaming context is stopped > println("*** done") > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org