The idea for you would be that Messagechooser could hang on to the prefetched messages.
ccing cmcc...@apache.org @Collin just for you to see that MessageChooser is a powerfull abstraction. :) Best jan On 18.10.2018 13:59, Zahari Dichev wrote: > Jan, > > Quite insightful indeed. I think your propositions are valid. > > Ryanne, > > I understand that consumers are using a pull model... And yes, indeed if a > consumer is not ready for more records it surely should not call poll. > Except that it needs to do so periodically in order to indicate that its > live. Forget about the "backpressure", I guess I was wrong with phrasing > this so lets not get caught up on it. > > You say pause/resume can be used to prioritise certain topics/partitions > over others. And indeed this is the case. So instead of thinking about it > in terms of backpressure, lets put it in a different way. The Akka streams > connector would like to prioritise certain topics over others, using once > consumer instance. On top of that, add the detail that the priorities > change quite frequently (which translates to calling pause/resume > frequently). So all that being said, what would be a proper way to handle > the situation without throwing the pre-fetched records away when calling > poll on a consumer that happens to have a topic that was recently paused > (and that might be un-paused soon )? Am I the only one who considers that > an actual problem with the use os pause/resume ? Not sure how to explain > the situation in a better way.. > > Zahari > > > On Thu, Oct 18, 2018 at 9:46 AM Zahari Dichev <zaharidic...@gmail.com> > wrote: > >> Thanks a lot Jan, >> >> I will read it. >> >> Zahari >> >> On Thu, Oct 18, 2018 at 9:31 AM Jan Filipiak <jan.filip...@trivago.com> >> wrote: >> >>> especially my suggestions ;) >>> >>> On 18.10.2018 08:30, Jan Filipiak wrote: >>>> Hi Zahari, >>>> >>>> would you be willing to scan through the KIP-349 discussion a little? >>>> I think it has suggestions that could be interesting for you >>>> >>>> Best Jan >>>> >>>> On 16.10.2018 09:29, Zahari Dichev wrote: >>>>> Hi there Kafka developers, >>>>> >>>>> I am currently trying to find a solution to an issue that has been >>>>> manifesting itself in the Akka streams implementation of the Kafka >>>>> connector. When it comes to consuming messages, the implementation >>> relies >>>>> heavily on the fact that we can pause and resume partitions. In some >>>>> situations when a single consumer instance is shared among several >>>>> streams, >>>>> we might end up with frequently pausing and unpausing a set of topic >>>>> partitions, which is the main facility that allows us to implement back >>>>> pressure. This however has certain disadvantages, especially when >>>>> there are >>>>> two consumers that differ in terms of processing speed. >>>>> >>>>> To articulate the issue more clearly, imagine that a consumer maintains >>>>> assignments for two topic partitions *TP1* and *TP2*. This consumer is >>>>> shared by two streams - S1 and S2. So effectively when we have demand >>>>> from >>>>> only one of the streams - *S1*, we will pause one of the topic >>> partitions >>>>> *TP2* and call *poll()* on the consumer to only retrieve the records >>> for >>>>> the demanded topic partition - *TP1*. The result of that is all the >>>>> records >>>>> that have been prefetched for *TP2* are now thrown away by the fetcher >>>>> ("*Not >>>>> returning fetched records for assigned partition TP2 since it is no >>>>> longer >>>>> fetchable"*). If we extrapolate that to multiple streams sharing the >>> same >>>>> consumer, we might quickly end up in a situation where we throw >>>>> prefetched >>>>> data quite often. This does not seem like the most efficient approach >>> and >>>>> in fact produces quite a lot of overlapping fetch requests as >>> illustrated >>>>> in the following issue: >>>>> >>>>> https://github.com/akka/alpakka-kafka/issues/549 >>>>> >>>>> I am writing this email to get some initial opinion on a KIP I was >>>>> thinking >>>>> about. What if we give the clients of the Consumer API a bit more >>> control >>>>> of what to do with this prefetched data. Two options I am wondering >>>>> about: >>>>> >>>>> 1. Introduce a configuration setting, such as* >>>>> "return-prefetched-data-for-paused-topic-partitions = false"* (have to >>>>> think of a better name), which when set to true will return what is >>>>> prefetched instead of throwing it away on calling *poll()*. Since this >>> is >>>>> amount of data that is bounded by the maximum size of the prefetch, we >>>>> can >>>>> control what is the most amount of records returned. The client of the >>>>> consumer API can then be responsible for keeping that data around and >>> use >>>>> it when appropriate (i.e. when demand is present) >>>>> >>>>> 2. Introduce a facility to pass in a buffer into which the prefetched >>>>> records are drained when poll is called and paused partitions have some >>>>> prefetched records. >>>>> >>>>> Any opinions on the matter are welcome. Thanks a lot ! >>>>> >>>>> Zahari Dichev >>>>> >>> >> >