[ 
https://issues.apache.org/jira/browse/SPARK-17397?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15466183#comment-15466183
 ] 

Spiro Michaylov commented on SPARK-17397:
-----------------------------------------

Apologies: I was originally going to file it as a doc bug, and perhaps I should 
have done that, although I actually suspect there's a feature enhancement to 
file as well. I guess maybe the user list was a better place to hash that out. 

I mostly had more mundane exceptions in mind, say throwing a custom exception 
when processing a stream like this fragment from my code above. 
{code}
stream
      .map(x => { throw new SomeException("something"); x} )
      .foreachRDD(r => println("*** count = " + r.count()))
{code}
I got the impression from the unit tests (StreamingContextSuite) that such 
processing exceptions were intended to propagate through awaitTermination() and 
indeed they do.  

In so far as the intended usage is orderly shutdown of the StreamingContext 
when the first such exception propagates, I agree it's just a doc issue, and 
perhaps the intended usage pattern is something like:

{code}
try {
    ssc.awaitTermination()
    println("*** streaming terminated")
} catch {
   case e: Exception => {
      ssc.stop() // ADDED THIS AS A WAY TO GET AN ORDERLY SHUTDOWN
  }
}
{code}

But it seems like to get resilient streaming applications you'd like to have 
the option of seeing which exceptions have propagated, how severe, and how 
many, so you'd like (at least _I_ would like) something like:

{code}
var terminated = false
var nastyExceptionCount = 0
while (!terminated && nastyExceptionCount < 100) {
  try {
      ssc.awaitTermination()
      terminated = true
      println("*** streaming terminated")
  } catch {
    case ne: NastyException => {
       // log it and count it but don't panic
       nastyExceptionCount = nastyExceptionCount + 1
    }
    case e: Exception => {
      // yawn!
    }
  }
}
if (!terminated) ssc.stop() // because we got too many nasty exceptions
{code}

However, that doesn't seem to be supported (looks like the first exception gets 
repeated every time awaitTermination() is called), so I'm inclined to file it 
as either a bug or a feature request according to your taste, or just drop it 
if you think what I'm proposing is inappropriate.

In any case, I don't want to abuse the process more than I already have: I'm 
happy to either drop this, take it to the user list, or open other tickets if 
desired and close this one.  


> what to do when awaitTermination() throws? 
> -------------------------------------------
>
>                 Key: SPARK-17397
>                 URL: https://issues.apache.org/jira/browse/SPARK-17397
>             Project: Spark
>          Issue Type: Question
>          Components: Streaming
>    Affects Versions: 2.0.0
>         Environment: Linux, Scala but probably general
>            Reporter: Spiro Michaylov
>            Priority: Minor
>
> When awaitTermination propagates an exception that was thrown in processing a 
> batch, the StreamingContext keeps running. Perhaps this is by design, but I 
> don't see any mention of it in the API docs or the streaming programming 
> guide. It's not clear what idiom should be used to block the thread until the 
> context HAS been stopped in a situation where stream processing is throwing 
> lots of exceptions. 
> For example, in the following, streaming takes the full 30 seconds to 
> terminate. My hope in asking this is to improve my own understanding and 
> perhaps inspire documentation improvements. I'm not filing a bug because it's 
> not clear to me whether this is working as intended. 
> {code}
>     val conf = new 
> SparkConf().setAppName("ExceptionPropagation").setMaster("local[4]")
>     val sc = new SparkContext(conf)
>     // streams will produce data every second
>     val ssc = new StreamingContext(sc, Seconds(1))
>     val qm = new QueueMaker(sc, ssc)
>     // create the stream
>     val stream = // create some stream
>     // register for data
>     stream
>       .map(x => { throw new SomeException("something"); x} )
>       .foreachRDD(r => println("*** count = " + r.count()))
>     // start streaming
>     ssc.start()
>     new Thread("Delayed Termination") {
>       override def run() {
>         Thread.sleep(30000)
>         ssc.stop()
>       }
>     }.start()
>     println("*** producing data")
>     // start producing data
>     qm.populateQueue()
>     try {
>       ssc.awaitTermination()
>       println("*** streaming terminated")
>     } catch {
>       case e: Exception => {
>         println("*** streaming exception caught in monitor thread")
>       }
>     }
>     // if the above goes down the exception path, there seems no 
>     // good way to block here until the streaming context is stopped 
>     println("*** done")
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to