Hi Soumya,
I don’t think what you’ll end up measuring this way will be very useful. I 
mean, between the completion of the future and the triggering of the map there 
are multiple asynchronous boundaries… So you won’t be measuring how fast the 
set operation was, but how much time was between these asynchronous boundaries 
- which could have been backpressured by the way.

I suggest directly wrapping the set call with your measurement logic instead - 
since that is what you want to measure it seems.

By the way, we do have a “timed” element, in our extras section: 
https://github.com/akka/akka/blob/release-2.3-dev/akka-stream/src/main/scala/akka/stream/extra/Timed.scala
 You can `import Timed._` and then use it as shown here: 
https://github.com/akka/akka/blob/release-2.3-dev/akka-stream-tests/src/test/scala/akka/stream/extra/FlowTimedSpec.scala
It’s a rather old element and I’m not sure if we’ll be keeping it, but you can 
use it as a source of inspiration in case you end up needing that kind of 
measurement.



On 26 December 2014 at 05:46:55, Soumya Simanta ([email protected]) 
wrote:


This is related to this thread but sufficiently different that I decided to 
create new thread. Hope that's okay. 

I would like to create a histogram of latency of a large number of set 
operations ( set returns a Future[Boolean]) using LatencyUtils 

 Basically I need to start recording the time before the set operation (inside 
mapAsyncUnordered(k => redis.set(k + rnd, message))) and then somehow record 
the end time in a map operation( .map( //record the end time here) after this. 
I'm having a hard time trying to figure this out. 

My understanding is that the even though the mapAsyncUnordered doesn't maintain 
the order of operations the map following the mapAsynUnordered will maintain 
the order from the previous stage because of TCP maintaining the order. Is this 
correct? 


val redis = RedisClient("localhost")

val random1 = UUID.randomUUID().toString

def insertValues(rnd: String): Flow[Int, Boolean] = {
    Flow[Int].mapAsyncUnordered(k => redis.set(k + rnd, message)).map( //record 
the end time here)
}

val blackhole = BlackholeSink

val maxSeq = 5000000
val seqSource = Source( () => (1 to maxSeq).iterator )
val streamWithSeqSource = 
seqSource.via(insertValues(random1)).runWith(blackhole)


--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to