Hi, 

I'm trying to scale my application using the balancing graph as described 
in 
http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.1/scala/stream-cookbook.html#Balancing_jobs_to_a_fixed_pool_of_workers

The basic idea is to read data from server in multiple threads then merge 
streams and group by time frames (in my case - 2 seconds or 1048576 messages
)

The problem is - when i run application I have no performance improvement 
with 2 workers or 8 and more.
Even throughput is the same as flows without any balancing. 
But if i run several instances of the application the throughput is 
grows almost proportionally.

Thread dump shows that bottleneck is in the pinned dispatcher which reads 
data using one thread. 
So, the question is - how should i setup Materializer or dispatchers or 
something else to 
scale inside actor system but not adding the instances of application? 

Thanks, 
Viktor.

import akka.actor.ActorSystem
import akka.stream.scaladsl.Tcp.OutgoingConnection
import akka.stream.scaladsl._
import akka.stream.{ActorMaterializer, ActorMaterializerSettings, FlowShape}
import akka.util.ByteString

import scala.collection.immutable
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.language.postfixOps

object TcpReader {

  def main(args: Array[String]) {

    import akka.stream.io.Framing

    implicit val system = ActorSystem("tcp-reader")
    implicit def materializer = ActorMaterializer(
      ActorMaterializerSettings(system)
        .withInputBuffer(
          initialSize = 1048576,
          maxSize = 1048576).withDispatcher("my-dispatcher"))

    def balancer[In, Out](worker: Flow[In, Out, Any], workerCount: Int): 
Flow[In, Out, Unit] = {
      import GraphDSL.Implicits._

      Flow.fromGraph(GraphDSL.create() { implicit b =>
        val balancer = b.add(Balance[In](workerCount, waitForAllDownstreams = 
false))
        val merge = b.add(Merge[Out](workerCount))

        for (_ <- 1 to workerCount) {
          balancer ~> worker ~> merge
        }

        FlowShape(balancer.in, merge.out)
      })
    }

    def connection: Flow[ByteString, ByteString, Future[OutgoingConnection]] =
      Tcp().outgoingConnection("127.0.0.1", 9999)

    val transform: Flow[ByteString, immutable.Seq[String], Unit] = 
Flow[ByteString]
      .via(Framing.delimiter(
        ByteString("\n"),
        maximumFrameLength = 512,
        allowTruncation = false))
      .map(_.utf8String)
      .groupedWithin(1048576, 2 seconds)

    val print: Sink[Seq[String], Future[Unit]] = Sink.foreach[Seq[String]] {
      elem =>
        println(s"elems number = ${elem.size}")
    }

    Source.maybe.via(balancer(connection, workerCount = args(0).toInt))
      .via(transform)
      .runWith(print)

  }

}



-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to