Zahari,

It sounds to me like this problem is due to Akka attempting to implement
additional backpressure on top of the Consumer API. I'd suggest they not do
that, and then this problem goes away.

Ryanne

On Wed, Oct 17, 2018 at 7:35 AM Zahari Dichev <zaharidic...@gmail.com>
wrote:

> Hi there,
>
> Are there any opinions on the matter described in my previous email? I
> think this is quite important when it comes to implementing any non trivial
> functionality that relies on pause/resume. Of course if I am mistaken, feel
> free to elaborate.
>
> Thanks,
> Zahari
>
> On Tue, Oct 16, 2018 at 10:29 AM Zahari Dichev <zaharidic...@gmail.com>
> wrote:
>
> > Hi there Kafka developers,
> >
> > I am currently trying to find a solution to an issue that has been
> > manifesting itself in the Akka streams implementation of the Kafka
> > connector. When it comes to consuming messages, the implementation relies
> > heavily on the fact that we can pause and resume partitions. In some
> > situations when a single consumer instance is shared among several
> streams,
> > we might end up with frequently pausing and unpausing a set of topic
> > partitions, which is the main facility that allows us to implement back
> > pressure. This however has certain disadvantages, especially when there
> are
> > two consumers that differ in terms of processing speed.
> >
> > To articulate the issue more clearly, imagine that a consumer maintains
> > assignments for two topic partitions *TP1* and *TP2*. This consumer is
> > shared by two streams - S1 and S2. So effectively when we have demand
> from
> > only one of the streams - *S1*, we will pause one of the topic partitions
> > *TP2* and call *poll()* on the consumer to only retrieve the records for
> > the demanded topic partition - *TP1*. The result of that is all the
> > records that have been prefetched for *TP2* are now thrown away by the
> > fetcher ("*Not returning fetched records for assigned partition TP2 since
> > it is no longer fetchable"*). If we extrapolate that to multiple streams
> > sharing the same consumer, we might quickly end up in a situation where
> we
> > throw prefetched data quite often. This does not seem like the most
> > efficient approach and in fact produces quite a lot of overlapping fetch
> > requests as illustrated in the following issue:
> >
> > https://github.com/akka/alpakka-kafka/issues/549
> >
> > I am writing this email to get some initial opinion on a KIP I was
> > thinking about. What if we give the clients of the Consumer API a bit
> more
> > control of what to do with this prefetched data. Two options I am
> wondering
> > about:
> >
> > 1. Introduce a configuration setting, such as*
> > "return-prefetched-data-for-paused-topic-partitions = false"* (have to
> > think of a better name), which when set to true will return what is
> > prefetched instead of throwing it away on calling *poll()*. Since this is
> > amount of data that is bounded by the maximum size of the prefetch, we
> can
> > control what is the most amount of records returned. The client of the
> > consumer API can then be responsible for keeping that data around and use
> > it when appropriate (i.e. when demand is present)
> >
> > 2. Introduce a facility to pass in a buffer into which the prefetched
> > records are drained when poll is called and paused partitions have some
> > prefetched records.
> >
> > Any opinions on the matter are welcome. Thanks a lot !
> >
> > Zahari Dichev
> >
>

Reply via email to