Hi Andrew,

Thanks for the review and suggestions. I have updated the KIP accordingly. Here 
is a summary of the changes:

1. Term change
max.idempotence.batches.to.retain -> producer.state.batches.to.retain
MaxIdempotenceBatchesToRetain -> ProducerStateBatchesToRetain

2. Topic-level configuration
The configuration has been changed from a broker-level config to a topic-level 
config with a server default.
The ProducerStateBatchesToRetain field in ProduceResponse is now placed inside 
TopicProduceResponse rather than as a top-level tagged field, since different 
partitions on the same broker can have different values.

3. Two-level in-flight check
To support per-topic deduplication window sizes, the producer now enforces two 
independent in-flight checks:
Per-partition check (new): the number of in-flight batches to a specific 
partition must not exceed that partition's discovered 
ProducerStateBatchesToRetain limit.
Per-connection check (existing): the total number of in-flight requests to a 
broker node must not exceed max.in.flight.requests.per.connection.

Kind regards,
PoAn

> On Mar 9, 2026, at 8:43 PM, Andrew Schofield <[email protected]> wrote:
> 
> Hi PoAn,
> Thanks for your response. I'm going to try again :)
> 
> AS2: ProducerStateEntry#NUM_BATCHES_TO_RETAIN is already a partition-level 
> concept. The new max.idempotence.batches.to.retain acts per-partition as far 
> as I understand. The fact that we have connection-level and partition-level 
> concepts colliding is weird to me.
> 
> If I created 20 partitions and set max.in.flight.requests.per.connection to 
> 100, and also changed the producer to limit its in-flight requests at the 
> partition level, couldn't I get the best of both worlds? I could use the 
> usual scaling mechanism of adding partitions to get higher throughput and I 
> could continue to use 5 requests per partition.
> 
> I don't mind adding a config to set the number of batches to retain, but I 
> think that's only half the problem.
> 
> Thanks,
> Andrew
> 
> On 2026/03/08 06:33:35 PoAn Yang wrote:
>> Hi Andrew,
>> 
>> Thanks for the review and sorry for the late reply. I took some time to 
>> think through how a partition-level approach could be implemented, what 
>> benefits it might bring, and to run additional experiments in low-latency 
>> environments.
>> 
>> AS1 & AS2: Since both comments converge on the idea of introducing 
>> partition-level configuration, I'll address them together.
>> 
>> 1. Partition-level configuration does not satisfy all use cases. While a 
>> partition-level in-flight limit offers finer-grained control per partition, 
>> it doesn't cover the case where a user wants to bound the total number of 
>> in-flight requests on a single broker connection. These are two distinct 
>> concerns: per-partition flow control vs. per-connection back-pressure. A 
>> partition-level configuration alone cannot replace the connection-level 
>> limit that max.in.flight.requests.per.connection currently provides.
>> 
>> 2. Having two levels of in-flight limits increases both user-facing and 
>> implementation complexity.
>> 
>> 3. A broker-level configuration benefits both high-latency and low-latency 
>> environments. I've added localhost benchmark results to the KIP. Even in a 
>> low-latency environment, setting max.in.flight.requests.per.connection=1 
>> causes each request to queue at the producer level, resulting in 
>> significantly higher latency compared to allowing more in-flight requests.
>> 
>> Thank you,
>> PoAn
>> 
>>> On Mar 2, 2026, at 7:11 AM, Andrew Schofield <[email protected]> wrote:
>>> 
>>> Hi PoAn,
>>> Thanks for your KIP. This is seems like a good area to improve, not just 
>>> for the high-latency connections between clients and brokers that you 
>>> mentioned, but also because diskless is introducing topics which have high 
>>> write latency too.
>>> 
>>> AS1: In general, I'm nervous of having to set broker configurations based 
>>> on knowledge of the client latency. If you have an asymmetrical 
>>> configuration with a mixture of high and low latency clients, you end up 
>>> having to configure for the worst case. I'd prefer the client to behave 
>>> differently in the event that it is experiencing high latency, and also to 
>>> be responsive to the difference in latency for specific topics which have 
>>> higher latency, rather than to change the broker configuration for all 
>>> clients. wdyt?
>>> 
>>> AS2: If I understand the code correctly (and that's not guaranteed), 
>>> ProducerStateEntry.NUM_BATCHES_TO_RETAIN (5) is the number of batches per 
>>> producer ID per topic-partition that the broker can retain. The Java 
>>> producer client uses max.in-flight.batches.per.connection (also 5) to limit 
>>> how many requests it is prepared to have in flight, but this is at the 
>>> level of the entire connection. Would an alternative change be to switch 
>>> the producer's limit from a connection-level limit to a partition-level 
>>> limit matching the broker implementation? You could get a lot of in-flight 
>>> requests by using more partitions. The key is the amount of data in flight, 
>>> not really the number of batches. I may have misunderstood how this area 
>>> works, but it doesn't seem optimal.
>>> 
>>> Thanks,
>>> Andrew
>>> 
>>> On 2026/02/28 12:21:10 PoAn Yang wrote:
>>>> Hi Luke,
>>>> 
>>>> Thanks for the review.
>>>> 
>>>> 2 & 4. I add more background to Broker Configuration and
>>>> Dynamic Capacity Discovery paragraphs. In the initial state,
>>>> the producer can only send at most min(5,
>>>> max.in.flight.requests.per.connection) requests, so it doesn’t
>>>> break old brokers capacity.
>>>> 
>>>> Thank you,
>>>> PoAn
>>>> 
>>>>> On Feb 27, 2026, at 4:27 PM, Luke Chen <[email protected]> wrote:
>>>>> 
>>>>> Hi PoAn,
>>>>> 
>>>>>> 1. KAFKA-18905 or KAFKA-9199 are about leader changes cause
>>>>> OUT_OF_ORDER_SEQUENCE error. This KIP is to remove
>>>>> NUM_BATCHES_TO_RETAIN limitation. I think they’re not related.
>>>>> 
>>>>> OK, I see.
>>>>> 
>>>>>> Yes, if max.in.flight.requests.per.connection is larger than
>>>>> NUM_BATCHES_TO_RETAIN, the batches cannot be retained.
>>>>> That is why we have initial state to make sure the producer sends
>>>>> in flight requests less or equal to NUM_BATCHES_TO_RETAIN.
>>>>> Only if it finds a broker can retain more batches, it adjusts its
>>>>> limitation.
>>>>> 
>>>>> So, currently, when idempotent/transactional producer is enabled, we will
>>>>> throw exception if the max.in.flight.requests.per.connection > 5.
>>>>> When we allow users to configure the NUM_BATCHES_TO_RETAIN, the validation
>>>>> will not be applied before sending the produce request.
>>>>> And that's why we need the produce response to tell the producer what the
>>>>> setting in the broker side is.
>>>>> Could you make it more clear about this in the KIP?
>>>>> 
>>>>> Also, if the max.in.flight.requests.per.connection is set to 100,
>>>>> and NUM_BATCHES_TO_RETAIN is 5, then it means it's a little late when the
>>>>> first producer response is received if we already allow producers to send
>>>>> 100 requests in flight. If we want to adopt this solution, maybe we need 
>>>>> to
>>>>> let the producer begins from max.in.flight.requests.per.connection = 1 and
>>>>> then adjust it to the expected value after the first producer response is
>>>>> received. Does that make sense?
>>>>> 
>>>>>> 4. We can adjust the default NUM_BATCHES_TO_RETAIN. However,
>>>>> if a broker works with old producers, it may waste memory. Old
>>>>> producers can't send more in flight requests cause of ConfigException.
>>>>> How about we still use 5 in 4.x and adjust to a larger value in 5.0?
>>>>> 
>>>>> Sounds good to me.
>>>>> 
>>>>> Thank you,
>>>>> Luke
>>>>> 
>>>>> 
>>>>> 
>>>>> On Thu, Feb 26, 2026 at 9:22 PM PoAn Yang <[email protected]> wrote:
>>>>> 
>>>>>> Hi Luke,
>>>>>> 
>>>>>> Thanks for the review and suggestions.
>>>>>> 
>>>>>> 1. KAFKA-18905 or KAFKA-9199 are about leader changes cause
>>>>>> OUT_OF_ORDER_SEQUENCE error. This KIP is to remove
>>>>>> NUM_BATCHES_TO_RETAIN limitation. I think they’re not related.
>>>>>> 
>>>>>> 2. Agree, transactional producers are based on idempotent producers.
>>>>>> Updated it.
>>>>>> 
>>>>>> 3.
>>>>>>> So, I'd like to know why we have to adjust the
>>>>>>> `max.in.flight.requests.per.connection` value in the producer side?
>>>>>> 
>>>>>> 
>>>>>> User doesn’t need to update max.in.flight.requests.per.connection in
>>>>>> this case. The producer will automatically adjust internal limitation of
>>>>>> in flight requests.
>>>>>> 
>>>>>>> Using the example above, after this KIP,
>>>>>>> the `max.in.flight.requests.per.connection=10` cannot be retained
>>>>>>> unless NUM_BATCHES_TO_RETAIN is set to 10, right?
>>>>>> 
>>>>>> 
>>>>>> Yes, if max.in.flight.requests.per.connection is larger than
>>>>>> NUM_BATCHES_TO_RETAIN, the batches cannot be retained.
>>>>>> That is why we have initial state to make sure the producer sends
>>>>>> in flight requests less or equal to NUM_BATCHES_TO_RETAIN.
>>>>>> Only if it finds a broker can retain more batches, it adjusts its
>>>>>> limitation.
>>>>>> 
>>>>>> 4. We can adjust the default NUM_BATCHES_TO_RETAIN. However,
>>>>>> if a broker works with old producers, it may waste memory. Old
>>>>>> producers can't send more in flight requests cause of ConfigException.
>>>>>> How about we still use 5 in 4.x and adjust to a larger value in 5.0?
>>>>>> 
>>>>>> Thank you,
>>>>>> PoAn
>>>>>> 
>>>>>>> On Feb 25, 2026, at 9:07 PM, Luke Chen <[email protected]> wrote:
>>>>>>> 
>>>>>>> Hi PoAn,
>>>>>>> 
>>>>>>> Thanks for the KIP!
>>>>>>> I agree the number of batches to retain should be configurable to 
>>>>>>> improve
>>>>>>> the throughput.
>>>>>>> 
>>>>>>> Comments:
>>>>>>> 1. Could you add the issue: KAFKA-18905
>>>>>>> <https://issues.apache.org/jira/browse/KAFKA-18905> into the
>>>>>>> motivation section? I think this is the issue we want to address, right?
>>>>>>> 
>>>>>>> 2. > Introduce a new config on the broker, as the broker must know how
>>>>>> much
>>>>>>> memory to allocate. Operators can set a limitation on the broker side to
>>>>>>> prevent malicious producers. This configuration only takes effect for
>>>>>>> idempotent producers.
>>>>>>> I think not only the idempotent producers, but also the
>>>>>>> transactional producers, as long as they have the PID.
>>>>>>> 
>>>>>>> 3. About the producer response update, I'm wondering if it is necessary?
>>>>>>> Currently, when producer with `max.in.flight.requests.per.connection=10`
>>>>>>> and NUM_BATCHES_TO_RETAIN=5, we won't adjust the producer config to 5.
>>>>>>> Of course it is possible to the duplication cannot be detected, but that
>>>>>>> might be user's choice to improve the throughput (though it might be
>>>>>> rare).
>>>>>>> So, I'd like to know why we have to adjust the
>>>>>>> `max.in.flight.requests.per.connection` value in the producer side?
>>>>>>> Using the example above, after this KIP,
>>>>>>> the `max.in.flight.requests.per.connection=10` cannot be retained
>>>>>>> unless NUM_BATCHES_TO_RETAIN is set to 10, right?
>>>>>>> 
>>>>>>> 4. The default value of `max.idempotence.batches.to.retain`
>>>>>>> In the performance test you showed, it obviously shows
>>>>>>> larger `max.idempotence.batches.to.retain` will get better throughput.
>>>>>>> Also, the memory usage is small, do we have any reason we keep the
>>>>>> default
>>>>>>> value for 5?
>>>>>>> 
>>>>>>> Thank you,
>>>>>>> Luke
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> On Sun, Feb 22, 2026 at 9:48 PM PoAn Yang <[email protected]> wrote:
>>>>>>> 
>>>>>>>> Hi all,
>>>>>>>> 
>>>>>>>> I would like to start a discussion thread on KIP-1269. In this KIP, we
>>>>>> aim
>>>>>>>> to remove limitation of maximal number of batches to retain for a
>>>>>>>> idempotent producer. In our test, it can improve throughput and reduce
>>>>>>>> latency.
>>>>>>>> 
>>>>>>>> https://cwiki.apache.org/confluence/x/loI8G
>>>>>>>> 
>>>>>>>> Please take a look and feel free to share any thoughts.
>>>>>>>> 
>>>>>>>> Thanks.
>>>>>>>> PoAn
>>>>>> 
>>>>>> 
>>>> 
>>>> 
>> 
>> 

Reply via email to