Hi, Luke, Thanks for the KIP. A few comments below.
10. Accumulating small batches could improve memory usage. Will that introduce extra copying when generating a produce request? Currently, a produce request takes a single MemoryRecords per partition. 11. Do we need to introduce a new config batch.max.size? Could we just increase the default of batch.size? We probably need to have KIP-794 <https://cwiki.apache.org/confluence/display/KAFKA/KIP-794%3A+Strictly+Uniform+Sticky+Partitioner> resolved before increasing the default batch size since the larger the batch size, the worse the problem in KIP-794. 12. As for max.request.size, currently it's used for both the max record size and the max request size, which is unintuitive. Perhaps we could introduce a new config max.record.size that defaults to 1MB. We could then increase max.request.size to sth like 10MB. Thanks, Jun On Mon, Nov 29, 2021 at 6:02 PM Artem Livshits <alivsh...@confluent.io.invalid> wrote: > Hi Luke, > > I don't mind increasing the max.request.size to a higher number, e.g. 2MB > could be good. I think we should also run some benchmarks to see the > effects of different sizes. > > I agree that changing round robin to random solves an independent existing > issue, however the logic in this KIP exacerbates the issue, so there is > some dependency. > > -Artem > > On Wed, Nov 24, 2021 at 12:43 AM Luke Chen <show...@gmail.com> wrote: > > > Hi Artem, > > Yes, I agree if we go with random selection instead of round-robin > > selection, the latency issue will be more fair. That is, if there are 10 > > partitions, the 10th partition will always be the last choice in each > round > > in current design, but with random selection, the chance to be selected > is > > more fair. > > > > However, I think that's kind of out of scope with this KIP. This is an > > existing issue, and it might need further discussion to decide if this > > change is necessary. > > > > I agree the default 32KB for "batch.max.size" might be not huge > improvement > > compared with 256KB. I'm thinking, maybe default to "64KB" for > > "batch.max.size", and make the documentation clear that if the > > "batch.max.size" > > is increased, there might be chances that the "ready" partitions need to > > wait for next request to send to broker, because of the > "max.request.size" > > (default 1MB) limitation. "max.request.size" can also be considered to > > increase to avoid this issue. What do you think? > > > > Thank you. > > Luke > > > > On Wed, Nov 24, 2021 at 2:26 AM Artem Livshits > > <alivsh...@confluent.io.invalid> wrote: > > > > > > maybe I can firstly decrease the "batch.max.size" to 32KB > > > > > > I think 32KB is too small. With 5 in-flight and 100ms latency we can > > > produce 1.6MB/s per partition. With 256KB we can produce 12.8MB/s per > > > partition. We should probably set up some testing and see if 256KB has > > > problems. > > > > > > To illustrate latency dynamics, let's consider a simplified model: 1 > > > in-flight request per broker, produce latency 125ms, 256KB max request > > > size, 16 partitions assigned to the same broker, every second 128KB is > > > produced to each partition (total production rate is 2MB/sec). > > > > > > If the batch size is 16KB, then the pattern would be the following: > > > > > > 0ms - produce 128KB into each partition > > > 0ms - take 16KB from each partition send (total 256KB) > > > 125ms - complete first 16KB from each partition, send next 16KB > > > 250ms - complete second 16KB, send next 16KB > > > ... > > > 1000ms - complete 8th 16KB from each partition > > > > > > from this model it's easy to see that there are 256KB that are sent > > > immediately, 256KB that are sent in 125ms, ... 256KB that are sent in > > > 875ms. > > > > > > If the batch size is 256KB, then the pattern would be the following: > > > > > > 0ms - produce 128KB into each partition > > > 0ms - take 128KB each from first 2 partitions and send (total 256KB) > > > 125ms - complete 2 first partitions, send data from next 2 partitions > > > ... > > > 1000ms - complete last 2 partitions > > > > > > even though the pattern is different, there are still 256KB that are > sent > > > immediately, 256KB that are sent in 125ms, ... 256KB that are sent in > > > 875ms. > > > > > > Now, in this example if we do strictly round-robin (current > > implementation) > > > and we have this exact pattern (not sure how often such regular pattern > > > would happen in practice -- I would expect that it would be a bit more > > > random), some partitions would experience higher latency than others > (not > > > sure how much it would matter in practice -- in the end of the day some > > > bytes produced to a topic would have higher latency and some bytes > would > > > have lower latency). This pattern is easily fixed by choosing the next > > > partition randomly instead of using round-robin. > > > > > > -Artem > > > > > > On Tue, Nov 23, 2021 at 12:08 AM Luke Chen <show...@gmail.com> wrote: > > > > > > > Hi Tom, > > > > Thanks for your comments. And thanks for Artem's explanation. > > > > Below is my response: > > > > > > > > > Currently because buffers are allocated using batch.size it means > we > > > can > > > > handle records that are that large (e.g. one big record per batch). > > > Doesn't > > > > the introduction of smaller buffer sizes (batch.initial.size) mean a > > > > corresponding decrease in the maximum record size that the producer > can > > > > handle? > > > > > > > > Actually, the "batch.size" is only like a threshold to decide if the > > > batch > > > > is "ready to be sent". That is, even if you set the "batch.size=16KB" > > > > (default value), users can still send one record sized with 20KB, as > > long > > > > as the size is less than "max.request.size" in producer (default > 1MB). > > > > Therefore, the introduction of "batch.initial.size" won't decrease > the > > > > maximum record size that the producer can handle. > > > > > > > > > But isn't there the risk that drainBatchesForOneNode would end up > not > > > > sending ready > > > > batches well past when they ought to be sent (according to their > > > linger.ms > > > > ), > > > > because it's sending buffers for earlier partitions too aggressively? > > > > > > > > Did you mean that we have a "max.request.size" per request (default > is > > > > 1MB), and before this KIP, the request can include 64 batches in > single > > > > request ["batch.size"(16KB) * 64 = 1MB], but now, we might be able to > > > > include 32 batches or less, because we aggressively sent more records > > in > > > > one batch, is that what you meant? That's a really good point that > I've > > > > never thought about. I think your suggestion to go through other > > > partitions > > > > that just fit "batch.size", or expire "linger.ms" first, before > > handling > > > > the one that is > "batch.size" limit is not a good way, because it > > might > > > > cause the one with size > "batch.size" always in the lowest priority, > > and > > > > cause starving issue that the batch won't have chance to get sent. > > > > > > > > I don't have better solution for it, but maybe I can firstly decrease > > the > > > > "batch.max.size" to 32KB, instead of aggressively 256KB in the KIP. > > That > > > > should alleviate the problem. And still improve the throughput. What > do > > > you > > > > think? > > > > > > > > Thank you. > > > > Luke > > > > > > > > On Tue, Nov 23, 2021 at 9:04 AM Artem Livshits > > > > <alivsh...@confluent.io.invalid> wrote: > > > > > > > > > > I think this KIP would change the behaviour of producers when > there > > > are > > > > > multiple partitions ready to be sent > > > > > > > > > > This is correct, the pattern changes and becomes more > coarse-grained. > > > > But > > > > > I don't think it changes fairness over the long run. I think it's > a > > > good > > > > > idea to change drainIndex to be random rather than round robin to > > avoid > > > > > forming patterns where some partitions would consistently get > higher > > > > > latencies than others because they wait longer for their turn. > > > > > > > > > > If we really wanted to preserve the exact patterns, we could either > > try > > > > to > > > > > support multiple 16KB batches from one partition per request > > (probably > > > > > would require protocol change to change logic on the broker for > > > duplicate > > > > > detection) or try to re-batch 16KB batches from accumulator into > > larger > > > > > batches during send (additional computations) or try to consider > all > > > > > partitions assigned to a broker to check if a new batch needs to be > > > > created > > > > > (i.e. compare cumulative batch size from all partitions assigned > to a > > > > > broker and create new batch when cumulative size is 1MB, more > > complex). > > > > > > > > > > Overall, it seems like just increasing the max batch size is a > > simpler > > > > > solution and it does favor larger batch sizes, which is beneficial > > not > > > > just > > > > > for production. > > > > > > > > > > > ready batches well past when they ought to be sent (according to > > > their > > > > > linger.ms) > > > > > > > > > > The trigger for marking batches ready to be sent isn't changed - a > > > batch > > > > is > > > > > ready to be sent once it reaches 16KB, so by the time larger > batches > > > > start > > > > > forming, linger.ms wouldn't matter much because the batching goal > is > > > met > > > > > and the batch can be sent immediately. Larger batches start > forming > > > once > > > > > the client starts waiting for the server, in which case some data > > will > > > > wait > > > > > its turn to be sent. This will happen for some data regardless of > > how > > > we > > > > > pick data to send, the question is just whether we'd have some > > > scenarios > > > > > where some partitions would consistently experience higher latency > > than > > > > > others. I think picking drainIndex randomly would prevent such > > > > scenarios. > > > > > > > > > > -Artem > > > > > > > > > > On Mon, Nov 22, 2021 at 2:28 AM Tom Bentley <tbent...@redhat.com> > > > wrote: > > > > > > > > > > > Hi Luke, > > > > > > > > > > > > Thanks for the KIP! > > > > > > > > > > > > Currently because buffers are allocated using batch.size it means > > we > > > > can > > > > > > handle records that are that large (e.g. one big record per > batch). > > > > > Doesn't > > > > > > the introduction of smaller buffer sizes (batch.initial.size) > mean > > a > > > > > > corresponding decrease in the maximum record size that the > producer > > > can > > > > > > handle? That might not be a problem if the user knows their > maximum > > > > > record > > > > > > size and has tuned batch.initial.size accordingly, but if the > > default > > > > for > > > > > > batch.initial.size < batch.size it could cause regressions for > > > existing > > > > > > users with a large record size, I think. It should be enough for > > > > > > batch.initial.size to default to batch.size, allowing users who > > care > > > > > about > > > > > > the memory saving in the off-peak throughput case to do the > tuning, > > > but > > > > > not > > > > > > causing a regression for existing users. > > > > > > > > > > > > I think this KIP would change the behaviour of producers when > there > > > are > > > > > > multiple partitions ready to be sent: By sending all the ready > > > buffers > > > > > > (which may now be > batch.size) for the first partition, we could > > end > > > > up > > > > > > excluding ready buffers for other partitions from the current > send. > > > In > > > > > > other words, as I understand the KIP currently, there's a change > in > > > > > > fairness. I think the code in > > > RecordAccumulator#drainBatchesForOneNode > > > > > will > > > > > > ensure fairness in the long run, because the drainIndex will > ensure > > > > that > > > > > > those other partitions each get their turn at being the first. > But > > > > isn't > > > > > > there the risk that drainBatchesForOneNode would end up not > sending > > > > ready > > > > > > batches well past when they ought to be sent (according to their > > > > > linger.ms > > > > > > ), > > > > > > because it's sending buffers for earlier partitions too > > aggressively? > > > > Or, > > > > > > to put it another way, perhaps the RecordAccumulator should > > > round-robin > > > > > the > > > > > > ready buffers for _all_ the partitions before trying to fill the > > > > > remaining > > > > > > space with the extra buffers (beyond the batch.size limit) for > the > > > > first > > > > > > partitions? > > > > > > > > > > > > Kind regards, > > > > > > > > > > > > Tom > > > > > > > > > > > > On Wed, Oct 20, 2021 at 1:35 PM Luke Chen <show...@gmail.com> > > wrote: > > > > > > > > > > > > > Hi Ismael and all devs, > > > > > > > Is there any comments/suggestions to this KIP? > > > > > > > If no, I'm going to update the KIP based on my previous mail, > and > > > > > start a > > > > > > > vote tomorrow or next week. > > > > > > > > > > > > > > Thank you. > > > > > > > Luke > > > > > > > > > > > > > > On Mon, Oct 18, 2021 at 2:40 PM Luke Chen <show...@gmail.com> > > > wrote: > > > > > > > > > > > > > > > Hi Ismael, > > > > > > > > Thanks for your comments. > > > > > > > > > > > > > > > > 1. Why do we have to reallocate the buffer? We can keep a > list > > of > > > > > > buffers > > > > > > > > instead and avoid reallocation. > > > > > > > > -> Do you mean we allocate multiple buffers with > > > > > "buffer.initial.size", > > > > > > > > and link them together (with linked list)? > > > > > > > > ex: > > > > > > > > a. We allocate 4KB initial buffer > > > > > > > > | 4KB | > > > > > > > > > > > > > > > > b. when new records reached and the remaining buffer is not > > > enough > > > > > for > > > > > > > the > > > > > > > > records, we create another batch with "batch.initial.size" > > buffer > > > > > > > > ex: we already have 3KB of data in the 1st buffer, and here > > comes > > > > the > > > > > > 2KB > > > > > > > > record > > > > > > > > > > > > > > > > | 4KB (1KB remaining) | > > > > > > > > now, record: 2KB coming > > > > > > > > We fill the 1st 1KB into 1st buffer, and create new buffer, > and > > > > > linked > > > > > > > > together, and fill the rest of data into it > > > > > > > > | 4KB (full) | ---> | 4KB (3KB remaining) | > > > > > > > > > > > > > > > > Is that what you mean? > > > > > > > > If so, I think I like this idea! > > > > > > > > If not, please explain more detail about it. > > > > > > > > Thank you. > > > > > > > > > > > > > > > > 2. I think we should also consider tweaking the semantics of > > > > > batch.size > > > > > > > so > > > > > > > > that the sent batches can be larger if the batch is not ready > > to > > > be > > > > > > sent > > > > > > > > (while still respecting max.request.size and perhaps a new > > > > > > > max.batch.size). > > > > > > > > > > > > > > > > --> In the KIP, I was trying to make the "batch.size" as the > > > upper > > > > > > bound > > > > > > > > of the batch size, and introduce a "batch.initial.size" as > > > initial > > > > > > batch > > > > > > > > size. > > > > > > > > So are you saying that we can let "batch.size" as initial > batch > > > > size > > > > > > and > > > > > > > > introduce a "max.batch.size" as upper bound value? > > > > > > > > That's a good suggestion, but that would change the semantics > > of > > > > > > > > "batch.size", which might surprise some users. I think my > > > original > > > > > > > proposal > > > > > > > > ("batch.initial.size") is safer for users. What do you think? > > > > > > > > > > > > > > > > Thank you. > > > > > > > > Luke > > > > > > > > > > > > > > > > > > > > > > > > On Mon, Oct 18, 2021 at 3:12 AM Ismael Juma < > ism...@juma.me.uk > > > > > > > > wrote: > > > > > > > > > > > > > > > >> I think we should also consider tweaking the semantics of > > > > batch.size > > > > > > so > > > > > > > >> that the sent batches can be larger if the batch is not > ready > > to > > > > be > > > > > > sent > > > > > > > >> (while still respecting max.request.size and perhaps a new > > > > > > > >> max.batch.size). > > > > > > > >> > > > > > > > >> Ismael > > > > > > > >> > > > > > > > >> On Sun, Oct 17, 2021, 12:08 PM Ismael Juma < > ism...@juma.me.uk > > > > > > > > wrote: > > > > > > > >> > > > > > > > >> > Hi Luke, > > > > > > > >> > > > > > > > > >> > Thanks for the KIP. Why do we have to reallocate the > buffer? > > > We > > > > > can > > > > > > > >> keep a > > > > > > > >> > list of buffers instead and avoid reallocation. > > > > > > > >> > > > > > > > > >> > Ismael > > > > > > > >> > > > > > > > > >> > On Sun, Oct 17, 2021, 2:02 AM Luke Chen < > show...@gmail.com> > > > > > wrote: > > > > > > > >> > > > > > > > > >> >> Hi Kafka dev, > > > > > > > >> >> I'd like to start the discussion for the proposal: > KIP-782: > > > > > > > Expandable > > > > > > > >> >> batch size in producer. > > > > > > > >> >> > > > > > > > >> >> The main purpose for this KIP is to have better memory > > usage > > > in > > > > > > > >> producer, > > > > > > > >> >> and also save users from the dilemma while setting the > > batch > > > > size > > > > > > > >> >> configuration. After this KIP, users can set a higher > > > > batch.size > > > > > > > >> without > > > > > > > >> >> worries, and of course, with an appropriate > > > > "batch.initial.size" > > > > > > and > > > > > > > >> >> "batch.reallocation.factor". > > > > > > > >> >> > > > > > > > >> >> Derailed description can be found here: > > > > > > > >> >> > > > > > > > >> >> > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-782%3A+Expandable+batch+size+in+producer > > > > > > > >> >> > > > > > > > >> >> Any comments and feedback are welcome. > > > > > > > >> >> > > > > > > > >> >> Thank you. > > > > > > > >> >> Luke > > > > > > > >> >> > > > > > > > >> > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >