Colin, I agree I will take a closer looks at the Fetcher itself and see whether that is feasible and update my KIP accordingly. I guess we can label this one, the official discussion thread for it or should I start another one ?
Zahari On Wed, Oct 24, 2018 at 6:43 AM Colin McCabe <cmcc...@apache.org> wrote: > On Tue, Oct 23, 2018, at 12:38, Zahari Dichev wrote: > > Hi there Matthias, I looked through the code of Kafka Streams. Quite > > impressive work ! If I have to put the logic of buffering within the > > context of what we are doing in Akka though, I might end up with the > > following situation. > > > > 1. Poll is called with two partition being active *TP1, TP2* > > 2. We get some data for both, both of them also prefetch some data. > > 3. So now we have some data that we have obtained and some data that sits > > with the buffer of the fetcher, waiting to be obtained. > > 4. We put the data that we have obtained from the poll into the > respective > > buffers of the partitions. > > 5. Since both of our buffers are "full", we call pause on both *TP1* and > > *TP2*. > > 6. A little time has passed and the client of *TP1* has processed all its > > records from the buffer, while the one of *TP2* has processed none > > 7. Buffer of *TP1* gets empty, we call resume on *TP1* > > 8. We call poll again with *TP1* resumed and *TP2* paused. > > 9. We get some records for TP1 and we throw away all the records that > were > > prefetched for *TP2* in step 2 > > > > This can go on and on and due to the dynamic nature of the speed of > > processing records and the theoretically unlimited number of topic > > partitions, I find it possible that this scenario can happen more than > once > > over the lifetime of a client. And instead of trying to calculate the > > probability of this happening and attempt to minimise it, I would prefer > to > > have one of two options: > > > > 1. Having control to allow me to enable the returning of already > prefetched > > data, and simply store it in a buffer of my own until I have enough > > capacity to deal with it > > > > OR > > > > 2. Keep the data in the fetcher and not throw it away but use it on the > > next poll (not sure how viable that is as I have not looked at the > details > > of it all). > > I haven't thought about it that hard, but it sounds like the second option > might be better. I have a hard time thinking of a case where we actually > want to throw away data for paused partitions. If you're still subscribed > to it, presumably you'll eventually unpause it and use the cache, right? > It makes sense for unsubscribe to clear those records, but not pause, as > far as I can see. > > best, > Colin > > > > > > The first option is what I suggested initially and the second option is > the > > one that will allow us to skip the introduction of a configuration > > parameter as Colin suggested. These are the things I can suggest at the > > moment. As mentioned, I am willing to carry out the work. There is also > an > > official discussion thread, but I guess we have deviated from that, so I > > can just put that current on in JIRA instead if that is OK ? > > > > Matthias, regarding how the fetcher works. From what I have looked at, > > whenever the consumer polls and returns some data, we immediately issue > > another fetch request that delivered us records that are returned on the > > next poll. All these fetched records, that have not made it to the caller > > of poll but have been fetched are thrown away in case at the time of the > > nest poll() the partition is in paused state. This is what is causing the > > inefficiency. > > > > Any more comments are welcome. > > > > On Mon, Oct 22, 2018 at 6:00 AM Ismael Juma <isma...@gmail.com> wrote: > > > > > Hi, > > > > > > I think a KIP to discuss a concrete proposal makes sense. One > suggestion is > > > to explore the possibility of fixing the issue without a new config. > Would > > > that break existing users? Generally, we should strive for avoiding > configs > > > if at all possible. > > > > > > Ismael > > > > > > On 16 Oct 2018 12:30 am, "Zahari Dichev" <zaharidic...@gmail.com> > wrote: > > > > > > Hi there Kafka developers, > > > > > > I am currently trying to find a solution to an issue that has been > > > manifesting itself in the Akka streams implementation of the Kafka > > > connector. When it comes to consuming messages, the implementation > relies > > > heavily on the fact that we can pause and resume partitions. In some > > > situations when a single consumer instance is shared among several > streams, > > > we might end up with frequently pausing and unpausing a set of topic > > > partitions, which is the main facility that allows us to implement back > > > pressure. This however has certain disadvantages, especially when > there are > > > two consumers that differ in terms of processing speed. > > > > > > To articulate the issue more clearly, imagine that a consumer maintains > > > assignments for two topic partitions *TP1* and *TP2*. This consumer is > > > shared by two streams - S1 and S2. So effectively when we have demand > from > > > only one of the streams - *S1*, we will pause one of the topic > partitions > > > *TP2* and call *poll()* on the consumer to only retrieve the records > for > > > the demanded topic partition - *TP1*. The result of that is all the > records > > > that have been prefetched for *TP2* are now thrown away by the fetcher > > > ("*Not > > > returning fetched records for assigned partition TP2 since it is no > longer > > > fetchable"*). If we extrapolate that to multiple streams sharing the > same > > > consumer, we might quickly end up in a situation where we throw > prefetched > > > data quite often. This does not seem like the most efficient approach > and > > > in fact produces quite a lot of overlapping fetch requests as > illustrated > > > in the following issue: > > > > > > https://github.com/akka/alpakka-kafka/issues/549 > > > > > > I am writing this email to get some initial opinion on a KIP I was > thinking > > > about. What if we give the clients of the Consumer API a bit more > control > > > of what to do with this prefetched data. Two options I am wondering > about: > > > > > > 1. Introduce a configuration setting, such as* > > > "return-prefetched-data-for-paused-topic-partitions = false"* (have to > > > think of a better name), which when set to true will return what is > > > prefetched instead of throwing it away on calling *poll()*. Since this > is > > > amount of data that is bounded by the maximum size of the prefetch, we > can > > > control what is the most amount of records returned. The client of the > > > consumer API can then be responsible for keeping that data around and > use > > > it when appropriate (i.e. when demand is present) > > > > > > 2. Introduce a facility to pass in a buffer into which the prefetched > > > records are drained when poll is called and paused partitions have some > > > prefetched records. > > > > > > Any opinions on the matter are welcome. Thanks a lot ! > > > > > > > > > Zahari Dichev > > > >