Looks like my akka-streams code was not doing back pressure. Not sure how I 
can change it handle back pressure. 

Then I changed my code to the following. I borrowed the code from one of 
the Akka stream activator examples (WritePrimes). I added a buffer in 
between that also helped significantly. 


  val maxRandomNumberSize = 1000000
*  val randomSource = Source(() => 
Iterator.continually(ThreadLocalRandom.current().nextInt(maxRandomNumberSize)))*

  def insertValues : Flow[Int,Boolean] = {
    Flow[Int].mapAsyncUnordered(k => redis.set(k + random, message))
  }

  val blackhole = BlackholeSink

  //val stream  = source.via(insertValues).runWith(blackhole) //No buffer
  val streamWithRandomSource = randomSource.*buffer(20000, 
OverflowStrategy.backpressure)*.via(insertValues).runWith(blackhole)

Not the network IO looks much more uniform. It's a pleasure to see back 
pressure work (visually) :-)

<https://lh5.googleusercontent.com/-wpEVIqgvT3k/VJeVkEsl50I/AAAAAAAAvTc/ZGlyonJbof8/s1600/rediscala_network_io_1actor_backpressure.png>


I did see my CPU usage bump up in this version. Any reason why ? 


On Sunday, December 21, 2014 11:55:01 AM UTC-5, Soumya Simanta wrote:
>
> Here is my attempt to create a version with back pressure with Reactive 
> Stream. Not sure if it completely correct or not. Can someone please verify 
> if the code below is correct? 
> Even with this version I don't see any change is throughput and the 
> network IO graph looks very similar to what I had without using reactive 
> streams. 
>
> On the other hand if I use 100 Rediscala client actors the inserts of much 
> faster. I understand that now there are 100 queues (mailboxes) and 
> therefore its faster. But I still don't understand why the performance is 
> so bad for a single client after a certain threshold, even after using back 
> pressure (assuming I'm using Akka streams correctly).
>
>
> *Code with Akka streams and one Rediscala client. *
>
> import java.util.UUID
>
> import akka.actor.ActorSystem
> import akka.stream.FlowMaterializer
> import akka.stream.scaladsl.Source
> import akka.util.ByteString
> import redis.RedisClient
>
> object RedisStreamClient extends App {
>
>   val message = """How to explain ZeroMQ? Some of us start by saying all 
> the wonderful things it does. It's sockets on steroids. It's like mailboxes 
> with routing. It's fast! Others try to share their moment of enlightenment, 
> that zap-pow-kaboom satori paradigm-shift moment when it all became 
> obvious. Things just become simpler. Complexity goes away. It opens the 
> mind. Others try to explain by comparison. It's smaller, simpler, but still 
> looks familiar. Personally, I like to remember why we made ZeroMQ at all, 
> because that's most likely where you, the reader, still are today.How to 
> explain ZeroMQ? Some of us start by saying all the wonderful things it 
> does. It's sockets on steroids. It's like mailboxes with routing. It's 
> fast! Others try to share their moment of enlightenment, that 
> zap-pow-kaboom satori paradigm-shift moment when it all became obvious. 
> Things just become simpler. Complexity goes away. It opens the mind. Others 
> try to explain by comparison. It's smaller, simpler, but still looks 
> familiar. Personally, I like to remember why we made ZeroMQ at all, because 
> that's most likely where"""
>
>   implicit val system = ActorSystem("Sys")
>
>   implicit val materializer = FlowMaterializer()
>
>   val msgSize = message.getBytes.size
>
>   val redis = RedisClient()
>   implicit val ec = redis.executionContext
>   val random = UUID.randomUUID().toString
>
>   val source = Source( () => (1 to 1000000).iterator )
>   source.map{  x => x + 1 }.foreach( x => redis.set(random+x.toString, 
> ByteString(message) ) ).onComplete( _ => system.shutdown())
>
> }
>
>
> <https://lh4.googleusercontent.com/-5aG6CMqBLcA/VJb7J3qqdVI/AAAAAAAAvTE/kkdPVBT6AbQ/s1600/rediscala_network_IO.png>
>
>
>
> *Code for with 100 Rediscala clients.*
>
> import akka.actor.{ActorLogging, Props, Actor}
> import akka.util.ByteString
> import redisbenchmark.RedisBenchmarkActor.InsertValues
>
> import scala.concurrent.duration.Duration
> import scala.concurrent.{Await, Future}
> import redis.RedisClient
>
> import java.util.UUID
>
>
> object RedisLocalPerfMultipleActors {
>
>
>   def main(args: Array[String]) : Unit = {
>     implicit val akkaSystem = akka.actor.ActorSystem()
>     //create 100 RedisClient actors
>     val actors = 1 to 100
>     actors.map{ x => akkaSystem.actorOf(Props(new 
> RedisBenchmarkActor(10000)), "actor"+x) }.map{ actor => actor ! 
> InsertValues}
>
>     //TODO shutdown the actor system
> *    //Not sure how to wait for all Futures to complete before shutting 
> down the actor system*
>
>   }
>
> }
>
>
> class RedisBenchmarkActor(runs: Int) extends Actor with ActorLogging {
>
>   val redis = RedisClient()
>   //implicit val ec = redis.executionContext
>   log.info(s"Actor created with $runs ")
>
>   def receive = {
>
>     case InsertValues => {
>
>       log.info("Inserting values ")
>
>       val random = UUID.randomUUID().toString
>       val start = System.currentTimeMillis()
>       val result: Seq[Future[Boolean]] = for {i <- 1 to runs} yield {
>         //log.info("sending values ....")
>         redis.set(random + i.toString, 
> ByteString(RedisBenchmarkActor.message))
>       }
>
>     }
>   }
>
> }
>
>
> object RedisBenchmarkActor {
>
>   object InsertValues
>
>   val message = """How to explain ZeroMQ? Some of us start by saying all 
> the wonderful things it does. It's sockets on steroids. It's like mailboxes 
> with routing. It's fast! Others try to share their moment of enlightenment, 
> that zap-pow-kaboom satori paradigm-shift moment when it all became 
> obvious. Things just become simpler. Complexity goes away. It opens the 
> mind. Others try to explain by comparison. It's smaller, simpler, but still 
> looks familiar. Personally, I like to remember why we made ZeroMQ at all, 
> because that's most likely where you, the reader, still are today.How to 
> explain ZeroMQ? Some of us start by saying all the wonderful things it 
> does. It's sockets on steroids. It's like mailboxes with routing. It's 
> fast! Others try to share their moment of enlightenment, that 
> zap-pow-kaboom satori paradigm-shift moment when it all became obvious. 
> Things just become simpler. Complexity goes away. It opens the mind. Others 
> try to explain by comparison. It's smaller, simpler, but still looks 
> familiar. Personally, I like to remember why we made ZeroMQ at all, because 
> that's most likely where"""
>
> }
>
>
> <https://lh6.googleusercontent.com/-3ymr3gm5E9U/VJb7RnuFuGI/AAAAAAAAvTM/qKDLpLMvlF4/s1600/rediscala_network_io_100actors.png>
>
>
> On Saturday, December 20, 2014 9:07:53 AM UTC-5, Soumya Simanta wrote:
>>
>> Endre, thank you for responding. Following is what the author of 
>> Rediscala has to say. 
>>
>> *"Yes i noticed it during my tests, at some point the scale is 
>> exponential (bad).*
>>
>>
>> *I suspected the thread scheduler to be the limitation.Or the way 
>> Future.sequence works.*
>>
>> *If you can isolate a test that scale linearly up to 1M of futures, I 
>> would be interested to see it. By replacing akka-io with another java.nio 
>> library (xnio) I was able to pass the 1M req (at the speed of around 500k 
>> req/s)" *
>>
>> https://github.com/etaty/rediscala/issues/67
>>
>> If replacing akka-io with java.nio resolves this then either akka-io is 
>> not used correctly in Rediscala OR it is a fundamental limitation of 
>> akka-io. 
>>
>> My other responses inline. 
>>
>>
>> On Saturday, December 20, 2014 6:35:22 AM UTC-5, Akka Team wrote:
>>>
>>> Hi,
>>>
>>> My personal guess is that since you don't obey any backpressure when you 
>>> start flooding the redis client with requests you end up with a lot of 
>>> queued messages and probably high GC pressure. You can easily test this by 
>>> looking at the memory profile of your test.
>>>
>>>
>> Yes, the memory pressure is indeed high. The young generation (Edge) 
>> space fills up very quickly and then a minor GC is kicked off. 
>> Can I use akka-streams to resolve and add backpressure here? Any pointers 
>> here will be greatly appreciated. 
>>  
>>
>>>
>>>
>>> On Sat, Dec 20, 2014 at 6:55 AM, Soumya Simanta <[email protected]> 
>>> wrote:
>>>
>>>> val res: Future[List[Boolean]] = Future.sequence(result.toList) val end 
>>>> = System.currentTimeMillis() val diff = (end - start) println(s"for 
>>>> msgSize 
>>>> $msgSize and numOfRuns [$numberRuns] time is $diff ms ") 
>>>> akkaSystem.shutdown() 
>>>>
>>>> }
>>>>
>>> What does the above code intend to measure? Didn't you want to actually 
>>> wait on the "res" future?
>>>
>>> Yes, you are correct again. I should be waiting on res in order to get 
>> an estimate of overall latency. 
>>  
>>
>

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to