Here is my attempt to create a version with back pressure with Reactive 
Stream. Not sure if it completely correct or not. Can someone please verify 
if the code below is correct? 
Even with this version I don't see any change is throughput and the network 
IO graph looks very similar to what I had without using reactive streams. 

On the other hand if I use 100 Rediscala client actors the inserts of much 
faster. I understand that now there are 100 queues (mailboxes) and 
therefore its faster. But I still don't understand why the performance is 
so bad for a single client after a certain threshold, even after using back 
pressure (assuming I'm using Akka streams correctly).


*Code with Akka streams and one Rediscala client. *

import java.util.UUID

import akka.actor.ActorSystem
import akka.stream.FlowMaterializer
import akka.stream.scaladsl.Source
import akka.util.ByteString
import redis.RedisClient

object RedisStreamClient extends App {

  val message = """How to explain ZeroMQ? Some of us start by saying all 
the wonderful things it does. It's sockets on steroids. It's like mailboxes 
with routing. It's fast! Others try to share their moment of enlightenment, 
that zap-pow-kaboom satori paradigm-shift moment when it all became 
obvious. Things just become simpler. Complexity goes away. It opens the 
mind. Others try to explain by comparison. It's smaller, simpler, but still 
looks familiar. Personally, I like to remember why we made ZeroMQ at all, 
because that's most likely where you, the reader, still are today.How to 
explain ZeroMQ? Some of us start by saying all the wonderful things it 
does. It's sockets on steroids. It's like mailboxes with routing. It's 
fast! Others try to share their moment of enlightenment, that 
zap-pow-kaboom satori paradigm-shift moment when it all became obvious. 
Things just become simpler. Complexity goes away. It opens the mind. Others 
try to explain by comparison. It's smaller, simpler, but still looks 
familiar. Personally, I like to remember why we made ZeroMQ at all, because 
that's most likely where"""

  implicit val system = ActorSystem("Sys")

  implicit val materializer = FlowMaterializer()

  val msgSize = message.getBytes.size

  val redis = RedisClient()
  implicit val ec = redis.executionContext
  val random = UUID.randomUUID().toString

  val source = Source( () => (1 to 1000000).iterator )
  source.map{  x => x + 1 }.foreach( x => redis.set(random+x.toString, 
ByteString(message) ) ).onComplete( _ => system.shutdown())

}

<https://lh4.googleusercontent.com/-5aG6CMqBLcA/VJb7J3qqdVI/AAAAAAAAvTE/kkdPVBT6AbQ/s1600/rediscala_network_IO.png>



*Code for with 100 Rediscala clients.*

import akka.actor.{ActorLogging, Props, Actor}
import akka.util.ByteString
import redisbenchmark.RedisBenchmarkActor.InsertValues

import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future}
import redis.RedisClient

import java.util.UUID


object RedisLocalPerfMultipleActors {


  def main(args: Array[String]) : Unit = {
    implicit val akkaSystem = akka.actor.ActorSystem()
    //create 100 RedisClient actors
    val actors = 1 to 100
    actors.map{ x => akkaSystem.actorOf(Props(new 
RedisBenchmarkActor(10000)), "actor"+x) }.map{ actor => actor ! 
InsertValues}

    //TODO shutdown the actor system
*    //Not sure how to wait for all Futures to complete before shutting 
down the actor system*

  }

}


class RedisBenchmarkActor(runs: Int) extends Actor with ActorLogging {

  val redis = RedisClient()
  //implicit val ec = redis.executionContext
  log.info(s"Actor created with $runs ")

  def receive = {

    case InsertValues => {

      log.info("Inserting values ")

      val random = UUID.randomUUID().toString
      val start = System.currentTimeMillis()
      val result: Seq[Future[Boolean]] = for {i <- 1 to runs} yield {
        //log.info("sending values ....")
        redis.set(random + i.toString, 
ByteString(RedisBenchmarkActor.message))
      }

    }
  }

}


object RedisBenchmarkActor {

  object InsertValues

  val message = """How to explain ZeroMQ? Some of us start by saying all 
the wonderful things it does. It's sockets on steroids. It's like mailboxes 
with routing. It's fast! Others try to share their moment of enlightenment, 
that zap-pow-kaboom satori paradigm-shift moment when it all became 
obvious. Things just become simpler. Complexity goes away. It opens the 
mind. Others try to explain by comparison. It's smaller, simpler, but still 
looks familiar. Personally, I like to remember why we made ZeroMQ at all, 
because that's most likely where you, the reader, still are today.How to 
explain ZeroMQ? Some of us start by saying all the wonderful things it 
does. It's sockets on steroids. It's like mailboxes with routing. It's 
fast! Others try to share their moment of enlightenment, that 
zap-pow-kaboom satori paradigm-shift moment when it all became obvious. 
Things just become simpler. Complexity goes away. It opens the mind. Others 
try to explain by comparison. It's smaller, simpler, but still looks 
familiar. Personally, I like to remember why we made ZeroMQ at all, because 
that's most likely where"""

}

<https://lh6.googleusercontent.com/-3ymr3gm5E9U/VJb7RnuFuGI/AAAAAAAAvTM/qKDLpLMvlF4/s1600/rediscala_network_io_100actors.png>


On Saturday, December 20, 2014 9:07:53 AM UTC-5, Soumya Simanta wrote:
>
> Endre, thank you for responding. Following is what the author of Rediscala 
> has to say. 
>
> *"Yes i noticed it during my tests, at some point the scale is exponential 
> (bad).*
>
>
> *I suspected the thread scheduler to be the limitation.Or the way 
> Future.sequence works.*
>
> *If you can isolate a test that scale linearly up to 1M of futures, I 
> would be interested to see it. By replacing akka-io with another java.nio 
> library (xnio) I was able to pass the 1M req (at the speed of around 500k 
> req/s)" *
>
> https://github.com/etaty/rediscala/issues/67
>
> If replacing akka-io with java.nio resolves this then either akka-io is 
> not used correctly in Rediscala OR it is a fundamental limitation of 
> akka-io. 
>
> My other responses inline. 
>
>
> On Saturday, December 20, 2014 6:35:22 AM UTC-5, Akka Team wrote:
>>
>> Hi,
>>
>> My personal guess is that since you don't obey any backpressure when you 
>> start flooding the redis client with requests you end up with a lot of 
>> queued messages and probably high GC pressure. You can easily test this by 
>> looking at the memory profile of your test.
>>
>>
> Yes, the memory pressure is indeed high. The young generation (Edge) space 
> fills up very quickly and then a minor GC is kicked off. 
> Can I use akka-streams to resolve and add backpressure here? Any pointers 
> here will be greatly appreciated. 
>  
>
>>
>>
>> On Sat, Dec 20, 2014 at 6:55 AM, Soumya Simanta <[email protected]> 
>> wrote:
>>
>>> val res: Future[List[Boolean]] = Future.sequence(result.toList) val end 
>>> = System.currentTimeMillis() val diff = (end - start) println(s"for msgSize 
>>> $msgSize and numOfRuns [$numberRuns] time is $diff ms ") 
>>> akkaSystem.shutdown() 
>>>
>>> }
>>>
>> What does the above code intend to measure? Didn't you want to actually 
>> wait on the "res" future?
>>
>> Yes, you are correct again. I should be waiting on res in order to get an 
> estimate of overall latency. 
>  
>

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to