UpdateStateByKey - How to improve performance?

2014-08-06 Thread Venkat Subramanian
The method

def updateStateByKey[S: ClassTag] ( updateFunc: (Seq[V], Option[S]) =
Option[S] ): DStream[(K, S)]

takes Dstream (K,V) and Produces DStream (K,S)  in Spark Streaming

We have a input Dstream(K,V) that has 40,000 elements. We update on average
of 1000  elements of them in every 3 second batch, but based on how this
updateStateByKey function is defined, we are looping through 40,000 elements
(Seq[V]) to make an update for just 1000 elements and not updating 39000
elements. I think looping through extra 39000 elements is a waste of
performance.

Isn't there a better way to update this efficiently by just figuring out the
a hash map for the 1000 elements that are required to be updated and just
updating it (without looping through the unwanted elements)?  Shouldn't
there be a Streaming update function provided that updates selective members
or are we missing some concepts here?

I think updateStateByKey may be causing lot of performance degradation in
our app as we keep doing this again and again for every batch. Please let us
know if my thought process is correct here.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/UpdateStateByKey-How-to-improve-performance-tp11575.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: UpdateStateByKey - How to improve performance?

2014-08-06 Thread Michael Malak
Depending on the density of your keys, the alternative signature

def updateStateByKey[S](updateFunc: (Iterator[(K, Seq[V], Option[S])]) ? 
Iterator[(K, S)], partitioner: Partitioner, rememberPartitioner: 
Boolean)(implicit arg0: ClassTag[S]): DStream[(K, S)] 

at least iterates by key rather than by (old) value.

I believe your thinking is correct that there might be a performance 
improvement opportunity for your case if there were an updateStateByKey() that 
instead iterated by (new) value.

BTW, my impression from the stock examples is that the signature I pasted above 
was intended to be the more typically called updateStateByKey(), as opposed to 
the one you pasted, for which my impression is that it is the more general 
purpose one. I have used the more general purpose one but only when I needed to 
peek into the entire set of states for some unusual reason.



On Wednesday, August 6, 2014 2:30 PM, Venkat Subramanian vsubr...@gmail.com 
wrote:
 


The method

def updateStateByKey[S: ClassTag] ( updateFunc: (Seq[V], Option[S]) =
Option[S] ): DStream[(K, S)]

takes Dstream (K,V) and Produces DStream (K,S)  in Spark Streaming

We have a input Dstream(K,V) that has 40,000 elements. We update on average
of 1000  elements of them in every 3 second batch, but based on how this
updateStateByKey function is defined, we are looping through 40,000 elements
(Seq[V]) to make an update for just 1000 elements and not updating 39000
elements. I think looping through extra 39000 elements is a waste of
performance.

Isn't there a better way to update this efficiently by just figuring out the
a hash map for the 1000 elements that are required to be updated and just
updating it (without looping through the unwanted elements)?  Shouldn't
there be a Streaming update function provided that updates selective members
or are we missing some concepts here?

I think updateStateByKey may be causing lot of performance degradation in
our app as we keep doing this again and again for every batch. Please let us
know if my thought process is correct here.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/UpdateStateByKey-How-to-improve-performance-tp11575.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Re: UpdateStateByKey - How to improve performance?

2014-08-06 Thread Tathagata Das
Hello Venkat,

Your thoughts are quite spot on. The current implementation was designed to
allow the functionality of timing out a state. For this to be possible, the
update function need to be called each key even if there is no new data, so
that the function can check things like last update time, etc to time
itself out and return a None as state. However, in Spark 1.2 I plan to
improve on the performance for such scenarios as yours.

For the time being, you could also try other techniques
http://spark.apache.org/docs/latest/streaming-programming-guide.html#reducing-the-processing-time-of-each-batch
for improving performance (if you havent tried already). You can also set
the storage level of dstream as non-serialized, which may improve perf.

TD


On Wed, Aug 6, 2014 at 1:29 PM, Venkat Subramanian vsubr...@gmail.com
wrote:

 The method

 def updateStateByKey[S: ClassTag] ( updateFunc: (Seq[V], Option[S]) =
 Option[S] ): DStream[(K, S)]

 takes Dstream (K,V) and Produces DStream (K,S)  in Spark Streaming

 We have a input Dstream(K,V) that has 40,000 elements. We update on average
 of 1000  elements of them in every 3 second batch, but based on how this
 updateStateByKey function is defined, we are looping through 40,000
 elements
 (Seq[V]) to make an update for just 1000 elements and not updating 39000
 elements. I think looping through extra 39000 elements is a waste of
 performance.

 Isn't there a better way to update this efficiently by just figuring out
 the
 a hash map for the 1000 elements that are required to be updated and just
 updating it (without looping through the unwanted elements)?  Shouldn't
 there be a Streaming update function provided that updates selective
 members
 or are we missing some concepts here?

 I think updateStateByKey may be causing lot of performance degradation in
 our app as we keep doing this again and again for every batch. Please let
 us
 know if my thought process is correct here.




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/UpdateStateByKey-How-to-improve-performance-tp11575.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org