Re: spark streaming doubt
For second question I am comparing 2 situtations of processing kafkaRDD. case I - When I used foreachPartition to process kafka stream I am not able to see any stream job timing interval like Time: 142905487 ms . displayed on driver console at start of each stream batch. But it processed each RDD and on webUI it showed jobs got created at each batch interval of 1 sec. Case 2 -When I called mapPartition on kafkaStream RDD and then called any action (say print()) at end of each stream interval I am getting on driver console jobs getting created with batch interval Time: 142905487 ms .. Why in case-I no information comes on driver console? Thanks Shushant On Mon, Jul 13, 2015 at 7:22 PM, Cody Koeninger c...@koeninger.org wrote: Regarding your first question, having more partitions than you do executors usually means you'll have better utilization, because the workload will be distributed more evenly. There's some degree of per-task overhead, but as long as you don't have a huge imbalance between number of tasks and number of executors that shouldn't be a large problem. I don't really understand your second question. On Sat, Jul 11, 2015 at 5:00 AM, Shushant Arora shushantaror...@gmail.com wrote: 1.spark streaming 1.3 creates as many RDD Partitions as there are kafka partitions in topic. Say I have 300 partitions in topic and 10 executors and each with 3 cores so , is it means at a time only 10*3=30 partitions are processed and then 30 like that since executors launch tasks per RDD partitions , so I need in total; 300 tasks but since I have 30 cores(10 executors each with 3 cores) so these tasks will execute 30 after 30 till 300. So reducing no of kafka paartitions to say 100 will speed up the processing? 2.In spark streaming job when I processed the kafka stream using foreachRDD directKafkaStream.foreachRDD(new function( public void call( vi)){ v1.foreachPartition(new function(){public void call(){ //..process partition }}) }); since foreachRDD is operation so it spawns spark job but these jobs are not coming on driver console like in map and print function as 1.spark streaming 1.3 creates as many RDD Partitions as there are kafka partitions in topic. Say I have 300 partitions in topic and 10 executors and each with 3 cores so , is it means at a time only 10*3=30 partitions are processed and then 30 like that since executors launch tasks per RDD partitions , so I need in total; 300 tasks but since I have 30 cores(10 executors each with 3 cores) so these tasks will execute 30 after 30 till 300. So reducing no of kafka paartitions to say 100 will speed up the processing? 2.In spark streaming job when I processed the kafka stream using foreachRDD directKafkaStream.foreachRDD(new function( public void call( vi)){ v1.foreachPartition(new function(){public void call(){ //..process partition }}) }); since foreachRDD is operation so it spawns spark job but these jobs timings are not coming on driver console like in map and print function as --- Time: 142905487 ms --- -- Time: 1429054871000 ms --- .. Why is it so? Thanks Shushant
Re: spark streaming doubt
Regarding your first question, having more partitions than you do executors usually means you'll have better utilization, because the workload will be distributed more evenly. There's some degree of per-task overhead, but as long as you don't have a huge imbalance between number of tasks and number of executors that shouldn't be a large problem. I don't really understand your second question. On Sat, Jul 11, 2015 at 5:00 AM, Shushant Arora shushantaror...@gmail.com wrote: 1.spark streaming 1.3 creates as many RDD Partitions as there are kafka partitions in topic. Say I have 300 partitions in topic and 10 executors and each with 3 cores so , is it means at a time only 10*3=30 partitions are processed and then 30 like that since executors launch tasks per RDD partitions , so I need in total; 300 tasks but since I have 30 cores(10 executors each with 3 cores) so these tasks will execute 30 after 30 till 300. So reducing no of kafka paartitions to say 100 will speed up the processing? 2.In spark streaming job when I processed the kafka stream using foreachRDD directKafkaStream.foreachRDD(new function( public void call( vi)){ v1.foreachPartition(new function(){public void call(){ //..process partition }}) }); since foreachRDD is operation so it spawns spark job but these jobs are not coming on driver console like in map and print function as 1.spark streaming 1.3 creates as many RDD Partitions as there are kafka partitions in topic. Say I have 300 partitions in topic and 10 executors and each with 3 cores so , is it means at a time only 10*3=30 partitions are processed and then 30 like that since executors launch tasks per RDD partitions , so I need in total; 300 tasks but since I have 30 cores(10 executors each with 3 cores) so these tasks will execute 30 after 30 till 300. So reducing no of kafka paartitions to say 100 will speed up the processing? 2.In spark streaming job when I processed the kafka stream using foreachRDD directKafkaStream.foreachRDD(new function( public void call( vi)){ v1.foreachPartition(new function(){public void call(){ //..process partition }}) }); since foreachRDD is operation so it spawns spark job but these jobs timings are not coming on driver console like in map and print function as --- Time: 142905487 ms --- -- Time: 1429054871000 ms --- .. Why is it so? Thanks Shushant
Re: spark streaming doubt
Hi Sushant/Cody, For question 1 , following is my understanding ( I am not 100% sure and this is only my understanding, I have asked this question in another words to TD for confirmation which is not confirmed as of now). Following is my understanding. In accordance with tasks created in proportion to partitions of data the main goal is to try to parallelize execution of tasks which is dependent on number of simultaneous threads created in executor JVM) and number of threads to be created is controlled by User of program and User has to set this number in accordance with number of physical cores. In Yarn number of threads should be numDefaultPartitions*executor-cores (which is user supplied). For example if you have 3 physical cores in your machine then might be you can create alteast 6 threads by passing executor-cores = 6 assuming your numDefaultPartitions is set to 1. Then what it should do is each executor should execute concurrent 6 tasks (rather than 3 which was number of physical cores) and when these 6 tasks finish then it should execute another 6 and so on. (Please note: Again this is my understanding how Spark works and I may be wrong). Thanks and Regards Aniruddh On Mon, Jul 13, 2015 at 7:22 PM, Cody Koeninger c...@koeninger.org wrote: Regarding your first question, having more partitions than you do executors usually means you'll have better utilization, because the workload will be distributed more evenly. There's some degree of per-task overhead, but as long as you don't have a huge imbalance between number of tasks and number of executors that shouldn't be a large problem. I don't really understand your second question. On Sat, Jul 11, 2015 at 5:00 AM, Shushant Arora shushantaror...@gmail.com wrote: 1.spark streaming 1.3 creates as many RDD Partitions as there are kafka partitions in topic. Say I have 300 partitions in topic and 10 executors and each with 3 cores so , is it means at a time only 10*3=30 partitions are processed and then 30 like that since executors launch tasks per RDD partitions , so I need in total; 300 tasks but since I have 30 cores(10 executors each with 3 cores) so these tasks will execute 30 after 30 till 300. So reducing no of kafka paartitions to say 100 will speed up the processing? 2.In spark streaming job when I processed the kafka stream using foreachRDD directKafkaStream.foreachRDD(new function( public void call( vi)){ v1.foreachPartition(new function(){public void call(){ //..process partition }}) }); since foreachRDD is operation so it spawns spark job but these jobs are not coming on driver console like in map and print function as 1.spark streaming 1.3 creates as many RDD Partitions as there are kafka partitions in topic. Say I have 300 partitions in topic and 10 executors and each with 3 cores so , is it means at a time only 10*3=30 partitions are processed and then 30 like that since executors launch tasks per RDD partitions , so I need in total; 300 tasks but since I have 30 cores(10 executors each with 3 cores) so these tasks will execute 30 after 30 till 300. So reducing no of kafka paartitions to say 100 will speed up the processing? 2.In spark streaming job when I processed the kafka stream using foreachRDD directKafkaStream.foreachRDD(new function( public void call( vi)){ v1.foreachPartition(new function(){public void call(){ //..process partition }}) }); since foreachRDD is operation so it spawns spark job but these jobs timings are not coming on driver console like in map and print function as --- Time: 142905487 ms --- -- Time: 1429054871000 ms --- .. Why is it so? Thanks Shushant
spark streaming doubt
1.spark streaming 1.3 creates as many RDD Partitions as there are kafka partitions in topic. Say I have 300 partitions in topic and 10 executors and each with 3 cores so , is it means at a time only 10*3=30 partitions are processed and then 30 like that since executors launch tasks per RDD partitions , so I need in total; 300 tasks but since I have 30 cores(10 executors each with 3 cores) so these tasks will execute 30 after 30 till 300. So reducing no of kafka paartitions to say 100 will speed up the processing? 2.In spark streaming job when I processed the kafka stream using foreachRDD directKafkaStream.foreachRDD(new function( public void call( vi)){ v1.foreachPartition(new function(){public void call(){ //..process partition }}) }); since foreachRDD is operation so it spawns spark job but these jobs are not coming on driver console like in map and print function as 1.spark streaming 1.3 creates as many RDD Partitions as there are kafka partitions in topic. Say I have 300 partitions in topic and 10 executors and each with 3 cores so , is it means at a time only 10*3=30 partitions are processed and then 30 like that since executors launch tasks per RDD partitions , so I need in total; 300 tasks but since I have 30 cores(10 executors each with 3 cores) so these tasks will execute 30 after 30 till 300. So reducing no of kafka paartitions to say 100 will speed up the processing? 2.In spark streaming job when I processed the kafka stream using foreachRDD directKafkaStream.foreachRDD(new function( public void call( vi)){ v1.foreachPartition(new function(){public void call(){ //..process partition }}) }); since foreachRDD is operation so it spawns spark job but these jobs timings are not coming on driver console like in map and print function as --- Time: 142905487 ms --- -- Time: 1429054871000 ms --- .. Why is it so? Thanks Shushant
Re: spark streaming doubt
One receiver basically runs on 1 core, so if your single node is having 4 cores, there are still 3 cores left for the processing (for executors). And yes receiver remains on the same machine unless some failure happens. Thanks Best Regards On Tue, May 19, 2015 at 10:57 PM, Shushant Arora shushantaror...@gmail.com wrote: Thanks Akhil andDibyendu. Does in high level receiver based streaming executors run on receivers itself to have data localisation ? Or its always data is transferred to executor nodes and executor nodes differ in each run of job but receiver node remains same(same machines) throughout life of streaming application unless node failure happens? On Tue, May 19, 2015 at 9:29 PM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: Just to add, there is a Receiver based Kafka consumer which uses Kafka Low Level Consumer API. http://spark-packages.org/package/dibbhatt/kafka-spark-consumer Regards, Dibyendu On Tue, May 19, 2015 at 9:00 PM, Akhil Das ak...@sigmoidanalytics.com wrote: On Tue, May 19, 2015 at 8:10 PM, Shushant Arora shushantaror...@gmail.com wrote: So for Kafka+spark streaming, Receiver based streaming used highlevel api and non receiver based streaming used low level api. 1.In high level receiver based streaming does it registers consumers at each job start(whenever a new job is launched by streaming application say at each second)? - Receiver based streaming will always have the receiver running parallel while your job is running, So by default for every 200ms (spark.streaming.blockInterval) the receiver will generate a block of data which is read from Kafka. 2.No of executors in highlevel receiver based jobs will always equal to no of partitions in topic ? - Not sure from where did you came up with this. For the non stream based one, i think the number of partitions in spark will be equal to the number of kafka partitions for the given topic. 3.Will data from a single topic be consumed by executors in parllel or only one receiver consumes in multiple threads and assign to executors in high level receiver based approach ? - They will consume the data parallel. For the receiver based approach, you can actually specify the number of receiver that you want to spawn for consuming the messages. On Tue, May 19, 2015 at 2:38 PM, Akhil Das ak...@sigmoidanalytics.com wrote: spark.streaming.concurrentJobs takes an integer value, not boolean. If you set it as 2 then 2 jobs will run parallel. Default value is 1 and the next job will start once it completes the current one. Actually, in the current implementation of Spark Streaming and under default configuration, only job is active (i.e. under execution) at any point of time. So if one batch's processing takes longer than 10 seconds, then then next batch's jobs will stay queued. This can be changed with an experimental Spark property spark.streaming.concurrentJobs which is by default set to 1. Its not currently documented (maybe I should add it). The reason it is set to 1 is that concurrent jobs can potentially lead to weird sharing of resources and which can make it hard to debug the whether there is sufficient resources in the system to process the ingested data fast enough. With only 1 job running at a time, it is easy to see that if batch processing time batch interval, then the system will be stable. Granted that this may not be the most efficient use of resources under certain conditions. We definitely hope to improve this in the future. Copied from TD's answer written in SO http://stackoverflow.com/questions/23528006/how-jobs-are-assigned-to-executors-in-spark-streaming . Non-receiver based streaming for example you can say are the fileStream, directStream ones. You can read a bit of information from here https://spark.apache.org/docs/1.3.1/streaming-kafka-integration.html Thanks Best Regards On Tue, May 19, 2015 at 2:13 PM, Shushant Arora shushantaror...@gmail.com wrote: Thanks Akhil. When I don't set spark.streaming.concurrentJobs to true. Will the all pending jobs starts one by one after 1 jobs completes,or it does not creates jobs which could not be started at its desired interval. And Whats the difference and usage of Receiver vs non-receiver based streaming. Is there any documentation for that? On Tue, May 19, 2015 at 1:35 PM, Akhil Das ak...@sigmoidanalytics.com wrote: It will be a single job running at a time by default (you can also configure the spark.streaming.concurrentJobs to run jobs parallel which is not recommended to put in production). Now, your batch duration being 1 sec and processing time being 2 minutes, if you are using a receiver based streaming then ideally those receivers will keep on receiving data while the job is running (which will accumulate in memory if you set StorageLevel as MEMORY_ONLY and end up in block not found exceptions as spark drops
Re: spark streaming doubt
So I can explicitly specify no of receivers and executors in receiver based streaming? Can you share a sample program if any? Also in Low level non receiver based , will data be fetched by same worker executor node and processed ? Also if I have concurrent jobs set to 1- so in low level fetching and processing will be delayed till next job starts ,say a situation where I have 1 sec of stream interval but my job1 takes 5 sec to complete , hence job2 starts at end of 5 sec, so now will it process all data from sec1 to sec 5 in low level non receiver streaming or only for interval sec1-sec2 ? And if it processes data for complete duration sec1-sec5.Is there any option to suppress start of other queued jobs(for interval sec2-3, sec3-4,sec4-5) since there work is already done by job2 ? On Wed, May 20, 2015 at 12:36 PM, Akhil Das ak...@sigmoidanalytics.com wrote: One receiver basically runs on 1 core, so if your single node is having 4 cores, there are still 3 cores left for the processing (for executors). And yes receiver remains on the same machine unless some failure happens. Thanks Best Regards On Tue, May 19, 2015 at 10:57 PM, Shushant Arora shushantaror...@gmail.com wrote: Thanks Akhil andDibyendu. Does in high level receiver based streaming executors run on receivers itself to have data localisation ? Or its always data is transferred to executor nodes and executor nodes differ in each run of job but receiver node remains same(same machines) throughout life of streaming application unless node failure happens? On Tue, May 19, 2015 at 9:29 PM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: Just to add, there is a Receiver based Kafka consumer which uses Kafka Low Level Consumer API. http://spark-packages.org/package/dibbhatt/kafka-spark-consumer Regards, Dibyendu On Tue, May 19, 2015 at 9:00 PM, Akhil Das ak...@sigmoidanalytics.com wrote: On Tue, May 19, 2015 at 8:10 PM, Shushant Arora shushantaror...@gmail.com wrote: So for Kafka+spark streaming, Receiver based streaming used highlevel api and non receiver based streaming used low level api. 1.In high level receiver based streaming does it registers consumers at each job start(whenever a new job is launched by streaming application say at each second)? - Receiver based streaming will always have the receiver running parallel while your job is running, So by default for every 200ms (spark.streaming.blockInterval) the receiver will generate a block of data which is read from Kafka. 2.No of executors in highlevel receiver based jobs will always equal to no of partitions in topic ? - Not sure from where did you came up with this. For the non stream based one, i think the number of partitions in spark will be equal to the number of kafka partitions for the given topic. 3.Will data from a single topic be consumed by executors in parllel or only one receiver consumes in multiple threads and assign to executors in high level receiver based approach ? - They will consume the data parallel. For the receiver based approach, you can actually specify the number of receiver that you want to spawn for consuming the messages. On Tue, May 19, 2015 at 2:38 PM, Akhil Das ak...@sigmoidanalytics.com wrote: spark.streaming.concurrentJobs takes an integer value, not boolean. If you set it as 2 then 2 jobs will run parallel. Default value is 1 and the next job will start once it completes the current one. Actually, in the current implementation of Spark Streaming and under default configuration, only job is active (i.e. under execution) at any point of time. So if one batch's processing takes longer than 10 seconds, then then next batch's jobs will stay queued. This can be changed with an experimental Spark property spark.streaming.concurrentJobs which is by default set to 1. Its not currently documented (maybe I should add it). The reason it is set to 1 is that concurrent jobs can potentially lead to weird sharing of resources and which can make it hard to debug the whether there is sufficient resources in the system to process the ingested data fast enough. With only 1 job running at a time, it is easy to see that if batch processing time batch interval, then the system will be stable. Granted that this may not be the most efficient use of resources under certain conditions. We definitely hope to improve this in the future. Copied from TD's answer written in SO http://stackoverflow.com/questions/23528006/how-jobs-are-assigned-to-executors-in-spark-streaming . Non-receiver based streaming for example you can say are the fileStream, directStream ones. You can read a bit of information from here https://spark.apache.org/docs/1.3.1/streaming-kafka-integration.html Thanks Best Regards On Tue, May 19, 2015 at 2:13 PM, Shushant Arora shushantaror...@gmail.com wrote: Thanks Akhil. When I don't set
Re: spark streaming doubt
On Wed, May 20, 2015 at 1:12 PM, Shushant Arora shushantaror...@gmail.com wrote: So I can explicitly specify no of receivers and executors in receiver based streaming? Can you share a sample program if any? -You can look at the lowlevel consumer repo https://github.com/dibbhatt/kafka-spark-consumer shared by Dibyendu for sample code. Also in Low level non receiver based , will data be fetched by same worker executor node and processed ? Also if I have concurrent jobs set to 1- so in low level fetching and processing will be delayed till next job starts ,say a situation where I have 1 sec of stream interval but my job1 takes 5 sec to complete , hence job2 starts at end of 5 sec, so now will it process all data from sec1 to sec 5 in low level non receiver streaming or only for interval sec1-sec2 ? And if it processes data for complete duration sec1-sec5.Is there any option to suppress start of other queued jobs(for interval sec2-3, sec3-4,sec4-5) since there work is already done by job2 ? - I believe all your data from sec2-sec5 will be available in Kafka and when the second batch starts at 5 sec it will consumer it (you can also limit the rate with spark.streaming.kafka.maxRatePerPartition) Read more here https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md On Wed, May 20, 2015 at 12:36 PM, Akhil Das ak...@sigmoidanalytics.com wrote: One receiver basically runs on 1 core, so if your single node is having 4 cores, there are still 3 cores left for the processing (for executors). And yes receiver remains on the same machine unless some failure happens. Thanks Best Regards On Tue, May 19, 2015 at 10:57 PM, Shushant Arora shushantaror...@gmail.com wrote: Thanks Akhil andDibyendu. Does in high level receiver based streaming executors run on receivers itself to have data localisation ? Or its always data is transferred to executor nodes and executor nodes differ in each run of job but receiver node remains same(same machines) throughout life of streaming application unless node failure happens? On Tue, May 19, 2015 at 9:29 PM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: Just to add, there is a Receiver based Kafka consumer which uses Kafka Low Level Consumer API. http://spark-packages.org/package/dibbhatt/kafka-spark-consumer Regards, Dibyendu On Tue, May 19, 2015 at 9:00 PM, Akhil Das ak...@sigmoidanalytics.com wrote: On Tue, May 19, 2015 at 8:10 PM, Shushant Arora shushantaror...@gmail.com wrote: So for Kafka+spark streaming, Receiver based streaming used highlevel api and non receiver based streaming used low level api. 1.In high level receiver based streaming does it registers consumers at each job start(whenever a new job is launched by streaming application say at each second)? - Receiver based streaming will always have the receiver running parallel while your job is running, So by default for every 200ms (spark.streaming.blockInterval) the receiver will generate a block of data which is read from Kafka. 2.No of executors in highlevel receiver based jobs will always equal to no of partitions in topic ? - Not sure from where did you came up with this. For the non stream based one, i think the number of partitions in spark will be equal to the number of kafka partitions for the given topic. 3.Will data from a single topic be consumed by executors in parllel or only one receiver consumes in multiple threads and assign to executors in high level receiver based approach ? - They will consume the data parallel. For the receiver based approach, you can actually specify the number of receiver that you want to spawn for consuming the messages. On Tue, May 19, 2015 at 2:38 PM, Akhil Das ak...@sigmoidanalytics.com wrote: spark.streaming.concurrentJobs takes an integer value, not boolean. If you set it as 2 then 2 jobs will run parallel. Default value is 1 and the next job will start once it completes the current one. Actually, in the current implementation of Spark Streaming and under default configuration, only job is active (i.e. under execution) at any point of time. So if one batch's processing takes longer than 10 seconds, then then next batch's jobs will stay queued. This can be changed with an experimental Spark property spark.streaming.concurrentJobs which is by default set to 1. Its not currently documented (maybe I should add it). The reason it is set to 1 is that concurrent jobs can potentially lead to weird sharing of resources and which can make it hard to debug the whether there is sufficient resources in the system to process the ingested data fast enough. With only 1 job running at a time, it is easy to see that if batch processing time batch interval, then the system will be stable. Granted that this may not be the most efficient use of resources under certain conditions. We definitely hope to
spark streaming doubt
What happnes if in a streaming application one job is not yet finished and stream interval reaches. Does it starts next job or wait for first to finish and rest jobs will keep on accumulating in queue. Say I have a streaming application with stream interval of 1 sec, but my job takes 2 min to process 1 sec stream , what will happen ? At any time there will be only one job running or multiple ?
Re: spark streaming doubt
It will be a single job running at a time by default (you can also configure the spark.streaming.concurrentJobs to run jobs parallel which is not recommended to put in production). Now, your batch duration being 1 sec and processing time being 2 minutes, if you are using a receiver based streaming then ideally those receivers will keep on receiving data while the job is running (which will accumulate in memory if you set StorageLevel as MEMORY_ONLY and end up in block not found exceptions as spark drops some blocks which are yet to process to accumulate new blocks). If you are using a non-receiver based approach, you will not have this problem of dropping blocks. Ideally, if your data is small and you have enough memory to hold your data then it will run smoothly without any issues. Thanks Best Regards On Tue, May 19, 2015 at 1:23 PM, Shushant Arora shushantaror...@gmail.com wrote: What happnes if in a streaming application one job is not yet finished and stream interval reaches. Does it starts next job or wait for first to finish and rest jobs will keep on accumulating in queue. Say I have a streaming application with stream interval of 1 sec, but my job takes 2 min to process 1 sec stream , what will happen ? At any time there will be only one job running or multiple ?
Re: spark streaming doubt
spark.streaming.concurrentJobs takes an integer value, not boolean. If you set it as 2 then 2 jobs will run parallel. Default value is 1 and the next job will start once it completes the current one. Actually, in the current implementation of Spark Streaming and under default configuration, only job is active (i.e. under execution) at any point of time. So if one batch's processing takes longer than 10 seconds, then then next batch's jobs will stay queued. This can be changed with an experimental Spark property spark.streaming.concurrentJobs which is by default set to 1. Its not currently documented (maybe I should add it). The reason it is set to 1 is that concurrent jobs can potentially lead to weird sharing of resources and which can make it hard to debug the whether there is sufficient resources in the system to process the ingested data fast enough. With only 1 job running at a time, it is easy to see that if batch processing time batch interval, then the system will be stable. Granted that this may not be the most efficient use of resources under certain conditions. We definitely hope to improve this in the future. Copied from TD's answer written in SO http://stackoverflow.com/questions/23528006/how-jobs-are-assigned-to-executors-in-spark-streaming . Non-receiver based streaming for example you can say are the fileStream, directStream ones. You can read a bit of information from here https://spark.apache.org/docs/1.3.1/streaming-kafka-integration.html Thanks Best Regards On Tue, May 19, 2015 at 2:13 PM, Shushant Arora shushantaror...@gmail.com wrote: Thanks Akhil. When I don't set spark.streaming.concurrentJobs to true. Will the all pending jobs starts one by one after 1 jobs completes,or it does not creates jobs which could not be started at its desired interval. And Whats the difference and usage of Receiver vs non-receiver based streaming. Is there any documentation for that? On Tue, May 19, 2015 at 1:35 PM, Akhil Das ak...@sigmoidanalytics.com wrote: It will be a single job running at a time by default (you can also configure the spark.streaming.concurrentJobs to run jobs parallel which is not recommended to put in production). Now, your batch duration being 1 sec and processing time being 2 minutes, if you are using a receiver based streaming then ideally those receivers will keep on receiving data while the job is running (which will accumulate in memory if you set StorageLevel as MEMORY_ONLY and end up in block not found exceptions as spark drops some blocks which are yet to process to accumulate new blocks). If you are using a non-receiver based approach, you will not have this problem of dropping blocks. Ideally, if your data is small and you have enough memory to hold your data then it will run smoothly without any issues. Thanks Best Regards On Tue, May 19, 2015 at 1:23 PM, Shushant Arora shushantaror...@gmail.com wrote: What happnes if in a streaming application one job is not yet finished and stream interval reaches. Does it starts next job or wait for first to finish and rest jobs will keep on accumulating in queue. Say I have a streaming application with stream interval of 1 sec, but my job takes 2 min to process 1 sec stream , what will happen ? At any time there will be only one job running or multiple ?
Re: spark streaming doubt
So for Kafka+spark streaming, Receiver based streaming used highlevel api and non receiver based streaming used low level api. 1.In high level receiver based streaming does it registers consumers at each job start(whenever a new job is launched by streaming application say at each second)? 2.No of executors in highlevel receiver based jobs will always equal to no of partitions in topic ? 3.Will data from a single topic be consumed by executors in parllel or only one receiver consumes in multiple threads and assign to executors in high level receiver based approach ? On Tue, May 19, 2015 at 2:38 PM, Akhil Das ak...@sigmoidanalytics.com wrote: spark.streaming.concurrentJobs takes an integer value, not boolean. If you set it as 2 then 2 jobs will run parallel. Default value is 1 and the next job will start once it completes the current one. Actually, in the current implementation of Spark Streaming and under default configuration, only job is active (i.e. under execution) at any point of time. So if one batch's processing takes longer than 10 seconds, then then next batch's jobs will stay queued. This can be changed with an experimental Spark property spark.streaming.concurrentJobs which is by default set to 1. Its not currently documented (maybe I should add it). The reason it is set to 1 is that concurrent jobs can potentially lead to weird sharing of resources and which can make it hard to debug the whether there is sufficient resources in the system to process the ingested data fast enough. With only 1 job running at a time, it is easy to see that if batch processing time batch interval, then the system will be stable. Granted that this may not be the most efficient use of resources under certain conditions. We definitely hope to improve this in the future. Copied from TD's answer written in SO http://stackoverflow.com/questions/23528006/how-jobs-are-assigned-to-executors-in-spark-streaming . Non-receiver based streaming for example you can say are the fileStream, directStream ones. You can read a bit of information from here https://spark.apache.org/docs/1.3.1/streaming-kafka-integration.html Thanks Best Regards On Tue, May 19, 2015 at 2:13 PM, Shushant Arora shushantaror...@gmail.com wrote: Thanks Akhil. When I don't set spark.streaming.concurrentJobs to true. Will the all pending jobs starts one by one after 1 jobs completes,or it does not creates jobs which could not be started at its desired interval. And Whats the difference and usage of Receiver vs non-receiver based streaming. Is there any documentation for that? On Tue, May 19, 2015 at 1:35 PM, Akhil Das ak...@sigmoidanalytics.com wrote: It will be a single job running at a time by default (you can also configure the spark.streaming.concurrentJobs to run jobs parallel which is not recommended to put in production). Now, your batch duration being 1 sec and processing time being 2 minutes, if you are using a receiver based streaming then ideally those receivers will keep on receiving data while the job is running (which will accumulate in memory if you set StorageLevel as MEMORY_ONLY and end up in block not found exceptions as spark drops some blocks which are yet to process to accumulate new blocks). If you are using a non-receiver based approach, you will not have this problem of dropping blocks. Ideally, if your data is small and you have enough memory to hold your data then it will run smoothly without any issues. Thanks Best Regards On Tue, May 19, 2015 at 1:23 PM, Shushant Arora shushantaror...@gmail.com wrote: What happnes if in a streaming application one job is not yet finished and stream interval reaches. Does it starts next job or wait for first to finish and rest jobs will keep on accumulating in queue. Say I have a streaming application with stream interval of 1 sec, but my job takes 2 min to process 1 sec stream , what will happen ? At any time there will be only one job running or multiple ?
Re: spark streaming doubt
On Tue, May 19, 2015 at 8:10 PM, Shushant Arora shushantaror...@gmail.com wrote: So for Kafka+spark streaming, Receiver based streaming used highlevel api and non receiver based streaming used low level api. 1.In high level receiver based streaming does it registers consumers at each job start(whenever a new job is launched by streaming application say at each second)? - Receiver based streaming will always have the receiver running parallel while your job is running, So by default for every 200ms (spark.streaming.blockInterval) the receiver will generate a block of data which is read from Kafka. 2.No of executors in highlevel receiver based jobs will always equal to no of partitions in topic ? - Not sure from where did you came up with this. For the non stream based one, i think the number of partitions in spark will be equal to the number of kafka partitions for the given topic. 3.Will data from a single topic be consumed by executors in parllel or only one receiver consumes in multiple threads and assign to executors in high level receiver based approach ? - They will consume the data parallel. For the receiver based approach, you can actually specify the number of receiver that you want to spawn for consuming the messages. On Tue, May 19, 2015 at 2:38 PM, Akhil Das ak...@sigmoidanalytics.com wrote: spark.streaming.concurrentJobs takes an integer value, not boolean. If you set it as 2 then 2 jobs will run parallel. Default value is 1 and the next job will start once it completes the current one. Actually, in the current implementation of Spark Streaming and under default configuration, only job is active (i.e. under execution) at any point of time. So if one batch's processing takes longer than 10 seconds, then then next batch's jobs will stay queued. This can be changed with an experimental Spark property spark.streaming.concurrentJobs which is by default set to 1. Its not currently documented (maybe I should add it). The reason it is set to 1 is that concurrent jobs can potentially lead to weird sharing of resources and which can make it hard to debug the whether there is sufficient resources in the system to process the ingested data fast enough. With only 1 job running at a time, it is easy to see that if batch processing time batch interval, then the system will be stable. Granted that this may not be the most efficient use of resources under certain conditions. We definitely hope to improve this in the future. Copied from TD's answer written in SO http://stackoverflow.com/questions/23528006/how-jobs-are-assigned-to-executors-in-spark-streaming . Non-receiver based streaming for example you can say are the fileStream, directStream ones. You can read a bit of information from here https://spark.apache.org/docs/1.3.1/streaming-kafka-integration.html Thanks Best Regards On Tue, May 19, 2015 at 2:13 PM, Shushant Arora shushantaror...@gmail.com wrote: Thanks Akhil. When I don't set spark.streaming.concurrentJobs to true. Will the all pending jobs starts one by one after 1 jobs completes,or it does not creates jobs which could not be started at its desired interval. And Whats the difference and usage of Receiver vs non-receiver based streaming. Is there any documentation for that? On Tue, May 19, 2015 at 1:35 PM, Akhil Das ak...@sigmoidanalytics.com wrote: It will be a single job running at a time by default (you can also configure the spark.streaming.concurrentJobs to run jobs parallel which is not recommended to put in production). Now, your batch duration being 1 sec and processing time being 2 minutes, if you are using a receiver based streaming then ideally those receivers will keep on receiving data while the job is running (which will accumulate in memory if you set StorageLevel as MEMORY_ONLY and end up in block not found exceptions as spark drops some blocks which are yet to process to accumulate new blocks). If you are using a non-receiver based approach, you will not have this problem of dropping blocks. Ideally, if your data is small and you have enough memory to hold your data then it will run smoothly without any issues. Thanks Best Regards On Tue, May 19, 2015 at 1:23 PM, Shushant Arora shushantaror...@gmail.com wrote: What happnes if in a streaming application one job is not yet finished and stream interval reaches. Does it starts next job or wait for first to finish and rest jobs will keep on accumulating in queue. Say I have a streaming application with stream interval of 1 sec, but my job takes 2 min to process 1 sec stream , what will happen ? At any time there will be only one job running or multiple ?
Re: spark streaming doubt
Just to add, there is a Receiver based Kafka consumer which uses Kafka Low Level Consumer API. http://spark-packages.org/package/dibbhatt/kafka-spark-consumer Regards, Dibyendu On Tue, May 19, 2015 at 9:00 PM, Akhil Das ak...@sigmoidanalytics.com wrote: On Tue, May 19, 2015 at 8:10 PM, Shushant Arora shushantaror...@gmail.com wrote: So for Kafka+spark streaming, Receiver based streaming used highlevel api and non receiver based streaming used low level api. 1.In high level receiver based streaming does it registers consumers at each job start(whenever a new job is launched by streaming application say at each second)? - Receiver based streaming will always have the receiver running parallel while your job is running, So by default for every 200ms (spark.streaming.blockInterval) the receiver will generate a block of data which is read from Kafka. 2.No of executors in highlevel receiver based jobs will always equal to no of partitions in topic ? - Not sure from where did you came up with this. For the non stream based one, i think the number of partitions in spark will be equal to the number of kafka partitions for the given topic. 3.Will data from a single topic be consumed by executors in parllel or only one receiver consumes in multiple threads and assign to executors in high level receiver based approach ? - They will consume the data parallel. For the receiver based approach, you can actually specify the number of receiver that you want to spawn for consuming the messages. On Tue, May 19, 2015 at 2:38 PM, Akhil Das ak...@sigmoidanalytics.com wrote: spark.streaming.concurrentJobs takes an integer value, not boolean. If you set it as 2 then 2 jobs will run parallel. Default value is 1 and the next job will start once it completes the current one. Actually, in the current implementation of Spark Streaming and under default configuration, only job is active (i.e. under execution) at any point of time. So if one batch's processing takes longer than 10 seconds, then then next batch's jobs will stay queued. This can be changed with an experimental Spark property spark.streaming.concurrentJobs which is by default set to 1. Its not currently documented (maybe I should add it). The reason it is set to 1 is that concurrent jobs can potentially lead to weird sharing of resources and which can make it hard to debug the whether there is sufficient resources in the system to process the ingested data fast enough. With only 1 job running at a time, it is easy to see that if batch processing time batch interval, then the system will be stable. Granted that this may not be the most efficient use of resources under certain conditions. We definitely hope to improve this in the future. Copied from TD's answer written in SO http://stackoverflow.com/questions/23528006/how-jobs-are-assigned-to-executors-in-spark-streaming . Non-receiver based streaming for example you can say are the fileStream, directStream ones. You can read a bit of information from here https://spark.apache.org/docs/1.3.1/streaming-kafka-integration.html Thanks Best Regards On Tue, May 19, 2015 at 2:13 PM, Shushant Arora shushantaror...@gmail.com wrote: Thanks Akhil. When I don't set spark.streaming.concurrentJobs to true. Will the all pending jobs starts one by one after 1 jobs completes,or it does not creates jobs which could not be started at its desired interval. And Whats the difference and usage of Receiver vs non-receiver based streaming. Is there any documentation for that? On Tue, May 19, 2015 at 1:35 PM, Akhil Das ak...@sigmoidanalytics.com wrote: It will be a single job running at a time by default (you can also configure the spark.streaming.concurrentJobs to run jobs parallel which is not recommended to put in production). Now, your batch duration being 1 sec and processing time being 2 minutes, if you are using a receiver based streaming then ideally those receivers will keep on receiving data while the job is running (which will accumulate in memory if you set StorageLevel as MEMORY_ONLY and end up in block not found exceptions as spark drops some blocks which are yet to process to accumulate new blocks). If you are using a non-receiver based approach, you will not have this problem of dropping blocks. Ideally, if your data is small and you have enough memory to hold your data then it will run smoothly without any issues. Thanks Best Regards On Tue, May 19, 2015 at 1:23 PM, Shushant Arora shushantaror...@gmail.com wrote: What happnes if in a streaming application one job is not yet finished and stream interval reaches. Does it starts next job or wait for first to finish and rest jobs will keep on accumulating in queue. Say I have a streaming application with stream interval of 1 sec, but my job takes 2 min to process 1 sec stream , what will happen ? At any time there will be only one job running or
Re: spark streaming doubt
Thanks Akhil andDibyendu. Does in high level receiver based streaming executors run on receivers itself to have data localisation ? Or its always data is transferred to executor nodes and executor nodes differ in each run of job but receiver node remains same(same machines) throughout life of streaming application unless node failure happens? On Tue, May 19, 2015 at 9:29 PM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: Just to add, there is a Receiver based Kafka consumer which uses Kafka Low Level Consumer API. http://spark-packages.org/package/dibbhatt/kafka-spark-consumer Regards, Dibyendu On Tue, May 19, 2015 at 9:00 PM, Akhil Das ak...@sigmoidanalytics.com wrote: On Tue, May 19, 2015 at 8:10 PM, Shushant Arora shushantaror...@gmail.com wrote: So for Kafka+spark streaming, Receiver based streaming used highlevel api and non receiver based streaming used low level api. 1.In high level receiver based streaming does it registers consumers at each job start(whenever a new job is launched by streaming application say at each second)? - Receiver based streaming will always have the receiver running parallel while your job is running, So by default for every 200ms (spark.streaming.blockInterval) the receiver will generate a block of data which is read from Kafka. 2.No of executors in highlevel receiver based jobs will always equal to no of partitions in topic ? - Not sure from where did you came up with this. For the non stream based one, i think the number of partitions in spark will be equal to the number of kafka partitions for the given topic. 3.Will data from a single topic be consumed by executors in parllel or only one receiver consumes in multiple threads and assign to executors in high level receiver based approach ? - They will consume the data parallel. For the receiver based approach, you can actually specify the number of receiver that you want to spawn for consuming the messages. On Tue, May 19, 2015 at 2:38 PM, Akhil Das ak...@sigmoidanalytics.com wrote: spark.streaming.concurrentJobs takes an integer value, not boolean. If you set it as 2 then 2 jobs will run parallel. Default value is 1 and the next job will start once it completes the current one. Actually, in the current implementation of Spark Streaming and under default configuration, only job is active (i.e. under execution) at any point of time. So if one batch's processing takes longer than 10 seconds, then then next batch's jobs will stay queued. This can be changed with an experimental Spark property spark.streaming.concurrentJobs which is by default set to 1. Its not currently documented (maybe I should add it). The reason it is set to 1 is that concurrent jobs can potentially lead to weird sharing of resources and which can make it hard to debug the whether there is sufficient resources in the system to process the ingested data fast enough. With only 1 job running at a time, it is easy to see that if batch processing time batch interval, then the system will be stable. Granted that this may not be the most efficient use of resources under certain conditions. We definitely hope to improve this in the future. Copied from TD's answer written in SO http://stackoverflow.com/questions/23528006/how-jobs-are-assigned-to-executors-in-spark-streaming . Non-receiver based streaming for example you can say are the fileStream, directStream ones. You can read a bit of information from here https://spark.apache.org/docs/1.3.1/streaming-kafka-integration.html Thanks Best Regards On Tue, May 19, 2015 at 2:13 PM, Shushant Arora shushantaror...@gmail.com wrote: Thanks Akhil. When I don't set spark.streaming.concurrentJobs to true. Will the all pending jobs starts one by one after 1 jobs completes,or it does not creates jobs which could not be started at its desired interval. And Whats the difference and usage of Receiver vs non-receiver based streaming. Is there any documentation for that? On Tue, May 19, 2015 at 1:35 PM, Akhil Das ak...@sigmoidanalytics.com wrote: It will be a single job running at a time by default (you can also configure the spark.streaming.concurrentJobs to run jobs parallel which is not recommended to put in production). Now, your batch duration being 1 sec and processing time being 2 minutes, if you are using a receiver based streaming then ideally those receivers will keep on receiving data while the job is running (which will accumulate in memory if you set StorageLevel as MEMORY_ONLY and end up in block not found exceptions as spark drops some blocks which are yet to process to accumulate new blocks). If you are using a non-receiver based approach, you will not have this problem of dropping blocks. Ideally, if your data is small and you have enough memory to hold your data then it will run smoothly without any issues. Thanks Best Regards On Tue, May 19, 2015 at