Hi, Luke,

Thanks for the KIP.  A few comments below.

10. Accumulating small batches could improve memory usage. Will that
introduce extra copying when generating a produce request? Currently, a
produce request takes a single MemoryRecords per partition.
11. Do we need to introduce a new config batch.max.size? Could we just
increase the default of batch.size? We probably need to have KIP-794
<https://cwiki.apache.org/confluence/display/KAFKA/KIP-794%3A+Strictly+Uniform+Sticky+Partitioner>
resolved
before increasing the default batch size since the larger the batch size,
the worse the problem in KIP-794.
12. As for max.request.size, currently it's used for both the max record
size and the max request size, which is unintuitive. Perhaps we could
introduce a new config max.record.size that defaults to 1MB. We could then
increase max.request.size to sth like 10MB.

Thanks,

Jun


On Mon, Nov 29, 2021 at 6:02 PM Artem Livshits
<alivsh...@confluent.io.invalid> wrote:

> Hi Luke,
>
> I don't mind increasing the max.request.size to a higher number, e.g. 2MB
> could be good.  I think we should also run some benchmarks to see the
> effects of different sizes.
>
> I agree that changing round robin to random solves an independent existing
> issue, however the logic in this KIP exacerbates the issue, so there is
> some dependency.
>
> -Artem
>
> On Wed, Nov 24, 2021 at 12:43 AM Luke Chen <show...@gmail.com> wrote:
>
> > Hi Artem,
> > Yes, I agree if we go with random selection instead of round-robin
> > selection, the latency issue will be more fair. That is, if there are 10
> > partitions, the 10th partition will always be the last choice in each
> round
> > in current design, but with random selection, the chance to be selected
> is
> > more fair.
> >
> > However, I think that's kind of out of scope with this KIP. This is an
> > existing issue, and it might need further discussion to decide if this
> > change is necessary.
> >
> > I agree the default 32KB for "batch.max.size" might be not huge
> improvement
> > compared with 256KB. I'm thinking, maybe default to "64KB" for
> > "batch.max.size", and make the documentation clear that if the
> > "batch.max.size"
> > is increased, there might be chances that the "ready" partitions need to
> > wait for next request to send to broker, because of the
> "max.request.size"
> > (default 1MB) limitation. "max.request.size" can also be considered to
> > increase to avoid this issue. What do you think?
> >
> > Thank you.
> > Luke
> >
> > On Wed, Nov 24, 2021 at 2:26 AM Artem Livshits
> > <alivsh...@confluent.io.invalid> wrote:
> >
> > > >  maybe I can firstly decrease the "batch.max.size" to 32KB
> > >
> > > I think 32KB is too small.  With 5 in-flight and 100ms latency we can
> > > produce 1.6MB/s per partition.  With 256KB we can produce 12.8MB/s per
> > > partition.  We should probably set up some testing and see if 256KB has
> > > problems.
> > >
> > > To illustrate latency dynamics, let's consider a simplified model: 1
> > > in-flight request per broker, produce latency 125ms, 256KB max request
> > > size, 16 partitions assigned to the same broker, every second 128KB is
> > > produced to each partition (total production rate is 2MB/sec).
> > >
> > > If the batch size is 16KB, then the pattern would be the following:
> > >
> > > 0ms - produce 128KB into each partition
> > > 0ms - take 16KB from each partition send (total 256KB)
> > > 125ms - complete first 16KB from each partition, send next 16KB
> > > 250ms - complete second 16KB, send next 16KB
> > > ...
> > > 1000ms - complete 8th 16KB from each partition
> > >
> > > from this model it's easy to see that there are 256KB that are sent
> > > immediately, 256KB that are sent in 125ms, ... 256KB that are sent in
> > > 875ms.
> > >
> > > If the batch size is 256KB, then the pattern would be the following:
> > >
> > > 0ms - produce 128KB into each partition
> > > 0ms - take 128KB each from first 2 partitions and send (total 256KB)
> > > 125ms - complete 2 first partitions, send data from next 2 partitions
> > > ...
> > > 1000ms - complete last 2 partitions
> > >
> > > even though the pattern is different, there are still 256KB that are
> sent
> > > immediately, 256KB that are sent in 125ms, ... 256KB that are sent in
> > > 875ms.
> > >
> > > Now, in this example if we do strictly round-robin (current
> > implementation)
> > > and we have this exact pattern (not sure how often such regular pattern
> > > would happen in practice -- I would expect that it would be a bit more
> > > random), some partitions would experience higher latency than others
> (not
> > > sure how much it would matter in practice -- in the end of the day some
> > > bytes produced to a topic would have higher latency and some bytes
> would
> > > have lower latency).  This pattern is easily fixed by choosing the next
> > > partition randomly instead of using round-robin.
> > >
> > > -Artem
> > >
> > > On Tue, Nov 23, 2021 at 12:08 AM Luke Chen <show...@gmail.com> wrote:
> > >
> > > > Hi Tom,
> > > > Thanks for your comments. And thanks for Artem's explanation.
> > > > Below is my response:
> > > >
> > > > > Currently because buffers are allocated using batch.size it means
> we
> > > can
> > > > handle records that are that large (e.g. one big record per batch).
> > > Doesn't
> > > > the introduction of smaller buffer sizes (batch.initial.size) mean a
> > > > corresponding decrease in the maximum record size that the producer
> can
> > > > handle?
> > > >
> > > > Actually, the "batch.size" is only like a threshold to decide if the
> > > batch
> > > > is "ready to be sent". That is, even if you set the "batch.size=16KB"
> > > > (default value), users can still send one record sized with 20KB, as
> > long
> > > > as the size is less than "max.request.size" in producer (default
> 1MB).
> > > > Therefore, the introduction of "batch.initial.size" won't decrease
> the
> > > > maximum record size that the producer can handle.
> > > >
> > > > > But isn't there the risk that drainBatchesForOneNode would end up
> not
> > > > sending ready
> > > > batches well past when they ought to be sent (according to their
> > > linger.ms
> > > > ),
> > > > because it's sending buffers for earlier partitions too aggressively?
> > > >
> > > > Did you mean that we have a "max.request.size" per request (default
> is
> > > > 1MB), and before this KIP, the request can include 64 batches in
> single
> > > > request ["batch.size"(16KB) * 64 = 1MB], but now, we might be able to
> > > > include 32 batches or less, because we aggressively sent more records
> > in
> > > > one batch, is that what you meant? That's a really good point that
> I've
> > > > never thought about. I think your suggestion to go through other
> > > partitions
> > > > that just fit "batch.size", or expire "linger.ms" first, before
> > handling
> > > > the one that is > "batch.size" limit is not a good way, because it
> > might
> > > > cause the one with size > "batch.size" always in the lowest priority,
> > and
> > > > cause starving issue that the batch won't have chance to get sent.
> > > >
> > > > I don't have better solution for it, but maybe I can firstly decrease
> > the
> > > > "batch.max.size" to 32KB, instead of aggressively 256KB in the KIP.
> > That
> > > > should alleviate the problem. And still improve the throughput. What
> do
> > > you
> > > > think?
> > > >
> > > > Thank you.
> > > > Luke
> > > >
> > > > On Tue, Nov 23, 2021 at 9:04 AM Artem Livshits
> > > > <alivsh...@confluent.io.invalid> wrote:
> > > >
> > > > > > I think this KIP would change the behaviour of producers when
> there
> > > are
> > > > > multiple partitions ready to be sent
> > > > >
> > > > > This is correct, the pattern changes and becomes more
> coarse-grained.
> > > > But
> > > > > I don't think it changes fairness over the long run.  I think it's
> a
> > > good
> > > > > idea to change drainIndex to be random rather than round robin to
> > avoid
> > > > > forming patterns where some partitions would consistently get
> higher
> > > > > latencies than others because they wait longer for their turn.
> > > > >
> > > > > If we really wanted to preserve the exact patterns, we could either
> > try
> > > > to
> > > > > support multiple 16KB batches from one partition per request
> > (probably
> > > > > would require protocol change to change logic on the broker for
> > > duplicate
> > > > > detection) or try to re-batch 16KB batches from accumulator into
> > larger
> > > > > batches during send (additional computations) or try to consider
> all
> > > > > partitions assigned to a broker to check if a new batch needs to be
> > > > created
> > > > > (i.e. compare cumulative batch size from all partitions assigned
> to a
> > > > > broker and create new batch when cumulative size is 1MB, more
> > complex).
> > > > >
> > > > > Overall, it seems like just increasing the max batch size is a
> > simpler
> > > > > solution and it does favor larger batch sizes, which is beneficial
> > not
> > > > just
> > > > > for production.
> > > > >
> > > > > > ready batches well past when they ought to be sent (according to
> > > their
> > > > > linger.ms)
> > > > >
> > > > > The trigger for marking batches ready to be sent isn't changed - a
> > > batch
> > > > is
> > > > > ready to be sent once it reaches 16KB, so by the time larger
> batches
> > > > start
> > > > > forming, linger.ms wouldn't matter much because the batching goal
> is
> > > met
> > > > > and the batch can be sent immediately.  Larger batches start
> forming
> > > once
> > > > > the client starts waiting for the server, in which case some data
> > will
> > > > wait
> > > > > its turn to be sent.  This will happen for some data regardless of
> > how
> > > we
> > > > > pick data to send, the question is just whether we'd have some
> > > scenarios
> > > > > where some partitions would consistently experience higher latency
> > than
> > > > > others.  I think picking drainIndex randomly would prevent such
> > > > scenarios.
> > > > >
> > > > > -Artem
> > > > >
> > > > > On Mon, Nov 22, 2021 at 2:28 AM Tom Bentley <tbent...@redhat.com>
> > > wrote:
> > > > >
> > > > > > Hi Luke,
> > > > > >
> > > > > > Thanks for the KIP!
> > > > > >
> > > > > > Currently because buffers are allocated using batch.size it means
> > we
> > > > can
> > > > > > handle records that are that large (e.g. one big record per
> batch).
> > > > > Doesn't
> > > > > > the introduction of smaller buffer sizes (batch.initial.size)
> mean
> > a
> > > > > > corresponding decrease in the maximum record size that the
> producer
> > > can
> > > > > > handle? That might not be a problem if the user knows their
> maximum
> > > > > record
> > > > > > size and has tuned batch.initial.size accordingly, but if the
> > default
> > > > for
> > > > > > batch.initial.size < batch.size it could cause regressions for
> > > existing
> > > > > > users with a large record size, I think. It should be enough for
> > > > > > batch.initial.size to default to batch.size, allowing users who
> > care
> > > > > about
> > > > > > the memory saving in the off-peak throughput case to do the
> tuning,
> > > but
> > > > > not
> > > > > > causing a regression for existing users.
> > > > > >
> > > > > > I think this KIP would change the behaviour of producers when
> there
> > > are
> > > > > > multiple partitions ready to be sent: By sending all the ready
> > > buffers
> > > > > > (which may now be > batch.size) for the first partition, we could
> > end
> > > > up
> > > > > > excluding ready buffers for other partitions from the current
> send.
> > > In
> > > > > > other words, as I understand the KIP currently, there's a change
> in
> > > > > > fairness. I think the code in
> > > RecordAccumulator#drainBatchesForOneNode
> > > > > will
> > > > > > ensure fairness in the long run, because the drainIndex will
> ensure
> > > > that
> > > > > > those other partitions each get their turn at being the first.
> But
> > > > isn't
> > > > > > there the risk that drainBatchesForOneNode would end up not
> sending
> > > > ready
> > > > > > batches well past when they ought to be sent (according to their
> > > > > linger.ms
> > > > > > ),
> > > > > > because it's sending buffers for earlier partitions too
> > aggressively?
> > > > Or,
> > > > > > to put it another way, perhaps the RecordAccumulator should
> > > round-robin
> > > > > the
> > > > > > ready buffers for _all_ the partitions before trying to fill the
> > > > > remaining
> > > > > > space with the extra buffers (beyond the batch.size limit) for
> the
> > > > first
> > > > > > partitions?
> > > > > >
> > > > > > Kind regards,
> > > > > >
> > > > > > Tom
> > > > > >
> > > > > > On Wed, Oct 20, 2021 at 1:35 PM Luke Chen <show...@gmail.com>
> > wrote:
> > > > > >
> > > > > > > Hi Ismael and all devs,
> > > > > > > Is there any comments/suggestions to this KIP?
> > > > > > > If no, I'm going to update the KIP based on my previous mail,
> and
> > > > > start a
> > > > > > > vote tomorrow or next week.
> > > > > > >
> > > > > > > Thank you.
> > > > > > > Luke
> > > > > > >
> > > > > > > On Mon, Oct 18, 2021 at 2:40 PM Luke Chen <show...@gmail.com>
> > > wrote:
> > > > > > >
> > > > > > > > Hi Ismael,
> > > > > > > > Thanks for your comments.
> > > > > > > >
> > > > > > > > 1. Why do we have to reallocate the buffer? We can keep a
> list
> > of
> > > > > > buffers
> > > > > > > > instead and avoid reallocation.
> > > > > > > > -> Do you mean we allocate multiple buffers with
> > > > > "buffer.initial.size",
> > > > > > > > and link them together (with linked list)?
> > > > > > > > ex:
> > > > > > > > a. We allocate 4KB initial buffer
> > > > > > > > | 4KB |
> > > > > > > >
> > > > > > > > b. when new records reached and the remaining buffer is not
> > > enough
> > > > > for
> > > > > > > the
> > > > > > > > records, we create another batch with "batch.initial.size"
> > buffer
> > > > > > > > ex: we already have 3KB of data in the 1st buffer, and here
> > comes
> > > > the
> > > > > > 2KB
> > > > > > > > record
> > > > > > > >
> > > > > > > > | 4KB (1KB remaining) |
> > > > > > > > now, record: 2KB coming
> > > > > > > > We fill the 1st 1KB into 1st buffer, and create new buffer,
> and
> > > > > linked
> > > > > > > > together, and fill the rest of data into it
> > > > > > > > | 4KB (full) | ---> | 4KB (3KB remaining) |
> > > > > > > >
> > > > > > > > Is that what you mean?
> > > > > > > > If so, I think I like this idea!
> > > > > > > > If not, please explain more detail about it.
> > > > > > > > Thank you.
> > > > > > > >
> > > > > > > > 2. I think we should also consider tweaking the semantics of
> > > > > batch.size
> > > > > > > so
> > > > > > > > that the sent batches can be larger if the batch is not ready
> > to
> > > be
> > > > > > sent
> > > > > > > > (while still respecting max.request.size and perhaps a new
> > > > > > > max.batch.size).
> > > > > > > >
> > > > > > > > --> In the KIP, I was trying to make the "batch.size" as the
> > > upper
> > > > > > bound
> > > > > > > > of the batch size, and introduce a "batch.initial.size" as
> > > initial
> > > > > > batch
> > > > > > > > size.
> > > > > > > > So are you saying that we can let "batch.size" as initial
> batch
> > > > size
> > > > > > and
> > > > > > > > introduce a "max.batch.size" as upper bound value?
> > > > > > > > That's a good suggestion, but that would change the semantics
> > of
> > > > > > > > "batch.size", which might surprise some users. I think my
> > > original
> > > > > > > proposal
> > > > > > > > ("batch.initial.size") is safer for users. What do you think?
> > > > > > > >
> > > > > > > > Thank you.
> > > > > > > > Luke
> > > > > > > >
> > > > > > > >
> > > > > > > > On Mon, Oct 18, 2021 at 3:12 AM Ismael Juma <
> ism...@juma.me.uk
> > >
> > > > > wrote:
> > > > > > > >
> > > > > > > >> I think we should also consider tweaking the semantics of
> > > > batch.size
> > > > > > so
> > > > > > > >> that the sent batches can be larger if the batch is not
> ready
> > to
> > > > be
> > > > > > sent
> > > > > > > >> (while still respecting max.request.size and perhaps a new
> > > > > > > >> max.batch.size).
> > > > > > > >>
> > > > > > > >> Ismael
> > > > > > > >>
> > > > > > > >> On Sun, Oct 17, 2021, 12:08 PM Ismael Juma <
> ism...@juma.me.uk
> > >
> > > > > wrote:
> > > > > > > >>
> > > > > > > >> > Hi Luke,
> > > > > > > >> >
> > > > > > > >> > Thanks for the KIP. Why do we have to reallocate the
> buffer?
> > > We
> > > > > can
> > > > > > > >> keep a
> > > > > > > >> > list of buffers instead and avoid reallocation.
> > > > > > > >> >
> > > > > > > >> > Ismael
> > > > > > > >> >
> > > > > > > >> > On Sun, Oct 17, 2021, 2:02 AM Luke Chen <
> show...@gmail.com>
> > > > > wrote:
> > > > > > > >> >
> > > > > > > >> >> Hi Kafka dev,
> > > > > > > >> >> I'd like to start the discussion for the proposal:
> KIP-782:
> > > > > > > Expandable
> > > > > > > >> >> batch size in producer.
> > > > > > > >> >>
> > > > > > > >> >> The main purpose for this KIP is to have better memory
> > usage
> > > in
> > > > > > > >> producer,
> > > > > > > >> >> and also save users from the dilemma while setting the
> > batch
> > > > size
> > > > > > > >> >> configuration. After this KIP, users can set a higher
> > > > batch.size
> > > > > > > >> without
> > > > > > > >> >> worries, and of course, with an appropriate
> > > > "batch.initial.size"
> > > > > > and
> > > > > > > >> >> "batch.reallocation.factor".
> > > > > > > >> >>
> > > > > > > >> >> Derailed description can be found here:
> > > > > > > >> >>
> > > > > > > >> >>
> > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-782%3A+Expandable+batch+size+in+producer
> > > > > > > >> >>
> > > > > > > >> >> Any comments and feedback are welcome.
> > > > > > > >> >>
> > > > > > > >> >> Thank you.
> > > > > > > >> >> Luke
> > > > > > > >> >>
> > > > > > > >> >
> > > > > > > >>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to