That makes sense. I thought the groups got created as needed, and the max
value has to be greater than the number of possible distinct groups.
I'll have to re-think the flow. Maybe I can do a flatMapMerge and do the
grouping later.
Thanks.

On Thu, Feb 18, 2016 at 8:07 AM, Endre Varga <[email protected]>
wrote:

>
>
> On Thu, Feb 18, 2016 at 4:57 PM, Richard Rodseth <[email protected]>
> wrote:
>
>> I wondered if I could solve this with a combination of buffer and
>> Balance, but I see that Balance backpressures when the outputs
>> backpressure, rather than when the number of workers maxes out.
>>
>>
>> http://doc.akka.io/docs/akka/2.4.2/scala/stream/stream-cookbook.html#Balancing_jobs_to_a_fixed_pool_of_workers
>>
>> I feel like I need a flatMapConcat that takes a "max" parameter, like
>> mapAsync does. What am I missing?
>>
>
> There is a max, and it is exactly one :)
>
> flatMapConcat == flattenMerge(1)
>
> Your issue is that your flatMapConcat executes on every group so it is the
> group count that matters here.
>
> -Endre
>
>
>>
>> On Wed, Feb 17, 2016 at 8:31 PM, Richard Rodseth <[email protected]>
>> wrote:
>>
>>> I'm still missing something. I thought I had solved my problem of
>>> overwhelming Postgres by using buffer(), but today (after upgrading to
>>> 2.4.2-RC3, but that's probably coincidence) I am getting a lot of timeouts.
>>> As you can see below, I have two Slick sources, one nested via
>>> flatMapConcat.
>>>
>>> Even with all these superfluous buffer() calls, I still see a ton of 
>>> channel-month-for-interval-query
>>> messages
>>>
>>> I assume that the Slick DataBasePublisher is not signalling backpressure
>>> if waiting for a connection.
>>>
>>> How can I limit the nested slick stream call to something like the size
>>> of the connection pool?
>>>
>>> If the flow from second query on was run as a separate stream run in a
>>> mapAsync I suppose I could use the parallelism value and backpressure would
>>> flow back to the buffer, but how can I do it in one flow?
>>>
>>> val channels = Sources.channelsForFilter(db, channelFilter, 1000 /*fetchSize
>>> */ ) // First Slick call
>>>
>>>     val source = channels
>>>
>>>       .buffer(5, OverflowStrategy.backpressure) // Candidate 1
>>>
>>>       .flatMapConcat { channel =>
>>>
>>>         Sources.monthsForChannel(channel, requestedRange)
>>>
>>>       }
>>>
>>>       .buffer(5, OverflowStrategy.backpressure) // Candidate 2
>>>
>>>       .groupBy(100000, c => c.channel.channelId)
>>>
>>>       .buffer(5,OverflowStrategy.backpressure) // Candidate 3
>>>
>>>       .via(Transformations.progress(e => s"channel-month-for-interval-query
>>> $e")) // Message logged
>>>
>>>       .flatMapConcat { channelMonth =>
>>>
>>>         val r = Sources.intervalRowsForChannelAndTime(db, channelMonth.
>>> channel, channelMonth.period, batchSize) // Second Slick call
>>>
>>>           .buffer(5, OverflowStrategy.backpressure) // Candidate 4
>>>
>>>         r.map { x => ChannelAndInstantRangeAndInterval(channelMonth.
>>> channel, channelMonth.period, x) }
>>>
>>>       }
>>>
>>>       .via(Transformations.channelMonthIntervalToBytesWritten)
>>>
>>>       .mergeSubstreams
>>>
>>>       .via(Transformations.progress(fileWritten => s"File written $
>>> fileWritten"))
>>>
>>>     val sink = Sinks.countingSink
>>>
>>>     val runnable = source.toMat(sink)(Keep.right)
>>>
>>
>> --
>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>> >>>>>>>>>> Check the FAQ:
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
>> ---
>> You received this message because you are subscribed to the Google Groups
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an
>> email to [email protected].
>> To post to this group, send email to [email protected].
>> Visit this group at https://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>
> --
> >>>>>>>>>> Read the docs: http://akka.io/docs/
> >>>>>>>>>> Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to [email protected].
> To post to this group, send email to [email protected].
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to