The idea for you would be that Messagechooser could hang on to the 
prefetched messages.

ccing cmcc...@apache.org

@Collin
just for you to see that MessageChooser is a powerfull abstraction.

:)

Best jan

On 18.10.2018 13:59, Zahari Dichev wrote:
> Jan,
>
> Quite insightful indeed. I think your propositions are valid.
>
> Ryanne,
>
> I understand that consumers are using a pull model... And yes, indeed if a
> consumer is not ready for more records it surely should not call poll.
> Except that it needs to do so periodically in order to indicate that its
> live. Forget about the "backpressure", I guess I was wrong with phrasing
> this so lets not get caught up on it.
>
> You say pause/resume can be used to prioritise certain topics/partitions
> over others. And indeed this is the case. So instead of thinking about it
> in terms of backpressure, lets put it in a different way. The Akka streams
> connector would like to prioritise certain topics over others, using once
> consumer instance. On top of that, add the detail that the priorities
> change quite frequently (which translates to calling pause/resume
> frequently). So all that being said, what would be a proper way to handle
> the situation without throwing the pre-fetched records away when calling
> poll on a consumer that happens to have a topic that was recently paused
> (and that might be un-paused soon )? Am I the only one who considers that
> an actual problem with the use os pause/resume ? Not sure how to explain
> the situation in a better way..
>
> Zahari
>
>
> On Thu, Oct 18, 2018 at 9:46 AM Zahari Dichev <zaharidic...@gmail.com>
> wrote:
>
>> Thanks a lot Jan,
>>
>> I will read it.
>>
>> Zahari
>>
>> On Thu, Oct 18, 2018 at 9:31 AM Jan Filipiak <jan.filip...@trivago.com>
>> wrote:
>>
>>> especially my suggestions ;)
>>>
>>> On 18.10.2018 08:30, Jan Filipiak wrote:
>>>> Hi Zahari,
>>>>
>>>> would you be willing to scan through the KIP-349 discussion a little?
>>>> I think it has suggestions that could be interesting for you
>>>>
>>>> Best Jan
>>>>
>>>> On 16.10.2018 09:29, Zahari Dichev wrote:
>>>>> Hi there Kafka developers,
>>>>>
>>>>> I am currently trying to find a solution to an issue that has been
>>>>> manifesting itself in the Akka streams implementation of the Kafka
>>>>> connector. When it comes to consuming messages, the implementation
>>> relies
>>>>> heavily on the fact that we can pause and resume partitions. In some
>>>>> situations when a single consumer instance is shared among several
>>>>> streams,
>>>>> we might end up with frequently pausing and unpausing a set of topic
>>>>> partitions, which is the main facility that allows us to implement back
>>>>> pressure. This however has certain disadvantages, especially when
>>>>> there are
>>>>> two consumers that differ in terms of processing speed.
>>>>>
>>>>> To articulate the issue more clearly, imagine that a consumer maintains
>>>>> assignments for two topic partitions *TP1* and *TP2*. This consumer is
>>>>> shared by two streams - S1 and S2. So effectively when we have demand
>>>>> from
>>>>> only one of the streams - *S1*, we will pause one of the topic
>>> partitions
>>>>> *TP2* and call *poll()* on the consumer to only retrieve the records
>>> for
>>>>> the demanded topic partition - *TP1*. The result of that is all the
>>>>> records
>>>>> that have been prefetched for *TP2* are now thrown away by the fetcher
>>>>> ("*Not
>>>>> returning fetched records for assigned partition TP2 since it is no
>>>>> longer
>>>>> fetchable"*). If we extrapolate that to multiple streams sharing the
>>> same
>>>>> consumer, we might quickly end up in a situation where we throw
>>>>> prefetched
>>>>> data quite often. This does not seem like the most efficient approach
>>> and
>>>>> in fact produces quite a lot of overlapping fetch requests as
>>> illustrated
>>>>> in the following issue:
>>>>>
>>>>> https://github.com/akka/alpakka-kafka/issues/549
>>>>>
>>>>> I am writing this email to get some initial opinion on a KIP I was
>>>>> thinking
>>>>> about. What if we give the clients of the Consumer API a bit more
>>> control
>>>>> of what to do with this prefetched data. Two options I am wondering
>>>>> about:
>>>>>
>>>>> 1. Introduce a configuration setting, such as*
>>>>> "return-prefetched-data-for-paused-topic-partitions = false"* (have to
>>>>> think of a better name), which when set to true will return what is
>>>>> prefetched instead of throwing it away on calling *poll()*. Since this
>>> is
>>>>> amount of data that is bounded by the maximum size of the prefetch, we
>>>>> can
>>>>> control what is the most amount of records returned. The client of the
>>>>> consumer API can then be responsible for keeping that data around and
>>> use
>>>>> it when appropriate (i.e. when demand is present)
>>>>>
>>>>> 2. Introduce a facility to pass in a buffer into which the prefetched
>>>>> records are drained when poll is called and paused partitions have some
>>>>> prefetched records.
>>>>>
>>>>> Any opinions on the matter are welcome. Thanks a lot !
>>>>>
>>>>> Zahari Dichev
>>>>>
>>>
>>
>

Reply via email to