Hi, Lucas,

Thanks for the reply. It would be useful to summarize the benefits of a
separate batch.max.size. To me, it's not clear why a user would want two
different batch sizes. In your example, I can understand why a user would
want to form a batch with a 5ms linger. But why would a user prefer 16K
batches with 5ms linger, if say 256K is deemed best for throughput?

Thanks,

Jun

On Fri, Dec 10, 2021 at 4:35 PM Lucas Bradstreet <lu...@confluent.io.invalid>
wrote:

> Hi Jun,
>
> One difference compared to increasing the default batch size is that users
> may actually prefer smaller batches but it makes much less sense to
> accumulate many small batches if a batch is already sending.
>
> For example, imagine a user that prefer 16K batches with 5ms linger.
> Everything is functioning normally and 16KB batches are being sent. Then
> there's a 500ms blip for that broker. Do we want to continue to accumulate
> 16KB batches, each of which requires a round trip, or would we prefer to
> accumulate larger batches while sending is blocked?
>
> I'm not hugely against increasing the default batch.size in general, but
> batch.max.size does seem to have some nice properties.
>
> Thanks,
>
> Lucas
>
> On Fri, Dec 10, 2021 at 9:42 AM Jun Rao <j...@confluent.io.invalid> wrote:
>
> > Hi, Artem, Luke,
> >
> > Thanks for the reply.
> >
> > 11. If we get rid of batch.max.size and increase the default batch.size,
> > it's true the behavior is slightly different than before. However, does
> > that difference matter to most of our users? In your example, if a user
> > sets linger.ms to 100ms and thinks 256KB is good for throughput, does it
> > matter to deliver any batch smaller than 256KB before 100ms? I also find
> it
> > a bit hard to explain to our users these 3 different settings related to
> > batch size.
> >
> > Thanks,
> >
> > Jun
> >
> > On Thu, Dec 9, 2021 at 5:47 AM Luke Chen <show...@gmail.com> wrote:
> >
> > > Hi Jun,
> > >
> > > 11. In addition to Artem's comment, I think the reason to have
> additional
> > > "batch.max.size" is to have more flexibility to users.
> > > For example:
> > > With linger.ms=100ms, batch.size=16KB, now, we have 20KB of data
> coming
> > to
> > > a partition within 50ms. Now, sender is ready to pick up the batch to
> > send.
> > > In current design, we send 16KB data to broker, and keep the remaining
> > 4KB
> > > in the producer, to keep accumulating data.
> > > But after this KIP, user can send the whole 20KB of data together. That
> > is,
> > > user can decide if they want to accumulate more data before the sender
> is
> > > ready, and send them together, to have higher throughput. The
> > > "batch.size=16KB" in the proposal, is more like a soft limit, (and
> > > "batch.max.size" is like a hard limit), or it's like a switch to enable
> > the
> > > batch to become ready. Before sender is ready, we can still accumulate
> > more
> > > data, and wrap them together to send to broker.
> > >
> > > User can increase "batch.size" to 20KB to achieve the same goal in the
> > > current design, of course. But you can imagine, if the data within
> 100ms
> > is
> > > just 18KB, then the batch of data will wait for 100ms passed to be sent
> > > out. This "batch.max.size" config will allow more flexible for user
> > config.
> > >
> > > Does that make sense?
> > >
> > > Thank you.
> > > Luke
> > >
> > > On Thu, Dec 9, 2021 at 7:53 AM Artem Livshits
> > > <alivsh...@confluent.io.invalid> wrote:
> > >
> > > > Hi Jun,
> > > >
> > > > 11. That was my initial thinking as well, but in a discussion some
> > people
> > > > pointed out the change of behavior in some scenarios.  E.g. if
> someone
> > > for
> > > > some reason really wants batches to be at least 16KB and sets large
> > > > linger.ms, and most of the time the batches are filled quickly
> enough
> > > and
> > > > they observe a certain latency.  Then they upgrade their client with
> a
> > > > default size 256KB and the latency increases.  This could be seen as
> a
> > > > regression.  It could be fixed by just reducing linger.ms to specify
> > the
> > > > expected latency, but still could be seen as a disruption by some
> > users.
> > > > The other reason to have 2 sizes is to avoid allocating large buffers
> > > > upfront.
> > > >
> > > > -Artem
> > > >
> > > > On Wed, Dec 8, 2021 at 3:07 PM Jun Rao <j...@confluent.io.invalid>
> > wrote:
> > > >
> > > > > Hi, Artem,
> > > > >
> > > > > Thanks for the reply.
> > > > >
> > > > > 11. Got it. To me, batch.size is really used for throughput and not
> > for
> > > > > latency guarantees. There is no guarantee when 16KB will be
> > > accumulated.
> > > > > So, if users want any latency guarantee, they will need to specify
> > > > > linger.ms accordingly.
> > > > > Then, batch.size can just be used to tune for throughput.
> > > > >
> > > > > 20. Could we also describe the unit of compression? Is
> > > > > it batch.initial.size, batch.size or batch.max.size?
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Jun
> > > > >
> > > > > On Wed, Dec 8, 2021 at 9:58 AM Artem Livshits
> > > > > <alivsh...@confluent.io.invalid> wrote:
> > > > >
> > > > > > Hi Jun,
> > > > > >
> > > > > > 10. My understanding is that MemoryRecords would under the covers
> > be
> > > > > > allocated in chunks, so logically it still would be one
> > MemoryRecords
> > > > > > object, it's just instead of allocating one large chunk upfront,
> > > > smaller
> > > > > > chunks are allocated as needed to grow the batch and linked into
> a
> > > > list.
> > > > > >
> > > > > > 11. The reason for 2 sizes is to avoid change of behavior when
> > > > triggering
> > > > > > batch send with large linger.ms.  Currently, a batch send is
> > > triggered
> > > > > > once
> > > > > > the batch reaches 16KB by default, if we just raise the default
> to
> > > > 256KB,
> > > > > > then the batch send will be delayed.  Using a separate value
> would
> > > > allow
> > > > > > keeping the current behavior when sending the batch out, but
> > provide
> > > > > better
> > > > > > throughput with high latency + high bandwidth channels.
> > > > > >
> > > > > > -Artem
> > > > > >
> > > > > > On Tue, Dec 7, 2021 at 5:29 PM Jun Rao <j...@confluent.io.invalid
> >
> > > > wrote:
> > > > > >
> > > > > > > Hi, Luke,
> > > > > > >
> > > > > > > Thanks for the KIP.  A few comments below.
> > > > > > >
> > > > > > > 10. Accumulating small batches could improve memory usage. Will
> > > that
> > > > > > > introduce extra copying when generating a produce request?
> > > > Currently, a
> > > > > > > produce request takes a single MemoryRecords per partition.
> > > > > > > 11. Do we need to introduce a new config batch.max.size? Could
> we
> > > > just
> > > > > > > increase the default of batch.size? We probably need to have
> > > KIP-794
> > > > > > > <
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-794%3A+Strictly+Uniform+Sticky+Partitioner
> > > > > > > >
> > > > > > > resolved
> > > > > > > before increasing the default batch size since the larger the
> > batch
> > > > > size,
> > > > > > > the worse the problem in KIP-794.
> > > > > > > 12. As for max.request.size, currently it's used for both the
> max
> > > > > record
> > > > > > > size and the max request size, which is unintuitive. Perhaps we
> > > could
> > > > > > > introduce a new config max.record.size that defaults to 1MB. We
> > > could
> > > > > > then
> > > > > > > increase max.request.size to sth like 10MB.
> > > > > > >
> > > > > > > Thanks,
> > > > > > >
> > > > > > > Jun
> > > > > > >
> > > > > > >
> > > > > > > On Mon, Nov 29, 2021 at 6:02 PM Artem Livshits
> > > > > > > <alivsh...@confluent.io.invalid> wrote:
> > > > > > >
> > > > > > > > Hi Luke,
> > > > > > > >
> > > > > > > > I don't mind increasing the max.request.size to a higher
> > number,
> > > > e.g.
> > > > > > 2MB
> > > > > > > > could be good.  I think we should also run some benchmarks to
> > see
> > > > the
> > > > > > > > effects of different sizes.
> > > > > > > >
> > > > > > > > I agree that changing round robin to random solves an
> > independent
> > > > > > > existing
> > > > > > > > issue, however the logic in this KIP exacerbates the issue,
> so
> > > > there
> > > > > is
> > > > > > > > some dependency.
> > > > > > > >
> > > > > > > > -Artem
> > > > > > > >
> > > > > > > > On Wed, Nov 24, 2021 at 12:43 AM Luke Chen <
> show...@gmail.com>
> > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi Artem,
> > > > > > > > > Yes, I agree if we go with random selection instead of
> > > > round-robin
> > > > > > > > > selection, the latency issue will be more fair. That is, if
> > > there
> > > > > are
> > > > > > > 10
> > > > > > > > > partitions, the 10th partition will always be the last
> choice
> > > in
> > > > > each
> > > > > > > > round
> > > > > > > > > in current design, but with random selection, the chance to
> > be
> > > > > > selected
> > > > > > > > is
> > > > > > > > > more fair.
> > > > > > > > >
> > > > > > > > > However, I think that's kind of out of scope with this KIP.
> > > This
> > > > is
> > > > > > an
> > > > > > > > > existing issue, and it might need further discussion to
> > decide
> > > if
> > > > > > this
> > > > > > > > > change is necessary.
> > > > > > > > >
> > > > > > > > > I agree the default 32KB for "batch.max.size" might be not
> > huge
> > > > > > > > improvement
> > > > > > > > > compared with 256KB. I'm thinking, maybe default to "64KB"
> > for
> > > > > > > > > "batch.max.size", and make the documentation clear that if
> > the
> > > > > > > > > "batch.max.size"
> > > > > > > > > is increased, there might be chances that the "ready"
> > > partitions
> > > > > need
> > > > > > > to
> > > > > > > > > wait for next request to send to broker, because of the
> > > > > > > > "max.request.size"
> > > > > > > > > (default 1MB) limitation. "max.request.size" can also be
> > > > considered
> > > > > > to
> > > > > > > > > increase to avoid this issue. What do you think?
> > > > > > > > >
> > > > > > > > > Thank you.
> > > > > > > > > Luke
> > > > > > > > >
> > > > > > > > > On Wed, Nov 24, 2021 at 2:26 AM Artem Livshits
> > > > > > > > > <alivsh...@confluent.io.invalid> wrote:
> > > > > > > > >
> > > > > > > > > > >  maybe I can firstly decrease the "batch.max.size" to
> > 32KB
> > > > > > > > > >
> > > > > > > > > > I think 32KB is too small.  With 5 in-flight and 100ms
> > > latency
> > > > we
> > > > > > can
> > > > > > > > > > produce 1.6MB/s per partition.  With 256KB we can produce
> > > > > 12.8MB/s
> > > > > > > per
> > > > > > > > > > partition.  We should probably set up some testing and
> see
> > if
> > > > > 256KB
> > > > > > > has
> > > > > > > > > > problems.
> > > > > > > > > >
> > > > > > > > > > To illustrate latency dynamics, let's consider a
> simplified
> > > > > model:
> > > > > > 1
> > > > > > > > > > in-flight request per broker, produce latency 125ms,
> 256KB
> > > max
> > > > > > > request
> > > > > > > > > > size, 16 partitions assigned to the same broker, every
> > second
> > > > > 128KB
> > > > > > > is
> > > > > > > > > > produced to each partition (total production rate is
> > > 2MB/sec).
> > > > > > > > > >
> > > > > > > > > > If the batch size is 16KB, then the pattern would be the
> > > > > following:
> > > > > > > > > >
> > > > > > > > > > 0ms - produce 128KB into each partition
> > > > > > > > > > 0ms - take 16KB from each partition send (total 256KB)
> > > > > > > > > > 125ms - complete first 16KB from each partition, send
> next
> > > 16KB
> > > > > > > > > > 250ms - complete second 16KB, send next 16KB
> > > > > > > > > > ...
> > > > > > > > > > 1000ms - complete 8th 16KB from each partition
> > > > > > > > > >
> > > > > > > > > > from this model it's easy to see that there are 256KB
> that
> > > are
> > > > > sent
> > > > > > > > > > immediately, 256KB that are sent in 125ms, ... 256KB that
> > are
> > > > > sent
> > > > > > in
> > > > > > > > > > 875ms.
> > > > > > > > > >
> > > > > > > > > > If the batch size is 256KB, then the pattern would be the
> > > > > > following:
> > > > > > > > > >
> > > > > > > > > > 0ms - produce 128KB into each partition
> > > > > > > > > > 0ms - take 128KB each from first 2 partitions and send
> > (total
> > > > > > 256KB)
> > > > > > > > > > 125ms - complete 2 first partitions, send data from next
> 2
> > > > > > partitions
> > > > > > > > > > ...
> > > > > > > > > > 1000ms - complete last 2 partitions
> > > > > > > > > >
> > > > > > > > > > even though the pattern is different, there are still
> 256KB
> > > > that
> > > > > > are
> > > > > > > > sent
> > > > > > > > > > immediately, 256KB that are sent in 125ms, ... 256KB that
> > are
> > > > > sent
> > > > > > in
> > > > > > > > > > 875ms.
> > > > > > > > > >
> > > > > > > > > > Now, in this example if we do strictly round-robin
> (current
> > > > > > > > > implementation)
> > > > > > > > > > and we have this exact pattern (not sure how often such
> > > regular
> > > > > > > pattern
> > > > > > > > > > would happen in practice -- I would expect that it would
> > be a
> > > > bit
> > > > > > > more
> > > > > > > > > > random), some partitions would experience higher latency
> > than
> > > > > > others
> > > > > > > > (not
> > > > > > > > > > sure how much it would matter in practice -- in the end
> of
> > > the
> > > > > day
> > > > > > > some
> > > > > > > > > > bytes produced to a topic would have higher latency and
> > some
> > > > > bytes
> > > > > > > > would
> > > > > > > > > > have lower latency).  This pattern is easily fixed by
> > > choosing
> > > > > the
> > > > > > > next
> > > > > > > > > > partition randomly instead of using round-robin.
> > > > > > > > > >
> > > > > > > > > > -Artem
> > > > > > > > > >
> > > > > > > > > > On Tue, Nov 23, 2021 at 12:08 AM Luke Chen <
> > > show...@gmail.com>
> > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Hi Tom,
> > > > > > > > > > > Thanks for your comments. And thanks for Artem's
> > > explanation.
> > > > > > > > > > > Below is my response:
> > > > > > > > > > >
> > > > > > > > > > > > Currently because buffers are allocated using
> > batch.size
> > > it
> > > > > > means
> > > > > > > > we
> > > > > > > > > > can
> > > > > > > > > > > handle records that are that large (e.g. one big record
> > per
> > > > > > batch).
> > > > > > > > > > Doesn't
> > > > > > > > > > > the introduction of smaller buffer sizes
> > > (batch.initial.size)
> > > > > > mean
> > > > > > > a
> > > > > > > > > > > corresponding decrease in the maximum record size that
> > the
> > > > > > producer
> > > > > > > > can
> > > > > > > > > > > handle?
> > > > > > > > > > >
> > > > > > > > > > > Actually, the "batch.size" is only like a threshold to
> > > decide
> > > > > if
> > > > > > > the
> > > > > > > > > > batch
> > > > > > > > > > > is "ready to be sent". That is, even if you set the
> > > > > > > "batch.size=16KB"
> > > > > > > > > > > (default value), users can still send one record sized
> > with
> > > > > 20KB,
> > > > > > > as
> > > > > > > > > long
> > > > > > > > > > > as the size is less than "max.request.size" in producer
> > > > > (default
> > > > > > > > 1MB).
> > > > > > > > > > > Therefore, the introduction of "batch.initial.size"
> won't
> > > > > > decrease
> > > > > > > > the
> > > > > > > > > > > maximum record size that the producer can handle.
> > > > > > > > > > >
> > > > > > > > > > > > But isn't there the risk that drainBatchesForOneNode
> > > would
> > > > > end
> > > > > > up
> > > > > > > > not
> > > > > > > > > > > sending ready
> > > > > > > > > > > batches well past when they ought to be sent (according
> > to
> > > > > their
> > > > > > > > > > linger.ms
> > > > > > > > > > > ),
> > > > > > > > > > > because it's sending buffers for earlier partitions too
> > > > > > > aggressively?
> > > > > > > > > > >
> > > > > > > > > > > Did you mean that we have a "max.request.size" per
> > request
> > > > > > (default
> > > > > > > > is
> > > > > > > > > > > 1MB), and before this KIP, the request can include 64
> > > batches
> > > > > in
> > > > > > > > single
> > > > > > > > > > > request ["batch.size"(16KB) * 64 = 1MB], but now, we
> > might
> > > be
> > > > > > able
> > > > > > > to
> > > > > > > > > > > include 32 batches or less, because we aggressively
> sent
> > > more
> > > > > > > records
> > > > > > > > > in
> > > > > > > > > > > one batch, is that what you meant? That's a really good
> > > point
> > > > > > that
> > > > > > > > I've
> > > > > > > > > > > never thought about. I think your suggestion to go
> > through
> > > > > other
> > > > > > > > > > partitions
> > > > > > > > > > > that just fit "batch.size", or expire "linger.ms"
> first,
> > > > > before
> > > > > > > > > handling
> > > > > > > > > > > the one that is > "batch.size" limit is not a good way,
> > > > because
> > > > > > it
> > > > > > > > > might
> > > > > > > > > > > cause the one with size > "batch.size" always in the
> > lowest
> > > > > > > priority,
> > > > > > > > > and
> > > > > > > > > > > cause starving issue that the batch won't have chance
> to
> > > get
> > > > > > sent.
> > > > > > > > > > >
> > > > > > > > > > > I don't have better solution for it, but maybe I can
> > > firstly
> > > > > > > decrease
> > > > > > > > > the
> > > > > > > > > > > "batch.max.size" to 32KB, instead of aggressively 256KB
> > in
> > > > the
> > > > > > KIP.
> > > > > > > > > That
> > > > > > > > > > > should alleviate the problem. And still improve the
> > > > throughput.
> > > > > > > What
> > > > > > > > do
> > > > > > > > > > you
> > > > > > > > > > > think?
> > > > > > > > > > >
> > > > > > > > > > > Thank you.
> > > > > > > > > > > Luke
> > > > > > > > > > >
> > > > > > > > > > > On Tue, Nov 23, 2021 at 9:04 AM Artem Livshits
> > > > > > > > > > > <alivsh...@confluent.io.invalid> wrote:
> > > > > > > > > > >
> > > > > > > > > > > > > I think this KIP would change the behaviour of
> > > producers
> > > > > when
> > > > > > > > there
> > > > > > > > > > are
> > > > > > > > > > > > multiple partitions ready to be sent
> > > > > > > > > > > >
> > > > > > > > > > > > This is correct, the pattern changes and becomes more
> > > > > > > > coarse-grained.
> > > > > > > > > > > But
> > > > > > > > > > > > I don't think it changes fairness over the long
> run.  I
> > > > think
> > > > > > > it's
> > > > > > > > a
> > > > > > > > > > good
> > > > > > > > > > > > idea to change drainIndex to be random rather than
> > round
> > > > > robin
> > > > > > to
> > > > > > > > > avoid
> > > > > > > > > > > > forming patterns where some partitions would
> > consistently
> > > > get
> > > > > > > > higher
> > > > > > > > > > > > latencies than others because they wait longer for
> > their
> > > > > turn.
> > > > > > > > > > > >
> > > > > > > > > > > > If we really wanted to preserve the exact patterns,
> we
> > > > could
> > > > > > > either
> > > > > > > > > try
> > > > > > > > > > > to
> > > > > > > > > > > > support multiple 16KB batches from one partition per
> > > > request
> > > > > > > > > (probably
> > > > > > > > > > > > would require protocol change to change logic on the
> > > broker
> > > > > for
> > > > > > > > > > duplicate
> > > > > > > > > > > > detection) or try to re-batch 16KB batches from
> > > accumulator
> > > > > > into
> > > > > > > > > larger
> > > > > > > > > > > > batches during send (additional computations) or try
> to
> > > > > > consider
> > > > > > > > all
> > > > > > > > > > > > partitions assigned to a broker to check if a new
> batch
> > > > needs
> > > > > > to
> > > > > > > be
> > > > > > > > > > > created
> > > > > > > > > > > > (i.e. compare cumulative batch size from all
> partitions
> > > > > > assigned
> > > > > > > > to a
> > > > > > > > > > > > broker and create new batch when cumulative size is
> > 1MB,
> > > > more
> > > > > > > > > complex).
> > > > > > > > > > > >
> > > > > > > > > > > > Overall, it seems like just increasing the max batch
> > size
> > > > is
> > > > > a
> > > > > > > > > simpler
> > > > > > > > > > > > solution and it does favor larger batch sizes, which
> is
> > > > > > > beneficial
> > > > > > > > > not
> > > > > > > > > > > just
> > > > > > > > > > > > for production.
> > > > > > > > > > > >
> > > > > > > > > > > > > ready batches well past when they ought to be sent
> > > > > (according
> > > > > > > to
> > > > > > > > > > their
> > > > > > > > > > > > linger.ms)
> > > > > > > > > > > >
> > > > > > > > > > > > The trigger for marking batches ready to be sent
> isn't
> > > > > changed
> > > > > > -
> > > > > > > a
> > > > > > > > > > batch
> > > > > > > > > > > is
> > > > > > > > > > > > ready to be sent once it reaches 16KB, so by the time
> > > > larger
> > > > > > > > batches
> > > > > > > > > > > start
> > > > > > > > > > > > forming, linger.ms wouldn't matter much because the
> > > > batching
> > > > > > > goal
> > > > > > > > is
> > > > > > > > > > met
> > > > > > > > > > > > and the batch can be sent immediately.  Larger
> batches
> > > > start
> > > > > > > > forming
> > > > > > > > > > once
> > > > > > > > > > > > the client starts waiting for the server, in which
> case
> > > > some
> > > > > > data
> > > > > > > > > will
> > > > > > > > > > > wait
> > > > > > > > > > > > its turn to be sent.  This will happen for some data
> > > > > regardless
> > > > > > > of
> > > > > > > > > how
> > > > > > > > > > we
> > > > > > > > > > > > pick data to send, the question is just whether we'd
> > have
> > > > > some
> > > > > > > > > > scenarios
> > > > > > > > > > > > where some partitions would consistently experience
> > > higher
> > > > > > > latency
> > > > > > > > > than
> > > > > > > > > > > > others.  I think picking drainIndex randomly would
> > > prevent
> > > > > such
> > > > > > > > > > > scenarios.
> > > > > > > > > > > >
> > > > > > > > > > > > -Artem
> > > > > > > > > > > >
> > > > > > > > > > > > On Mon, Nov 22, 2021 at 2:28 AM Tom Bentley <
> > > > > > tbent...@redhat.com
> > > > > > > >
> > > > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > > Hi Luke,
> > > > > > > > > > > > >
> > > > > > > > > > > > > Thanks for the KIP!
> > > > > > > > > > > > >
> > > > > > > > > > > > > Currently because buffers are allocated using
> > > batch.size
> > > > it
> > > > > > > means
> > > > > > > > > we
> > > > > > > > > > > can
> > > > > > > > > > > > > handle records that are that large (e.g. one big
> > record
> > > > per
> > > > > > > > batch).
> > > > > > > > > > > > Doesn't
> > > > > > > > > > > > > the introduction of smaller buffer sizes
> > > > > (batch.initial.size)
> > > > > > > > mean
> > > > > > > > > a
> > > > > > > > > > > > > corresponding decrease in the maximum record size
> > that
> > > > the
> > > > > > > > producer
> > > > > > > > > > can
> > > > > > > > > > > > > handle? That might not be a problem if the user
> knows
> > > > their
> > > > > > > > maximum
> > > > > > > > > > > > record
> > > > > > > > > > > > > size and has tuned batch.initial.size accordingly,
> > but
> > > if
> > > > > the
> > > > > > > > > default
> > > > > > > > > > > for
> > > > > > > > > > > > > batch.initial.size < batch.size it could cause
> > > > regressions
> > > > > > for
> > > > > > > > > > existing
> > > > > > > > > > > > > users with a large record size, I think. It should
> be
> > > > > enough
> > > > > > > for
> > > > > > > > > > > > > batch.initial.size to default to batch.size,
> allowing
> > > > users
> > > > > > who
> > > > > > > > > care
> > > > > > > > > > > > about
> > > > > > > > > > > > > the memory saving in the off-peak throughput case
> to
> > do
> > > > the
> > > > > > > > tuning,
> > > > > > > > > > but
> > > > > > > > > > > > not
> > > > > > > > > > > > > causing a regression for existing users.
> > > > > > > > > > > > >
> > > > > > > > > > > > > I think this KIP would change the behaviour of
> > > producers
> > > > > when
> > > > > > > > there
> > > > > > > > > > are
> > > > > > > > > > > > > multiple partitions ready to be sent: By sending
> all
> > > the
> > > > > > ready
> > > > > > > > > > buffers
> > > > > > > > > > > > > (which may now be > batch.size) for the first
> > > partition,
> > > > we
> > > > > > > could
> > > > > > > > > end
> > > > > > > > > > > up
> > > > > > > > > > > > > excluding ready buffers for other partitions from
> the
> > > > > current
> > > > > > > > send.
> > > > > > > > > > In
> > > > > > > > > > > > > other words, as I understand the KIP currently,
> > > there's a
> > > > > > > change
> > > > > > > > in
> > > > > > > > > > > > > fairness. I think the code in
> > > > > > > > > > RecordAccumulator#drainBatchesForOneNode
> > > > > > > > > > > > will
> > > > > > > > > > > > > ensure fairness in the long run, because the
> > drainIndex
> > > > > will
> > > > > > > > ensure
> > > > > > > > > > > that
> > > > > > > > > > > > > those other partitions each get their turn at being
> > the
> > > > > > first.
> > > > > > > > But
> > > > > > > > > > > isn't
> > > > > > > > > > > > > there the risk that drainBatchesForOneNode would
> end
> > up
> > > > not
> > > > > > > > sending
> > > > > > > > > > > ready
> > > > > > > > > > > > > batches well past when they ought to be sent
> > (according
> > > > to
> > > > > > > their
> > > > > > > > > > > > linger.ms
> > > > > > > > > > > > > ),
> > > > > > > > > > > > > because it's sending buffers for earlier partitions
> > too
> > > > > > > > > aggressively?
> > > > > > > > > > > Or,
> > > > > > > > > > > > > to put it another way, perhaps the
> RecordAccumulator
> > > > should
> > > > > > > > > > round-robin
> > > > > > > > > > > > the
> > > > > > > > > > > > > ready buffers for _all_ the partitions before
> trying
> > to
> > > > > fill
> > > > > > > the
> > > > > > > > > > > > remaining
> > > > > > > > > > > > > space with the extra buffers (beyond the batch.size
> > > > limit)
> > > > > > for
> > > > > > > > the
> > > > > > > > > > > first
> > > > > > > > > > > > > partitions?
> > > > > > > > > > > > >
> > > > > > > > > > > > > Kind regards,
> > > > > > > > > > > > >
> > > > > > > > > > > > > Tom
> > > > > > > > > > > > >
> > > > > > > > > > > > > On Wed, Oct 20, 2021 at 1:35 PM Luke Chen <
> > > > > show...@gmail.com
> > > > > > >
> > > > > > > > > wrote:
> > > > > > > > > > > > >
> > > > > > > > > > > > > > Hi Ismael and all devs,
> > > > > > > > > > > > > > Is there any comments/suggestions to this KIP?
> > > > > > > > > > > > > > If no, I'm going to update the KIP based on my
> > > previous
> > > > > > mail,
> > > > > > > > and
> > > > > > > > > > > > start a
> > > > > > > > > > > > > > vote tomorrow or next week.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Thank you.
> > > > > > > > > > > > > > Luke
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > On Mon, Oct 18, 2021 at 2:40 PM Luke Chen <
> > > > > > show...@gmail.com
> > > > > > > >
> > > > > > > > > > wrote:
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Hi Ismael,
> > > > > > > > > > > > > > > Thanks for your comments.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > 1. Why do we have to reallocate the buffer? We
> > can
> > > > > keep a
> > > > > > > > list
> > > > > > > > > of
> > > > > > > > > > > > > buffers
> > > > > > > > > > > > > > > instead and avoid reallocation.
> > > > > > > > > > > > > > > -> Do you mean we allocate multiple buffers
> with
> > > > > > > > > > > > "buffer.initial.size",
> > > > > > > > > > > > > > > and link them together (with linked list)?
> > > > > > > > > > > > > > > ex:
> > > > > > > > > > > > > > > a. We allocate 4KB initial buffer
> > > > > > > > > > > > > > > | 4KB |
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > b. when new records reached and the remaining
> > > buffer
> > > > is
> > > > > > not
> > > > > > > > > > enough
> > > > > > > > > > > > for
> > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > records, we create another batch with
> > > > > > "batch.initial.size"
> > > > > > > > > buffer
> > > > > > > > > > > > > > > ex: we already have 3KB of data in the 1st
> > buffer,
> > > > and
> > > > > > here
> > > > > > > > > comes
> > > > > > > > > > > the
> > > > > > > > > > > > > 2KB
> > > > > > > > > > > > > > > record
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > | 4KB (1KB remaining) |
> > > > > > > > > > > > > > > now, record: 2KB coming
> > > > > > > > > > > > > > > We fill the 1st 1KB into 1st buffer, and create
> > new
> > > > > > buffer,
> > > > > > > > and
> > > > > > > > > > > > linked
> > > > > > > > > > > > > > > together, and fill the rest of data into it
> > > > > > > > > > > > > > > | 4KB (full) | ---> | 4KB (3KB remaining) |
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Is that what you mean?
> > > > > > > > > > > > > > > If so, I think I like this idea!
> > > > > > > > > > > > > > > If not, please explain more detail about it.
> > > > > > > > > > > > > > > Thank you.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > 2. I think we should also consider tweaking the
> > > > > semantics
> > > > > > > of
> > > > > > > > > > > > batch.size
> > > > > > > > > > > > > > so
> > > > > > > > > > > > > > > that the sent batches can be larger if the
> batch
> > is
> > > > not
> > > > > > > ready
> > > > > > > > > to
> > > > > > > > > > be
> > > > > > > > > > > > > sent
> > > > > > > > > > > > > > > (while still respecting max.request.size and
> > > perhaps
> > > > a
> > > > > > new
> > > > > > > > > > > > > > max.batch.size).
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > --> In the KIP, I was trying to make the
> > > "batch.size"
> > > > > as
> > > > > > > the
> > > > > > > > > > upper
> > > > > > > > > > > > > bound
> > > > > > > > > > > > > > > of the batch size, and introduce a
> > > > "batch.initial.size"
> > > > > > as
> > > > > > > > > > initial
> > > > > > > > > > > > > batch
> > > > > > > > > > > > > > > size.
> > > > > > > > > > > > > > > So are you saying that we can let "batch.size"
> as
> > > > > initial
> > > > > > > > batch
> > > > > > > > > > > size
> > > > > > > > > > > > > and
> > > > > > > > > > > > > > > introduce a "max.batch.size" as upper bound
> > value?
> > > > > > > > > > > > > > > That's a good suggestion, but that would change
> > the
> > > > > > > semantics
> > > > > > > > > of
> > > > > > > > > > > > > > > "batch.size", which might surprise some users.
> I
> > > > think
> > > > > my
> > > > > > > > > > original
> > > > > > > > > > > > > > proposal
> > > > > > > > > > > > > > > ("batch.initial.size") is safer for users. What
> > do
> > > > you
> > > > > > > think?
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Thank you.
> > > > > > > > > > > > > > > Luke
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > On Mon, Oct 18, 2021 at 3:12 AM Ismael Juma <
> > > > > > > > ism...@juma.me.uk
> > > > > > > > > >
> > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >> I think we should also consider tweaking the
> > > > semantics
> > > > > > of
> > > > > > > > > > > batch.size
> > > > > > > > > > > > > so
> > > > > > > > > > > > > > >> that the sent batches can be larger if the
> batch
> > > is
> > > > > not
> > > > > > > > ready
> > > > > > > > > to
> > > > > > > > > > > be
> > > > > > > > > > > > > sent
> > > > > > > > > > > > > > >> (while still respecting max.request.size and
> > > > perhaps a
> > > > > > new
> > > > > > > > > > > > > > >> max.batch.size).
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >> Ismael
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >> On Sun, Oct 17, 2021, 12:08 PM Ismael Juma <
> > > > > > > > ism...@juma.me.uk
> > > > > > > > > >
> > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >> > Hi Luke,
> > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > >> > Thanks for the KIP. Why do we have to
> > reallocate
> > > > the
> > > > > > > > buffer?
> > > > > > > > > > We
> > > > > > > > > > > > can
> > > > > > > > > > > > > > >> keep a
> > > > > > > > > > > > > > >> > list of buffers instead and avoid
> > reallocation.
> > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > >> > Ismael
> > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > >> > On Sun, Oct 17, 2021, 2:02 AM Luke Chen <
> > > > > > > > show...@gmail.com>
> > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > >> >> Hi Kafka dev,
> > > > > > > > > > > > > > >> >> I'd like to start the discussion for the
> > > > proposal:
> > > > > > > > KIP-782:
> > > > > > > > > > > > > > Expandable
> > > > > > > > > > > > > > >> >> batch size in producer.
> > > > > > > > > > > > > > >> >>
> > > > > > > > > > > > > > >> >> The main purpose for this KIP is to have
> > better
> > > > > > memory
> > > > > > > > > usage
> > > > > > > > > > in
> > > > > > > > > > > > > > >> producer,
> > > > > > > > > > > > > > >> >> and also save users from the dilemma while
> > > > setting
> > > > > > the
> > > > > > > > > batch
> > > > > > > > > > > size
> > > > > > > > > > > > > > >> >> configuration. After this KIP, users can
> set
> > a
> > > > > higher
> > > > > > > > > > > batch.size
> > > > > > > > > > > > > > >> without
> > > > > > > > > > > > > > >> >> worries, and of course, with an appropriate
> > > > > > > > > > > "batch.initial.size"
> > > > > > > > > > > > > and
> > > > > > > > > > > > > > >> >> "batch.reallocation.factor".
> > > > > > > > > > > > > > >> >>
> > > > > > > > > > > > > > >> >> Derailed description can be found here:
> > > > > > > > > > > > > > >> >>
> > > > > > > > > > > > > > >> >>
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-782%3A+Expandable+batch+size+in+producer
> > > > > > > > > > > > > > >> >>
> > > > > > > > > > > > > > >> >> Any comments and feedback are welcome.
> > > > > > > > > > > > > > >> >>
> > > > > > > > > > > > > > >> >> Thank you.
> > > > > > > > > > > > > > >> >> Luke
> > > > > > > > > > > > > > >> >>
> > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to