Re: spark streaming doubt

2015-07-13 Thread Shushant Arora
For second question

I am comparing 2 situtations of processing kafkaRDD.

case I - When I used foreachPartition to process kafka stream I am not able
to see any stream job timing interval like Time: 142905487 ms .
displayed on driver console at start of each stream batch. But it processed
each RDD and on webUI it showed jobs got created at each batch interval of
1 sec.

Case 2 -When I called mapPartition on kafkaStream RDD and then called any
action (say print()) at end of each stream interval I am getting on driver
console jobs getting created with batch interval Time: 142905487 ms ..

Why in case-I no information comes on driver console?

Thanks
Shushant




On Mon, Jul 13, 2015 at 7:22 PM, Cody Koeninger c...@koeninger.org wrote:

 Regarding your first question, having more partitions than you do
 executors usually means you'll have better utilization, because the
 workload will be distributed more evenly.  There's some degree of per-task
 overhead, but as long as you don't have a huge imbalance between number of
 tasks and number of executors that shouldn't be a large problem.

 I don't really understand your second question.

 On Sat, Jul 11, 2015 at 5:00 AM, Shushant Arora shushantaror...@gmail.com
  wrote:

 1.spark streaming 1.3 creates as many RDD Partitions as there are kafka
 partitions in topic. Say I have 300 partitions in topic and 10 executors
 and each with 3 cores so , is it means at a time only 10*3=30 partitions
 are processed and then 30 like that since executors launch tasks per RDD
 partitions , so I need in total; 300 tasks but since I have 30 cores(10
 executors each with 3 cores) so these tasks will execute 30 after 30 till
 300.

 So reducing no of kafka paartitions to say 100 will speed up the
 processing?

 2.In spark streaming job when I processed the kafka stream using
 foreachRDD

 directKafkaStream.foreachRDD(new function( public void call(  vi)){
 v1.foreachPartition(new function(){public void call(){
 //..process partition
 }})

 });

 since foreachRDD is operation so it spawns spark job but these jobs are
 not coming on driver console like in map and print function as

 1.spark streaming 1.3 creates as many RDD Partitions as there are kafka
 partitions in topic. Say I have 300 partitions in topic and 10 executors
 and each with 3 cores so , is it means at a time only 10*3=30 partitions
 are processed and then 30 like that since executors launch tasks per RDD
 partitions , so I need in total; 300 tasks but since I have 30 cores(10
 executors each with 3 cores) so these tasks will execute 30 after 30 till
 300.

 So reducing no of kafka paartitions to say 100 will speed up the
 processing?

 2.In spark streaming job when I processed the kafka stream using
 foreachRDD

 directKafkaStream.foreachRDD(new function( public void call(  vi)){
 v1.foreachPartition(new function(){public void call(){
 //..process partition
 }})

 });

 since foreachRDD is operation so it spawns spark job but these jobs
 timings are not coming on driver console like in map and print function as


 ---
 Time: 142905487 ms
 ---
 --
 Time: 1429054871000 ms
 ---

 ..

 Why is it so?


 Thanks
 Shushant









Re: spark streaming doubt

2015-07-13 Thread Cody Koeninger
Regarding your first question, having more partitions than you do executors
usually means you'll have better utilization, because the workload will be
distributed more evenly.  There's some degree of per-task overhead, but as
long as you don't have a huge imbalance between number of tasks and number
of executors that shouldn't be a large problem.

I don't really understand your second question.

On Sat, Jul 11, 2015 at 5:00 AM, Shushant Arora shushantaror...@gmail.com
wrote:

 1.spark streaming 1.3 creates as many RDD Partitions as there are kafka
 partitions in topic. Say I have 300 partitions in topic and 10 executors
 and each with 3 cores so , is it means at a time only 10*3=30 partitions
 are processed and then 30 like that since executors launch tasks per RDD
 partitions , so I need in total; 300 tasks but since I have 30 cores(10
 executors each with 3 cores) so these tasks will execute 30 after 30 till
 300.

 So reducing no of kafka paartitions to say 100 will speed up the
 processing?

 2.In spark streaming job when I processed the kafka stream using foreachRDD

 directKafkaStream.foreachRDD(new function( public void call(  vi)){
 v1.foreachPartition(new function(){public void call(){
 //..process partition
 }})

 });

 since foreachRDD is operation so it spawns spark job but these jobs are
 not coming on driver console like in map and print function as

 1.spark streaming 1.3 creates as many RDD Partitions as there are kafka
 partitions in topic. Say I have 300 partitions in topic and 10 executors
 and each with 3 cores so , is it means at a time only 10*3=30 partitions
 are processed and then 30 like that since executors launch tasks per RDD
 partitions , so I need in total; 300 tasks but since I have 30 cores(10
 executors each with 3 cores) so these tasks will execute 30 after 30 till
 300.

 So reducing no of kafka paartitions to say 100 will speed up the
 processing?

 2.In spark streaming job when I processed the kafka stream using foreachRDD

 directKafkaStream.foreachRDD(new function( public void call(  vi)){
 v1.foreachPartition(new function(){public void call(){
 //..process partition
 }})

 });

 since foreachRDD is operation so it spawns spark job but these jobs
 timings are not coming on driver console like in map and print function as


 ---
 Time: 142905487 ms
 ---
 --
 Time: 1429054871000 ms
 ---

 ..

 Why is it so?


 Thanks
 Shushant








Re: spark streaming doubt

2015-07-13 Thread Aniruddh Sharma
Hi Sushant/Cody,

For question 1 , following is my understanding ( I am not 100% sure and
this is only my understanding, I have asked this question in another words
to TD for confirmation which is not confirmed as of now).

Following is my understanding. In accordance with tasks created in
proportion to partitions of data the main goal is to try to parallelize
execution of tasks which is dependent on number of simultaneous threads
created in executor JVM) and number of threads to be created is controlled
by User of program and User has to set this number in accordance with
number of physical cores. In Yarn number of threads should be
numDefaultPartitions*executor-cores  (which is user supplied). For example
if you have 3 physical cores in your machine then might be you can create
alteast 6 threads by passing executor-cores = 6 assuming your
numDefaultPartitions is set to 1. Then what it should do is each executor
should execute concurrent 6 tasks (rather than 3 which was number of
physical cores) and when these 6 tasks finish then it should execute
another 6 and so on. (Please note: Again this is my understanding how Spark
works and I may be wrong).

Thanks and Regards
Aniruddh

On Mon, Jul 13, 2015 at 7:22 PM, Cody Koeninger c...@koeninger.org wrote:

 Regarding your first question, having more partitions than you do
 executors usually means you'll have better utilization, because the
 workload will be distributed more evenly.  There's some degree of per-task
 overhead, but as long as you don't have a huge imbalance between number of
 tasks and number of executors that shouldn't be a large problem.

 I don't really understand your second question.

 On Sat, Jul 11, 2015 at 5:00 AM, Shushant Arora shushantaror...@gmail.com
  wrote:

 1.spark streaming 1.3 creates as many RDD Partitions as there are kafka
 partitions in topic. Say I have 300 partitions in topic and 10 executors
 and each with 3 cores so , is it means at a time only 10*3=30 partitions
 are processed and then 30 like that since executors launch tasks per RDD
 partitions , so I need in total; 300 tasks but since I have 30 cores(10
 executors each with 3 cores) so these tasks will execute 30 after 30 till
 300.

 So reducing no of kafka paartitions to say 100 will speed up the
 processing?

 2.In spark streaming job when I processed the kafka stream using
 foreachRDD

 directKafkaStream.foreachRDD(new function( public void call(  vi)){
 v1.foreachPartition(new function(){public void call(){
 //..process partition
 }})

 });

 since foreachRDD is operation so it spawns spark job but these jobs are
 not coming on driver console like in map and print function as

 1.spark streaming 1.3 creates as many RDD Partitions as there are kafka
 partitions in topic. Say I have 300 partitions in topic and 10 executors
 and each with 3 cores so , is it means at a time only 10*3=30 partitions
 are processed and then 30 like that since executors launch tasks per RDD
 partitions , so I need in total; 300 tasks but since I have 30 cores(10
 executors each with 3 cores) so these tasks will execute 30 after 30 till
 300.

 So reducing no of kafka paartitions to say 100 will speed up the
 processing?

 2.In spark streaming job when I processed the kafka stream using
 foreachRDD

 directKafkaStream.foreachRDD(new function( public void call(  vi)){
 v1.foreachPartition(new function(){public void call(){
 //..process partition
 }})

 });

 since foreachRDD is operation so it spawns spark job but these jobs
 timings are not coming on driver console like in map and print function as


 ---
 Time: 142905487 ms
 ---
 --
 Time: 1429054871000 ms
 ---

 ..

 Why is it so?


 Thanks
 Shushant









spark streaming doubt

2015-07-11 Thread Shushant Arora
1.spark streaming 1.3 creates as many RDD Partitions as there are kafka
partitions in topic. Say I have 300 partitions in topic and 10 executors
and each with 3 cores so , is it means at a time only 10*3=30 partitions
are processed and then 30 like that since executors launch tasks per RDD
partitions , so I need in total; 300 tasks but since I have 30 cores(10
executors each with 3 cores) so these tasks will execute 30 after 30 till
300.

So reducing no of kafka paartitions to say 100 will speed up the processing?

2.In spark streaming job when I processed the kafka stream using foreachRDD

directKafkaStream.foreachRDD(new function( public void call(  vi)){
v1.foreachPartition(new function(){public void call(){
//..process partition
}})

});

since foreachRDD is operation so it spawns spark job but these jobs are not
coming on driver console like in map and print function as

1.spark streaming 1.3 creates as many RDD Partitions as there are kafka
partitions in topic. Say I have 300 partitions in topic and 10 executors
and each with 3 cores so , is it means at a time only 10*3=30 partitions
are processed and then 30 like that since executors launch tasks per RDD
partitions , so I need in total; 300 tasks but since I have 30 cores(10
executors each with 3 cores) so these tasks will execute 30 after 30 till
300.

So reducing no of kafka paartitions to say 100 will speed up the processing?

2.In spark streaming job when I processed the kafka stream using foreachRDD

directKafkaStream.foreachRDD(new function( public void call(  vi)){
v1.foreachPartition(new function(){public void call(){
//..process partition
}})

});

since foreachRDD is operation so it spawns spark job but these jobs timings
are not coming on driver console like in map and print function as


---
Time: 142905487 ms
---
--
Time: 1429054871000 ms
---

..

Why is it so?


Thanks
Shushant


Re: spark streaming doubt

2015-05-20 Thread Akhil Das
One receiver basically runs on 1 core, so if your single node is having 4
cores, there are still 3 cores left for the processing (for executors). And
yes receiver remains on the same machine unless some failure happens.

Thanks
Best Regards

On Tue, May 19, 2015 at 10:57 PM, Shushant Arora shushantaror...@gmail.com
wrote:

 Thanks Akhil andDibyendu.

 Does in high level receiver based streaming executors run on receivers
 itself to have data localisation ? Or its always data is transferred to
 executor nodes and executor nodes differ in each run of job but receiver
 node remains same(same machines) throughout life of streaming application
 unless node failure happens?



 On Tue, May 19, 2015 at 9:29 PM, Dibyendu Bhattacharya 
 dibyendu.bhattach...@gmail.com wrote:

 Just to add, there is a Receiver based Kafka consumer which uses Kafka
 Low Level Consumer API.

 http://spark-packages.org/package/dibbhatt/kafka-spark-consumer


 Regards,
 Dibyendu

 On Tue, May 19, 2015 at 9:00 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:


 On Tue, May 19, 2015 at 8:10 PM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 So for Kafka+spark streaming, Receiver based streaming used highlevel
 api and non receiver based streaming used low level api.

 1.In high level receiver based streaming does it registers consumers at
 each job start(whenever a new job is launched by streaming application say
 at each second)?


 ​- Receiver based streaming will always have the receiver running
 parallel while your job is running, So by default for every 200ms
 (spark.streaming.blockInterval) the receiver will generate a block of data
 which is read from Kafka.
 ​


 2.No of executors in highlevel receiver based jobs will always equal to
 no of partitions in topic ?


 ​- Not sure from where did you came up with this. For the non stream
 based one, i think the number of partitions in spark will be equal to the
 number of kafka partitions for the given topic.
 ​


 3.Will data from a single topic be consumed by executors in parllel or
 only one receiver consumes in multiple threads and assign to executors in
 high level receiver based approach ?

 ​- They will consume the data parallel.​ For the receiver based
 approach, you can actually specify the number of receiver that you want to
 spawn for consuming the messages.




 On Tue, May 19, 2015 at 2:38 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 spark.streaming.concurrentJobs takes an integer value, not boolean.
 If you set it as 2 then 2 jobs will run parallel. Default value is 1 and
 the next job will start once it completes the current one.


 Actually, in the current implementation of Spark Streaming and under
 default configuration, only job is active (i.e. under execution) at any
 point of time. So if one batch's processing takes longer than 10 seconds,
 then then next batch's jobs will stay queued.
 This can be changed with an experimental Spark property
 spark.streaming.concurrentJobs which is by default set to 1. Its not
 currently documented (maybe I should add it).
 The reason it is set to 1 is that concurrent jobs can potentially
 lead to weird sharing of resources and which can make it hard to debug 
 the
 whether there is sufficient resources in the system to process the 
 ingested
 data fast enough. With only 1 job running at a time, it is easy to see 
 that
 if batch processing time  batch interval, then the system will be 
 stable.
 Granted that this may not be the most efficient use of resources under
 certain conditions. We definitely hope to improve this in the future.


 Copied from TD's answer written in SO
 http://stackoverflow.com/questions/23528006/how-jobs-are-assigned-to-executors-in-spark-streaming
 .

 Non-receiver based streaming for example you can say are the
 fileStream, directStream ones. You can read a bit of information from here
 https://spark.apache.org/docs/1.3.1/streaming-kafka-integration.html

 Thanks
 Best Regards

 On Tue, May 19, 2015 at 2:13 PM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 Thanks Akhil.
 When I don't  set spark.streaming.concurrentJobs to true. Will the
 all pending jobs starts one by one after 1 jobs completes,or it does not
 creates jobs which could not be started at its desired interval.

 And Whats the difference and usage of Receiver vs non-receiver based
 streaming. Is there any documentation for that?

 On Tue, May 19, 2015 at 1:35 PM, Akhil Das 
 ak...@sigmoidanalytics.com wrote:

 It will be a single job running at a time by default (you can also
 configure the spark.streaming.concurrentJobs to run jobs parallel which 
 is
 not recommended to put in production).

 Now, your batch duration being 1 sec and processing time being 2
 minutes, if you are using a receiver based streaming then ideally those
 receivers will keep on receiving data while the job is running (which 
 will
 accumulate in memory if you set StorageLevel as MEMORY_ONLY and end up 
 in
 block not found exceptions as spark drops 

Re: spark streaming doubt

2015-05-20 Thread Shushant Arora
So I can explicitly specify no of receivers and executors in receiver based
streaming? Can you share a sample program if any?

Also in Low level non receiver based , will data be fetched by same worker
executor node and processed ? Also if I have concurrent jobs set to 1- so
in low level
fetching and processing will be delayed till next job starts ,say a
situation where I have 1 sec of stream interval but my job1 takes 5 sec to
complete , hence job2 starts at end of 5 sec, so now will it process all
data from sec1 to sec 5 in low level non receiver streaming or only for
interval sec1-sec2 ?

And if it processes data for complete duration sec1-sec5.Is there any
option to suppress start of other queued jobs(for interval sec2-3,
sec3-4,sec4-5) since there work is already done by job2 ?


On Wed, May 20, 2015 at 12:36 PM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 One receiver basically runs on 1 core, so if your single node is having 4
 cores, there are still 3 cores left for the processing (for executors). And
 yes receiver remains on the same machine unless some failure happens.

 Thanks
 Best Regards

 On Tue, May 19, 2015 at 10:57 PM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 Thanks Akhil andDibyendu.

 Does in high level receiver based streaming executors run on receivers
 itself to have data localisation ? Or its always data is transferred to
 executor nodes and executor nodes differ in each run of job but receiver
 node remains same(same machines) throughout life of streaming application
 unless node failure happens?



 On Tue, May 19, 2015 at 9:29 PM, Dibyendu Bhattacharya 
 dibyendu.bhattach...@gmail.com wrote:

 Just to add, there is a Receiver based Kafka consumer which uses Kafka
 Low Level Consumer API.

 http://spark-packages.org/package/dibbhatt/kafka-spark-consumer


 Regards,
 Dibyendu

 On Tue, May 19, 2015 at 9:00 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:


 On Tue, May 19, 2015 at 8:10 PM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 So for Kafka+spark streaming, Receiver based streaming used highlevel
 api and non receiver based streaming used low level api.

 1.In high level receiver based streaming does it registers consumers
 at each job start(whenever a new job is launched by streaming application
 say at each second)?


 ​- Receiver based streaming will always have the receiver running
 parallel while your job is running, So by default for every 200ms
 (spark.streaming.blockInterval) the receiver will generate a block of data
 which is read from Kafka.
 ​


 2.No of executors in highlevel receiver based jobs will always equal
 to no of partitions in topic ?


 ​- Not sure from where did you came up with this. For the non stream
 based one, i think the number of partitions in spark will be equal to the
 number of kafka partitions for the given topic.
 ​


 3.Will data from a single topic be consumed by executors in parllel or
 only one receiver consumes in multiple threads and assign to executors in
 high level receiver based approach ?

 ​- They will consume the data parallel.​ For the receiver based
 approach, you can actually specify the number of receiver that you want to
 spawn for consuming the messages.




 On Tue, May 19, 2015 at 2:38 PM, Akhil Das ak...@sigmoidanalytics.com
  wrote:

 spark.streaming.concurrentJobs takes an integer value, not boolean.
 If you set it as 2 then 2 jobs will run parallel. Default value is 1 and
 the next job will start once it completes the current one.


 Actually, in the current implementation of Spark Streaming and under
 default configuration, only job is active (i.e. under execution) at any
 point of time. So if one batch's processing takes longer than 10 
 seconds,
 then then next batch's jobs will stay queued.
 This can be changed with an experimental Spark property
 spark.streaming.concurrentJobs which is by default set to 1. Its not
 currently documented (maybe I should add it).
 The reason it is set to 1 is that concurrent jobs can potentially
 lead to weird sharing of resources and which can make it hard to debug 
 the
 whether there is sufficient resources in the system to process the 
 ingested
 data fast enough. With only 1 job running at a time, it is easy to see 
 that
 if batch processing time  batch interval, then the system will be 
 stable.
 Granted that this may not be the most efficient use of resources under
 certain conditions. We definitely hope to improve this in the future.


 Copied from TD's answer written in SO
 http://stackoverflow.com/questions/23528006/how-jobs-are-assigned-to-executors-in-spark-streaming
 .

 Non-receiver based streaming for example you can say are the
 fileStream, directStream ones. You can read a bit of information from 
 here
 https://spark.apache.org/docs/1.3.1/streaming-kafka-integration.html

 Thanks
 Best Regards

 On Tue, May 19, 2015 at 2:13 PM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 Thanks Akhil.
 When I don't  set 

Re: spark streaming doubt

2015-05-20 Thread Akhil Das
On Wed, May 20, 2015 at 1:12 PM, Shushant Arora shushantaror...@gmail.com
wrote:

 So I can explicitly specify no of receivers and executors in receiver
 based streaming? Can you share a sample program if any?


​
​
-You can look at the lowlevel consumer repo
https://github.com/dibbhatt/kafka-spark-consumer shared by Dibyendu for
sample code.​​

 ​
 ​

Also in Low level non receiver based , will data be fetched by same worker
 executor node and processed ? Also if I have concurrent jobs set to 1- so
 in low level
 fetching and processing will be delayed till next job starts ,say a
 situation where I have 1 sec of stream interval but my job1 takes 5 sec to
 complete , hence job2 starts at end of 5 sec, so now will it process all
 data from sec1 to sec 5 in low level non receiver streaming or only for
 interval sec1-sec2 ?​


 And if it processes data for complete duration sec1-sec5.Is there any
 option to suppress start of other queued jobs(for interval sec2-3,
 sec3-4,sec4-5) since there work is already done by job2 ?


​
​
​- I believe all your data from sec2-sec5 will be available in Kafka and
when the second batch starts at 5 sec​

​it will consumer it (you can also limit the rate with
spark.streaming.kafka.maxRatePerPartition)​

Read more here
https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md​




 On Wed, May 20, 2015 at 12:36 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 One receiver basically runs on 1 core, so if your single node is having 4
 cores, there are still 3 cores left for the processing (for executors). And
 yes receiver remains on the same machine unless some failure happens.

 Thanks
 Best Regards

 On Tue, May 19, 2015 at 10:57 PM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 Thanks Akhil andDibyendu.

 Does in high level receiver based streaming executors run on receivers
 itself to have data localisation ? Or its always data is transferred to
 executor nodes and executor nodes differ in each run of job but receiver
 node remains same(same machines) throughout life of streaming application
 unless node failure happens?



 On Tue, May 19, 2015 at 9:29 PM, Dibyendu Bhattacharya 
 dibyendu.bhattach...@gmail.com wrote:

 Just to add, there is a Receiver based Kafka consumer which uses Kafka
 Low Level Consumer API.

 http://spark-packages.org/package/dibbhatt/kafka-spark-consumer


 Regards,
 Dibyendu

 On Tue, May 19, 2015 at 9:00 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:


 On Tue, May 19, 2015 at 8:10 PM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 So for Kafka+spark streaming, Receiver based streaming used highlevel
 api and non receiver based streaming used low level api.

 1.In high level receiver based streaming does it registers consumers
 at each job start(whenever a new job is launched by streaming application
 say at each second)?


 ​- Receiver based streaming will always have the receiver running
 parallel while your job is running, So by default for every 200ms
 (spark.streaming.blockInterval) the receiver will generate a block of data
 which is read from Kafka.
 ​


 2.No of executors in highlevel receiver based jobs will always equal
 to no of partitions in topic ?


 ​- Not sure from where did you came up with this. For the non stream
 based one, i think the number of partitions in spark will be equal to the
 number of kafka partitions for the given topic.
 ​


 3.Will data from a single topic be consumed by executors in parllel
 or only one receiver consumes in multiple threads and assign to executors
 in high level receiver based approach ?

 ​- They will consume the data parallel.​ For the receiver based
 approach, you can actually specify the number of receiver that you want to
 spawn for consuming the messages.




 On Tue, May 19, 2015 at 2:38 PM, Akhil Das 
 ak...@sigmoidanalytics.com wrote:

 spark.streaming.concurrentJobs takes an integer value, not boolean.
 If you set it as 2 then 2 jobs will run parallel. Default value is 1 and
 the next job will start once it completes the current one.


 Actually, in the current implementation of Spark Streaming and
 under default configuration, only job is active (i.e. under execution) 
 at
 any point of time. So if one batch's processing takes longer than 10
 seconds, then then next batch's jobs will stay queued.
 This can be changed with an experimental Spark property
 spark.streaming.concurrentJobs which is by default set to 1. Its not
 currently documented (maybe I should add it).
 The reason it is set to 1 is that concurrent jobs can potentially
 lead to weird sharing of resources and which can make it hard to debug 
 the
 whether there is sufficient resources in the system to process the 
 ingested
 data fast enough. With only 1 job running at a time, it is easy to see 
 that
 if batch processing time  batch interval, then the system will be 
 stable.
 Granted that this may not be the most efficient use of resources under
 certain conditions. We definitely hope to 

spark streaming doubt

2015-05-19 Thread Shushant Arora
What happnes if in a streaming application one job is not yet finished and
stream interval reaches. Does it starts next job or wait for first to
finish and rest jobs will keep on accumulating in queue.


Say I have a streaming application with stream interval of 1 sec, but my
job takes 2 min to process 1 sec stream , what will happen ?  At any time
there will be only one job running or multiple ?


Re: spark streaming doubt

2015-05-19 Thread Akhil Das
It will be a single job running at a time by default (you can also
configure the spark.streaming.concurrentJobs to run jobs parallel which is
not recommended to put in production).

Now, your batch duration being 1 sec and processing time being 2 minutes,
if you are using a receiver based streaming then ideally those receivers
will keep on receiving data while the job is running (which will accumulate
in memory if you set StorageLevel as MEMORY_ONLY and end up in block not
found exceptions as spark drops some blocks which are yet to process to
accumulate new blocks). If you are using a non-receiver based approach, you
will not have this problem of dropping blocks.

Ideally, if your data is small and you have enough memory to hold your data
then it will run smoothly without any issues.

Thanks
Best Regards

On Tue, May 19, 2015 at 1:23 PM, Shushant Arora shushantaror...@gmail.com
wrote:

 What happnes if in a streaming application one job is not yet finished and
 stream interval reaches. Does it starts next job or wait for first to
 finish and rest jobs will keep on accumulating in queue.


 Say I have a streaming application with stream interval of 1 sec, but my
 job takes 2 min to process 1 sec stream , what will happen ?  At any time
 there will be only one job running or multiple ?




Re: spark streaming doubt

2015-05-19 Thread Akhil Das
spark.streaming.concurrentJobs takes an integer value, not boolean. If you
set it as 2 then 2 jobs will run parallel. Default value is 1 and the next
job will start once it completes the current one.


 Actually, in the current implementation of Spark Streaming and under
 default configuration, only job is active (i.e. under execution) at any
 point of time. So if one batch's processing takes longer than 10 seconds,
 then then next batch's jobs will stay queued.
 This can be changed with an experimental Spark property
 spark.streaming.concurrentJobs which is by default set to 1. Its not
 currently documented (maybe I should add it).
 The reason it is set to 1 is that concurrent jobs can potentially lead to
 weird sharing of resources and which can make it hard to debug the whether
 there is sufficient resources in the system to process the ingested data
 fast enough. With only 1 job running at a time, it is easy to see that if
 batch processing time  batch interval, then the system will be stable.
 Granted that this may not be the most efficient use of resources under
 certain conditions. We definitely hope to improve this in the future.


Copied from TD's answer written in SO
http://stackoverflow.com/questions/23528006/how-jobs-are-assigned-to-executors-in-spark-streaming
.

Non-receiver based streaming for example you can say are the fileStream,
directStream ones. You can read a bit of information from here
https://spark.apache.org/docs/1.3.1/streaming-kafka-integration.html

Thanks
Best Regards

On Tue, May 19, 2015 at 2:13 PM, Shushant Arora shushantaror...@gmail.com
wrote:

 Thanks Akhil.
 When I don't  set spark.streaming.concurrentJobs to true. Will the all
 pending jobs starts one by one after 1 jobs completes,or it does not
 creates jobs which could not be started at its desired interval.

 And Whats the difference and usage of Receiver vs non-receiver based
 streaming. Is there any documentation for that?

 On Tue, May 19, 2015 at 1:35 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 It will be a single job running at a time by default (you can also
 configure the spark.streaming.concurrentJobs to run jobs parallel which is
 not recommended to put in production).

 Now, your batch duration being 1 sec and processing time being 2 minutes,
 if you are using a receiver based streaming then ideally those receivers
 will keep on receiving data while the job is running (which will accumulate
 in memory if you set StorageLevel as MEMORY_ONLY and end up in block not
 found exceptions as spark drops some blocks which are yet to process to
 accumulate new blocks). If you are using a non-receiver based approach, you
 will not have this problem of dropping blocks.

 Ideally, if your data is small and you have enough memory to hold your
 data then it will run smoothly without any issues.

 Thanks
 Best Regards

 On Tue, May 19, 2015 at 1:23 PM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 What happnes if in a streaming application one job is not yet finished
 and stream interval reaches. Does it starts next job or wait for first to
 finish and rest jobs will keep on accumulating in queue.


 Say I have a streaming application with stream interval of 1 sec, but my
 job takes 2 min to process 1 sec stream , what will happen ?  At any time
 there will be only one job running or multiple ?






Re: spark streaming doubt

2015-05-19 Thread Shushant Arora
So for Kafka+spark streaming, Receiver based streaming used highlevel api
and non receiver based streaming used low level api.

1.In high level receiver based streaming does it registers consumers at
each job start(whenever a new job is launched by streaming application say
at each second)?
2.No of executors in highlevel receiver based jobs will always equal to no
of partitions in topic ?
3.Will data from a single topic be consumed by executors in parllel or only
one receiver consumes in multiple threads and assign to executors in high
level receiver based approach ?




On Tue, May 19, 2015 at 2:38 PM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 spark.streaming.concurrentJobs takes an integer value, not boolean. If
 you set it as 2 then 2 jobs will run parallel. Default value is 1 and the
 next job will start once it completes the current one.


 Actually, in the current implementation of Spark Streaming and under
 default configuration, only job is active (i.e. under execution) at any
 point of time. So if one batch's processing takes longer than 10 seconds,
 then then next batch's jobs will stay queued.
 This can be changed with an experimental Spark property
 spark.streaming.concurrentJobs which is by default set to 1. Its not
 currently documented (maybe I should add it).
 The reason it is set to 1 is that concurrent jobs can potentially lead to
 weird sharing of resources and which can make it hard to debug the whether
 there is sufficient resources in the system to process the ingested data
 fast enough. With only 1 job running at a time, it is easy to see that if
 batch processing time  batch interval, then the system will be stable.
 Granted that this may not be the most efficient use of resources under
 certain conditions. We definitely hope to improve this in the future.


 Copied from TD's answer written in SO
 http://stackoverflow.com/questions/23528006/how-jobs-are-assigned-to-executors-in-spark-streaming
 .

 Non-receiver based streaming for example you can say are the fileStream,
 directStream ones. You can read a bit of information from here
 https://spark.apache.org/docs/1.3.1/streaming-kafka-integration.html

 Thanks
 Best Regards

 On Tue, May 19, 2015 at 2:13 PM, Shushant Arora shushantaror...@gmail.com
  wrote:

 Thanks Akhil.
 When I don't  set spark.streaming.concurrentJobs to true. Will the all
 pending jobs starts one by one after 1 jobs completes,or it does not
 creates jobs which could not be started at its desired interval.

 And Whats the difference and usage of Receiver vs non-receiver based
 streaming. Is there any documentation for that?

 On Tue, May 19, 2015 at 1:35 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 It will be a single job running at a time by default (you can also
 configure the spark.streaming.concurrentJobs to run jobs parallel which is
 not recommended to put in production).

 Now, your batch duration being 1 sec and processing time being 2
 minutes, if you are using a receiver based streaming then ideally those
 receivers will keep on receiving data while the job is running (which will
 accumulate in memory if you set StorageLevel as MEMORY_ONLY and end up in
 block not found exceptions as spark drops some blocks which are yet to
 process to accumulate new blocks). If you are using a non-receiver based
 approach, you will not have this problem of dropping blocks.

 Ideally, if your data is small and you have enough memory to hold your
 data then it will run smoothly without any issues.

 Thanks
 Best Regards

 On Tue, May 19, 2015 at 1:23 PM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 What happnes if in a streaming application one job is not yet finished
 and stream interval reaches. Does it starts next job or wait for first to
 finish and rest jobs will keep on accumulating in queue.


 Say I have a streaming application with stream interval of 1 sec, but
 my job takes 2 min to process 1 sec stream , what will happen ?  At any
 time there will be only one job running or multiple ?







Re: spark streaming doubt

2015-05-19 Thread Akhil Das
On Tue, May 19, 2015 at 8:10 PM, Shushant Arora shushantaror...@gmail.com
wrote:

 So for Kafka+spark streaming, Receiver based streaming used highlevel api
 and non receiver based streaming used low level api.

 1.In high level receiver based streaming does it registers consumers at
 each job start(whenever a new job is launched by streaming application say
 at each second)?


​- Receiver based streaming will always have the receiver running parallel
while your job is running, So by default for every 200ms
(spark.streaming.blockInterval) the receiver will generate a block of data
which is read from Kafka.
​


 2.No of executors in highlevel receiver based jobs will always equal to no
 of partitions in topic ?


​- Not sure from where did you came up with this. For the non stream based
one, i think the number of partitions in spark will be equal to the number
of kafka partitions for the given topic.
​


 3.Will data from a single topic be consumed by executors in parllel or
 only one receiver consumes in multiple threads and assign to executors in
 high level receiver based approach ?

 ​- They will consume the data parallel.​ For the receiver based approach,
you can actually specify the number of receiver that you want to spawn for
consuming the messages.




 On Tue, May 19, 2015 at 2:38 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 spark.streaming.concurrentJobs takes an integer value, not boolean. If
 you set it as 2 then 2 jobs will run parallel. Default value is 1 and the
 next job will start once it completes the current one.


 Actually, in the current implementation of Spark Streaming and under
 default configuration, only job is active (i.e. under execution) at any
 point of time. So if one batch's processing takes longer than 10 seconds,
 then then next batch's jobs will stay queued.
 This can be changed with an experimental Spark property
 spark.streaming.concurrentJobs which is by default set to 1. Its not
 currently documented (maybe I should add it).
 The reason it is set to 1 is that concurrent jobs can potentially lead
 to weird sharing of resources and which can make it hard to debug the
 whether there is sufficient resources in the system to process the ingested
 data fast enough. With only 1 job running at a time, it is easy to see that
 if batch processing time  batch interval, then the system will be stable.
 Granted that this may not be the most efficient use of resources under
 certain conditions. We definitely hope to improve this in the future.


 Copied from TD's answer written in SO
 http://stackoverflow.com/questions/23528006/how-jobs-are-assigned-to-executors-in-spark-streaming
 .

 Non-receiver based streaming for example you can say are the fileStream,
 directStream ones. You can read a bit of information from here
 https://spark.apache.org/docs/1.3.1/streaming-kafka-integration.html

 Thanks
 Best Regards

 On Tue, May 19, 2015 at 2:13 PM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 Thanks Akhil.
 When I don't  set spark.streaming.concurrentJobs to true. Will the all
 pending jobs starts one by one after 1 jobs completes,or it does not
 creates jobs which could not be started at its desired interval.

 And Whats the difference and usage of Receiver vs non-receiver based
 streaming. Is there any documentation for that?

 On Tue, May 19, 2015 at 1:35 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 It will be a single job running at a time by default (you can also
 configure the spark.streaming.concurrentJobs to run jobs parallel which is
 not recommended to put in production).

 Now, your batch duration being 1 sec and processing time being 2
 minutes, if you are using a receiver based streaming then ideally those
 receivers will keep on receiving data while the job is running (which will
 accumulate in memory if you set StorageLevel as MEMORY_ONLY and end up in
 block not found exceptions as spark drops some blocks which are yet to
 process to accumulate new blocks). If you are using a non-receiver based
 approach, you will not have this problem of dropping blocks.

 Ideally, if your data is small and you have enough memory to hold your
 data then it will run smoothly without any issues.

 Thanks
 Best Regards

 On Tue, May 19, 2015 at 1:23 PM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 What happnes if in a streaming application one job is not yet finished
 and stream interval reaches. Does it starts next job or wait for first to
 finish and rest jobs will keep on accumulating in queue.


 Say I have a streaming application with stream interval of 1 sec, but
 my job takes 2 min to process 1 sec stream , what will happen ?  At any
 time there will be only one job running or multiple ?








Re: spark streaming doubt

2015-05-19 Thread Dibyendu Bhattacharya
Just to add, there is a Receiver based Kafka consumer which uses Kafka Low
Level Consumer API.

http://spark-packages.org/package/dibbhatt/kafka-spark-consumer


Regards,
Dibyendu

On Tue, May 19, 2015 at 9:00 PM, Akhil Das ak...@sigmoidanalytics.com
wrote:


 On Tue, May 19, 2015 at 8:10 PM, Shushant Arora shushantaror...@gmail.com
  wrote:

 So for Kafka+spark streaming, Receiver based streaming used highlevel api
 and non receiver based streaming used low level api.

 1.In high level receiver based streaming does it registers consumers at
 each job start(whenever a new job is launched by streaming application say
 at each second)?


 ​- Receiver based streaming will always have the receiver running
 parallel while your job is running, So by default for every 200ms
 (spark.streaming.blockInterval) the receiver will generate a block of data
 which is read from Kafka.
 ​


 2.No of executors in highlevel receiver based jobs will always equal to
 no of partitions in topic ?


 ​- Not sure from where did you came up with this. For the non stream
 based one, i think the number of partitions in spark will be equal to the
 number of kafka partitions for the given topic.
 ​


 3.Will data from a single topic be consumed by executors in parllel or
 only one receiver consumes in multiple threads and assign to executors in
 high level receiver based approach ?

 ​- They will consume the data parallel.​ For the receiver based
 approach, you can actually specify the number of receiver that you want to
 spawn for consuming the messages.




 On Tue, May 19, 2015 at 2:38 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 spark.streaming.concurrentJobs takes an integer value, not boolean. If
 you set it as 2 then 2 jobs will run parallel. Default value is 1 and the
 next job will start once it completes the current one.


 Actually, in the current implementation of Spark Streaming and under
 default configuration, only job is active (i.e. under execution) at any
 point of time. So if one batch's processing takes longer than 10 seconds,
 then then next batch's jobs will stay queued.
 This can be changed with an experimental Spark property
 spark.streaming.concurrentJobs which is by default set to 1. Its not
 currently documented (maybe I should add it).
 The reason it is set to 1 is that concurrent jobs can potentially lead
 to weird sharing of resources and which can make it hard to debug the
 whether there is sufficient resources in the system to process the ingested
 data fast enough. With only 1 job running at a time, it is easy to see that
 if batch processing time  batch interval, then the system will be stable.
 Granted that this may not be the most efficient use of resources under
 certain conditions. We definitely hope to improve this in the future.


 Copied from TD's answer written in SO
 http://stackoverflow.com/questions/23528006/how-jobs-are-assigned-to-executors-in-spark-streaming
 .

 Non-receiver based streaming for example you can say are the fileStream,
 directStream ones. You can read a bit of information from here
 https://spark.apache.org/docs/1.3.1/streaming-kafka-integration.html

 Thanks
 Best Regards

 On Tue, May 19, 2015 at 2:13 PM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 Thanks Akhil.
 When I don't  set spark.streaming.concurrentJobs to true. Will the all
 pending jobs starts one by one after 1 jobs completes,or it does not
 creates jobs which could not be started at its desired interval.

 And Whats the difference and usage of Receiver vs non-receiver based
 streaming. Is there any documentation for that?

 On Tue, May 19, 2015 at 1:35 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 It will be a single job running at a time by default (you can also
 configure the spark.streaming.concurrentJobs to run jobs parallel which is
 not recommended to put in production).

 Now, your batch duration being 1 sec and processing time being 2
 minutes, if you are using a receiver based streaming then ideally those
 receivers will keep on receiving data while the job is running (which will
 accumulate in memory if you set StorageLevel as MEMORY_ONLY and end up in
 block not found exceptions as spark drops some blocks which are yet to
 process to accumulate new blocks). If you are using a non-receiver based
 approach, you will not have this problem of dropping blocks.

 Ideally, if your data is small and you have enough memory to hold your
 data then it will run smoothly without any issues.

 Thanks
 Best Regards

 On Tue, May 19, 2015 at 1:23 PM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 What happnes if in a streaming application one job is not yet
 finished and stream interval reaches. Does it starts next job or wait for
 first to finish and rest jobs will keep on accumulating in queue.


 Say I have a streaming application with stream interval of 1 sec, but
 my job takes 2 min to process 1 sec stream , what will happen ?  At any
 time there will be only one job running or 

Re: spark streaming doubt

2015-05-19 Thread Shushant Arora
Thanks Akhil andDibyendu.

Does in high level receiver based streaming executors run on receivers
itself to have data localisation ? Or its always data is transferred to
executor nodes and executor nodes differ in each run of job but receiver
node remains same(same machines) throughout life of streaming application
unless node failure happens?



On Tue, May 19, 2015 at 9:29 PM, Dibyendu Bhattacharya 
dibyendu.bhattach...@gmail.com wrote:

 Just to add, there is a Receiver based Kafka consumer which uses Kafka Low
 Level Consumer API.

 http://spark-packages.org/package/dibbhatt/kafka-spark-consumer


 Regards,
 Dibyendu

 On Tue, May 19, 2015 at 9:00 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:


 On Tue, May 19, 2015 at 8:10 PM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 So for Kafka+spark streaming, Receiver based streaming used highlevel
 api and non receiver based streaming used low level api.

 1.In high level receiver based streaming does it registers consumers at
 each job start(whenever a new job is launched by streaming application say
 at each second)?


 ​- Receiver based streaming will always have the receiver running
 parallel while your job is running, So by default for every 200ms
 (spark.streaming.blockInterval) the receiver will generate a block of data
 which is read from Kafka.
 ​


 2.No of executors in highlevel receiver based jobs will always equal to
 no of partitions in topic ?


 ​- Not sure from where did you came up with this. For the non stream
 based one, i think the number of partitions in spark will be equal to the
 number of kafka partitions for the given topic.
 ​


 3.Will data from a single topic be consumed by executors in parllel or
 only one receiver consumes in multiple threads and assign to executors in
 high level receiver based approach ?

 ​- They will consume the data parallel.​ For the receiver based
 approach, you can actually specify the number of receiver that you want to
 spawn for consuming the messages.




 On Tue, May 19, 2015 at 2:38 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 spark.streaming.concurrentJobs takes an integer value, not boolean. If
 you set it as 2 then 2 jobs will run parallel. Default value is 1 and the
 next job will start once it completes the current one.


 Actually, in the current implementation of Spark Streaming and under
 default configuration, only job is active (i.e. under execution) at any
 point of time. So if one batch's processing takes longer than 10 seconds,
 then then next batch's jobs will stay queued.
 This can be changed with an experimental Spark property
 spark.streaming.concurrentJobs which is by default set to 1. Its not
 currently documented (maybe I should add it).
 The reason it is set to 1 is that concurrent jobs can potentially lead
 to weird sharing of resources and which can make it hard to debug the
 whether there is sufficient resources in the system to process the 
 ingested
 data fast enough. With only 1 job running at a time, it is easy to see 
 that
 if batch processing time  batch interval, then the system will be stable.
 Granted that this may not be the most efficient use of resources under
 certain conditions. We definitely hope to improve this in the future.


 Copied from TD's answer written in SO
 http://stackoverflow.com/questions/23528006/how-jobs-are-assigned-to-executors-in-spark-streaming
 .

 Non-receiver based streaming for example you can say are the
 fileStream, directStream ones. You can read a bit of information from here
 https://spark.apache.org/docs/1.3.1/streaming-kafka-integration.html

 Thanks
 Best Regards

 On Tue, May 19, 2015 at 2:13 PM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 Thanks Akhil.
 When I don't  set spark.streaming.concurrentJobs to true. Will the
 all pending jobs starts one by one after 1 jobs completes,or it does not
 creates jobs which could not be started at its desired interval.

 And Whats the difference and usage of Receiver vs non-receiver based
 streaming. Is there any documentation for that?

 On Tue, May 19, 2015 at 1:35 PM, Akhil Das ak...@sigmoidanalytics.com
  wrote:

 It will be a single job running at a time by default (you can also
 configure the spark.streaming.concurrentJobs to run jobs parallel which 
 is
 not recommended to put in production).

 Now, your batch duration being 1 sec and processing time being 2
 minutes, if you are using a receiver based streaming then ideally those
 receivers will keep on receiving data while the job is running (which 
 will
 accumulate in memory if you set StorageLevel as MEMORY_ONLY and end up in
 block not found exceptions as spark drops some blocks which are yet to
 process to accumulate new blocks). If you are using a non-receiver based
 approach, you will not have this problem of dropping blocks.

 Ideally, if your data is small and you have enough memory to hold
 your data then it will run smoothly without any issues.

 Thanks
 Best Regards

 On Tue, May 19, 2015 at