If I have a streaming job (Spark 1.5.1) and attempt to stop the stream after the first batch, the system locks up and never completes. The pseudo code below shows that after the batch complete notification is called the stream is stopped. I have traced the lockup to the call `listener.stop()`in JobScheduler (line 114) which attempts to join the thread in AsynchronousListenerBus. That thread never ends because it is still getting messages `SparkListenerExecutorMetricsUpdate` from the DAGScheduler. The thread never ends because the events continue to come in.
Any thoughts/ideas on how I can effectively stop the stream after the first batch would greatly appreciated. Psuedo Example: class SomeJob { val ssc = createStreamingContext() val listener = new MyListener(ssc) ssc.addStreamingListener(listener) val stream = getStream stream.foreachRDD { rdd => // Do something with the data } } class MyListener(ctx: StreamingContext) extends StreamingListener { override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) = synchronized { ctx.stop(false, false) // NOTE: I get the same results with ctx.stop(), ctx.stop(true), ctx.stop(true, true), or ctx.stop(false, false) } } -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Calling-stop-on-StreamingContext-locks-up-tp15063.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org