Jan, Quite insightful indeed. I think your propositions are valid.
Ryanne, I understand that consumers are using a pull model... And yes, indeed if a consumer is not ready for more records it surely should not call poll. Except that it needs to do so periodically in order to indicate that its live. Forget about the "backpressure", I guess I was wrong with phrasing this so lets not get caught up on it. You say pause/resume can be used to prioritise certain topics/partitions over others. And indeed this is the case. So instead of thinking about it in terms of backpressure, lets put it in a different way. The Akka streams connector would like to prioritise certain topics over others, using once consumer instance. On top of that, add the detail that the priorities change quite frequently (which translates to calling pause/resume frequently). So all that being said, what would be a proper way to handle the situation without throwing the pre-fetched records away when calling poll on a consumer that happens to have a topic that was recently paused (and that might be un-paused soon )? Am I the only one who considers that an actual problem with the use os pause/resume ? Not sure how to explain the situation in a better way.. Zahari On Thu, Oct 18, 2018 at 9:46 AM Zahari Dichev <zaharidic...@gmail.com> wrote: > Thanks a lot Jan, > > I will read it. > > Zahari > > On Thu, Oct 18, 2018 at 9:31 AM Jan Filipiak <jan.filip...@trivago.com> > wrote: > >> especially my suggestions ;) >> >> On 18.10.2018 08:30, Jan Filipiak wrote: >> > Hi Zahari, >> > >> > would you be willing to scan through the KIP-349 discussion a little? >> > I think it has suggestions that could be interesting for you >> > >> > Best Jan >> > >> > On 16.10.2018 09:29, Zahari Dichev wrote: >> >> Hi there Kafka developers, >> >> >> >> I am currently trying to find a solution to an issue that has been >> >> manifesting itself in the Akka streams implementation of the Kafka >> >> connector. When it comes to consuming messages, the implementation >> relies >> >> heavily on the fact that we can pause and resume partitions. In some >> >> situations when a single consumer instance is shared among several >> >> streams, >> >> we might end up with frequently pausing and unpausing a set of topic >> >> partitions, which is the main facility that allows us to implement back >> >> pressure. This however has certain disadvantages, especially when >> >> there are >> >> two consumers that differ in terms of processing speed. >> >> >> >> To articulate the issue more clearly, imagine that a consumer maintains >> >> assignments for two topic partitions *TP1* and *TP2*. This consumer is >> >> shared by two streams - S1 and S2. So effectively when we have demand >> >> from >> >> only one of the streams - *S1*, we will pause one of the topic >> partitions >> >> *TP2* and call *poll()* on the consumer to only retrieve the records >> for >> >> the demanded topic partition - *TP1*. The result of that is all the >> >> records >> >> that have been prefetched for *TP2* are now thrown away by the fetcher >> >> ("*Not >> >> returning fetched records for assigned partition TP2 since it is no >> >> longer >> >> fetchable"*). If we extrapolate that to multiple streams sharing the >> same >> >> consumer, we might quickly end up in a situation where we throw >> >> prefetched >> >> data quite often. This does not seem like the most efficient approach >> and >> >> in fact produces quite a lot of overlapping fetch requests as >> illustrated >> >> in the following issue: >> >> >> >> https://github.com/akka/alpakka-kafka/issues/549 >> >> >> >> I am writing this email to get some initial opinion on a KIP I was >> >> thinking >> >> about. What if we give the clients of the Consumer API a bit more >> control >> >> of what to do with this prefetched data. Two options I am wondering >> >> about: >> >> >> >> 1. Introduce a configuration setting, such as* >> >> "return-prefetched-data-for-paused-topic-partitions = false"* (have to >> >> think of a better name), which when set to true will return what is >> >> prefetched instead of throwing it away on calling *poll()*. Since this >> is >> >> amount of data that is bounded by the maximum size of the prefetch, we >> >> can >> >> control what is the most amount of records returned. The client of the >> >> consumer API can then be responsible for keeping that data around and >> use >> >> it when appropriate (i.e. when demand is present) >> >> >> >> 2. Introduce a facility to pass in a buffer into which the prefetched >> >> records are drained when poll is called and paused partitions have some >> >> prefetched records. >> >> >> >> Any opinions on the matter are welcome. Thanks a lot ! >> >> >> >> Zahari Dichev >> >> >> >