Hi Jimmy, Sorry for the long delay responding to the KIP. I think it's important to be careful with the refinements to KIP-932, so I'm taking my time to make sure whatever we do is good.
There are good reasons why KIP-932 optimised for batch delivery, but I know from talking to early users of the share consumer, that it's not always ideal. I'm strongly in favour of providing a way of limiting the number of records regardless of the batching, for applications which need it. There is another improvement that I foresee here and that is for pre-fetching of records. For example, an application could ask for 500 records, but actually want to fetch 1000 so that it can overlap the request-response of acknowledging the first 500 records with the processing of the next 500 already fetched. However, getting pre-fetching to work in all situations while the acquisition locks are ticking down is more work. I don't want to introduce pre-fetching in this KIP. As a result, I would prefer that this KIP introduces just one new config, share.acquire.mode. I think we will use share.fetch.max.records or something like it for prefetching when it's time. I know this contradicts an comment of mine, but I've changed my mind :) I would also like to rename "strict" to "record limit" in the config. That is a better description of the effect. I hope this makes sense. Here are my specific comments. AS3: I would remove "strict" and "loose" from the description. Having "Strict" in the KIP title is fine, but the modes should be "batch-optimised" and "record-limit". AS4: I suggest removing `share.max.fetch.records` from this KIP. It moves us in the direction of having MaxRecords and BatchSize differing in the ShareFetch request, and that's intended for pre-fetching when we get around to it. AS5: I suggest changing "strict" to "record_limit" in the config for `share.acquire.mode`. Of course, this affects the `ShareAcquireMode` enum too. AS6: Since "BATCH_OPTIMIZED" is the default for the config, I would prefer this enum value to be the first in the ShareAcquireMode enum. AS7: The package and naming of the AcquireMode enum is inconsistent and incorrect. The internals package is not intended to be public. So, I think the enum is org.apache.kafka.clients.consumer.ShareAcquireMode. AS8: In the ShareFetchRequest, I would make the AcquireMode field have type int8. There's no need to make it string. The values should match the equivalent enum values. AS9: In the proposed changes, I suggest an example such as this. Let's say that the records produced onto a topic-partition each contain 3 records. Then the batches would have offsets 0-2 inclusive, 3-5 inclusive, 6-8 inclusive, and so on. Consider a share consumer with `max.poll.records=5` fetching records from the topic. For batch-optimised mode, it will receive two whole batches of records, from offsets 0 to 5 inclusive, which is 6 records in total. For record-limit mode, it will receive no more than 5 records. The broker could choose to: * Return just one entire batch 0-2 inclusive, which is 3 records in total. * Return two entire batches 0-5 inclusive, but only acquire records 0-4 inclusive. If a batch is "split" like this, the broker has returned record data which has not been acquired by this consumer. * Adjust the record batching of the records returned so that a single batch 0-4 inclusive is returned. * Or any other strategy which acquires no more than 5 records. I hope we can get this into Apache Kafka 4.2. I expect we will make it. Thanks, Andrew ________________________________ From: Wang Jimmy <[email protected]> Sent: 10 September 2025 19:16 To: [email protected] <[email protected]> Subject: RE: Re: [DISCUSS] KIP-1206: Strict max fetch records in share fetch Hi Andrew, Thanks for your comments and sorry for the delayed response. AS1: I think your intention is to control the fetched records solely by share.max.fetch.records rather than introducing a new acquireMode parameter. However, the concept of acquireMode is intended to achieve the following things: Disable batches (only one batch will be returned to the consumer in one fetch in strict mode) Make a distinction from the current broker behavior where the maximum number of records is set as a soft limit. As for the first point, it would be better if we wanted to extend the lock timeout on a ”record“ basis rather than on a ”record batches” basis. And as Mittal suggested, we assume that the client application cares more about the precise count of messages rather than the throughput, so I think it makes sense that batching is not allowed in this mode. What do you think? AS2: I agree with your idea and have changed the client configuration to share.max.fetch.records. Thanks for your advice. Best, Jimmy Wang On 2025/09/03 15:46:06 Andrew Schofield wrote: > Hi Jimmy, > Thank you for the KIP. I'm sure I'll have more comments yet as > I think through how it will work in practice, and also the work that > we are looking to do in the consumer as part of Kafka 4.2 around > flow control and memory usage. > > > The behaviour in KIP-932 is expecting that the consuming application > will be able to consume the fetched records in a timely fashion so > that it does not inadvertently breach the acquisition lock time-out. > It lets the application limit the amount of memory used for buffered > records and also limit the number of fetched records. The limit of > the number of records is applied as a soft limit, meaning that > complete record batches (as written to the log) will be acquired. > Providing a way to control the number of records more strictly > will be useful for some situations, at the expense of throughput. > > AS1: I suggest using `share.fetch.max.records` as the way to control > the maximum number of records. If not specified, you would get what > you get today, which is a soft limit based on `max.poll.records`. > If specified, the number of acquired records would not exceed this > number. The broker would return complete record batches to the > consumer application (to prevent decompression in the broker to > split batches), but the number of acquired records would not > exceed the limit specified. > > I suggest `share.fetch.max.records` with the "share." at the start. > KIP-1199 is looking to introduce a maximum number of records for > regular fetches. Because the behaviour would be quite different, > I think it's preferable to have a different configuration > property. > > > Thanks, > Andrew > ________________________________________ > From: Wang Jimmy <[email protected]> > Sent: 31 August 2025 17:54 > To: [email protected] <[email protected]> > Subject: Re: [DISCUSS] KIP-1206: Strict max fetch records in share fetch > > Hi Mittal, > Thanks for your thoughtful feedback on the KIP! > > AM1: > I agree with your point. I have updated the KIP to explain the pros and cons > of the “strict” mode. > > AM2: > Surely. After implementing strict mode, share-consumer can leverage > max.poll.records ( or fetch.max.records, as mentioned in AM4) to control the > fetch rate of shareFetchManager. This prevents scenarios where one consumer > fetches too many records while others suffer from starvation, thereby ensure > balanced throughput among different consumers. > > AM3: > Thanks for pointing this out, I'll update the document. But I think this KIP > won't change behavior of acquisition lock timeout or session timeout, which > will stay the same as stated in KIP-932. > > AM4a: > I overlooked this point and I think you are right. In “strict” mode, the > share fetch response will contain only one batch, with maximum records upper > bounded by max(BatchSize, MaxRecords). > > AM4b: > From my point of view, it would be better to introduce a new > max.fetch.records configuration since it has different meaning compared to > max.poll.records. Regarding the pre-fetch behavior, regardless of the current > implementation for implicit or explicit mode, all records should be > acknowledged before sending the next fetch request. To achieve "pre-fetch”, > my initial thought is that broker needs to allow the same member in share > group to send multiple shareFetch requests, but with an upper bound on the > total number of delivered records set to max.fetch.records. I am not quite > sure, but I think I could also finish it in this KIP. What do you think? > > AM5: > Since “AcquireMode” is needed for both the share-consumer(as client > configuration) and broker(determine the mode used), it should ideally placed > in two separate class under core and client module. > > Best, > Jimmy Wang > > > 2025年8月27日 04:01,Apoorv Mittal <[email protected]> 写道: > > Hi Jimmy, > Thanks for the KIP. Please find some comments below: > > AM1: The KIP mentions the current behaviour of soft limit but it would be > helpful to explain the reasoning as well in KIP. Else it seems like the > "strict" should always be the preferred fetch behaviour. However, that's > not true. The broker never reads the actual data records, rather sends back > the batch of records as produced. Hence, say in strict mode the MaxRecords > is set to 1 but the producer generates a single batch of 5 records on log > then only 1 record will be acquired but the whole batch of 5 records will > be sent to the client. This will have higher egress from the broker and > wasted memory on the client. The strict behaviour is helpful in some > scenarios but not always. > > AM2: When we say "Strict max fetch records enables clients to achieve > predictable > throughput", can you please help explain what is meant by it? An example > could help here. > > AM3: As mentioned in the KIP "In scenarios where record processing is > time-consuming" hence strict mode is advisable. The client connection shall > be disconnected post session timeout configuration. Hence it means that if > processing is taking longer than the session timeout then client sessions > will be dropped and held records will be released. Shall we propose to > handle the behaviour for such scenarios in the KIP as well? > > AM4: Currently, other than max and min bytes, there are 2 other parameters > in ShareFetch request 1) MaxRecords 2) BatchSize. Both of these share fetch > params currently use max.poll.records client configuration. Which means > that a single batch of records will be fetched as per max.poll.records > client configuration. But the MaxRecords and BatchSize were added because > of following reasons a) Have some predictable number of records returned > from broker as the records are backed by acquisition lock timeout, in case > client takes more time in processing higher number of records b) Generate > batches so client can "pre-fetch" record batches which can be > acknowledged individually (batch) rather waiting for all records to be > processed by client. Pre-fetch needs additional handling in client and > broker to renew the lock timeout for acquired-waiting record batches in > client, which currently does not exist. Questions: > > AM4-a: What would be the suggested behaviour with "strict" mode and > BatchSize i.e. shall always only a single batch be allowed to fetch in > "strict" mode? Or there could be any reason to fetch multiple batches even > in strict mode? I am assuming, and as KIP mentions as well, applications > will generally use strict mode when the processing time is higher on > clients for records, then does it make sense to allow multiple batches? > > AM4-b: As defined in the KIP-1199 > <https://emea01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-1199%253A%2BAdd%2Bmax%2Brecord%2Bcount%2Blimit%2Bto%2BFetchRequest&data=05%7C02%7C%7C767791c4f14343ca9ffe08ddf09666b7%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C638931250964830686%7CUnknown%7CTWFpbGZsb3d8eyJFbXB0eU1hcGkiOnRydWUsIlYiOiIwLjAuMDAwMCIsIlAiOiJXaW4zMiIsIkFOIjoiTWFpbCIsIldUIjoyfQ%3D%3D%7C0%7C%7C%7C&sdata=qtauR2Lvc4E82TJBNDUab6qeCNaRDHuKuSVpxGykW3Y%3D&reserved=0<https://cwiki.apache.org/confluence/display/KAFKA/KIP-1199%3A+Add+max+record+count+limit+to+FetchRequest>>, > there might be a separate config fetch.max.message.count (preferably > fetch.max.records) which will be used for MaxRecords. Hence, should we > introduce the fetch.max.records configuration in this KIP for ShareFetch > and think about how prefetching will work? Or if we want to leave this for > a separate KIP then do we want to define behaviour for MaxRecords in strict > mode i.e. should MaxRecords be same as max.poll.records and pre-fetching > should not be supported? > > AM5: AcquireMode is also used by clients so should the enum AcquireMode reside > in the server module or it should be in the clients module? > > Regards, > Apoorv Mittal > > > On Thu, Aug 21, 2025 at 6:55 PM Wang Jimmy <[email protected]> wrote: > > Hello all, > I would like to start a discussion on KIP-1206: Strict max fetch records > in share fetch. > This KIP introduces the AcquireMode in ShareFetchRequest, which provides > two options: Strict or loose. When strict mode is selected, we should only > acquire records till maxFetchRecords. > > KIP: > https://emea01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-1206%3A%2BStrict%2Bmax%2Bfetch%2Brecords%2Bin%2Bshare%2Bfetch&data=05%7C02%7C%7C767791c4f14343ca9ffe08ddf09666b7%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C638931250964867929%7CUnknown%7CTWFpbGZsb3d8eyJFbXB0eU1hcGkiOnRydWUsIlYiOiIwLjAuMDAwMCIsIlAiOiJXaW4zMiIsIkFOIjoiTWFpbCIsIldUIjoyfQ%3D%3D%7C0%7C%7C%7C&sdata=82ysUJA8aeaKXEqV5Ck8HFxGS6XB0PrmNGeCmD0cQT8%3D&reserved=0<https://cwiki.apache.org/confluence/display/KAFKA/KIP-1206:+Strict+max+fetch+records+in+share+fetch> > > Thanks, > Jimmy Wang > >
