Re: Spark Streaming - graceful shutdown when stream has no more data

2016-03-01 Thread Sachin Aggarwal
hi,

I used this code for graceful shutdown of my streaming app, this may not be
the best way. correct me

sys.ShutdownHookThread {
  println("Gracefully stopping Spark Streaming Application")
  ssc.stop(true, true)
  println("Application stopped")
}

class StopContextThread(ssc: StreamingContext) extends Runnable {
  def run {
ssc.stop(true, true)
  }
}

and i use this whenever i want to start graceful shutdown

val thread: Thread = new Thread(new StopContextThread(ssc))

thread.start


On Tue, Mar 1, 2016 at 2:32 PM, Lars Albertsson <la...@mapflat.com> wrote:

> If you wait for an inactivity period before ending a test case, you
> get the choice of using a long timeout, resulting in slow tests, or a
> short timeout, resulting in brittle tests. Both options will make
> developers waste time, and harm developer productivity.
>
> I suggest that you terminate the test case based on the test predicate
> getting fulfilled, with a long timeout in case of test failure. I
> presume that your application produces output to a channel, e.g.
> database or Kafka topic, which the test oracle can inspect for test
> completeness.
>
> The flow becomes:
>
> 1. Start the stream source component and output component locally,
> e.g. Kafka+Cassandra
> 2. Start the Spark streaming application, typically with a local master.
> 3. Feed the input into the stream source, e.g. a Kafka test topic.
> 4. Let the test oracle loop, polling for a condition to be met on the
> output, e.g. existence of a database entry or a message on the Kafka
> output topic. Sleep for a short period (10 ms) between polls, and fail
> the test if the condition is not met after a long time (30 s).
> 5. Terminate the streaming application.
> 6. Terminate the stream source and output components.
>
> I have used this strategy for testing both financial transaction
> systems and spark streaming applications, and it has resulted in both
> fast and reliable tests, without strong coupling between production
> code and test code.
>
> The base strategy fails if you need to test for the absence of an
> output event, e.g. when my streaming event sees message X, it should
> filter it and not produce output. You then need to send another input
> event (poison pill pattern), and terminate on the output effects of
> the poison pill event.
>
> If you test with multiple streaming executors, remember that there are
> no order guarantees between executors; you will need to either make
> sure that all executors receive poison pills, and test for the pill
> effects of all of them.
>
> Starting the source+output components and the Spark context can be
> slow. I recommend that you provide the option to reuse the test
> fixture between test cases for speed. For example, if you start and
> stop the fixture once for each test class, rather than once per test
> method, you save a lot of time.
>
> Regards,
>
>
>
> Lars Albertsson
> Data engineering consultant
> www.mapflat.com
> +46 70 7687109
>
>
> On Fri, Feb 26, 2016 at 8:24 AM, Mao, Wei <wei@intel.com> wrote:
> > I would argue against making it configurable unless there is real
> production
> > use case. If it’s just for test, there are bunch of ways to achieve it.
> For
> > example, you can mark if test streaming is finished globally, and stop
> ssc
> > on another thread when status of that mark changed.
> >
> >
> >
> > Back to  original exception, blindly calling “Option.get” is always not a
> > good practice. It would be better to pre-validate or use
> > getOption/getOrElse.
> >
> >
> >
> > Thanks,
> >
> > William
> >
> >
> >
> > From: Cheng, Hao [mailto:hao.ch...@intel.com]
> > Sent: Thursday, February 25, 2016 1:03 AM
> > To: Daniel Siegmann; Ashutosh Kumar
> > Cc: Hemant Bhanawat; Ted Yu; Femi Anthony; user
> > Subject: RE: Spark Streaming - graceful shutdown when stream has no more
> > data
> >
> >
> >
> > This is very interesting, how to shutdown the streaming job gracefully
> once
> > no input data for some time.
> >
> >
> >
> > A doable solution probably you can count the input data by using the
> > Accumulator, and anther thread (in master node) will always to get the
> > latest accumulator value, if there is no value change from the
> accumulator
> > for sometime, then shutdown the streaming job.
> >
> >
> >
> > From: Daniel Siegmann [mailto:daniel.siegm...@teamaol.com]
> > Sent: Wednesday, February 24, 2016 12:30 AM
> > To: Ashutosh Kumar <kmr.ashutos...@gmail.com>
> > Cc: Hemant Bhanawat <hemant9...@gmail.com>; Ted Yu 

RE: Spark Streaming - graceful shutdown when stream has no more data

2016-02-25 Thread Mao, Wei
I would argue against making it configurable unless there is real production 
use case. If it’s just for test, there are bunch of ways to achieve it. For 
example, you can mark if test streaming is finished globally, and stop ssc on 
another thread when status of that mark changed.

Back to  original exception, blindly calling “Option.get” is always not a good 
practice. It would be better to pre-validate or use getOption/getOrElse.

Thanks,
William

From: Cheng, Hao [mailto:hao.ch...@intel.com]
Sent: Thursday, February 25, 2016 1:03 AM
To: Daniel Siegmann; Ashutosh Kumar
Cc: Hemant Bhanawat; Ted Yu; Femi Anthony; user
Subject: RE: Spark Streaming - graceful shutdown when stream has no more data

This is very interesting, how to shutdown the streaming job gracefully once no 
input data for some time.

A doable solution probably you can count the input data by using the 
Accumulator, and anther thread (in master node) will always to get the latest 
accumulator value, if there is no value change from the accumulator for 
sometime, then shutdown the streaming job.

From: Daniel Siegmann [mailto:daniel.siegm...@teamaol.com]
Sent: Wednesday, February 24, 2016 12:30 AM
To: Ashutosh Kumar <kmr.ashutos...@gmail.com<mailto:kmr.ashutos...@gmail.com>>
Cc: Hemant Bhanawat <hemant9...@gmail.com<mailto:hemant9...@gmail.com>>; Ted Yu 
<yuzhih...@gmail.com<mailto:yuzhih...@gmail.com>>; Femi Anthony 
<femib...@gmail.com<mailto:femib...@gmail.com>>; user 
<user@spark.apache.org<mailto:user@spark.apache.org>>
Subject: Re: Spark Streaming - graceful shutdown when stream has no more data

During testing you will typically be using some finite data. You want the 
stream to shut down automatically when that data has been consumed so your test 
shuts down gracefully.
Of course once the code is running in production you'll want it to keep waiting 
for new records. So whether the stream shuts down when there's no more data 
should be configurable.

On Tue, Feb 23, 2016 at 11:09 AM, Ashutosh Kumar 
<kmr.ashutos...@gmail.com<mailto:kmr.ashutos...@gmail.com>> wrote:
Just out of curiosity I will like to know why a streaming program should 
shutdown when no new data is arriving?  I think it should keep waiting for 
arrival of new records.
Thanks
Ashutosh

On Tue, Feb 23, 2016 at 9:17 PM, Hemant Bhanawat 
<hemant9...@gmail.com<mailto:hemant9...@gmail.com>> wrote:
A guess - parseRecord is returning None in some case (probaly empty lines). And 
then entry.get is throwing the exception.
You may want to filter the None values from accessLogDStream before you run the 
map function over it.
Hemant

Hemant Bhanawat<https://www.linkedin.com/in/hemant-bhanawat-92a3811>
www.snappydata.io<http://www.snappydata.io>

On Tue, Feb 23, 2016 at 6:00 PM, Ted Yu 
<yuzhih...@gmail.com<mailto:yuzhih...@gmail.com>> wrote:
Which line is line 42 in your code ?

When variable lines becomes empty, you can stop your program.

Cheers

On Feb 23, 2016, at 12:25 AM, Femi Anthony 
<femib...@gmail.com<mailto:femib...@gmail.com>> wrote:

I am working on Spark Streaming API and I wish to stream a set of 
pre-downloaded web log files continuously to simulate a real-time stream. I 
wrote a script that gunzips the compressed logs and pipes the output to nc on 
port .

The script looks like this:

BASEDIR=/home/mysuer/data/datamining/internet_traffic_archive

zipped_files=`find $BASEDIR -name "*.gz"`



for zfile in $zipped_files

 do

  echo "Unzipping $zfile..."

  gunzip -c $zfile  | nc -l -p  -q 20



 done
I have streaming code written in Scala that processes the streams. It works 
well for the most part, but when its run out of files to stream I get the 
following error in Spark:


16/02/19 23:04:35 WARN ReceiverSupervisorImpl:

Restarting receiver with delay 2000 ms: Socket data stream had no more data

16/02/19 23:04:35 ERROR ReceiverTracker: Deregistered receiver for stream 0:

Restarting receiver with delay 2000ms: Socket data stream had no more data

16/02/19 23:04:35 WARN BlockManager: Block input-0-1455941075600 replicated to 
only 0 peer(s) instead of 1 peers



16/02/19 23:04:40 ERROR Executor: Exception in task 2.0 in stage 15.0 (TID 47)

java.util.NoSuchElementException: None.get

at scala.None$.get(Option.scala:313)

at scala.None$.get(Option.scala:311)

at 
com.femibyte.learningsparkaddexamples.scala.StreamingLogEnhanced$$anonfun$2.apply(StreamingLogEnhanced.scala:42)

at 
com.femibyte.learningsparkaddexamples.scala.StreamingLogEnhanced$$anonfun$2.apply(StreamingLogEnhanced.scala:42)

How to I implement a graceful shutdown so that the program exits gracefully 
when it no longer detects any data in the stream ?

My Spark Streaming code looks like this:

object StreamingLogEnhanced {

 def main(args: Array[String]) {

  val master = args(0)

  val conf = new

 SparkConf().setMaster(master).set

RE: Spark Streaming - graceful shutdown when stream has no more data

2016-02-24 Thread Cheng, Hao
This is very interesting, how to shutdown the streaming job gracefully once no 
input data for some time.

A doable solution probably you can count the input data by using the 
Accumulator, and anther thread (in master node) will always to get the latest 
accumulator value, if there is no value change from the accumulator for 
sometime, then shutdown the streaming job.

From: Daniel Siegmann [mailto:daniel.siegm...@teamaol.com]
Sent: Wednesday, February 24, 2016 12:30 AM
To: Ashutosh Kumar <kmr.ashutos...@gmail.com>
Cc: Hemant Bhanawat <hemant9...@gmail.com>; Ted Yu <yuzhih...@gmail.com>; Femi 
Anthony <femib...@gmail.com>; user <user@spark.apache.org>
Subject: Re: Spark Streaming - graceful shutdown when stream has no more data

During testing you will typically be using some finite data. You want the 
stream to shut down automatically when that data has been consumed so your test 
shuts down gracefully.
Of course once the code is running in production you'll want it to keep waiting 
for new records. So whether the stream shuts down when there's no more data 
should be configurable.


On Tue, Feb 23, 2016 at 11:09 AM, Ashutosh Kumar 
<kmr.ashutos...@gmail.com<mailto:kmr.ashutos...@gmail.com>> wrote:
Just out of curiosity I will like to know why a streaming program should 
shutdown when no new data is arriving?  I think it should keep waiting for 
arrival of new records.
Thanks
Ashutosh

On Tue, Feb 23, 2016 at 9:17 PM, Hemant Bhanawat 
<hemant9...@gmail.com<mailto:hemant9...@gmail.com>> wrote:
A guess - parseRecord is returning None in some case (probaly empty lines). And 
then entry.get is throwing the exception.
You may want to filter the None values from accessLogDStream before you run the 
map function over it.
Hemant

Hemant Bhanawat<https://www.linkedin.com/in/hemant-bhanawat-92a3811>
www.snappydata.io<http://www.snappydata.io>

On Tue, Feb 23, 2016 at 6:00 PM, Ted Yu 
<yuzhih...@gmail.com<mailto:yuzhih...@gmail.com>> wrote:
Which line is line 42 in your code ?

When variable lines becomes empty, you can stop your program.

Cheers

On Feb 23, 2016, at 12:25 AM, Femi Anthony 
<femib...@gmail.com<mailto:femib...@gmail.com>> wrote:

I am working on Spark Streaming API and I wish to stream a set of 
pre-downloaded web log files continuously to simulate a real-time stream. I 
wrote a script that gunzips the compressed logs and pipes the output to nc on 
port .

The script looks like this:

BASEDIR=/home/mysuer/data/datamining/internet_traffic_archive

zipped_files=`find $BASEDIR -name "*.gz"`



for zfile in $zipped_files

 do

  echo "Unzipping $zfile..."

  gunzip -c $zfile  | nc -l -p  -q 20



 done
I have streaming code written in Scala that processes the streams. It works 
well for the most part, but when its run out of files to stream I get the 
following error in Spark:



16/02/19 23:04:35 WARN ReceiverSupervisorImpl:

Restarting receiver with delay 2000 ms: Socket data stream had no more data

16/02/19 23:04:35 ERROR ReceiverTracker: Deregistered receiver for stream 0:

Restarting receiver with delay 2000ms: Socket data stream had no more data

16/02/19 23:04:35 WARN BlockManager: Block input-0-1455941075600 replicated to 
only 0 peer(s) instead of 1 peers



16/02/19 23:04:40 ERROR Executor: Exception in task 2.0 in stage 15.0 (TID 47)

java.util.NoSuchElementException: None.get

at scala.None$.get(Option.scala:313)

at scala.None$.get(Option.scala:311)

at 
com.femibyte.learningsparkaddexamples.scala.StreamingLogEnhanced$$anonfun$2.apply(StreamingLogEnhanced.scala:42)

at 
com.femibyte.learningsparkaddexamples.scala.StreamingLogEnhanced$$anonfun$2.apply(StreamingLogEnhanced.scala:42)

How to I implement a graceful shutdown so that the program exits gracefully 
when it no longer detects any data in the stream ?

My Spark Streaming code looks like this:

object StreamingLogEnhanced {

 def main(args: Array[String]) {

  val master = args(0)

  val conf = new

 SparkConf().setMaster(master).setAppName("StreamingLogEnhanced")

 // Create a StreamingContext with a n second batch size

  val ssc = new StreamingContext(conf, Seconds(10))

 // Create a DStream from all the input on port 

  val log = Logger.getLogger(getClass.getName)



  sys.ShutdownHookThread {

  log.info<http://log.info>("Gracefully stopping Spark Streaming Application")

  ssc.stop(true, true)

  log.info<http://log.info>("Application stopped")

  }

  val lines = ssc.socketTextStream("localhost", )

  // Create a count of log hits by ip

  var ipCounts=countByIp(lines)

  ipCounts.print()



  // start our streaming context and wait for it to "finish"

  ssc.start()

  // Wait for 600 seconds then exit

  ssc.awaitTermination(1*600)

  ssc.stop()

  }



 def countByIp(lines: DStream[String]) = {

   val parser = new AccessLogPar

Re: Spark Streaming - graceful shutdown when stream has no more data

2016-02-23 Thread Daniel Siegmann
During testing you will typically be using some finite data. You want the
stream to shut down automatically when that data has been consumed so your
test shuts down gracefully.

Of course once the code is running in production you'll want it to keep
waiting for new records. So whether the stream shuts down when there's no
more data should be configurable.



On Tue, Feb 23, 2016 at 11:09 AM, Ashutosh Kumar 
wrote:

> Just out of curiosity I will like to know why a streaming program should
> shutdown when no new data is arriving?  I think it should keep waiting for
> arrival of new records.
>
> Thanks
> Ashutosh
>
> On Tue, Feb 23, 2016 at 9:17 PM, Hemant Bhanawat 
> wrote:
>
>> A guess - parseRecord is returning None in some case (probaly empty
>> lines). And then entry.get is throwing the exception.
>>
>> You may want to filter the None values from accessLogDStream before you
>> run the map function over it.
>>
>> Hemant
>>
>> Hemant Bhanawat 
>> www.snappydata.io
>>
>> On Tue, Feb 23, 2016 at 6:00 PM, Ted Yu  wrote:
>>
>>> Which line is line 42 in your code ?
>>>
>>> When variable lines becomes empty, you can stop your program.
>>>
>>> Cheers
>>>
>>> On Feb 23, 2016, at 12:25 AM, Femi Anthony  wrote:
>>>
>>> I am working on Spark Streaming API and I wish to stream a set of
>>> pre-downloaded web log files continuously to simulate a real-time stream. I
>>> wrote a script that gunzips the compressed logs and pipes the output to nc
>>> on port .
>>>
>>> The script looks like this:
>>>
>>> BASEDIR=/home/mysuer/data/datamining/internet_traffic_archive
>>> zipped_files=`find $BASEDIR -name "*.gz"`
>>>
>>> for zfile in $zipped_files
>>>  do
>>>   echo "Unzipping $zfile..."
>>>   gunzip -c $zfile  | nc -l -p  -q 20
>>>
>>>  done
>>>
>>> I have streaming code written in Scala that processes the streams. It
>>> works well for the most part, but when its run out of files to stream I get
>>> the following error in Spark:
>>>
>>> 16/02/19 23:04:35 WARN ReceiverSupervisorImpl:
>>> Restarting receiver with delay 2000 ms: Socket data stream had no more data
>>> 16/02/19 23:04:35 ERROR ReceiverTracker: Deregistered receiver for stream 0:
>>> Restarting receiver with delay 2000ms: Socket data stream had no more data
>>> 16/02/19 23:04:35 WARN BlockManager: Block input-0-1455941075600 replicated 
>>> to only 0 peer(s) instead of 1 peers
>>> 
>>> 16/02/19 23:04:40 ERROR Executor: Exception in task 2.0 in stage 15.0 (TID 
>>> 47)
>>> java.util.NoSuchElementException: None.get
>>> at scala.None$.get(Option.scala:313)
>>> at scala.None$.get(Option.scala:311)
>>> at 
>>> com.femibyte.learningsparkaddexamples.scala.StreamingLogEnhanced$$anonfun$2.apply(StreamingLogEnhanced.scala:42)
>>> at 
>>> com.femibyte.learningsparkaddexamples.scala.StreamingLogEnhanced$$anonfun$2.apply(StreamingLogEnhanced.scala:42)
>>>
>>> How to I implement a graceful shutdown so that the program exits
>>> gracefully when it no longer detects any data in the stream ?
>>>
>>> My Spark Streaming code looks like this:
>>>
>>> object StreamingLogEnhanced {
>>>  def main(args: Array[String]) {
>>>   val master = args(0)
>>>   val conf = new
>>>  SparkConf().setMaster(master).setAppName("StreamingLogEnhanced")
>>>  // Create a StreamingContext with a n second batch size
>>>   val ssc = new StreamingContext(conf, Seconds(10))
>>>  // Create a DStream from all the input on port 
>>>   val log = Logger.getLogger(getClass.getName)
>>>
>>>   sys.ShutdownHookThread {
>>>   log.info("Gracefully stopping Spark Streaming Application")
>>>   ssc.stop(true, true)
>>>   log.info("Application stopped")
>>>   }
>>>   val lines = ssc.socketTextStream("localhost", )
>>>   // Create a count of log hits by ip
>>>   var ipCounts=countByIp(lines)
>>>   ipCounts.print()
>>>
>>>   // start our streaming context and wait for it to "finish"
>>>   ssc.start()
>>>   // Wait for 600 seconds then exit
>>>   ssc.awaitTermination(1*600)
>>>   ssc.stop()
>>>   }
>>>
>>>  def countByIp(lines: DStream[String]) = {
>>>val parser = new AccessLogParser
>>>val accessLogDStream = lines.map(line => parser.parseRecord(line))
>>>val ipDStream = accessLogDStream.map(entry =>
>>> (entry.get.clientIpAddress, 1))
>>>ipDStream.reduceByKey((x, y) => x + y)
>>>  }
>>>
>>> }
>>>
>>> Thanks for any suggestions in advance.
>>>
>>>
>>
>


Re: Spark Streaming - graceful shutdown when stream has no more data

2016-02-23 Thread Ashutosh Kumar
Just out of curiosity I will like to know why a streaming program should
shutdown when no new data is arriving?  I think it should keep waiting for
arrival of new records.

Thanks
Ashutosh

On Tue, Feb 23, 2016 at 9:17 PM, Hemant Bhanawat 
wrote:

> A guess - parseRecord is returning None in some case (probaly empty
> lines). And then entry.get is throwing the exception.
>
> You may want to filter the None values from accessLogDStream before you
> run the map function over it.
>
> Hemant
>
> Hemant Bhanawat 
> www.snappydata.io
>
> On Tue, Feb 23, 2016 at 6:00 PM, Ted Yu  wrote:
>
>> Which line is line 42 in your code ?
>>
>> When variable lines becomes empty, you can stop your program.
>>
>> Cheers
>>
>> On Feb 23, 2016, at 12:25 AM, Femi Anthony  wrote:
>>
>> I am working on Spark Streaming API and I wish to stream a set of
>> pre-downloaded web log files continuously to simulate a real-time stream. I
>> wrote a script that gunzips the compressed logs and pipes the output to nc
>> on port .
>>
>> The script looks like this:
>>
>> BASEDIR=/home/mysuer/data/datamining/internet_traffic_archive
>> zipped_files=`find $BASEDIR -name "*.gz"`
>>
>> for zfile in $zipped_files
>>  do
>>   echo "Unzipping $zfile..."
>>   gunzip -c $zfile  | nc -l -p  -q 20
>>
>>  done
>>
>> I have streaming code written in Scala that processes the streams. It
>> works well for the most part, but when its run out of files to stream I get
>> the following error in Spark:
>>
>> 16/02/19 23:04:35 WARN ReceiverSupervisorImpl:
>> Restarting receiver with delay 2000 ms: Socket data stream had no more data
>> 16/02/19 23:04:35 ERROR ReceiverTracker: Deregistered receiver for stream 0:
>> Restarting receiver with delay 2000ms: Socket data stream had no more data
>> 16/02/19 23:04:35 WARN BlockManager: Block input-0-1455941075600 replicated 
>> to only 0 peer(s) instead of 1 peers
>> 
>> 16/02/19 23:04:40 ERROR Executor: Exception in task 2.0 in stage 15.0 (TID 
>> 47)
>> java.util.NoSuchElementException: None.get
>> at scala.None$.get(Option.scala:313)
>> at scala.None$.get(Option.scala:311)
>> at 
>> com.femibyte.learningsparkaddexamples.scala.StreamingLogEnhanced$$anonfun$2.apply(StreamingLogEnhanced.scala:42)
>> at 
>> com.femibyte.learningsparkaddexamples.scala.StreamingLogEnhanced$$anonfun$2.apply(StreamingLogEnhanced.scala:42)
>>
>> How to I implement a graceful shutdown so that the program exits
>> gracefully when it no longer detects any data in the stream ?
>>
>> My Spark Streaming code looks like this:
>>
>> object StreamingLogEnhanced {
>>  def main(args: Array[String]) {
>>   val master = args(0)
>>   val conf = new
>>  SparkConf().setMaster(master).setAppName("StreamingLogEnhanced")
>>  // Create a StreamingContext with a n second batch size
>>   val ssc = new StreamingContext(conf, Seconds(10))
>>  // Create a DStream from all the input on port 
>>   val log = Logger.getLogger(getClass.getName)
>>
>>   sys.ShutdownHookThread {
>>   log.info("Gracefully stopping Spark Streaming Application")
>>   ssc.stop(true, true)
>>   log.info("Application stopped")
>>   }
>>   val lines = ssc.socketTextStream("localhost", )
>>   // Create a count of log hits by ip
>>   var ipCounts=countByIp(lines)
>>   ipCounts.print()
>>
>>   // start our streaming context and wait for it to "finish"
>>   ssc.start()
>>   // Wait for 600 seconds then exit
>>   ssc.awaitTermination(1*600)
>>   ssc.stop()
>>   }
>>
>>  def countByIp(lines: DStream[String]) = {
>>val parser = new AccessLogParser
>>val accessLogDStream = lines.map(line => parser.parseRecord(line))
>>val ipDStream = accessLogDStream.map(entry =>
>> (entry.get.clientIpAddress, 1))
>>ipDStream.reduceByKey((x, y) => x + y)
>>  }
>>
>> }
>>
>> Thanks for any suggestions in advance.
>>
>>
>


Re: Spark Streaming - graceful shutdown when stream has no more data

2016-02-23 Thread Hemant Bhanawat
A guess - parseRecord is returning None in some case (probaly empty lines).
And then entry.get is throwing the exception.

You may want to filter the None values from accessLogDStream before you run
the map function over it.

Hemant

Hemant Bhanawat 
www.snappydata.io

On Tue, Feb 23, 2016 at 6:00 PM, Ted Yu  wrote:

> Which line is line 42 in your code ?
>
> When variable lines becomes empty, you can stop your program.
>
> Cheers
>
> On Feb 23, 2016, at 12:25 AM, Femi Anthony  wrote:
>
> I am working on Spark Streaming API and I wish to stream a set of
> pre-downloaded web log files continuously to simulate a real-time stream. I
> wrote a script that gunzips the compressed logs and pipes the output to nc
> on port .
>
> The script looks like this:
>
> BASEDIR=/home/mysuer/data/datamining/internet_traffic_archive
> zipped_files=`find $BASEDIR -name "*.gz"`
>
> for zfile in $zipped_files
>  do
>   echo "Unzipping $zfile..."
>   gunzip -c $zfile  | nc -l -p  -q 20
>
>  done
>
> I have streaming code written in Scala that processes the streams. It
> works well for the most part, but when its run out of files to stream I get
> the following error in Spark:
>
> 16/02/19 23:04:35 WARN ReceiverSupervisorImpl:
> Restarting receiver with delay 2000 ms: Socket data stream had no more data
> 16/02/19 23:04:35 ERROR ReceiverTracker: Deregistered receiver for stream 0:
> Restarting receiver with delay 2000ms: Socket data stream had no more data
> 16/02/19 23:04:35 WARN BlockManager: Block input-0-1455941075600 replicated 
> to only 0 peer(s) instead of 1 peers
> 
> 16/02/19 23:04:40 ERROR Executor: Exception in task 2.0 in stage 15.0 (TID 47)
> java.util.NoSuchElementException: None.get
> at scala.None$.get(Option.scala:313)
> at scala.None$.get(Option.scala:311)
> at 
> com.femibyte.learningsparkaddexamples.scala.StreamingLogEnhanced$$anonfun$2.apply(StreamingLogEnhanced.scala:42)
> at 
> com.femibyte.learningsparkaddexamples.scala.StreamingLogEnhanced$$anonfun$2.apply(StreamingLogEnhanced.scala:42)
>
> How to I implement a graceful shutdown so that the program exits
> gracefully when it no longer detects any data in the stream ?
>
> My Spark Streaming code looks like this:
>
> object StreamingLogEnhanced {
>  def main(args: Array[String]) {
>   val master = args(0)
>   val conf = new
>  SparkConf().setMaster(master).setAppName("StreamingLogEnhanced")
>  // Create a StreamingContext with a n second batch size
>   val ssc = new StreamingContext(conf, Seconds(10))
>  // Create a DStream from all the input on port 
>   val log = Logger.getLogger(getClass.getName)
>
>   sys.ShutdownHookThread {
>   log.info("Gracefully stopping Spark Streaming Application")
>   ssc.stop(true, true)
>   log.info("Application stopped")
>   }
>   val lines = ssc.socketTextStream("localhost", )
>   // Create a count of log hits by ip
>   var ipCounts=countByIp(lines)
>   ipCounts.print()
>
>   // start our streaming context and wait for it to "finish"
>   ssc.start()
>   // Wait for 600 seconds then exit
>   ssc.awaitTermination(1*600)
>   ssc.stop()
>   }
>
>  def countByIp(lines: DStream[String]) = {
>val parser = new AccessLogParser
>val accessLogDStream = lines.map(line => parser.parseRecord(line))
>val ipDStream = accessLogDStream.map(entry =>
> (entry.get.clientIpAddress, 1))
>ipDStream.reduceByKey((x, y) => x + y)
>  }
>
> }
>
> Thanks for any suggestions in advance.
>
>


Re: Spark Streaming - graceful shutdown when stream has no more data

2016-02-23 Thread Ted Yu
Which line is line 42 in your code ?

When variable lines becomes empty, you can stop your program. 

Cheers

> On Feb 23, 2016, at 12:25 AM, Femi Anthony  wrote:
> 
> I am working on Spark Streaming API and I wish to stream a set of 
> pre-downloaded web log files continuously to simulate a real-time stream. I 
> wrote a script that gunzips the compressed logs and pipes the output to nc on 
> port .
> 
> The script looks like this:
> 
> BASEDIR=/home/mysuer/data/datamining/internet_traffic_archive
> zipped_files=`find $BASEDIR -name "*.gz"`
> 
> for zfile in $zipped_files
>  do
>   echo "Unzipping $zfile..."
>   gunzip -c $zfile  | nc -l -p  -q 20
> 
>  done
> I have streaming code written in Scala that processes the streams. It works 
> well for the most part, but when its run out of files to stream I get the 
> following error in Spark:
> 
> 16/02/19 23:04:35 WARN ReceiverSupervisorImpl: 
> Restarting receiver with delay 2000 ms: Socket data stream had no more data
> 16/02/19 23:04:35 ERROR ReceiverTracker: Deregistered receiver for stream 0: 
> Restarting receiver with delay 2000ms: Socket data stream had no more data
> 16/02/19 23:04:35 WARN BlockManager: Block input-0-1455941075600 replicated 
> to only 0 peer(s) instead of 1 peers
> 
> 16/02/19 23:04:40 ERROR Executor: Exception in task 2.0 in stage 15.0 (TID 47)
> java.util.NoSuchElementException: None.get
> at scala.None$.get(Option.scala:313)
> at scala.None$.get(Option.scala:311)
> at 
> com.femibyte.learningsparkaddexamples.scala.StreamingLogEnhanced$$anonfun$2.apply(StreamingLogEnhanced.scala:42)
> at 
> com.femibyte.learningsparkaddexamples.scala.StreamingLogEnhanced$$anonfun$2.apply(StreamingLogEnhanced.scala:42)
> How to I implement a graceful shutdown so that the program exits gracefully 
> when it no longer detects any data in the stream ?
> 
> My Spark Streaming code looks like this:
> 
> 
> object StreamingLogEnhanced {
>  def main(args: Array[String]) {
>   val master = args(0)
>   val conf = new
>  SparkConf().setMaster(master).setAppName("StreamingLogEnhanced")
>  // Create a StreamingContext with a n second batch size
>   val ssc = new StreamingContext(conf, Seconds(10))
>  // Create a DStream from all the input on port 
>   val log = Logger.getLogger(getClass.getName)
> 
>   sys.ShutdownHookThread {
>   log.info("Gracefully stopping Spark Streaming Application")
>   ssc.stop(true, true)
>   log.info("Application stopped")
>   }
>   val lines = ssc.socketTextStream("localhost", )
>   // Create a count of log hits by ip
>   var ipCounts=countByIp(lines)
>   ipCounts.print()
> 
>   // start our streaming context and wait for it to "finish"
>   ssc.start()
>   // Wait for 600 seconds then exit
>   ssc.awaitTermination(1*600)
>   ssc.stop()
>   }
> 
>  def countByIp(lines: DStream[String]) = {
>val parser = new AccessLogParser
>val accessLogDStream = lines.map(line => parser.parseRecord(line))
>val ipDStream = accessLogDStream.map(entry =>   
> (entry.get.clientIpAddress, 1))
>ipDStream.reduceByKey((x, y) => x + y)
>  }
> 
> }
> Thanks for any suggestions in advance.
> 
>