Hi, Lianet,

Thanks for the updated KIP. Just a minor comment. "but the key different"
should be "but the key difference".

Jun

On Fri, May 22, 2026 at 5:22 AM Lianet Magrans <[email protected]> wrote:

> Hi Jun, thanks for the feedback!  (sorry for the delay, Current/travelling)
>
> JR1: Agreed, I updated the example (aligned with the same mixed workload
> case mentioned at the beginning of the motivation)
>
> JR3: The "scale down" was referring to the reservation only (memory held by
> open batches, less during low-traffic vs high traffic period). Clarified in
> the KIP to make clear that it's not about pool memory scaling down, just
> about reservation in open batches (pool memory free for other partitions if
> needed).
>
> JR4: Yes, it was confusing indeed. The intention was just to refer to the
> producer thread marking the batch for closing (not the actual close). This
> will all be the same as today when the batch fills up, as you described
> (producer just "marks for close", sender does the actual close and frees
> memory up). I clarified it all in the KIP to be accurate.
>
> Thanks!
> Lianet
>
>
> On Fri, May 15, 2026 at 9:39 PM Jun Rao via dev <[email protected]>
> wrote:
>
> > Hi, Lianet,
> >
> > Thanks for the reply.
> >
> > JR1. "Memory usage: under the current static strategy, a producer writing
> > 10 MiB/s of aggregate throughput to a 1000-partition topic with
> > RoundRobinPartitioner struggles to achieve a meaningful fraction of that
> at
> > the default 16384 bytes "batch.size". Each partition only sends 16384
> bytes
> > at a time over a high-latency link, so per-partition throughput is
> bounded
> > by "16384 bytes / RTT". Increasing "batch.size" to 4 MiB unblocks
> > throughput but the producer would need 4 MiB × 1000 partitions = 4 GiB of
> > pool memory (regardless of actual volume of data flowing per partition).
> > Under the dynamic strategy and the same batch.size = 4 MiB, target
> > throughput of ~10 KiB/s and linger.ms = 100ms, per-partition memory
> > becomes
> > ≈ ~1 KiB (10 KiB throughput × 100 ms linger), so total memory ≈ 1000 × 1
> > KiB = ~1 MiB (orders of magnitude less than the 4 GiB used under the
> static
> > allocation). Similar savings apply to any workload where per-batch data
> > falls short of batch.size: hot-cold partition distributions (skewed key
> > traffic), bursty workloads with quiet periods, and over-provisioned
> > batch.size settings."
> > This example is still not very convincing. It's true that one can set
> > batch.size=4MB without running out of memory, but it doesn't achieve the
> > batching benefit. So, why will a user bother setting a high batch size?
> One
> > possible example is a client that publishes to a high volume topic
> without
> > keys, and to a low-volume topic with keys, using the default partitioning
> > strategy. When a high batch size is set, the static approach may exhaust
> > the buffer pool, whereas the dynamic approach avoids exhausting the pool
> > and still achieves the batching benefit for the high volume topic.
> >
> >
> > JR3. "Dynamic uses aggregate_throughput × linger.ms, which operators
> > control. During lower-traffic periods, static still reserves 400 MiB
> until
> > batches close; dynamic scales down proportionally."
> > Hmm, if the dynamic approach ever allocates 400MB worth of chunks, it
> never
> > deallocates them right? Then, how will dynamic scale down?
> >
> >
> > JR4. "If the non-blocking acquire fails (pool exhausted), the producer
> will
> > close the current batch (making it eligible to drain), and blocks on the
> > pool to allocate the chunks for the new record (up to max.block.ms)."
> > To be precise, currently, when the buffer pool is exhausted, the producer
> > doesn't close the batch directly. The background sender thread drains and
> > closes the batch.
> >
> > Jun
> >
> > On Thu, May 14, 2026 at 2:30 PM Lianet Magrans <[email protected]>
> wrote:
> >
> > > Hi Jun,
> > >
> > > JR1: The example's point was about the case where flow remains under
> the
> > > batch limit (those are the cases where we would get significant memory
> > > improvement/differences). But I do get your point and agree: in
> scenarios
> > > where the full batch is used, the dynamic strategy would end up using
> the
> > > same amount of memory. Still, in those cases the value comes from the
> > > predictability/tuning of the buffer.memory (memory consumption depends
> on
> > > known factors, not workload-dependant ones). I clarified the first
> > example,
> > > and added a second one to showcase the case where it's not about memory
> > > gains but about predictability.
> > >
> > > JR2: The main concern with keeping the same close-and-block as trunk in
> > > this case was the change it would bring into the send() blocking
> pattern.
> > > On trunk, send only blocks for memory for the first record of a batch,
> > but
> > > never mid-batch. Applying this close-and-block to the dynamic strategy
> > > would change this (send() could block on any record regardless of an
> open
> > > batch). I leaned initially toward avoiding changing the blocking
> > behaviour
> > > (and pay the extra direct allocation with visibility), but on second
> > > thoughts I agree it's cleaner to surface the situation to the API
> > (blocking
> > > on send, aligned with what trunk does on new batch only, and dynamic
> > would
> > > do at the record level). It's no change to the send or max.bloc.ms
> > > <
> >
> https://urldefense.com/v3/__http://max.bloc.ms__;!!Ayb5sqE7!tij1l2481b8WaW403I3sX_JjzjVmH3SFTImLH03m5t8v-95dzvTWsUQaSx4vXv1j93EM6pqXPd0mg4my$
> > >
> > > contract really, just a different pattern that seems sensible given the
> > > "on-demand" allocation. I updated the KIP with this, and left a
> rejected
> > > alternative for the record. Also, with this I opted for dropping the
> new
> > > metric I had (which was mainly to have visbility over this new
> > > direct-allocation path, now removed)
> > >
> > > Hi TengYao:
> > >
> > > TYC1: interesting point, agree that your suggested metric would give
> > > visibility on what's actually allocated from the pool (which is dynamic
> > > now, didn't make too much sense before because it was "static",
> > > ~batch.size). I believe that for some of the scenarios you shared, we
> > would
> > > be covered with the metrics that already exist in trunk (e.g.,
> > > bufferpool-wait-*, buffer-available/total-bytes, batch-size-avg),
> still,
> > > it's a fact that the new strategy allocates differently from the pool,
> > > dynamically, and only a metric like you suggest would let us see how
> that
> > > goes (batch-size-avg is the closest but is post-compression so not the
> > > same). I just wonder if it would make more sense to represent it in
> > bytes,
> > > rather than in chunks?? (e.g, "batch-pool-bytes-avg"). It would align
> > > better with existing metrics in this space, all in bytes. Also I expect
> > > operators probably think in bytes (not a new "chunk" concept, which is
> > just
> > > an internal implementation grouping bytes), and maybe better not to
> > expose
> > > chunk as a unit of measure to make sure the metric ages well even if
> the
> > > internal chunk details move). What do you think? Will wait to hear back
> > and
> > > align before updating the KIP
> > >
> > > Thanks both!
> > > Cheers,
> > > Lianet
> > >
> > >
> > > On Wed, May 13, 2026 at 4:40 PM Jun Rao via dev <[email protected]>
> > > wrote:
> > >
> > >> Hi, Lianet,
> > >>
> > >> Thanks for the reply.
> > >>
> > >> JR1. "As an example: a producer writing 10 MiB/s of aggregate
> throughput
> > >> to
> > >> a 1000-partition topic with RoundRobinPartitioner struggles to
> achieve a
> > >> meaningful fraction of that at the default 16384 bytes "batch.size".
> > Each
> > >> partition only sends 16384 bytes at a time over a high-latency link,
> so
> > >> per-partition throughput is bounded by "16384 bytes / RTT". Increasing
> > >> "batch.size" to 4 MiB unblocks throughput but the producer would need
> 4
> > >> MiB
> > >> × 1000 partitions = 4 GiB of pool memory to accommodate all partitions
> > >> simultaneously (regardless of actual volume of data flowing per
> > >> partition)."
> > >> This example does not seem strong. In this case, the producer still
> > >> requires 4GB of memory even with the proposed KIP to achieve high
> > >> throughput because all 1000 partitions are active.
> > >>
> > >> JR2. "When a new record arrives mid-batch and the pool is exhausted,
> it
> > >> will perform direct heap allocation to allocate all the chunks
> estimated
> > >> needed for the record uncompressed size."
> > >> Why do we need to introduce this new case for direct allocation? This
> > case
> > >> exists in the static allocation approach. If the buffer pool is
> > exhausted,
> > >> the send() call blocks but all pending batches become drainable to
> > prevent
> > >> deadlock. Is there any issue with using the same mechanism for dynamic
> > >> allocation?
> > >>
> > >> Jun
> > >>
> > >>
> > >> On Wed, May 13, 2026 at 8:53 AM Lianet Magrans <[email protected]>
> > >> wrote:
> > >>
> > >> > Hi Jun,
> > >> >
> > >> > JR1: Agreed, I updated the motivation section to clarify the
> different
> > >> > scenarios based on keys and partitioner, and under which situations
> it
> > >> > becomes problematic.
> > >> >
> > >> > JR2: The KIP preserves the 2 existing direct allocation triggers you
> > >> > mentioned (compressed data exceeding allocation and batch split),
> and
> > >> also
> > >> > introduces a new one (on new record mid-batch when pool exhausted,
> > >> > basically due to the per-record reservation approach). To mitigate,
> > >> direct
> > >> > allocation is limited to one record's worth of growth per batch
> (batch
> > >> > closed right after it), and we're also introducing the new metric to
> > >> have
> > >> > visblity and allow to tune buffer.memory. Under normal pool
> > conditions,
> > >> > direct allocations with the new strategy should happen less often
> than
> > >> with
> > >> > the current behaviour, mainly because of the proposed improvement to
> > try
> > >> > the pool first, non-blocking before falling back to heap
> allocation. I
> > >> > clarified it all in the Internal allocation strategy section
> > (extending
> > >> on
> > >> > new sections "Blocking behaviour" and "Direct heap allocation").
> > Please
> > >> > take a look and let me know.
> > >> >
> > >> > Thanks for the review!
> > >> > Lianet
> > >> >
> > >> > PS: addressing TengYao's feedback shortly, thanks!
> > >> >
> > >> > On Tue, May 12, 2026 at 11:31 AM TengYao Chi <[email protected]
> >
> > >> > wrote:
> > >> >
> > >> > > Hi Lianet,
> > >> > >
> > >> > > Thanks for this great KIP.
> > >> > >
> > >> > > TYC1. I have one consideration regarding observability: Do we
> need a
> > >> new
> > >> > > metric for average-chunks-per-batch? With the introduction of the
> > >> > > chunked-buffer strategy, memory usage per partition is no longer a
> > >> fixed
> > >> > > batch.size. While this significantly improves memory efficiency,
> it
> > >> might
> > >> > > be beneficial for operators to understand the actual "chunk
> > >> utilization"
> > >> > or
> > >> > > fragmentation under different workloads. Specifically, I think
> this
> > >> > metric
> > >> > > would be valuable when combined with the proposed
> > bufferpool-overflow
> > >> > > metrics: it would help operators distinguish whether memory
> pressure
> > >> is
> > >> > > being driven by a large number of active partitions (many small
> > >> batches)
> > >> > or
> > >> > > by individual batches becoming unexpectedly large (many chunks per
> > >> batch,
> > >> > > perhaps due to large records or low compression ratios). What do
> you
> > >> > think?
> > >> > >
> > >> > > Best,
> > >> > > TengYao Chi
> > >> > >
> > >> > > On 2026/05/11 23:03:53 Jun Rao via dev wrote:
> > >> > > > Hi, Lianet,
> > >> > > >
> > >> > > > Thanks for the KIP.
> > >> > > >
> > >> > > > JR1. It would be useful to provide a bit more motivation for the
> > >> KIP.
> > >> > The
> > >> > > > batches allocated from the buffer pool are proportional to the
> > >> number
> > >> > of
> > >> > > > active partitions. For publishing records without keys, the
> active
> > >> > > > partition is 1 by default, independent of the number of
> partitions
> > >> in a
> > >> > > > topic. It's only when publishing records with keys that the
> active
> > >> > > > partition can be the total number of partitions in a topic. So,
> a
> > >> > > possible
> > >> > > > scenario is that a client publishes records without keys to one
> > >> topic
> > >> > > while
> > >> > > > publishing records with keys to another.
> > >> > > >
> > >> > > > JR2. "Following records appended to the batch do not block or
> > throw.
> > >> > They
> > >> > > > attempt non-blocking pool allocation and fall back to direct
> heap
> > if
> > >> > the
> > >> > > > pool is exhausted.
> > >> > > > Ensures not blocking on pool memory while already holding some
> > for a
> > >> > > batch".
> > >> > > >
> > >> > > > Currently, the producer only allocates memory exceeding the
> > >> configured
> > >> > > > buffer pool size in two cases.
> > >> > > > (1) Compressed data exceeding the estimated size
> > >> > > > (2) When a batch is too large for the broker's max.message.bytes
> > and
> > >> > gets
> > >> > > > split, each sub-batch is allocated via
> > >> ByteBuffer.allocate(initialSize)
> > >> > > > directly.
> > >> > > >
> > >> > > > With the KIP, are we introducing new cases in addition to the
> > above
> > >> > two?
> > >> > > >
> > >> > > > Jun
> > >> > > >
> > >> > > >
> > >> > > >
> > >> > > > On Fri, May 1, 2026 at 6:03 AM Lianet Magrans <
> [email protected]
> > >
> > >> > > wrote:
> > >> > > >
> > >> > > > > Thanks for the feedback Jaisen! I like your proposed "static"
> > for
> > >> the
> > >> > > > > current behaviour, it aligns nicely. All updated.
> > >> > > > >
> > >> > > > > Best!
> > >> > > > > Lianet
> > >> > > > >
> > >> > > > > On Thu, Apr 30, 2026 at 4:27 PM Jaisen Mathai via dev <
> > >> > > > > [email protected]>
> > >> > > > > wrote:
> > >> > > > >
> > >> > > > > > Thanks Lianet.
> > >> > > > > >
> > >> > > > > > I like the proposal.
> > >> > > > > >
> > >> > > > > > I suggest a descriptive name such as static or fixed instead
> > of
> > >> > > legacy
> > >> > > > > for
> > >> > > > > > the default configuration value. I think these will age
> better
> > >> > while
> > >> > > > > still
> > >> > > > > > communicating that users should strongly consider using the
> > >> > > non-default
> > >> > > > > > value of dynamic.
> > >> > > > > >
> > >> > > > > > Jaisen
> > >> > > > > >
> > >> > > > > > On Thu, Apr 30, 2026 at 8:02 AM Lianet Magrans <
> > >> [email protected]
> > >> > >
> > >> > > > > wrote:
> > >> > > > > >
> > >> > > > > > > Hi all,
> > >> > > > > > >
> > >> > > > > > > I would like to start a discussion on KIP-1332 that
> > proposes a
> > >> > > dynamic
> > >> > > > > > > memory allocation strategy for the Kafka producer, to
> unlock
> > >> > > > > high-latency
> > >> > > > > > > scenarios increasingly common as Kafka moves toward object
> > >> > storage.
> > >> > > > > > >
> > >> > > > > > >
> > >> > > > > > >
> > >> > > > > >
> > >> > > > >
> > >> > >
> > >> >
> > >>
> >
> https://urldefense.com/v3/__https://cwiki.apache.org/confluence/display/KAFKA/KIP-1332*3A*Dynamic*memory*allocation*for*the*Kafka*producer__;JSsrKysrKys!!Ayb5sqE7!t4yI-C5BwMxJ6dMJC7tuQhu94KuolbgKXyEnl4GChJGLYY2eS4NXk-GZYlnVPnuw3ESrGwKjyPDr5Bjp0Gk$
> > >> > > > > > >
> > >> > > > > > > Thanks!
> > >> > > > > > > Lianet
> > >> > > > > > >
> > >> > > > > >
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> > >
> >
>

Reply via email to