Hi tathagata. I actually had a few minor improvements to spark streaming in SPARK-4040. possibly i could weave this in w/ my pr ?
On Wed, Oct 29, 2014 at 1:59 PM, Tathagata Das <tathagata.das1...@gmail.com> wrote: > Good idea, will do for 1.2 release. > On Oct 29, 2014 9:50 AM, "Gerard Maas" <gerard.m...@gmail.com> wrote: > >> Hi TD, >> >> Thanks a lot for the comprehensive answer. >> >> I think this explanation deserves some place in the Spark Streaming >> tuning guide. >> >> -kr, Gerard. >> >> On Thu, Oct 23, 2014 at 11:41 PM, Tathagata Das < >> tathagata.das1...@gmail.com> wrote: >> >>> Hey Gerard, >>> >>> This is a very good question! >>> >>> *TL;DR: *The performance should be same, except in case of >>> shuffle-based operations where the number of reducers is not explicitly >>> specified. >>> >>> Let me answer in more detail by dividing the set of DStream operations >>> into three categories. >>> >>> *1. Map-like operations (map, flatmap, filter, etc.) that does not >>> involve any shuffling of data:* Performance should virtually be the >>> same in both cases. Either ways, in each batch, the operations on the >>> batch's RDD are first set on the driver, and then the actions like on the >>> RDD are executed. There are very very minor differences in the two cases of >>> early foreachRDD and late foreachRDD (e.x, cleaning up for function >>> closures, etc.) but those should make almost not difference in the >>> performance. >>> >>> *2. Operations involving shuffle: *Here is there is a subtle difference >>> in both cases if the number of partitions is not specified. The default >>> number of partitions used when using dstream.reduceByKey() and than when >>> using dstream.foreachRDD(_.reduceByKey()) are different, and one needs to >>> play around with the number of reducers to see what performs better. But if >>> the number of reducers is explicitly specified and is the same both cases, >>> then the performance should be similar. Note that this difference in the >>> default numbers are not guaranteed to be like this, it could change in >>> future implementations. >>> >>> *3. Aggregation-like operations (count, reduce): *Here there is another >>> subtle execution difference between >>> - dstream.count() which produces a DStream of single-element RDDs, the >>> element being the count, and >>> - dstream.foreachRDD(_.count()) which returns the count directly. >>> >>> In the first case, some random worker node is chosen for the reduce, in >>> another the driver is chosen for the reduce. There should not be a >>> significant performance difference. >>> >>> *4. Other operations* including window ops and stateful ops >>> (updateStateByKey), are obviously not part of the discussion as they cannot >>> be (easily) done through early foreachRDD. >>> >>> Hope this helps! >>> >>> TD >>> >>> PS: Sorry for not noticing this question earlier. >>> >>> On Wed, Oct 22, 2014 at 5:37 AM, Gerard Maas <gerard.m...@gmail.com> >>> wrote: >>> >>>> PS: Just to clarify my statement: >>>> >>>> >>Unlike the feared RDD operations on the driver, it's my >>>> understanding that these Dstream ops on the driver are merely creating an >>>> execution plan for each RDD. >>>> >>>> With "feared RDD operations on the driver" I meant to contrast an rdd >>>> action like rdd.collect that would pull all rdd data to the driver, with >>>> dstream.foreachRDD(rdd => rdd.op) for which documentation says 'it runs on >>>> the driver' yet, all that it looks to be running on the driver is the >>>> scheduling of 'op' on that rdd, just like it happens for all rdd other >>>> operations >>>> (thanks to Sean for the clarification) >>>> >>>> So, not to move focus away from the original question: >>>> >>>> In Spark Streaming, would it be better to do foreachRDD early in a >>>> pipeline or instead do as much Dstream transformations before going into >>>> the foreachRDD call? >>>> >>>> Between these two pieces of code, from a performance perspective, what >>>> would be preferred and why: >>>> >>>> - Early foreachRDD: >>>> >>>> dstream.foreachRDD(rdd => >>>> val records = rdd.map(elem => record(elem)) >>>> targets.foreach(target => records.filter{record => >>>> isTarget(target,record)}.writeToCassandra(target,table)) >>>> ) >>>> >>>> - As most dstream transformations as possible before foreachRDD: >>>> >>>> val recordStream = dstream.map(elem => record(elem)) >>>> targets.foreach{target => recordStream.filter(record => >>>> isTarget(target,record)).foreachRDD(_.writeToCassandra(target,table))} >>>> >>>> ? >>>> >>>> kr, Gerard. >>>> >>>> >>>> >>>> On Wed, Oct 22, 2014 at 2:12 PM, Gerard Maas <gerard.m...@gmail.com> >>>> wrote: >>>> >>>>> Thanks Matt, >>>>> >>>>> Unlike the feared RDD operations on the driver, it's my understanding >>>>> that these Dstream ops on the driver are merely creating an execution plan >>>>> for each RDD. >>>>> My question still remains: Is it better to foreachRDD early in the >>>>> process or do as much Dstream transformations before going into the >>>>> foreachRDD call? >>>>> >>>>> Maybe this will require some empirical testing specific to each >>>>> implementation? >>>>> >>>>> -kr, Gerard. >>>>> >>>>> >>>>> On Mon, Oct 20, 2014 at 5:07 PM, Matt Narrell <matt.narr...@gmail.com> >>>>> wrote: >>>>> >>>>>> http://spark.apache.org/docs/latest/streaming-programming-guide.html >>>>>> >>>>>> foreachRDD is executed on the driver…. >>>>>> >>>>>> mn >>>>>> >>>>>> On Oct 20, 2014, at 3:07 AM, Gerard Maas <gerard.m...@gmail.com> >>>>>> wrote: >>>>>> >>>>>> Pinging TD -- I'm sure you know :-) >>>>>> >>>>>> -kr, Gerard. >>>>>> >>>>>> On Fri, Oct 17, 2014 at 11:20 PM, Gerard Maas <gerard.m...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> We have been implementing several Spark Streaming jobs that are >>>>>>> basically processing data and inserting it into Cassandra, sorting it >>>>>>> among >>>>>>> different keyspaces. >>>>>>> >>>>>>> We've been following the pattern: >>>>>>> >>>>>>> dstream.foreachRDD(rdd => >>>>>>> val records = rdd.map(elem => record(elem)) >>>>>>> targets.foreach(target => records.filter{record => >>>>>>> isTarget(target,record)}.writeToCassandra(target,table)) >>>>>>> ) >>>>>>> >>>>>>> I've been wondering whether there would be a performance difference >>>>>>> in transforming the dstream instead of transforming the RDD within the >>>>>>> dstream with regards to how the transformations get scheduled. >>>>>>> >>>>>>> Instead of the RDD-centric computation, I could transform the >>>>>>> dstream until the last step, where I need an rdd to store. >>>>>>> For example, the previous transformation could be written as: >>>>>>> >>>>>>> val recordStream = dstream.map(elem => record(elem)) >>>>>>> targets.foreach{target => recordStream.filter(record => >>>>>>> isTarget(target,record)).foreachRDD(_.writeToCassandra(target,table))} >>>>>>> >>>>>>> Would be a difference in execution and/or performance? What would >>>>>>> be the preferred way to do this? >>>>>>> >>>>>>> Bonus question: Is there a better (more performant) way to sort the >>>>>>> data in different "buckets" instead of filtering the data collection >>>>>>> times >>>>>>> the #buckets? >>>>>>> >>>>>>> thanks, Gerard. >>>>>>> >>>>>>> >>>>>> >>>>>> >>>>> >>>> >>> >> -- jay vyas