Here is my attempt to create a version with back pressure with Reactive
Stream. Not sure if it completely correct or not. Can someone please verify
if the code below is correct?
Even with this version I don't see any change is throughput and the network
IO graph looks very similar to what I had without using reactive streams.
On the other hand if I use 100 Rediscala client actors the inserts of much
faster. I understand that now there are 100 queues (mailboxes) and
therefore its faster. But I still don't understand why the performance is
so bad for a single client after a certain threshold, even after using back
pressure (assuming I'm using Akka streams correctly).
*Code with Akka streams and one Rediscala client. *
import java.util.UUID
import akka.actor.ActorSystem
import akka.stream.FlowMaterializer
import akka.stream.scaladsl.Source
import akka.util.ByteString
import redis.RedisClient
object RedisStreamClient extends App {
val message = """How to explain ZeroMQ? Some of us start by saying all
the wonderful things it does. It's sockets on steroids. It's like mailboxes
with routing. It's fast! Others try to share their moment of enlightenment,
that zap-pow-kaboom satori paradigm-shift moment when it all became
obvious. Things just become simpler. Complexity goes away. It opens the
mind. Others try to explain by comparison. It's smaller, simpler, but still
looks familiar. Personally, I like to remember why we made ZeroMQ at all,
because that's most likely where you, the reader, still are today.How to
explain ZeroMQ? Some of us start by saying all the wonderful things it
does. It's sockets on steroids. It's like mailboxes with routing. It's
fast! Others try to share their moment of enlightenment, that
zap-pow-kaboom satori paradigm-shift moment when it all became obvious.
Things just become simpler. Complexity goes away. It opens the mind. Others
try to explain by comparison. It's smaller, simpler, but still looks
familiar. Personally, I like to remember why we made ZeroMQ at all, because
that's most likely where"""
implicit val system = ActorSystem("Sys")
implicit val materializer = FlowMaterializer()
val msgSize = message.getBytes.size
val redis = RedisClient()
implicit val ec = redis.executionContext
val random = UUID.randomUUID().toString
val source = Source( () => (1 to 1000000).iterator )
source.map{ x => x + 1 }.foreach( x => redis.set(random+x.toString,
ByteString(message) ) ).onComplete( _ => system.shutdown())
}
<https://lh4.googleusercontent.com/-5aG6CMqBLcA/VJb7J3qqdVI/AAAAAAAAvTE/kkdPVBT6AbQ/s1600/rediscala_network_IO.png>
*Code for with 100 Rediscala clients.*
import akka.actor.{ActorLogging, Props, Actor}
import akka.util.ByteString
import redisbenchmark.RedisBenchmarkActor.InsertValues
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future}
import redis.RedisClient
import java.util.UUID
object RedisLocalPerfMultipleActors {
def main(args: Array[String]) : Unit = {
implicit val akkaSystem = akka.actor.ActorSystem()
//create 100 RedisClient actors
val actors = 1 to 100
actors.map{ x => akkaSystem.actorOf(Props(new
RedisBenchmarkActor(10000)), "actor"+x) }.map{ actor => actor !
InsertValues}
//TODO shutdown the actor system
* //Not sure how to wait for all Futures to complete before shutting
down the actor system*
}
}
class RedisBenchmarkActor(runs: Int) extends Actor with ActorLogging {
val redis = RedisClient()
//implicit val ec = redis.executionContext
log.info(s"Actor created with $runs ")
def receive = {
case InsertValues => {
log.info("Inserting values ")
val random = UUID.randomUUID().toString
val start = System.currentTimeMillis()
val result: Seq[Future[Boolean]] = for {i <- 1 to runs} yield {
//log.info("sending values ....")
redis.set(random + i.toString,
ByteString(RedisBenchmarkActor.message))
}
}
}
}
object RedisBenchmarkActor {
object InsertValues
val message = """How to explain ZeroMQ? Some of us start by saying all
the wonderful things it does. It's sockets on steroids. It's like mailboxes
with routing. It's fast! Others try to share their moment of enlightenment,
that zap-pow-kaboom satori paradigm-shift moment when it all became
obvious. Things just become simpler. Complexity goes away. It opens the
mind. Others try to explain by comparison. It's smaller, simpler, but still
looks familiar. Personally, I like to remember why we made ZeroMQ at all,
because that's most likely where you, the reader, still are today.How to
explain ZeroMQ? Some of us start by saying all the wonderful things it
does. It's sockets on steroids. It's like mailboxes with routing. It's
fast! Others try to share their moment of enlightenment, that
zap-pow-kaboom satori paradigm-shift moment when it all became obvious.
Things just become simpler. Complexity goes away. It opens the mind. Others
try to explain by comparison. It's smaller, simpler, but still looks
familiar. Personally, I like to remember why we made ZeroMQ at all, because
that's most likely where"""
}
<https://lh6.googleusercontent.com/-3ymr3gm5E9U/VJb7RnuFuGI/AAAAAAAAvTM/qKDLpLMvlF4/s1600/rediscala_network_io_100actors.png>
On Saturday, December 20, 2014 9:07:53 AM UTC-5, Soumya Simanta wrote:
>
> Endre, thank you for responding. Following is what the author of Rediscala
> has to say.
>
> *"Yes i noticed it during my tests, at some point the scale is exponential
> (bad).*
>
>
> *I suspected the thread scheduler to be the limitation.Or the way
> Future.sequence works.*
>
> *If you can isolate a test that scale linearly up to 1M of futures, I
> would be interested to see it. By replacing akka-io with another java.nio
> library (xnio) I was able to pass the 1M req (at the speed of around 500k
> req/s)" *
>
> https://github.com/etaty/rediscala/issues/67
>
> If replacing akka-io with java.nio resolves this then either akka-io is
> not used correctly in Rediscala OR it is a fundamental limitation of
> akka-io.
>
> My other responses inline.
>
>
> On Saturday, December 20, 2014 6:35:22 AM UTC-5, Akka Team wrote:
>>
>> Hi,
>>
>> My personal guess is that since you don't obey any backpressure when you
>> start flooding the redis client with requests you end up with a lot of
>> queued messages and probably high GC pressure. You can easily test this by
>> looking at the memory profile of your test.
>>
>>
> Yes, the memory pressure is indeed high. The young generation (Edge) space
> fills up very quickly and then a minor GC is kicked off.
> Can I use akka-streams to resolve and add backpressure here? Any pointers
> here will be greatly appreciated.
>
>
>>
>>
>> On Sat, Dec 20, 2014 at 6:55 AM, Soumya Simanta <[email protected]>
>> wrote:
>>
>>> val res: Future[List[Boolean]] = Future.sequence(result.toList) val end
>>> = System.currentTimeMillis() val diff = (end - start) println(s"for msgSize
>>> $msgSize and numOfRuns [$numberRuns] time is $diff ms ")
>>> akkaSystem.shutdown()
>>>
>>> }
>>>
>> What does the above code intend to measure? Didn't you want to actually
>> wait on the "res" future?
>>
>> Yes, you are correct again. I should be waiting on res in order to get an
> estimate of overall latency.
>
>
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.