This was a very helpful response, thanks Endre. By using flatMapMerge and a
few buffers, I think I have got the queries to postgres under control.
I have a similar problem now on the sink side of the stream - I am opening
too many files at once. The file write takes a source of ByteStrings and
returns a Future:

  def writeFile(source: Source[ByteString, akka.NotUsed], file: File):
Future[IOResult]

So it gets run in a mapAsync. But I have to group the stream of
channel-month-intervals by channel-month because each channel-month goes in
one file. And if I mapAsync after groupBy I'm getting a file created for
each group.

Things that come to mind are using a balancer (pre-grouping), but I'm not
sure that would work, or sending the groups to a sink rather than merging
the substreams.

Thoughts?



On Thu, Feb 18, 2016 at 8:29 AM, Richard Rodseth <[email protected]> wrote:

> That makes sense. I thought the groups got created as needed, and the max
> value has to be greater than the number of possible distinct groups.
> I'll have to re-think the flow. Maybe I can do a flatMapMerge and do the
> grouping later.
> Thanks.
>
> On Thu, Feb 18, 2016 at 8:07 AM, Endre Varga <[email protected]>
> wrote:
>
>>
>>
>> On Thu, Feb 18, 2016 at 4:57 PM, Richard Rodseth <[email protected]>
>> wrote:
>>
>>> I wondered if I could solve this with a combination of buffer and
>>> Balance, but I see that Balance backpressures when the outputs
>>> backpressure, rather than when the number of workers maxes out.
>>>
>>>
>>> http://doc.akka.io/docs/akka/2.4.2/scala/stream/stream-cookbook.html#Balancing_jobs_to_a_fixed_pool_of_workers
>>>
>>> I feel like I need a flatMapConcat that takes a "max" parameter, like
>>> mapAsync does. What am I missing?
>>>
>>
>> There is a max, and it is exactly one :)
>>
>> flatMapConcat == flattenMerge(1)
>>
>> Your issue is that your flatMapConcat executes on every group so it is
>> the group count that matters here.
>>
>> -Endre
>>
>>
>>>
>>> On Wed, Feb 17, 2016 at 8:31 PM, Richard Rodseth <[email protected]>
>>> wrote:
>>>
>>>> I'm still missing something. I thought I had solved my problem of
>>>> overwhelming Postgres by using buffer(), but today (after upgrading to
>>>> 2.4.2-RC3, but that's probably coincidence) I am getting a lot of timeouts.
>>>> As you can see below, I have two Slick sources, one nested via
>>>> flatMapConcat.
>>>>
>>>> Even with all these superfluous buffer() calls, I still see a ton of 
>>>> channel-month-for-interval-query
>>>> messages
>>>>
>>>> I assume that the Slick DataBasePublisher is not signalling
>>>> backpressure if waiting for a connection.
>>>>
>>>> How can I limit the nested slick stream call to something like the size
>>>> of the connection pool?
>>>>
>>>> If the flow from second query on was run as a separate stream run in a
>>>> mapAsync I suppose I could use the parallelism value and backpressure would
>>>> flow back to the buffer, but how can I do it in one flow?
>>>>
>>>> val channels = Sources.channelsForFilter(db, channelFilter, 1000 
>>>> /*fetchSize
>>>> */ ) // First Slick call
>>>>
>>>>     val source = channels
>>>>
>>>>       .buffer(5, OverflowStrategy.backpressure) // Candidate 1
>>>>
>>>>       .flatMapConcat { channel =>
>>>>
>>>>         Sources.monthsForChannel(channel, requestedRange)
>>>>
>>>>       }
>>>>
>>>>       .buffer(5, OverflowStrategy.backpressure) // Candidate 2
>>>>
>>>>       .groupBy(100000, c => c.channel.channelId)
>>>>
>>>>       .buffer(5,OverflowStrategy.backpressure) // Candidate 3
>>>>
>>>>       .via(Transformations.progress(e => s"channel-month-for-interval-query
>>>> $e")) // Message logged
>>>>
>>>>       .flatMapConcat { channelMonth =>
>>>>
>>>>         val r = Sources.intervalRowsForChannelAndTime(db, channelMonth.
>>>> channel, channelMonth.period, batchSize) // Second Slick call
>>>>
>>>>           .buffer(5, OverflowStrategy.backpressure) // Candidate 4
>>>>
>>>>         r.map { x => ChannelAndInstantRangeAndInterval(channelMonth.
>>>> channel, channelMonth.period, x) }
>>>>
>>>>       }
>>>>
>>>>       .via(Transformations.channelMonthIntervalToBytesWritten)
>>>>
>>>>       .mergeSubstreams
>>>>
>>>>       .via(Transformations.progress(fileWritten => s"File written $
>>>> fileWritten"))
>>>>
>>>>     val sink = Sinks.countingSink
>>>>
>>>>     val runnable = source.toMat(sink)(Keep.right)
>>>>
>>>
>>> --
>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>> >>>>>>>>>> Check the FAQ:
>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>> >>>>>>>>>> Search the archives:
>>> https://groups.google.com/group/akka-user
>>> ---
>>> You received this message because you are subscribed to the Google
>>> Groups "Akka User List" group.
>>> To unsubscribe from this group and stop receiving emails from it, send
>>> an email to [email protected].
>>> To post to this group, send email to [email protected].
>>> Visit this group at https://groups.google.com/group/akka-user.
>>> For more options, visit https://groups.google.com/d/optout.
>>>
>>
>> --
>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>> >>>>>>>>>> Check the FAQ:
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
>> ---
>> You received this message because you are subscribed to the Google Groups
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an
>> email to [email protected].
>> To post to this group, send email to [email protected].
>> Visit this group at https://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>
>

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to