I'm still missing something. I thought I had solved my problem of
overwhelming Postgres by using buffer(), but today (after upgrading to
2.4.2-RC3, but that's probably coincidence) I am getting a lot of timeouts.
As you can see below, I have two Slick sources, one nested via
flatMapConcat.

Even with all these superfluous buffer() calls, I still see a ton of
channel-month-for-interval-query
messages

I assume that the Slick DataBasePublisher is not signalling backpressure if
waiting for a connection.

How can I limit the nested slick stream call to something like the size of
the connection pool?

If the flow from second query on was run as a separate stream run in a
mapAsync I suppose I could use the parallelism value and backpressure would
flow back to the buffer, but how can I do it in one flow?

val channels = Sources.channelsForFilter(db, channelFilter, 1000 /*fetchSize
*/ ) // First Slick call

    val source = channels

      .buffer(5, OverflowStrategy.backpressure) // Candidate 1

      .flatMapConcat { channel =>

        Sources.monthsForChannel(channel, requestedRange)

      }

      .buffer(5, OverflowStrategy.backpressure) // Candidate 2

      .groupBy(100000, c => c.channel.channelId)

      .buffer(5,OverflowStrategy.backpressure) // Candidate 3

      .via(Transformations.progress(e => s"channel-month-for-interval-query
$e")) // Message logged

      .flatMapConcat { channelMonth =>

        val r = Sources.intervalRowsForChannelAndTime(db, channelMonth.
channel, channelMonth.period, batchSize) // Second Slick call

          .buffer(5, OverflowStrategy.backpressure) // Candidate 4

        r.map { x => ChannelAndInstantRangeAndInterval(channelMonth.channel,
channelMonth.period, x) }

      }

      .via(Transformations.channelMonthIntervalToBytesWritten)

      .mergeSubstreams

      .via(Transformations.progress(fileWritten => s"File written $
fileWritten"))

    val sink = Sinks.countingSink

    val runnable = source.toMat(sink)(Keep.right)

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to