I wondered if I could solve this with a combination of buffer and Balance,
but I see that Balance backpressures when the outputs backpressure, rather
than when the number of workers maxes out.

http://doc.akka.io/docs/akka/2.4.2/scala/stream/stream-cookbook.html#Balancing_jobs_to_a_fixed_pool_of_workers

I feel like I need a flatMapConcat that takes a "max" parameter, like
mapAsync does. What am I missing?

On Wed, Feb 17, 2016 at 8:31 PM, Richard Rodseth <[email protected]> wrote:

> I'm still missing something. I thought I had solved my problem of
> overwhelming Postgres by using buffer(), but today (after upgrading to
> 2.4.2-RC3, but that's probably coincidence) I am getting a lot of timeouts.
> As you can see below, I have two Slick sources, one nested via
> flatMapConcat.
>
> Even with all these superfluous buffer() calls, I still see a ton of 
> channel-month-for-interval-query
> messages
>
> I assume that the Slick DataBasePublisher is not signalling backpressure
> if waiting for a connection.
>
> How can I limit the nested slick stream call to something like the size of
> the connection pool?
>
> If the flow from second query on was run as a separate stream run in a
> mapAsync I suppose I could use the parallelism value and backpressure would
> flow back to the buffer, but how can I do it in one flow?
>
> val channels = Sources.channelsForFilter(db, channelFilter, 1000 /*fetchSize
> */ ) // First Slick call
>
>     val source = channels
>
>       .buffer(5, OverflowStrategy.backpressure) // Candidate 1
>
>       .flatMapConcat { channel =>
>
>         Sources.monthsForChannel(channel, requestedRange)
>
>       }
>
>       .buffer(5, OverflowStrategy.backpressure) // Candidate 2
>
>       .groupBy(100000, c => c.channel.channelId)
>
>       .buffer(5,OverflowStrategy.backpressure) // Candidate 3
>
>       .via(Transformations.progress(e => s"channel-month-for-interval-query
> $e")) // Message logged
>
>       .flatMapConcat { channelMonth =>
>
>         val r = Sources.intervalRowsForChannelAndTime(db, channelMonth.
> channel, channelMonth.period, batchSize) // Second Slick call
>
>           .buffer(5, OverflowStrategy.backpressure) // Candidate 4
>
>         r.map { x => ChannelAndInstantRangeAndInterval(channelMonth.
> channel, channelMonth.period, x) }
>
>       }
>
>       .via(Transformations.channelMonthIntervalToBytesWritten)
>
>       .mergeSubstreams
>
>       .via(Transformations.progress(fileWritten => s"File written $
> fileWritten"))
>
>     val sink = Sinks.countingSink
>
>     val runnable = source.toMat(sink)(Keep.right)
>

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to