This was a very helpful response, thanks Endre. By using flatMapMerge and a few buffers, I think I have got the queries to postgres under control. I have a similar problem now on the sink side of the stream - I am opening too many files at once. The file write takes a source of ByteStrings and returns a Future:
def writeFile(source: Source[ByteString, akka.NotUsed], file: File): Future[IOResult] So it gets run in a mapAsync. But I have to group the stream of channel-month-intervals by channel-month because each channel-month goes in one file. And if I mapAsync after groupBy I'm getting a file created for each group. Things that come to mind are using a balancer (pre-grouping), but I'm not sure that would work, or sending the groups to a sink rather than merging the substreams. Thoughts? On Thu, Feb 18, 2016 at 8:29 AM, Richard Rodseth <[email protected]> wrote: > That makes sense. I thought the groups got created as needed, and the max > value has to be greater than the number of possible distinct groups. > I'll have to re-think the flow. Maybe I can do a flatMapMerge and do the > grouping later. > Thanks. > > On Thu, Feb 18, 2016 at 8:07 AM, Endre Varga <[email protected]> > wrote: > >> >> >> On Thu, Feb 18, 2016 at 4:57 PM, Richard Rodseth <[email protected]> >> wrote: >> >>> I wondered if I could solve this with a combination of buffer and >>> Balance, but I see that Balance backpressures when the outputs >>> backpressure, rather than when the number of workers maxes out. >>> >>> >>> http://doc.akka.io/docs/akka/2.4.2/scala/stream/stream-cookbook.html#Balancing_jobs_to_a_fixed_pool_of_workers >>> >>> I feel like I need a flatMapConcat that takes a "max" parameter, like >>> mapAsync does. What am I missing? >>> >> >> There is a max, and it is exactly one :) >> >> flatMapConcat == flattenMerge(1) >> >> Your issue is that your flatMapConcat executes on every group so it is >> the group count that matters here. >> >> -Endre >> >> >>> >>> On Wed, Feb 17, 2016 at 8:31 PM, Richard Rodseth <[email protected]> >>> wrote: >>> >>>> I'm still missing something. I thought I had solved my problem of >>>> overwhelming Postgres by using buffer(), but today (after upgrading to >>>> 2.4.2-RC3, but that's probably coincidence) I am getting a lot of timeouts. >>>> As you can see below, I have two Slick sources, one nested via >>>> flatMapConcat. >>>> >>>> Even with all these superfluous buffer() calls, I still see a ton of >>>> channel-month-for-interval-query >>>> messages >>>> >>>> I assume that the Slick DataBasePublisher is not signalling >>>> backpressure if waiting for a connection. >>>> >>>> How can I limit the nested slick stream call to something like the size >>>> of the connection pool? >>>> >>>> If the flow from second query on was run as a separate stream run in a >>>> mapAsync I suppose I could use the parallelism value and backpressure would >>>> flow back to the buffer, but how can I do it in one flow? >>>> >>>> val channels = Sources.channelsForFilter(db, channelFilter, 1000 >>>> /*fetchSize >>>> */ ) // First Slick call >>>> >>>> val source = channels >>>> >>>> .buffer(5, OverflowStrategy.backpressure) // Candidate 1 >>>> >>>> .flatMapConcat { channel => >>>> >>>> Sources.monthsForChannel(channel, requestedRange) >>>> >>>> } >>>> >>>> .buffer(5, OverflowStrategy.backpressure) // Candidate 2 >>>> >>>> .groupBy(100000, c => c.channel.channelId) >>>> >>>> .buffer(5,OverflowStrategy.backpressure) // Candidate 3 >>>> >>>> .via(Transformations.progress(e => s"channel-month-for-interval-query >>>> $e")) // Message logged >>>> >>>> .flatMapConcat { channelMonth => >>>> >>>> val r = Sources.intervalRowsForChannelAndTime(db, channelMonth. >>>> channel, channelMonth.period, batchSize) // Second Slick call >>>> >>>> .buffer(5, OverflowStrategy.backpressure) // Candidate 4 >>>> >>>> r.map { x => ChannelAndInstantRangeAndInterval(channelMonth. >>>> channel, channelMonth.period, x) } >>>> >>>> } >>>> >>>> .via(Transformations.channelMonthIntervalToBytesWritten) >>>> >>>> .mergeSubstreams >>>> >>>> .via(Transformations.progress(fileWritten => s"File written $ >>>> fileWritten")) >>>> >>>> val sink = Sinks.countingSink >>>> >>>> val runnable = source.toMat(sink)(Keep.right) >>>> >>> >>> -- >>> >>>>>>>>>> Read the docs: http://akka.io/docs/ >>> >>>>>>>>>> Check the FAQ: >>> http://doc.akka.io/docs/akka/current/additional/faq.html >>> >>>>>>>>>> Search the archives: >>> https://groups.google.com/group/akka-user >>> --- >>> You received this message because you are subscribed to the Google >>> Groups "Akka User List" group. >>> To unsubscribe from this group and stop receiving emails from it, send >>> an email to [email protected]. >>> To post to this group, send email to [email protected]. >>> Visit this group at https://groups.google.com/group/akka-user. >>> For more options, visit https://groups.google.com/d/optout. >>> >> >> -- >> >>>>>>>>>> Read the docs: http://akka.io/docs/ >> >>>>>>>>>> Check the FAQ: >> http://doc.akka.io/docs/akka/current/additional/faq.html >> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user >> --- >> You received this message because you are subscribed to the Google Groups >> "Akka User List" group. >> To unsubscribe from this group and stop receiving emails from it, send an >> email to [email protected]. >> To post to this group, send email to [email protected]. >> Visit this group at https://groups.google.com/group/akka-user. >> For more options, visit https://groups.google.com/d/optout. >> > > -- >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>>>>> Check the FAQ: >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. To post to this group, send email to [email protected]. Visit this group at https://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
