In general, the official discussion thread for a KIP starts after the KIP is 
written and posted.  So you would typically start a new email thread with a 
[DISCUSS] string in the title somewhere.  You can certainly link back to this 
email thread if you want, though, since it has some useful context for 
everything.

best,
Colin


On Tue, Oct 23, 2018, at 23:21, Zahari Dichev wrote:
> Colin, I agree
> 
> I will take a closer looks at the Fetcher itself and see whether that is
> feasible and update my KIP accordingly. I guess we can label this one, the
> official discussion thread for it or should I start another one ?
> 
> Zahari
> 
> On Wed, Oct 24, 2018 at 6:43 AM Colin McCabe <cmcc...@apache.org> wrote:
> 
> > On Tue, Oct 23, 2018, at 12:38, Zahari Dichev wrote:
> > > Hi there Matthias, I looked through the code of Kafka Streams. Quite
> > > impressive work ! If I have to put the logic of buffering within the
> > > context of what we are doing in Akka though, I might end up with the
> > > following situation.
> > >
> > > 1. Poll is called with two partition being active *TP1, TP2*
> > > 2. We get some data for both, both of them also prefetch some data.
> > > 3. So now we have some data that we have obtained and some data that sits
> > > with the buffer of the fetcher, waiting to be obtained.
> > > 4. We put the data that we have obtained from the poll into the
> > respective
> > > buffers of the partitions.
> > > 5. Since both of our buffers are "full", we call pause on both *TP1* and
> > > *TP2*.
> > > 6. A little time has passed and the client of *TP1* has processed all its
> > > records from the buffer, while the one of *TP2* has processed none
> > > 7. Buffer of *TP1* gets empty, we call resume on *TP1*
> > > 8. We call poll again with *TP1* resumed and *TP2* paused.
> > > 9. We get some records for TP1 and we throw away all the records that
> > were
> > > prefetched for *TP2* in step 2
> > >
> > > This can go on and on and due to the dynamic nature of the speed of
> > > processing records and the theoretically unlimited number of topic
> > > partitions, I find it possible that this scenario can happen more than
> > once
> > > over the lifetime of a client. And instead of trying to calculate the
> > > probability of this happening and attempt to minimise it, I would prefer
> > to
> > > have one of two options:
> > >
> > > 1. Having control to allow me to enable the returning of already
> > prefetched
> > > data, and simply store it in a buffer of my own until I have enough
> > > capacity to deal with it
> > >
> > > OR
> > >
> > > 2. Keep the data in the fetcher and not throw it away but use it on the
> > > next poll (not sure how viable that is as I have not looked at the
> > details
> > > of it all).
> >
> > I haven't thought about it that hard, but it sounds like the second option
> > might be better.  I have a hard time thinking of a case where we actually
> > want to throw away data for paused partitions.  If you're still subscribed
> > to it, presumably you'll eventually unpause it and use the cache, right?
> > It makes sense for unsubscribe to clear those records, but not pause, as
> > far as I can see.
> >
> > best,
> > Colin
> >
> >
> > >
> > > The first option is what I suggested initially and the second option is
> > the
> > > one that will allow us to skip the introduction of a configuration
> > > parameter as Colin suggested. These are the things I can suggest at the
> > > moment. As mentioned, I am willing to carry out the work. There is also
> > an
> > > official discussion thread, but I guess we have deviated from that, so I
> > > can just put that current on in JIRA instead if that is OK ?
> > >
> > > Matthias, regarding how the fetcher works. From what I have looked at,
> > > whenever the consumer polls and returns some data, we immediately issue
> > > another fetch request that delivered us records that are returned on the
> > > next poll. All these fetched records, that have not made it to the caller
> > > of poll but have been fetched are thrown away in case at the time of the
> > > nest poll() the partition is in paused state. This is what is causing the
> > > inefficiency.
> > >
> > > Any more comments are welcome.
> > >
> > > On Mon, Oct 22, 2018 at 6:00 AM Ismael Juma <isma...@gmail.com> wrote:
> > >
> > > > Hi,
> > > >
> > > > I think a KIP to discuss a concrete proposal makes sense. One
> > suggestion is
> > > > to explore the possibility of fixing the issue without a new config.
> > Would
> > > > that break existing users? Generally, we should strive for avoiding
> > configs
> > > > if at all possible.
> > > >
> > > > Ismael
> > > >
> > > > On 16 Oct 2018 12:30 am, "Zahari Dichev" <zaharidic...@gmail.com>
> > wrote:
> > > >
> > > > Hi there Kafka developers,
> > > >
> > > > I am currently trying to find a solution to an issue that has been
> > > > manifesting itself in the Akka streams implementation of the Kafka
> > > > connector. When it comes to consuming messages, the implementation
> > relies
> > > > heavily on the fact that we can pause and resume partitions. In some
> > > > situations when a single consumer instance is shared among several
> > streams,
> > > > we might end up with frequently pausing and unpausing a set of topic
> > > > partitions, which is the main facility that allows us to implement back
> > > > pressure. This however has certain disadvantages, especially when
> > there are
> > > > two consumers that differ in terms of processing speed.
> > > >
> > > > To articulate the issue more clearly, imagine that a consumer maintains
> > > > assignments for two topic partitions *TP1* and *TP2*. This consumer is
> > > > shared by two streams - S1 and S2. So effectively when we have demand
> > from
> > > > only one of the streams - *S1*, we will pause one of the topic
> > partitions
> > > > *TP2* and call *poll()* on the consumer to only retrieve the records
> > for
> > > > the demanded topic partition - *TP1*. The result of that is all the
> > records
> > > > that have been prefetched for *TP2* are now thrown away by the fetcher
> > > > ("*Not
> > > > returning fetched records for assigned partition TP2 since it is no
> > longer
> > > > fetchable"*). If we extrapolate that to multiple streams sharing the
> > same
> > > > consumer, we might quickly end up in a situation where we throw
> > prefetched
> > > > data quite often. This does not seem like the most efficient approach
> > and
> > > > in fact produces quite a lot of overlapping fetch requests as
> > illustrated
> > > > in the following issue:
> > > >
> > > > https://github.com/akka/alpakka-kafka/issues/549
> > > >
> > > > I am writing this email to get some initial opinion on a KIP I was
> > thinking
> > > > about. What if we give the clients of the Consumer API a bit more
> > control
> > > > of what to do with this prefetched data. Two options I am wondering
> > about:
> > > >
> > > > 1. Introduce a configuration setting, such as*
> > > > "return-prefetched-data-for-paused-topic-partitions = false"* (have to
> > > > think of a better name), which when set to true will return what is
> > > > prefetched instead of throwing it away on calling *poll()*. Since this
> > is
> > > > amount of data that is bounded by the maximum size of the prefetch, we
> > can
> > > > control what is the most amount of records returned. The client of the
> > > > consumer API can then be responsible for keeping that data around and
> > use
> > > > it when appropriate (i.e. when demand is present)
> > > >
> > > > 2. Introduce a facility to pass in a buffer into which the prefetched
> > > > records are drained when poll is called and paused partitions have some
> > > > prefetched records.
> > > >
> > > > Any opinions on the matter are welcome. Thanks a lot !
> > > >
> > > >
> > > > Zahari Dichev
> > > >
> >

Reply via email to