Colin, I agree

I will take a closer looks at the Fetcher itself and see whether that is
feasible and update my KIP accordingly. I guess we can label this one, the
official discussion thread for it or should I start another one ?

Zahari

On Wed, Oct 24, 2018 at 6:43 AM Colin McCabe <cmcc...@apache.org> wrote:

> On Tue, Oct 23, 2018, at 12:38, Zahari Dichev wrote:
> > Hi there Matthias, I looked through the code of Kafka Streams. Quite
> > impressive work ! If I have to put the logic of buffering within the
> > context of what we are doing in Akka though, I might end up with the
> > following situation.
> >
> > 1. Poll is called with two partition being active *TP1, TP2*
> > 2. We get some data for both, both of them also prefetch some data.
> > 3. So now we have some data that we have obtained and some data that sits
> > with the buffer of the fetcher, waiting to be obtained.
> > 4. We put the data that we have obtained from the poll into the
> respective
> > buffers of the partitions.
> > 5. Since both of our buffers are "full", we call pause on both *TP1* and
> > *TP2*.
> > 6. A little time has passed and the client of *TP1* has processed all its
> > records from the buffer, while the one of *TP2* has processed none
> > 7. Buffer of *TP1* gets empty, we call resume on *TP1*
> > 8. We call poll again with *TP1* resumed and *TP2* paused.
> > 9. We get some records for TP1 and we throw away all the records that
> were
> > prefetched for *TP2* in step 2
> >
> > This can go on and on and due to the dynamic nature of the speed of
> > processing records and the theoretically unlimited number of topic
> > partitions, I find it possible that this scenario can happen more than
> once
> > over the lifetime of a client. And instead of trying to calculate the
> > probability of this happening and attempt to minimise it, I would prefer
> to
> > have one of two options:
> >
> > 1. Having control to allow me to enable the returning of already
> prefetched
> > data, and simply store it in a buffer of my own until I have enough
> > capacity to deal with it
> >
> > OR
> >
> > 2. Keep the data in the fetcher and not throw it away but use it on the
> > next poll (not sure how viable that is as I have not looked at the
> details
> > of it all).
>
> I haven't thought about it that hard, but it sounds like the second option
> might be better.  I have a hard time thinking of a case where we actually
> want to throw away data for paused partitions.  If you're still subscribed
> to it, presumably you'll eventually unpause it and use the cache, right?
> It makes sense for unsubscribe to clear those records, but not pause, as
> far as I can see.
>
> best,
> Colin
>
>
> >
> > The first option is what I suggested initially and the second option is
> the
> > one that will allow us to skip the introduction of a configuration
> > parameter as Colin suggested. These are the things I can suggest at the
> > moment. As mentioned, I am willing to carry out the work. There is also
> an
> > official discussion thread, but I guess we have deviated from that, so I
> > can just put that current on in JIRA instead if that is OK ?
> >
> > Matthias, regarding how the fetcher works. From what I have looked at,
> > whenever the consumer polls and returns some data, we immediately issue
> > another fetch request that delivered us records that are returned on the
> > next poll. All these fetched records, that have not made it to the caller
> > of poll but have been fetched are thrown away in case at the time of the
> > nest poll() the partition is in paused state. This is what is causing the
> > inefficiency.
> >
> > Any more comments are welcome.
> >
> > On Mon, Oct 22, 2018 at 6:00 AM Ismael Juma <isma...@gmail.com> wrote:
> >
> > > Hi,
> > >
> > > I think a KIP to discuss a concrete proposal makes sense. One
> suggestion is
> > > to explore the possibility of fixing the issue without a new config.
> Would
> > > that break existing users? Generally, we should strive for avoiding
> configs
> > > if at all possible.
> > >
> > > Ismael
> > >
> > > On 16 Oct 2018 12:30 am, "Zahari Dichev" <zaharidic...@gmail.com>
> wrote:
> > >
> > > Hi there Kafka developers,
> > >
> > > I am currently trying to find a solution to an issue that has been
> > > manifesting itself in the Akka streams implementation of the Kafka
> > > connector. When it comes to consuming messages, the implementation
> relies
> > > heavily on the fact that we can pause and resume partitions. In some
> > > situations when a single consumer instance is shared among several
> streams,
> > > we might end up with frequently pausing and unpausing a set of topic
> > > partitions, which is the main facility that allows us to implement back
> > > pressure. This however has certain disadvantages, especially when
> there are
> > > two consumers that differ in terms of processing speed.
> > >
> > > To articulate the issue more clearly, imagine that a consumer maintains
> > > assignments for two topic partitions *TP1* and *TP2*. This consumer is
> > > shared by two streams - S1 and S2. So effectively when we have demand
> from
> > > only one of the streams - *S1*, we will pause one of the topic
> partitions
> > > *TP2* and call *poll()* on the consumer to only retrieve the records
> for
> > > the demanded topic partition - *TP1*. The result of that is all the
> records
> > > that have been prefetched for *TP2* are now thrown away by the fetcher
> > > ("*Not
> > > returning fetched records for assigned partition TP2 since it is no
> longer
> > > fetchable"*). If we extrapolate that to multiple streams sharing the
> same
> > > consumer, we might quickly end up in a situation where we throw
> prefetched
> > > data quite often. This does not seem like the most efficient approach
> and
> > > in fact produces quite a lot of overlapping fetch requests as
> illustrated
> > > in the following issue:
> > >
> > > https://github.com/akka/alpakka-kafka/issues/549
> > >
> > > I am writing this email to get some initial opinion on a KIP I was
> thinking
> > > about. What if we give the clients of the Consumer API a bit more
> control
> > > of what to do with this prefetched data. Two options I am wondering
> about:
> > >
> > > 1. Introduce a configuration setting, such as*
> > > "return-prefetched-data-for-paused-topic-partitions = false"* (have to
> > > think of a better name), which when set to true will return what is
> > > prefetched instead of throwing it away on calling *poll()*. Since this
> is
> > > amount of data that is bounded by the maximum size of the prefetch, we
> can
> > > control what is the most amount of records returned. The client of the
> > > consumer API can then be responsible for keeping that data around and
> use
> > > it when appropriate (i.e. when demand is present)
> > >
> > > 2. Introduce a facility to pass in a buffer into which the prefetched
> > > records are drained when poll is called and paused partitions have some
> > > prefetched records.
> > >
> > > Any opinions on the matter are welcome. Thanks a lot !
> > >
> > >
> > > Zahari Dichev
> > >
>

Reply via email to