Thanks for the clarification. This helped a lot. 

I have a follow up question.
Is there a way to automatically put an async boundary around every 
element(processing unit) in the stream? 

>From what I understand, auto fusing merges element execution to the same 
actor there by avoiding buffers. If I set auto fusing to "off", does it 
mean all elements get async boundaries and there by get executed in 
different actors?

Thanks,
Vishnu.



On Thursday, 4 February 2016 00:13:29 UTC-8, drewhk wrote:
>
> Hi,
>
> Be aware that Streams 2.0 and later does auto-fusing, which means that by 
> default all of your stages will run in the same actor! In your case that 
> means of course that no parallelization will happen at all. In order to 
> make your graph truly parallel, you need to put an asynchronous boundary 
> around your "worker" flow, so it is materialized to a dedicated actor.
>
> Basically, Akka Streams allows you to define the concurrency/parallelism 
> boundaries of your graph in a purely declarative fashion! By default 
> everything shares the same actor, but you are free to slice or merge it 
> along almost arbitrary boundaries depending on the nature of your work.
>
> See: 
> http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.3/scala/stream-flows-and-basics.html#Operator_Fusion
>  
> for details.
>
> -Endre
>
>
>
> On Thu, Feb 4, 2016 at 6:09 AM, RC213V <[email protected] <javascript:>> 
> wrote:
>
>> Hi,
>>
>> I have a sample akka stream code where my source is a preloaded .5 Gb of 
>> data from disk which is split into lines. Each line is about 2.5Kb.
>> I create a  runnable graph where i add processing parallelism by doing a 
>> balance ~> processing flow ~> merge before it goes into a dummy sink.
>>
>> The problem is, what ever the value i set for parallelism(split 'N' ways 
>> and merge), the processing time remain the same. ~22 secs.
>> I varied the parallelism from 1 to 8 and got the same running time and my 
>> CPU usage never goes more than a core.
>>
>> I tried increasing the initial and max buffer but it didn't help. I also 
>> created my own dispatcher and tried using it in ActorMaterializerSettings 
>> but still got the same result.
>>
>> I am not sure what I am doing wrong here.
>>
>> I am running this test on a quad core mac book pro and with
>> -Xms5G
>> -Xmx5G
>>
>> I have attached the code below.
>>
>> I am using "com.typesafe.akka" %% "akka-stream" % "2.4.2-RC2"
>>
>> I want to process through the .5 Gb of data as soon as possible using all 
>> my CPU which is not happening now.
>>
>> I would really appreciate if someone can help me identify the performance 
>> problem.
>>
>> Thanks,
>> Vishnu.
>>
>>
>>
>> object StreamTest2 extends App{
>>     implicit val system = ActorSystem("indexer")
>>     implicit val mat = ActorMaterializer()
>>     val source = 
>> Source(scala.io.Source.fromFile("/tmp/collection512mb").getLines().toList)
>>     val parallelism = 8
>>
>>     val graph = RunnableGraph.fromGraph(GraphDSL.create(Sink.ignore) { 
>> implicit builder =>
>>         (snk) =>
>>             import GraphDSL.Implicits._
>>
>>             val balance = builder.add(Balance[String](parallelism))
>>             val merge = builder.add(Merge[Any](parallelism))
>>             val flow = Flow[String].map { str =>
>>                 str.split(" ").groupBy(identity).map(a => (a._1, 
>> a._2.length))
>>             }
>>
>>             source ~> balance.in
>>             for (i <- 0 until parallelism) {
>>                 balance.out(i) ~> flow ~> merge.in(i)
>>             }
>>             merge.out ~> snk.in
>>             ClosedShape
>>     })
>>
>>     val start = System.currentTimeMillis()
>>     val result = graph.run()
>>     implicit val ec =  system.dispatcher
>>     result.onComplete { _ =>
>>         println(s"Done ${(System.currentTimeMillis() - start) / 1000}s.")
>>         system.terminate()
>>     }
>> }
>>
>>
>>
>> -- 
>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>> >>>>>>>>>> Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
>> --- 
>> You received this message because you are subscribed to the Google Groups 
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to [email protected] <javascript:>.
>> To post to this group, send email to [email protected] 
>> <javascript:>.
>> Visit this group at https://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>
>

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to