I owe Endre a response about the documentation.

It's good.

One thing that might help would be if the section on operator fusion
described both .withAttributes(asyncBoundary) and x.viaAsync() with more
diagrams like the one shown, perhaps also including a groupBy for good
measure.

If I understand correctly, viaAsync wraps a box around a whole flow (the
argument) where .withAttributes wraps everything since the previous
boundary (or the beginning, if none).

Though I am now achieving the effect I desire, I still need to experiment
with different permutations to fully understand whether the withAttributes
or the viaAsync I inserted did the trick, and whether both are necessary.
Roland said somethng about the source needing an async boundary, but the
example in the operator fusion section of the docs only adds one after the
+1 operator.

On Fri, Feb 5, 2016 at 11:34 AM, Richard Rodseth <[email protected]> wrote:

> Progress! I modified the flow as follows, adding
>
> 1) withAttributes(Attributes.asyncBoundary) because Roland had a comment
> about the sources needing an async boundary.
> 2) groupByChannel and mergeSubstreams
> 3) viaAsync before the db-calling transformation
>
> Now I see the output folders and monthly files for each channel getting
> created in parallel, and filled in an interval at a time. Pretty cool to
> watch.
>
> I need to experiment further and/or read some more about the async
> boundaries - I'm not certain whether the withAttributes is in the right
> place and whether I picked the right transformation for the viaAsync.
>
>   def channelToBytesWritten(db: JdbcBackend.DatabaseDef, batchSize: Int,
> requestedRange:InstantRange): Flow[TenantSiteChannelInfo,(String,Long),Unit]
> = {
>
>     val result = Flow[TenantSiteChannelInfo].withAttributes(Attributes.
> asyncBoundary)
>
>       .groupBy(100000, c => c.channelId)
>
>       .via(Transformations.channelToChannelMonth(requestedRange))
>
>       .viaAsync(Transformations.channelMonthToChannelMonthInterval(db,
> batchSize))
>
>       .via(Transformations.channelMonthIntervalToBytesWritten)
>
>     .mergeSubstreams
>
>     result
>
>   }
>
> I did get the following error, so I'm not out of the woods yet.
>
> 2016-02-05 11:04:29,305 ERROR FileSubscriber: - Tearing down FileSink(...)
> due to upstream error
>
> org.postgresql.util.PSQLException: ERROR: canceling statement due to user
> request
>
> at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(
> QueryExecutorImpl.java:2161)
>
> at org.postgresql.core.v3.QueryExecutorImpl.processResults(
> QueryExecutorImpl.java:1890)
>
> at org.postgresql.core.v3.QueryExecutorImpl.execute(
> QueryExecutorImpl.java:255)
>
> at org.postgresql.jdbc2.AbstractJdbc2Statement.execute(
> AbstractJdbc2Statement.java:560)
>
> at org.postgresql.jdbc2.AbstractJdbc2Statement.executeWithFlags(
> AbstractJdbc2Statement.java:417)
>
> at org.postgresql.jdbc2.AbstractJdbc2Statement.execute(
> AbstractJdbc2Statement.java:410)
>
> at com.zaxxer.hikari.proxy.PreparedStatementProxy.execute(
> PreparedStatementProxy.java:44)
>
> at
> com.zaxxer.hikari.proxy.PreparedStatementJavassistProxy.execute(PreparedStatementJavassistProxy.java)
>
> at slick.jdbc.StatementInvoker.results(StatementInvoker.scala:39)
>
> at slick.jdbc.StatementInvoker.iteratorTo(StatementInvoker.scala:22)
>
> at slick.jdbc.StreamingInvokerAction$class.emitStream(
> StreamingInvokerAction.scala:28)
>
> at
> slick.driver.JdbcActionComponent$QueryActionExtensionMethodsImpl$$anon$1.emitStream(
> JdbcActionComponent.scala:218)
>
> at
> slick.driver.JdbcActionComponent$QueryActionExtensionMethodsImpl$$anon$1.emitStream(
> JdbcActionComponent.scala:218)
>
> at slick.backend.DatabaseComponent$DatabaseDef$$anon$3.run(
> DatabaseComponent.scala:285)
>
> at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
>
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
>
> at java.lang.Thread.run(Thread.java:745)
>
> On Fri, Feb 5, 2016 at 8:20 AM, Endre Varga <[email protected]>
> wrote:
>
>> Hi Richard,
>>
>> I recommend you to draw a simple diagram. Draw groupBy as a box with
>> multiple outputs, then draw the subsequent processing steps (Flow) as
>> another box. Now collect back the edges into a "flatten" box. Now you can
>> draw boundaries around various boxes (actors) and see how things relate.
>>
>> Then try it out in practice.
>>
>> -Endre
>>
>> On Fri, Feb 5, 2016 at 5:13 PM, Richard Rodseth <[email protected]>
>> wrote:
>>
>>> Thank you. But parallelism *between stages* is not the same as
>>> per-group-key parallelism, right?
>>> In the original post I included the pipeline to provide context.
>>> There are two database queries involved, (Channels, Intervals) both
>>> using Slick streaming rather than Future-returning db.run(action). So to
>>> build the stream of ChannelAndIntervals requires a flatMapConcat.
>>> So I'm basically wondering whether
>>> channels.groupBy(_.channelId).flatMap(channel =>
>>> Sources.intervalsForChannel(...)) would result in parallel
>>> intervals-for-channel queries.
>>> I think the answer is no.
>>> But since Endre said  groupBy(...).viaAsync(a) introduces an actor per
>>> substream, I'm thinking maybe
>>> groupBy(...).viaAsync(Transformations.dotheflatmapfromchannelstochannelintervals)
>>> *would* result in parallel interval-for-channel queries.
>>>
>>> Sorry to be so dense.
>>>
>>>
>>>
>>> On Fri, Feb 5, 2016 at 7:45 AM, Roland Kuhn <[email protected]> wrote:
>>>
>>>> The correct statement would be: “groupBy does not automatically
>>>> introduce any per-key parallelism.”
>>>>
>>>> There are several other combinators that can be used to introduce
>>>> parallelism between processing stages (like viaAsync, mapAsync and
>>>> potentially the flatMapX methods—if the provided sources declare async
>>>> boundaries). We try to keep the combinators and concepts as orthogonal and
>>>> composable as possible.
>>>>
>>>> Regards,
>>>>
>>>> Roland
>>>>
>>>> 5 feb 2016 kl. 16:24 skrev Richard Rodseth <[email protected]>:
>>>>
>>>> In an effort to be more succinct :) Is this a true statement?
>>>>
>>>> "groupBy does not automatically introduce any *per-key* parallelism,
>>>> unless followed by mapAsync"
>>>>
>>>> On Thu, Feb 4, 2016 at 3:05 PM, Richard Rodseth <[email protected]>
>>>> wrote:
>>>>
>>>>> I guess I'm still a bit confused by parallelism in akka streams, but
>>>>> let me describe what I have.
>>>>>
>>>>> Tenants have Sites which have Channels which have Intervals (start end
>>>>> value)
>>>>>
>>>>> My root source is a stream of TenantSiteChannelInfo (obtained from a
>>>>> join of channels with their sites and tenants)
>>>>>
>>>>> I am successfully writing files, one per channel-month arranged by
>>>>> tenant and site, eg.
>>>>>
>>>>> /archive/<tenantid>/<siteid>/<channelid>/<yearmonth>_<channelid>.txt
>>>>>
>>>>> using a flow like this
>>>>>
>>>>>   def channelToBytesWritten(db: JdbcBackend.DatabaseDef, batchSize:
>>>>> Int, requestedRange:InstantRange): 
>>>>> Flow[TenantSiteChannelInfo,(String,Long),Unit]
>>>>> = {
>>>>>
>>>>>     val result = Flow[TenantSiteChannelInfo]
>>>>>
>>>>>       .via(Transformations.channelToChannelMonths(requestedRange)) //
>>>>> uses flatMapConcat
>>>>>
>>>>>       .via(Transformations.channelMonthToChannelMonthIntervals(db,
>>>>> batchSize)) // uses flatMapConcat
>>>>>
>>>>>       .via(Transformations.channelMonthIntervalToBytesWritten) // see
>>>>> below
>>>>>
>>>>>     result
>>>>>
>>>>>   }
>>>>>
>>>>> Transformations.channelMonthIntervalToBytesWritten does a
>>>>> groupBy.prefixAndTail(1).mapAsync as discussed in a recent thread, where
>>>>> the key is (year-month,channelid)
>>>>>
>>>>> The result is I see the files showing up in the Finder in parallel,
>>>>> which is OK and fun to watch. The number of distinct keys is rather large
>>>>> but I could presumably throttle the source channels if necessary to 
>>>>> address
>>>>> that.
>>>>>
>>>>> But suppose I just/also wanted to process each *channel* (the very
>>>>> first type of stream element) in parallel? Does inserting a
>>>>> groupBy(_channelId).viaAsync at the beginning of this flow and merging
>>>>> substreams at the end achieve that, or is running the latter half of the
>>>>> flow inside mapAsync the only/best way?
>>>>>
>>>>> If the groupBy wouldn't achieve per-channel parallelism, is it fair to
>>>>> say that groupBy only has utility if followed by some sort of "folding", 
>>>>> as
>>>>> in the word count example or the file writing above, or if the receiving
>>>>> sink (shared by all substreams, an actor perhaps) can key off the same key
>>>>> in some way.
>>>>>
>>>>> Thanks.
>>>>>
>>>>
>>>>
>>>> --
>>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>>> >>>>>>>>>> Check the FAQ:
>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>> >>>>>>>>>> Search the archives:
>>>> https://groups.google.com/group/akka-user
>>>> ---
>>>> You received this message because you are subscribed to the Google
>>>> Groups "Akka User List" group.
>>>> To unsubscribe from this group and stop receiving emails from it, send
>>>> an email to [email protected].
>>>> To post to this group, send email to [email protected].
>>>> Visit this group at https://groups.google.com/group/akka-user.
>>>> For more options, visit https://groups.google.com/d/optout.
>>>>
>>>>
>>>>
>>>>
>>>> *Dr. Roland Kuhn*
>>>> *Akka Tech Lead*
>>>> Typesafe <http://typesafe.com/> – Reactive apps on the JVM.
>>>> twitter: @rolandkuhn
>>>> <http://twitter.com/#!/rolandkuhn>
>>>>
>>>> --
>>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>>> >>>>>>>>>> Check the FAQ:
>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>> >>>>>>>>>> Search the archives:
>>>> https://groups.google.com/group/akka-user
>>>> ---
>>>> You received this message because you are subscribed to the Google
>>>> Groups "Akka User List" group.
>>>> To unsubscribe from this group and stop receiving emails from it, send
>>>> an email to [email protected].
>>>> To post to this group, send email to [email protected].
>>>> Visit this group at https://groups.google.com/group/akka-user.
>>>> For more options, visit https://groups.google.com/d/optout.
>>>>
>>>
>>> --
>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>> >>>>>>>>>> Check the FAQ:
>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>> >>>>>>>>>> Search the archives:
>>> https://groups.google.com/group/akka-user
>>> ---
>>> You received this message because you are subscribed to the Google
>>> Groups "Akka User List" group.
>>> To unsubscribe from this group and stop receiving emails from it, send
>>> an email to [email protected].
>>> To post to this group, send email to [email protected].
>>> Visit this group at https://groups.google.com/group/akka-user.
>>> For more options, visit https://groups.google.com/d/optout.
>>>
>>
>> --
>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>> >>>>>>>>>> Check the FAQ:
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
>> ---
>> You received this message because you are subscribed to the Google Groups
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an
>> email to [email protected].
>> To post to this group, send email to [email protected].
>> Visit this group at https://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>
>

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to