Since you are using the latest Spark code and not Spark 0.9.1 (guessed from
the log messages), you can actually do graceful shutdown of a streaming
context. This ensures that the receivers are properly stopped and all
received data is processed and then the system terminates (stop() stays
blocked until then. See other variations of streamingContext.stop().

TD

On Mon, May 12, 2014 at 2:49 AM, Tobias Pfeiffer <t...@preferred.jp> wrote:

> Hello,
>
> I am trying to implement something like "process a stream for N
> seconds, then return a result" with Spark Streaming (built from git
> head). My approach (which is probably not very elegant) is
>
>     val ssc = new StreamingContext(...)
>     ssc.start()
>     future {
>       Thread.sleep(Seconds(N))
>       ssc.stop(true)
>     }
>     ssc.awaitTermination()
>
> and in fact, this stops the stream processing. However, I get the
> following error messages:
>
> 14/05/12 18:41:49 ERROR scheduler.ReceiverTracker: Deregistered
> receiver for stream 0: Stopped by driver
> 14/05/12 18:41:49 ERROR scheduler.ReceiverTracker: Deregistered
> receiver for stream 0: Restarting receiver with delay 2000ms: Retrying
> connecting to localhost:9999
> 14/05/12 18:41:50 ERROR network.ConnectionManager: Corresponding
> SendingConnectionManagerId not found
> 14/05/12 18:41:50 ERROR network.ConnectionManager: Corresponding
> SendingConnectionManagerId not found
>
> (where localhost:9999 is the source I am reading the stream from).
> This doesn't actually seem like the proper way to do it. Can anyone
> point me to how to implement "stop after N seconds" without these
> error messages?
>
> Thanks
> Tobias
>

Reply via email to