Hi tathagata.  I actually had a few minor improvements to spark streaming
in SPARK-4040.  possibly i could weave this in w/ my pr ?

On Wed, Oct 29, 2014 at 1:59 PM, Tathagata Das <tathagata.das1...@gmail.com>
wrote:

> Good idea, will do for 1.2 release.
> On Oct 29, 2014 9:50 AM, "Gerard Maas" <gerard.m...@gmail.com> wrote:
>
>> Hi TD,
>>
>> Thanks a lot for the comprehensive answer.
>>
>> I think this explanation deserves some place in the Spark Streaming
>> tuning guide.
>>
>> -kr, Gerard.
>>
>> On Thu, Oct 23, 2014 at 11:41 PM, Tathagata Das <
>> tathagata.das1...@gmail.com> wrote:
>>
>>> Hey Gerard,
>>>
>>> This is a very good question!
>>>
>>> *TL;DR: *The performance should be same, except in case of
>>> shuffle-based operations where the number of reducers is not explicitly
>>> specified.
>>>
>>> Let me answer in more detail by dividing the set of DStream operations
>>> into three categories.
>>>
>>> *1. Map-like operations (map, flatmap, filter, etc.) that does not
>>> involve any shuffling of data:* Performance should virtually be the
>>> same in both cases. Either ways, in each batch, the operations on the
>>> batch's RDD are first set on the driver, and then the actions like on the
>>> RDD are executed. There are very very minor differences in the two cases of
>>> early foreachRDD and late foreachRDD (e.x, cleaning up for function
>>> closures, etc.) but those should make almost not difference in the
>>> performance.
>>>
>>> *2. Operations involving shuffle: *Here is there is a subtle difference
>>> in both cases if the number of partitions is not specified. The default
>>> number of partitions used when using dstream.reduceByKey() and than when
>>> using dstream.foreachRDD(_.reduceByKey()) are different, and one needs to
>>> play around with the number of reducers to see what performs better. But if
>>> the number of reducers is explicitly specified and is the same both cases,
>>> then the performance should be similar. Note that this difference in the
>>> default numbers are not guaranteed to be like this, it could change in
>>> future implementations.
>>>
>>> *3. Aggregation-like operations (count, reduce): *Here there is another
>>> subtle execution difference between
>>> - dstream.count() which produces a DStream of single-element RDDs, the
>>> element being the count, and
>>> - dstream.foreachRDD(_.count()) which returns the count directly.
>>>
>>> In the first case, some random worker node is chosen for the reduce, in
>>> another the driver is chosen for the reduce. There should not be a
>>> significant performance difference.
>>>
>>> *4. Other operations* including window ops and stateful ops
>>> (updateStateByKey), are obviously not part of the discussion as they cannot
>>> be (easily) done through early foreachRDD.
>>>
>>> Hope this helps!
>>>
>>> TD
>>>
>>> PS: Sorry for not noticing this question earlier.
>>>
>>> On Wed, Oct 22, 2014 at 5:37 AM, Gerard Maas <gerard.m...@gmail.com>
>>> wrote:
>>>
>>>> PS: Just to clarify my statement:
>>>>
>>>> >>Unlike the feared RDD operations on the driver, it's my
>>>> understanding that these Dstream ops on the driver are merely creating an
>>>> execution plan for each RDD.
>>>>
>>>> With "feared RDD operations on the driver" I meant to contrast an rdd
>>>> action like rdd.collect that would pull all rdd data to the driver, with
>>>> dstream.foreachRDD(rdd => rdd.op) for which documentation says 'it runs on
>>>> the driver' yet, all that it looks to be running on the driver is the
>>>> scheduling of 'op' on that rdd, just like it happens for all rdd other
>>>> operations
>>>> (thanks to Sean for the clarification)
>>>>
>>>> So, not to move focus away from the original question:
>>>>
>>>> In Spark Streaming, would it be better to do foreachRDD early in a
>>>> pipeline or instead do as much Dstream transformations before going into
>>>> the foreachRDD call?
>>>>
>>>> Between these two pieces of code, from a performance perspective, what
>>>> would be preferred and why:
>>>>
>>>> - Early foreachRDD:
>>>>
>>>> dstream.foreachRDD(rdd =>
>>>>     val records = rdd.map(elem => record(elem))
>>>>     targets.foreach(target => records.filter{record =>
>>>> isTarget(target,record)}.writeToCassandra(target,table))
>>>> )
>>>>
>>>> - As most dstream transformations as possible before foreachRDD:
>>>>
>>>> val recordStream = dstream.map(elem => record(elem))
>>>> targets.foreach{target => recordStream.filter(record =>
>>>> isTarget(target,record)).foreachRDD(_.writeToCassandra(target,table))}
>>>>
>>>> ?
>>>>
>>>> kr, Gerard.
>>>>
>>>>
>>>>
>>>> On Wed, Oct 22, 2014 at 2:12 PM, Gerard Maas <gerard.m...@gmail.com>
>>>> wrote:
>>>>
>>>>> Thanks Matt,
>>>>>
>>>>> Unlike the feared RDD operations on the driver, it's my understanding
>>>>> that these Dstream ops on the driver are merely creating an execution plan
>>>>> for each RDD.
>>>>> My question still remains: Is it better to foreachRDD early in the
>>>>> process or do as much Dstream transformations before going into the
>>>>> foreachRDD call?
>>>>>
>>>>> Maybe this will require some empirical testing specific to each
>>>>> implementation?
>>>>>
>>>>> -kr, Gerard.
>>>>>
>>>>>
>>>>> On Mon, Oct 20, 2014 at 5:07 PM, Matt Narrell <matt.narr...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> http://spark.apache.org/docs/latest/streaming-programming-guide.html
>>>>>>
>>>>>> foreachRDD is executed on the driver….
>>>>>>
>>>>>> mn
>>>>>>
>>>>>> On Oct 20, 2014, at 3:07 AM, Gerard Maas <gerard.m...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>> Pinging TD  -- I'm sure you know :-)
>>>>>>
>>>>>> -kr, Gerard.
>>>>>>
>>>>>> On Fri, Oct 17, 2014 at 11:20 PM, Gerard Maas <gerard.m...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> We have been implementing several Spark Streaming jobs that are
>>>>>>> basically processing data and inserting it into Cassandra, sorting it 
>>>>>>> among
>>>>>>> different keyspaces.
>>>>>>>
>>>>>>> We've been following the pattern:
>>>>>>>
>>>>>>> dstream.foreachRDD(rdd =>
>>>>>>>     val records = rdd.map(elem => record(elem))
>>>>>>>     targets.foreach(target => records.filter{record =>
>>>>>>> isTarget(target,record)}.writeToCassandra(target,table))
>>>>>>> )
>>>>>>>
>>>>>>> I've been wondering whether there would be a performance difference
>>>>>>> in transforming the dstream instead of transforming the RDD within the
>>>>>>> dstream with regards to how the transformations get scheduled.
>>>>>>>
>>>>>>> Instead of the RDD-centric computation, I could transform the
>>>>>>> dstream until the last step, where I need an rdd to store.
>>>>>>> For example, the  previous  transformation could be written as:
>>>>>>>
>>>>>>> val recordStream = dstream.map(elem => record(elem))
>>>>>>> targets.foreach{target => recordStream.filter(record =>
>>>>>>> isTarget(target,record)).foreachRDD(_.writeToCassandra(target,table))}
>>>>>>>
>>>>>>> Would  be a difference in execution and/or performance?  What would
>>>>>>> be the preferred way to do this?
>>>>>>>
>>>>>>> Bonus question: Is there a better (more performant) way to sort the
>>>>>>> data in different "buckets" instead of filtering the data collection 
>>>>>>> times
>>>>>>> the #buckets?
>>>>>>>
>>>>>>> thanks,  Gerard.
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>


-- 
jay vyas

Reply via email to