Hi Andrew, Thanks for your feedback! AS10: You’re right. I have replaced the max.fetch.records with max.poll.records. AS11: Hmm, my intention was to illustrate the potential features that could be enabled after implementing this KIP, such as better handling poison records. However, I agree with your point and have removed that part. AS12 - AS14: Thank you for your suggestions. I have made the corresponding changes based on your feedback. AS15: I understand what you mean, but I was referring to the GroupProtocol enum class. There may be some differences in the implementation, but I think both approaches are acceptable, so I have made the changes. Additionally, I have also renamed all AcquireMode variables to ShareAcquireMode for alignment (including the ShareFetchRequest schema).
Thanks again for your review. Please feel free to take another look when you have time. Best, Jimmy Wang From: Andrew Schofield <[email protected]> Date: Thursday, October 9, 2025 at 21:59 To: [email protected] <[email protected]> Subject: Re: Re: [DISCUSS] KIP-1206: Strict max fetch records in share fetch Hi Jimmy, Thanks for the updates. Just a few minor comments remain, but it's nearly ready. AS10: There is no max.fetch.records parameter. I think you mean max.poll.records. There might be share.max.fetch.records in the future but not yet. AS11: I don't understand the point about poison records. The fix to that issue will likely be for the broker independently to decide to acquire only a subset of the records in a batch to ensure that any bad records failing delivery do not impact the delivery count of neighbouring records. This KIP is not required for that. AS12: On line 30 of the ShareFetchRequest schema, please put the values for the AcquireMode in the about string such as "The acquire mode to control the fetch behavior: 0 - batch-optimized, 1 - record-limit" This will end up in the protocol documentation automatically so having the values makes the documentation more complete. AS13: You should not include the details of the SharePartitionManager or ShareFetch classes in the KIP. The KIP is a specification of the protocol and public programming interfaces only. AS14: Thanks for adding the examples. These are just illustrations of some permitted behaviour, but the broker is only required to keep within the limits specified in the ShareFetch request. As a result, your rejected alternative of "Only one entire batch...." is actually incorrect. The broker could do that and the KIP is not prescriptive about the details of broker behaviour. So, please remove this rejected alternative. AS15: The standard for string config values in Kafka is snake_case. You are adding a new enum called ShareAcquireMode (in some places you've called it AcquireMode). The values should be snake_case, such as "batch_optimized", although the enum names would be capitalised like BATCH_OPTIMIZED. Then the description of the values for the config would also be snake_case. If you look at auto.offset.reset consumer config and the related AutoOffsetResetStrategy class you'll see what I mean. Thanks, Andrew ________________________________________ From: Wang Jimmy Sent: 09 October 2025 12:25 To: [email protected] Subject: Re: Re: [DISCUSS] KIP-1206: Strict max fetch records in share fetch Hi Andrew, Thanks for reading and responding to the KIP! AS3 - AS8: All these comments make sense to me, I have updated the KIP according to your suggestions. AS9: > Return two entire batches 0-5 inclusive, but only acquire records 0-4 > inclusive. If a batch is "split" like this, the broker has returned record > data which has not been acquired by this consumer. From my point of view, the method mentioned above is the best way to implement the record-limit mode in Kafka. Although this will introduce overhead on either the client or server side, I believe it represents a necessary trade-off between the current Kafka log segment structure (which relies on producer batches) and the need for strict record control. I have moved the rest of the strategies you mentioned into the 'Rejected Alternatives' section, and I’ve added two examples for reference. I will continue refining the KIP and do my best to ensure this change is merged into Kafka 4.2. Please take another look before starting the vote thread. Best, Jimmy Wang From: Andrew Schofield Date: Monday, October 6, 2025 at 23:30 To: [email protected] Subject: Re: Re: [DISCUSS] KIP-1206: Strict max fetch records in share fetch Hi Jimmy, Sorry for the long delay responding to the KIP. I think it's important to be careful with the refinements to KIP-932, so I'm taking my time to make sure whatever we do is good. There are good reasons why KIP-932 optimised for batch delivery, but I know from talking to early users of the share consumer, that it's not always ideal. I'm strongly in favour of providing a way of limiting the number of records regardless of the batching, for applications which need it. There is another improvement that I foresee here and that is for pre-fetching of records. For example, an application could ask for 500 records, but actually want to fetch 1000 so that it can overlap the request-response of acknowledging the first 500 records with the processing of the next 500 already fetched. However, getting pre-fetching to work in all situations while the acquisition locks are ticking down is more work. I don't want to introduce pre-fetching in this KIP. As a result, I would prefer that this KIP introduces just one new config, share.acquire.mode. I think we will use share.fetch.max.records or something like it for prefetching when it's time. I know this contradicts an comment of mine, but I've changed my mind :) I would also like to rename "strict" to "record limit" in the config. That is a better description of the effect. I hope this makes sense. Here are my specific comments. AS3: I would remove "strict" and "loose" from the description. Having "Strict" in the KIP title is fine, but the modes should be "batch-optimised" and "record-limit". AS4: I suggest removing `share.max.fetch.records` from this KIP. It moves us in the direction of having MaxRecords and BatchSize differing in the ShareFetch request, and that's intended for pre-fetching when we get around to it. AS5: I suggest changing "strict" to "record_limit" in the config for `share.acquire.mode`. Of course, this affects the `ShareAcquireMode` enum too. AS6: Since "BATCH_OPTIMIZED" is the default for the config, I would prefer this enum value to be the first in the ShareAcquireMode enum. AS7: The package and naming of the AcquireMode enum is inconsistent and incorrect. The internals package is not intended to be public. So, I think the enum is org.apache.kafka.clients.consumer.ShareAcquireMode. AS8: In the ShareFetchRequest, I would make the AcquireMode field have type int8. There's no need to make it string. The values should match the equivalent enum values. AS9: In the proposed changes, I suggest an example such as this. Let's say that the records produced onto a topic-partition each contain 3 records. Then the batches would have offsets 0-2 inclusive, 3-5 inclusive, 6-8 inclusive, and so on. Consider a share consumer with `max.poll.records=5` fetching records from the topic. For batch-optimised mode, it will receive two whole batches of records, from offsets 0 to 5 inclusive, which is 6 records in total. For record-limit mode, it will receive no more than 5 records. The broker could choose to: * Return just one entire batch 0-2 inclusive, which is 3 records in total. * Return two entire batches 0-5 inclusive, but only acquire records 0-4 inclusive. If a batch is "split" like this, the broker has returned record data which has not been acquired by this consumer. * Adjust the record batching of the records returned so that a single batch 0-4 inclusive is returned. * Or any other strategy which acquires no more than 5 records. I hope we can get this into Apache Kafka 4.2. I expect we will make it. Thanks, Andrew ________________________________ From: Wang Jimmy Sent: 10 September 2025 19:16 To: [email protected] Subject: RE: Re: [DISCUSS] KIP-1206: Strict max fetch records in share fetch Hi Andrew, Thanks for your comments and sorry for the delayed response. AS1: I think your intention is to control the fetched records solely by share.max.fetch.records rather than introducing a new acquireMode parameter. However, the concept of acquireMode is intended to achieve the following things: Disable batches (only one batch will be returned to the consumer in one fetch in strict mode) Make a distinction from the current broker behavior where the maximum number of records is set as a soft limit. As for the first point, it would be better if we wanted to extend the lock timeout on a ”record“ basis rather than on a ”record batches” basis. And as Mittal suggested, we assume that the client application cares more about the precise count of messages rather than the throughput, so I think it makes sense that batching is not allowed in this mode. What do you think? AS2: I agree with your idea and have changed the client configuration to share.max.fetch.records. Thanks for your advice. Best, Jimmy Wang On 2025/09/03 15:46:06 Andrew Schofield wrote: > Hi Jimmy, > Thank you for the KIP. I'm sure I'll have more comments yet as > I think through how it will work in practice, and also the work that > we are looking to do in the consumer as part of Kafka 4.2 around > flow control and memory usage. > > > The behaviour in KIP-932 is expecting that the consuming application > will be able to consume the fetched records in a timely fashion so > that it does not inadvertently breach the acquisition lock time-out. > It lets the application limit the amount of memory used for buffered > records and also limit the number of fetched records. The limit of > the number of records is applied as a soft limit, meaning that > complete record batches (as written to the log) will be acquired. > Providing a way to control the number of records more strictly > will be useful for some situations, at the expense of throughput. > > AS1: I suggest using `share.fetch.max.records` as the way to control > the maximum number of records. If not specified, you would get what > you get today, which is a soft limit based on `max.poll.records`. > If specified, the number of acquired records would not exceed this > number. The broker would return complete record batches to the > consumer application (to prevent decompression in the broker to > split batches), but the number of acquired records would not > exceed the limit specified. > > I suggest `share.fetch.max.records` with the "share." at the start. > KIP-1199 is looking to introduce a maximum number of records for > regular fetches. Because the behaviour would be quite different, > I think it's preferable to have a different configuration > property. > > > Thanks, > Andrew > ________________________________________ > From: Wang Jimmy > Sent: 31 August 2025 17:54 > To: [email protected] > Subject: Re: [DISCUSS] KIP-1206: Strict max fetch records in share fetch > > Hi Mittal, > Thanks for your thoughtful feedback on the KIP! > > AM1: > I agree with your point. I have updated the KIP to explain the pros and cons of the “strict” mode. > > AM2: > Surely. After implementing strict mode, share-consumer can leverage max.poll.records ( or fetch.max.records, as mentioned in AM4) to control the fetch rate of shareFetchManager. This prevents scenarios where one consumer fetches too many records while others suffer from starvation, thereby ensure balanced throughput among different consumers. > > AM3: > Thanks for pointing this out, I'll update the document. But I think this KIP won't change behavior of acquisition lock timeout or session timeout, which will stay the same as stated in KIP-932. > > AM4a: > I overlooked this point and I think you are right. In “strict” mode, the share fetch response will contain only one batch, with maximum records upper bounded by max(BatchSize, MaxRecords). > > AM4b: > From my point of view, it would be better to introduce a new max.fetch.records configuration since it has different meaning compared to max.poll.records. Regarding the pre-fetch behavior, regardless of the current implementation for implicit or explicit mode, all records should be acknowledged before sending the next fetch request. To achieve "pre-fetch”, my initial thought is that broker needs to allow the same member in share group to send multiple shareFetch requests, but with an upper bound on the total number of delivered records set to max.fetch.records. I am not quite sure, but I think I could also finish it in this KIP. What do you think? > > AM5: > Since “AcquireMode” is needed for both the share-consumer(as client configuration) and broker(determine the mode used), it should ideally placed in two separate class under core and client module. > > Best, > Jimmy Wang > > > 2025年8月27日 04:01,Apoorv Mittal 写道: > > Hi Jimmy, > Thanks for the KIP. Please find some comments below: > > AM1: The KIP mentions the current behaviour of soft limit but it would be > helpful to explain the reasoning as well in KIP. Else it seems like the > "strict" should always be the preferred fetch behaviour. However, that's > not true. The broker never reads the actual data records, rather sends back > the batch of records as produced. Hence, say in strict mode the MaxRecords > is set to 1 but the producer generates a single batch of 5 records on log > then only 1 record will be acquired but the whole batch of 5 records will > be sent to the client. This will have higher egress from the broker and > wasted memory on the client. The strict behaviour is helpful in some > scenarios but not always. > > AM2: When we say "Strict max fetch records enables clients to achieve > predictable > throughput", can you please help explain what is meant by it? An example > could help here. > > AM3: As mentioned in the KIP "In scenarios where record processing is > time-consuming" hence strict mode is advisable. The client connection shall > be disconnected post session timeout configuration. Hence it means that if > processing is taking longer than the session timeout then client sessions > will be dropped and held records will be released. Shall we propose to > handle the behaviour for such scenarios in the KIP as well? > > AM4: Currently, other than max and min bytes, there are 2 other parameters > in ShareFetch request 1) MaxRecords 2) BatchSize. Both of these share fetch > params currently use max.poll.records client configuration. Which means > that a single batch of records will be fetched as per max.poll.records > client configuration. But the MaxRecords and BatchSize were added because > of following reasons a) Have some predictable number of records returned > from broker as the records are backed by acquisition lock timeout, in case > client takes more time in processing higher number of records b) Generate > batches so client can "pre-fetch" record batches which can be > acknowledged individually (batch) rather waiting for all records to be > processed by client. Pre-fetch needs additional handling in client and > broker to renew the lock timeout for acquired-waiting record batches in > client, which currently does not exist. Questions: > > AM4-a: What would be the suggested behaviour with "strict" mode and > BatchSize i.e. shall always only a single batch be allowed to fetch in > "strict" mode? Or there could be any reason to fetch multiple batches even > in strict mode? I am assuming, and as KIP mentions as well, applications > will generally use strict mode when the processing time is higher on > clients for records, then does it make sense to allow multiple batches? > > AM4-b: As defined in the KIP-1199 > >, > there might be a separate config fetch.max.message.count (preferably > fetch.max.records) which will be used for MaxRecords. Hence, should we > introduce the fetch.max.records configuration in this KIP for ShareFetch > and think about how prefetching will work? Or if we want to leave this for > a separate KIP then do we want to define behaviour for MaxRecords in strict > mode i.e. should MaxRecords be same as max.poll.records and pre-fetching > should not be supported? > > AM5: AcquireMode is also used by clients so should the enum AcquireMode reside > in the server module or it should be in the clients module? > > Regards, > Apoorv Mittal > > > On Thu, Aug 21, 2025 at 6:55 PM Wang Jimmy wrote: > > Hello all, > I would like to start a discussion on KIP-1206: Strict max fetch records > in share fetch. > This KIP introduces the AcquireMode in ShareFetchRequest, which provides > two options: Strict or loose. When strict mode is selected, we should only > acquire records till maxFetchRecords. > > KIP: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1206:+Strict+max+fetch+records+in+share+fetch > > Thanks, > Jimmy Wang > >
