Thanks Jimmy, makes total sense given that it's the acquisition the one that ends up being limited (this it was actually me getting mixed up). All good.
Lianet On Wed, Oct 15, 2025 at 12:27 PM Wang Jimmy <[email protected]> wrote: > > Hi Lianet, > > Thank you for raising this point. > > I think this might not necessarily be the case. fetch.max.bytes might still > play a role even in record_limit mode. fetch.max.bytes first controls how > much data is read from the log, and then max.poll.records controls how many > records are acquired by the share consumer. These two configurations could > potentially work in sequence, with whichever limit is reached first taking > precedence. > > Best, > > Jimmy Wang > > From: Lianet M. <[email protected]> > Date: Tuesday, October 14, 2025 at 19:33 > To: [email protected] <[email protected]> > Subject: Re: [DISCUSS] KIP-1206: Strict max fetch records in share fetch > > Thanks for the answers and updates Jimmy, just one last question: > > LM4: Under this new record_limit fetch mode, the fetch.max.bytes > client config would not be honoured, correct? (and it's always the > max.poll.records that approximately defines how much data we may > include in a single share fetch response). > > Thanks! > Lianet > > On Mon, Oct 13, 2025 at 4:20 AM Andrew Schofield > <[email protected]> wrote: > > > > Hi Jimmy, > > Thanks for the updates. > > > > AS15: I see that you have used the GroupProtocol class as your inspiration > > for the ShareAcquireMode. It is still the case that the documentation for > > the > > group.protocol config uses lower-case values. I would prefer that the > > documentation for share.acquire.mode also used lower-case “batch_optimized” > > and “record_limit”. You should apply case-insensitive matching in the code > > and the way that GroupProtocol does it is fine. In summary, apart from the > > documentation of the values for the share.acquire.mode which I think should > > be lower-case for consistency with other string-based enum configs, > > the description in the KIP is good. > > > > > > Apart from this small detail, it’s ready for voting from my point of view. > > > > Thanks, > > Andrew > > > > > On 10 Oct 2025, at 18:33, Wang Jimmy <[email protected]> wrote: > > > > > > Hi Andrew, > > > Thanks for your feedback! > > > > > > AS10: > > > You’re right. I have replaced the max.fetch.records with max.poll.records. > > > AS11: > > > Hmm, my intention was to illustrate the potential features that could be > > > enabled after implementing this KIP, such as better handling poison > > > records. However, I agree with your point and have removed that part. > > > AS12 - AS14: > > > Thank you for your suggestions. I have made the corresponding changes > > > based on your feedback. > > > AS15: > > > I understand what you mean, but I was referring to the GroupProtocol enum > > > class. There may be some differences in the implementation, but I think > > > both approaches are acceptable, so I have made the changes. > > > Additionally, I have also renamed all AcquireMode variables to > > > ShareAcquireMode for alignment (including the ShareFetchRequest schema). > > > > > > Thanks again for your review. Please feel free to take another look when > > > you have time. > > > > > > Best, > > > Jimmy Wang > > > > > > From: Andrew Schofield <[email protected]> > > > Date: Thursday, October 9, 2025 at 21:59 > > > To: [email protected] <[email protected]> > > > Subject: Re: Re: [DISCUSS] KIP-1206: Strict max fetch records in share > > > fetch > > > > > > Hi Jimmy, > > > Thanks for the updates. Just a few minor comments remain, but it's nearly > > > ready. > > > > > > AS10: There is no max.fetch.records parameter. I think you mean > > > max.poll.records. There might be share.max.fetch.records in the future > > > but not yet. > > > > > > AS11: I don't understand the point about poison records. The fix to > > > that issue will likely be for the broker independently to decide to > > > acquire only a subset of the records in a batch to ensure that any > > > bad records failing delivery do not impact the delivery count of > > > neighbouring records. This KIP is not required for that. > > > > > > AS12: On line 30 of the ShareFetchRequest schema, please put the > > > values for the AcquireMode in the about string such as > > > "The acquire mode to control the fetch behavior: 0 - batch-optimized, 1 - > > > record-limit" > > > This will end up in the protocol documentation automatically so > > > having the values makes the documentation more complete. > > > > > > AS13: You should not include the details of the SharePartitionManager > > > or ShareFetch classes in the KIP. The KIP is a specification of the > > > protocol and public programming interfaces only. > > > > > > AS14: Thanks for adding the examples. These are just illustrations > > > of some permitted behaviour, but the broker is only required to > > > keep within the limits specified in the ShareFetch request. As a result, > > > your rejected alternative of "Only one entire batch...." is actually > > > incorrect. The broker could do that and the KIP is not prescriptive about > > > the details of broker behaviour. So, please remove this rejected > > > alternative. > > > > > > AS15: The standard for string config values in Kafka is snake_case. > > > You are adding a new enum called ShareAcquireMode (in some places you've > > > called it AcquireMode). The values should be snake_case, such as > > > "batch_optimized", although the enum names would be capitalised like > > > BATCH_OPTIMIZED. Then the description of the values for the config > > > would also be snake_case. > > > > > > If you look at auto.offset.reset consumer config and the > > > related AutoOffsetResetStrategy class you'll see what I mean. > > > > > > > > > Thanks, > > > Andrew > > > > > > ________________________________________ > > > From: Wang Jimmy > > > Sent: 09 October 2025 12:25 > > > To: [email protected] > > > Subject: Re: Re: [DISCUSS] KIP-1206: Strict max fetch records in > > > share fetch > > > > > > Hi Andrew, > > > Thanks for reading and responding to the KIP! > > > > > > AS3 - AS8: > > > All these comments make sense to me, I have updated the KIP > > > according to your suggestions. > > > > > > AS9: > > > > Return two entire batches 0-5 inclusive, but only acquire records 0-4 > > > > inclusive. If a batch is "split" like this, the broker has returned > > > record > > > > data which has not been acquired by this consumer. > > > > > > From my point of view, the method mentioned above is the best way to > > > implement the record-limit mode in Kafka. Although this will introduce > > > overhead on either the client or server side, I believe it represents a > > > necessary trade-off between the current Kafka log segment structure > > > (which relies on producer batches) and the need for strict record > > > control. I have moved the rest of the strategies you mentioned into the > > > 'Rejected Alternatives' section, and I’ve added two examples for > > > reference. > > > > > > I will continue refining the KIP and do my best to ensure this change is > > > merged into Kafka 4.2. Please take another look before starting the vote > > > thread. > > > > > > Best, > > > Jimmy Wang > > > > > > From: Andrew Schofield > > > Date: Monday, October 6, 2025 at 23:30 > > > To: [email protected] > > > Subject: Re: Re: [DISCUSS] KIP-1206: Strict max fetch records in share > > > fetch > > > > > > Hi Jimmy, > > > Sorry for the long delay responding to the KIP. I think it's important to > > > be > > > careful with the refinements to KIP-932, so I'm taking my time to make > > > sure > > > whatever we do is good. > > > > > > There are good reasons why KIP-932 optimised for batch delivery, but I > > > know > > > from talking to early users of the share consumer, that it's not always > > > ideal. > > > I'm strongly in favour of providing a way of limiting the number of > > > records > > > regardless of the batching, for applications which need it. > > > > > > There is another improvement that I foresee here and that is for > > > pre-fetching > > > of records. For example, an application could ask for 500 records, but > > > actually > > > want to fetch 1000 so that it can overlap the request-response of > > > acknowledging > > > the first 500 records with the processing of the next 500 already fetched. > > > However, getting pre-fetching to work in all situations while the > > > acquisition > > > locks are ticking down is more work. I don't want to introduce > > > pre-fetching in > > > this KIP. > > > > > > As a result, I would prefer that this KIP introduces just one new config, > > > share.acquire.mode. I think we will use share.fetch.max.records or > > > something > > > like it for prefetching when it's time. I know this contradicts an comment > > > of mine, but I've changed my mind :) > > > > > > I would also like to rename "strict" to "record limit" in the config. That > > > is a better description of the effect. > > > > > > I hope this makes sense. > > > > > > Here are my specific comments. > > > > > > AS3: I would remove "strict" and "loose" from the description. Having > > > "Strict" in the KIP title is fine, but the modes should be > > > "batch-optimised" > > > and "record-limit". > > > > > > AS4: I suggest removing `share.max.fetch.records` from this KIP. It moves > > > us > > > in the direction of having MaxRecords and BatchSize differing in the > > > ShareFetch > > > request, and that's intended for pre-fetching when we get around to it. > > > > > > AS5: I suggest changing "strict" to "record_limit" in the config for > > > `share.acquire.mode`. Of course, this affects the `ShareAcquireMode` enum > > > too. > > > > > > AS6: Since "BATCH_OPTIMIZED" is the default for the config, I would prefer > > > this enum value to be the first in the ShareAcquireMode enum. > > > > > > AS7: The package and naming of the AcquireMode enum is inconsistent and > > > incorrect. The internals package is not intended to be public. So, I think > > > the enum is org.apache.kafka.clients.consumer.ShareAcquireMode. > > > > > > AS8: In the ShareFetchRequest, I would make the AcquireMode field have > > > type int8. There's no need to make it string. The values should match the > > > equivalent enum values. > > > > > > AS9: In the proposed changes, I suggest an example such as this. > > > > > > Let's say that the records produced onto a topic-partition each > > > contain 3 records. Then the batches would have offsets 0-2 inclusive, > > > 3-5 inclusive, 6-8 inclusive, and so on. > > > > > > Consider a share consumer with `max.poll.records=5` fetching records > > > from the topic. For batch-optimised mode, it will receive two whole > > > batches of records, from offsets 0 to 5 inclusive, which is 6 records > > > in total. > > > > > > For record-limit mode, it will receive no more than 5 records. The broker > > > could choose to: > > > > > > * Return just one entire batch 0-2 inclusive, which is 3 records in total. > > > * Return two entire batches 0-5 inclusive, but only acquire records 0-4 > > > inclusive. If a batch is "split" like this, the broker has returned record > > > data which has not been acquired by this consumer. > > > * Adjust the record batching of the records returned so that a single > > > batch 0-4 inclusive is returned. > > > * Or any other strategy which acquires no more than 5 records. > > > > > > > > > I hope we can get this into Apache Kafka 4.2. I expect we will make it. > > > > > > Thanks, > > > Andrew > > > ________________________________ > > > From: Wang Jimmy > > > Sent: 10 September 2025 19:16 > > > To: [email protected] > > > Subject: RE: Re: [DISCUSS] KIP-1206: Strict max fetch records in share > > > fetch > > > > > > Hi Andrew, > > > Thanks for your comments and sorry for the delayed response. > > > > > > AS1: > > > I think your intention is to control the fetched records solely by > > > share.max.fetch.records rather than introducing a new acquireMode > > > parameter. > > > However, the concept of acquireMode is intended to achieve the following > > > things: > > > Disable batches (only one batch will be returned to the consumer in one > > > fetch in strict mode) > > > Make a distinction from the current broker behavior where the maximum > > > number of records is set as a soft limit. > > > > > > As for the first point, it would be better if we wanted to extend the > > > lock timeout on a ”record“ basis rather than on a ”record batches” basis. > > > And as Mittal suggested, we assume that the client application cares more > > > about the precise count of messages rather than the throughput, so I > > > think it makes sense that batching is not allowed in this mode. What do > > > you think? > > > > > > AS2: I agree with your idea and have changed the client configuration to > > > share.max.fetch.records. Thanks for your advice. > > > > > > Best, > > > Jimmy Wang > > > > > > On 2025/09/03 15:46:06 Andrew Schofield wrote: > > > > Hi Jimmy, > > > > Thank you for the KIP. I'm sure I'll have more comments yet as > > > > I think through how it will work in practice, and also the work that > > > > we are looking to do in the consumer as part of Kafka 4.2 around > > > > flow control and memory usage. > > > > > > > > > > > > The behaviour in KIP-932 is expecting that the consuming application > > > > will be able to consume the fetched records in a timely fashion so > > > > that it does not inadvertently breach the acquisition lock time-out. > > > > It lets the application limit the amount of memory used for buffered > > > > records and also limit the number of fetched records. The limit of > > > > the number of records is applied as a soft limit, meaning that > > > > complete record batches (as written to the log) will be acquired. > > > > Providing a way to control the number of records more strictly > > > > will be useful for some situations, at the expense of throughput. > > > > > > > > AS1: I suggest using `share.fetch.max.records` as the way to control > > > > the maximum number of records. If not specified, you would get what > > > > you get today, which is a soft limit based on `max.poll.records`. > > > > If specified, the number of acquired records would not exceed this > > > > number. The broker would return complete record batches to the > > > > consumer application (to prevent decompression in the broker to > > > > split batches), but the number of acquired records would not > > > > exceed the limit specified. > > > > > > > > I suggest `share.fetch.max.records` with the "share." at the start. > > > > KIP-1199 is looking to introduce a maximum number of records for > > > > regular fetches. Because the behaviour would be quite different, > > > > I think it's preferable to have a different configuration > > > > property. > > > > > > > > > > > > Thanks, > > > > Andrew > > > > ________________________________________ > > > > From: Wang Jimmy > > > > Sent: 31 August 2025 17:54 > > > > To: [email protected] > > > > Subject: Re: [DISCUSS] KIP-1206: Strict max fetch records in share > > > fetch > > > > > > > > Hi Mittal, > > > > Thanks for your thoughtful feedback on the KIP! > > > > > > > > AM1: > > > > I agree with your point. I have updated the KIP to explain the pros > > > and cons of the “strict” mode. > > > > > > > > AM2: > > > > Surely. After implementing strict mode, share-consumer can leverage > > > max.poll.records ( or fetch.max.records, as mentioned in AM4) to control > > > the fetch rate of shareFetchManager. This prevents scenarios where one > > > consumer fetches too many records while others suffer from starvation, > > > thereby ensure balanced throughput among different consumers. > > > > > > > > AM3: > > > > Thanks for pointing this out, I'll update the document. But I think > > > this KIP won't change behavior of acquisition lock timeout or session > > > timeout, which will stay the same as stated in KIP-932. > > > > > > > > AM4a: > > > > I overlooked this point and I think you are right. In “strict” mode, > > > the share fetch response will contain only one batch, with maximum > > > records upper bounded by max(BatchSize, MaxRecords). > > > > > > > > AM4b: > > > > From my point of view, it would be better to introduce a new > > > max.fetch.records configuration since it has different meaning compared > > > to max.poll.records. Regarding the pre-fetch behavior, regardless of the > > > current implementation for implicit or explicit mode, all records should > > > be acknowledged before sending the next fetch request. To achieve > > > "pre-fetch”, my initial thought is that broker needs to allow the same > > > member in share group to send multiple shareFetch requests, but with an > > > upper bound on the total number of delivered records set to > > > max.fetch.records. I am not quite sure, but I think I could also finish > > > it in this KIP. What do you think? > > > > > > > > AM5: > > > > Since “AcquireMode” is needed for both the share-consumer(as client > > > configuration) and broker(determine the mode used), it should ideally > > > placed in two separate class under core and client module. > > > > > > > > Best, > > > > Jimmy Wang > > > > > > > > > > > > 2025年8月27日 04:01,Apoorv Mittal 写道: > > > > > > > > Hi Jimmy, > > > > Thanks for the KIP. Please find some comments below: > > > > > > > > AM1: The KIP mentions the current behaviour of soft limit but it > > > would be > > > > helpful to explain the reasoning as well in KIP. Else it seems like > > > the > > > > "strict" should always be the preferred fetch behaviour. However, > > > that's > > > > not true. The broker never reads the actual data records, rather > > > sends back > > > > the batch of records as produced. Hence, say in strict mode the > > > MaxRecords > > > > is set to 1 but the producer generates a single batch of 5 records > > > on log > > > > then only 1 record will be acquired but the whole batch of 5 records > > > will > > > > be sent to the client. This will have higher egress from the broker > > > and > > > > wasted memory on the client. The strict behaviour is helpful in some > > > > scenarios but not always. > > > > > > > > AM2: When we say "Strict max fetch records enables clients to achieve > > > > predictable > > > > throughput", can you please help explain what is meant by it? An > > > example > > > > could help here. > > > > > > > > AM3: As mentioned in the KIP "In scenarios where record processing is > > > > time-consuming" hence strict mode is advisable. The client > > > connection shall > > > > be disconnected post session timeout configuration. Hence it means > > > that if > > > > processing is taking longer than the session timeout then client > > > sessions > > > > will be dropped and held records will be released. Shall we propose > > > to > > > > handle the behaviour for such scenarios in the KIP as well? > > > > > > > > AM4: Currently, other than max and min bytes, there are 2 other > > > parameters > > > > in ShareFetch request 1) MaxRecords 2) BatchSize. Both of these > > > share fetch > > > > params currently use max.poll.records client configuration. Which > > > means > > > > that a single batch of records will be fetched as per > > > max.poll.records > > > > client configuration. But the MaxRecords and BatchSize were added > > > because > > > > of following reasons a) Have some predictable number of records > > > returned > > > > from broker as the records are backed by acquisition lock timeout, > > > in case > > > > client takes more time in processing higher number of records b) > > > Generate > > > > batches so client can "pre-fetch" record batches which can be > > > > acknowledged individually (batch) rather waiting for all records to > > > be > > > > processed by client. Pre-fetch needs additional handling in client > > > and > > > > broker to renew the lock timeout for acquired-waiting record batches > > > in > > > > client, which currently does not exist. Questions: > > > > > > > > AM4-a: What would be the suggested behaviour with "strict" mode and > > > > BatchSize i.e. shall always only a single batch be allowed to fetch > > > in > > > > "strict" mode? Or there could be any reason to fetch multiple > > > batches even > > > > in strict mode? I am assuming, and as KIP mentions as well, > > > applications > > > > will generally use strict mode when the processing time is higher on > > > > clients for records, then does it make sense to allow multiple > > > batches? > > > > > > > > AM4-b: As defined in the KIP-1199 > > > > >, > > > > there might be a separate config fetch.max.message.count (preferably > > > > fetch.max.records) which will be used for MaxRecords. Hence, should > > > we > > > > introduce the fetch.max.records configuration in this KIP for > > > ShareFetch > > > > and think about how prefetching will work? Or if we want to leave > > > this for > > > > a separate KIP then do we want to define behaviour for MaxRecords in > > > strict > > > > mode i.e. should MaxRecords be same as max.poll.records and > > > pre-fetching > > > > should not be supported? > > > > > > > > AM5: AcquireMode is also used by clients so should the enum > > > AcquireMode reside > > > > in the server module or it should be in the clients module? > > > > > > > > Regards, > > > > Apoorv Mittal > > > > > > > > > > > > On Thu, Aug 21, 2025 at 6:55 PM Wang Jimmy wrote: > > > > > > > > Hello all, > > > > I would like to start a discussion on KIP-1206: Strict max fetch > > > records > > > > in share fetch. > > > > This KIP introduces the AcquireMode in ShareFetchRequest, which > > > provides > > > > two options: Strict or loose. When strict mode is selected, we > > > should only > > > > acquire records till maxFetchRecords. > > > > > > > > KIP: > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1206:+Strict+max+fetch+records+in+share+fetch > > > > > > > > Thanks, > > > > Jimmy Wang > > > > > > > > > >
