If I have a streaming job (Spark 1.5.1) and attempt to stop the stream after
the first batch, the system locks up and never completes. The pseudo code
below shows that after the batch complete notification is called the stream
is stopped. I have traced the lockup to the call `listener.stop()`in
JobScheduler (line 114) which attempts to join the thread in
AsynchronousListenerBus. That thread never ends because it is still getting
messages `SparkListenerExecutorMetricsUpdate` from the DAGScheduler. The
thread never ends because the events continue to come in.

Any thoughts/ideas on how I can effectively stop the stream after the first
batch would greatly appreciated.

Psuedo Example:

class SomeJob {

    val ssc = createStreamingContext()
    val listener = new MyListener(ssc)
    ssc.addStreamingListener(listener)

    val stream = getStream

    stream.foreachRDD { rdd =>
        // Do something with the data
    }
}

class MyListener(ctx: StreamingContext) extends StreamingListener {
    override def onBatchCompleted(batchCompleted:
StreamingListenerBatchCompleted) = synchronized {
        ctx.stop(false, false)
        // NOTE: I get the same results with ctx.stop(), ctx.stop(true),
ctx.stop(true, true), or ctx.stop(false, false)
    }
}



--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/Calling-stop-on-StreamingContext-locks-up-tp15063.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org

Reply via email to