Hi, Lucas, Thanks for the reply. It would be useful to summarize the benefits of a separate batch.max.size. To me, it's not clear why a user would want two different batch sizes. In your example, I can understand why a user would want to form a batch with a 5ms linger. But why would a user prefer 16K batches with 5ms linger, if say 256K is deemed best for throughput?
Thanks, Jun On Fri, Dec 10, 2021 at 4:35 PM Lucas Bradstreet <lu...@confluent.io.invalid> wrote: > Hi Jun, > > One difference compared to increasing the default batch size is that users > may actually prefer smaller batches but it makes much less sense to > accumulate many small batches if a batch is already sending. > > For example, imagine a user that prefer 16K batches with 5ms linger. > Everything is functioning normally and 16KB batches are being sent. Then > there's a 500ms blip for that broker. Do we want to continue to accumulate > 16KB batches, each of which requires a round trip, or would we prefer to > accumulate larger batches while sending is blocked? > > I'm not hugely against increasing the default batch.size in general, but > batch.max.size does seem to have some nice properties. > > Thanks, > > Lucas > > On Fri, Dec 10, 2021 at 9:42 AM Jun Rao <j...@confluent.io.invalid> wrote: > > > Hi, Artem, Luke, > > > > Thanks for the reply. > > > > 11. If we get rid of batch.max.size and increase the default batch.size, > > it's true the behavior is slightly different than before. However, does > > that difference matter to most of our users? In your example, if a user > > sets linger.ms to 100ms and thinks 256KB is good for throughput, does it > > matter to deliver any batch smaller than 256KB before 100ms? I also find > it > > a bit hard to explain to our users these 3 different settings related to > > batch size. > > > > Thanks, > > > > Jun > > > > On Thu, Dec 9, 2021 at 5:47 AM Luke Chen <show...@gmail.com> wrote: > > > > > Hi Jun, > > > > > > 11. In addition to Artem's comment, I think the reason to have > additional > > > "batch.max.size" is to have more flexibility to users. > > > For example: > > > With linger.ms=100ms, batch.size=16KB, now, we have 20KB of data > coming > > to > > > a partition within 50ms. Now, sender is ready to pick up the batch to > > send. > > > In current design, we send 16KB data to broker, and keep the remaining > > 4KB > > > in the producer, to keep accumulating data. > > > But after this KIP, user can send the whole 20KB of data together. That > > is, > > > user can decide if they want to accumulate more data before the sender > is > > > ready, and send them together, to have higher throughput. The > > > "batch.size=16KB" in the proposal, is more like a soft limit, (and > > > "batch.max.size" is like a hard limit), or it's like a switch to enable > > the > > > batch to become ready. Before sender is ready, we can still accumulate > > more > > > data, and wrap them together to send to broker. > > > > > > User can increase "batch.size" to 20KB to achieve the same goal in the > > > current design, of course. But you can imagine, if the data within > 100ms > > is > > > just 18KB, then the batch of data will wait for 100ms passed to be sent > > > out. This "batch.max.size" config will allow more flexible for user > > config. > > > > > > Does that make sense? > > > > > > Thank you. > > > Luke > > > > > > On Thu, Dec 9, 2021 at 7:53 AM Artem Livshits > > > <alivsh...@confluent.io.invalid> wrote: > > > > > > > Hi Jun, > > > > > > > > 11. That was my initial thinking as well, but in a discussion some > > people > > > > pointed out the change of behavior in some scenarios. E.g. if > someone > > > for > > > > some reason really wants batches to be at least 16KB and sets large > > > > linger.ms, and most of the time the batches are filled quickly > enough > > > and > > > > they observe a certain latency. Then they upgrade their client with > a > > > > default size 256KB and the latency increases. This could be seen as > a > > > > regression. It could be fixed by just reducing linger.ms to specify > > the > > > > expected latency, but still could be seen as a disruption by some > > users. > > > > The other reason to have 2 sizes is to avoid allocating large buffers > > > > upfront. > > > > > > > > -Artem > > > > > > > > On Wed, Dec 8, 2021 at 3:07 PM Jun Rao <j...@confluent.io.invalid> > > wrote: > > > > > > > > > Hi, Artem, > > > > > > > > > > Thanks for the reply. > > > > > > > > > > 11. Got it. To me, batch.size is really used for throughput and not > > for > > > > > latency guarantees. There is no guarantee when 16KB will be > > > accumulated. > > > > > So, if users want any latency guarantee, they will need to specify > > > > > linger.ms accordingly. > > > > > Then, batch.size can just be used to tune for throughput. > > > > > > > > > > 20. Could we also describe the unit of compression? Is > > > > > it batch.initial.size, batch.size or batch.max.size? > > > > > > > > > > Thanks, > > > > > > > > > > Jun > > > > > > > > > > On Wed, Dec 8, 2021 at 9:58 AM Artem Livshits > > > > > <alivsh...@confluent.io.invalid> wrote: > > > > > > > > > > > Hi Jun, > > > > > > > > > > > > 10. My understanding is that MemoryRecords would under the covers > > be > > > > > > allocated in chunks, so logically it still would be one > > MemoryRecords > > > > > > object, it's just instead of allocating one large chunk upfront, > > > > smaller > > > > > > chunks are allocated as needed to grow the batch and linked into > a > > > > list. > > > > > > > > > > > > 11. The reason for 2 sizes is to avoid change of behavior when > > > > triggering > > > > > > batch send with large linger.ms. Currently, a batch send is > > > triggered > > > > > > once > > > > > > the batch reaches 16KB by default, if we just raise the default > to > > > > 256KB, > > > > > > then the batch send will be delayed. Using a separate value > would > > > > allow > > > > > > keeping the current behavior when sending the batch out, but > > provide > > > > > better > > > > > > throughput with high latency + high bandwidth channels. > > > > > > > > > > > > -Artem > > > > > > > > > > > > On Tue, Dec 7, 2021 at 5:29 PM Jun Rao <j...@confluent.io.invalid > > > > > > wrote: > > > > > > > > > > > > > Hi, Luke, > > > > > > > > > > > > > > Thanks for the KIP. A few comments below. > > > > > > > > > > > > > > 10. Accumulating small batches could improve memory usage. Will > > > that > > > > > > > introduce extra copying when generating a produce request? > > > > Currently, a > > > > > > > produce request takes a single MemoryRecords per partition. > > > > > > > 11. Do we need to introduce a new config batch.max.size? Could > we > > > > just > > > > > > > increase the default of batch.size? We probably need to have > > > KIP-794 > > > > > > > < > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-794%3A+Strictly+Uniform+Sticky+Partitioner > > > > > > > > > > > > > > > resolved > > > > > > > before increasing the default batch size since the larger the > > batch > > > > > size, > > > > > > > the worse the problem in KIP-794. > > > > > > > 12. As for max.request.size, currently it's used for both the > max > > > > > record > > > > > > > size and the max request size, which is unintuitive. Perhaps we > > > could > > > > > > > introduce a new config max.record.size that defaults to 1MB. We > > > could > > > > > > then > > > > > > > increase max.request.size to sth like 10MB. > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > Jun > > > > > > > > > > > > > > > > > > > > > On Mon, Nov 29, 2021 at 6:02 PM Artem Livshits > > > > > > > <alivsh...@confluent.io.invalid> wrote: > > > > > > > > > > > > > > > Hi Luke, > > > > > > > > > > > > > > > > I don't mind increasing the max.request.size to a higher > > number, > > > > e.g. > > > > > > 2MB > > > > > > > > could be good. I think we should also run some benchmarks to > > see > > > > the > > > > > > > > effects of different sizes. > > > > > > > > > > > > > > > > I agree that changing round robin to random solves an > > independent > > > > > > > existing > > > > > > > > issue, however the logic in this KIP exacerbates the issue, > so > > > > there > > > > > is > > > > > > > > some dependency. > > > > > > > > > > > > > > > > -Artem > > > > > > > > > > > > > > > > On Wed, Nov 24, 2021 at 12:43 AM Luke Chen < > show...@gmail.com> > > > > > wrote: > > > > > > > > > > > > > > > > > Hi Artem, > > > > > > > > > Yes, I agree if we go with random selection instead of > > > > round-robin > > > > > > > > > selection, the latency issue will be more fair. That is, if > > > there > > > > > are > > > > > > > 10 > > > > > > > > > partitions, the 10th partition will always be the last > choice > > > in > > > > > each > > > > > > > > round > > > > > > > > > in current design, but with random selection, the chance to > > be > > > > > > selected > > > > > > > > is > > > > > > > > > more fair. > > > > > > > > > > > > > > > > > > However, I think that's kind of out of scope with this KIP. > > > This > > > > is > > > > > > an > > > > > > > > > existing issue, and it might need further discussion to > > decide > > > if > > > > > > this > > > > > > > > > change is necessary. > > > > > > > > > > > > > > > > > > I agree the default 32KB for "batch.max.size" might be not > > huge > > > > > > > > improvement > > > > > > > > > compared with 256KB. I'm thinking, maybe default to "64KB" > > for > > > > > > > > > "batch.max.size", and make the documentation clear that if > > the > > > > > > > > > "batch.max.size" > > > > > > > > > is increased, there might be chances that the "ready" > > > partitions > > > > > need > > > > > > > to > > > > > > > > > wait for next request to send to broker, because of the > > > > > > > > "max.request.size" > > > > > > > > > (default 1MB) limitation. "max.request.size" can also be > > > > considered > > > > > > to > > > > > > > > > increase to avoid this issue. What do you think? > > > > > > > > > > > > > > > > > > Thank you. > > > > > > > > > Luke > > > > > > > > > > > > > > > > > > On Wed, Nov 24, 2021 at 2:26 AM Artem Livshits > > > > > > > > > <alivsh...@confluent.io.invalid> wrote: > > > > > > > > > > > > > > > > > > > > maybe I can firstly decrease the "batch.max.size" to > > 32KB > > > > > > > > > > > > > > > > > > > > I think 32KB is too small. With 5 in-flight and 100ms > > > latency > > > > we > > > > > > can > > > > > > > > > > produce 1.6MB/s per partition. With 256KB we can produce > > > > > 12.8MB/s > > > > > > > per > > > > > > > > > > partition. We should probably set up some testing and > see > > if > > > > > 256KB > > > > > > > has > > > > > > > > > > problems. > > > > > > > > > > > > > > > > > > > > To illustrate latency dynamics, let's consider a > simplified > > > > > model: > > > > > > 1 > > > > > > > > > > in-flight request per broker, produce latency 125ms, > 256KB > > > max > > > > > > > request > > > > > > > > > > size, 16 partitions assigned to the same broker, every > > second > > > > > 128KB > > > > > > > is > > > > > > > > > > produced to each partition (total production rate is > > > 2MB/sec). > > > > > > > > > > > > > > > > > > > > If the batch size is 16KB, then the pattern would be the > > > > > following: > > > > > > > > > > > > > > > > > > > > 0ms - produce 128KB into each partition > > > > > > > > > > 0ms - take 16KB from each partition send (total 256KB) > > > > > > > > > > 125ms - complete first 16KB from each partition, send > next > > > 16KB > > > > > > > > > > 250ms - complete second 16KB, send next 16KB > > > > > > > > > > ... > > > > > > > > > > 1000ms - complete 8th 16KB from each partition > > > > > > > > > > > > > > > > > > > > from this model it's easy to see that there are 256KB > that > > > are > > > > > sent > > > > > > > > > > immediately, 256KB that are sent in 125ms, ... 256KB that > > are > > > > > sent > > > > > > in > > > > > > > > > > 875ms. > > > > > > > > > > > > > > > > > > > > If the batch size is 256KB, then the pattern would be the > > > > > > following: > > > > > > > > > > > > > > > > > > > > 0ms - produce 128KB into each partition > > > > > > > > > > 0ms - take 128KB each from first 2 partitions and send > > (total > > > > > > 256KB) > > > > > > > > > > 125ms - complete 2 first partitions, send data from next > 2 > > > > > > partitions > > > > > > > > > > ... > > > > > > > > > > 1000ms - complete last 2 partitions > > > > > > > > > > > > > > > > > > > > even though the pattern is different, there are still > 256KB > > > > that > > > > > > are > > > > > > > > sent > > > > > > > > > > immediately, 256KB that are sent in 125ms, ... 256KB that > > are > > > > > sent > > > > > > in > > > > > > > > > > 875ms. > > > > > > > > > > > > > > > > > > > > Now, in this example if we do strictly round-robin > (current > > > > > > > > > implementation) > > > > > > > > > > and we have this exact pattern (not sure how often such > > > regular > > > > > > > pattern > > > > > > > > > > would happen in practice -- I would expect that it would > > be a > > > > bit > > > > > > > more > > > > > > > > > > random), some partitions would experience higher latency > > than > > > > > > others > > > > > > > > (not > > > > > > > > > > sure how much it would matter in practice -- in the end > of > > > the > > > > > day > > > > > > > some > > > > > > > > > > bytes produced to a topic would have higher latency and > > some > > > > > bytes > > > > > > > > would > > > > > > > > > > have lower latency). This pattern is easily fixed by > > > choosing > > > > > the > > > > > > > next > > > > > > > > > > partition randomly instead of using round-robin. > > > > > > > > > > > > > > > > > > > > -Artem > > > > > > > > > > > > > > > > > > > > On Tue, Nov 23, 2021 at 12:08 AM Luke Chen < > > > show...@gmail.com> > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > Hi Tom, > > > > > > > > > > > Thanks for your comments. And thanks for Artem's > > > explanation. > > > > > > > > > > > Below is my response: > > > > > > > > > > > > > > > > > > > > > > > Currently because buffers are allocated using > > batch.size > > > it > > > > > > means > > > > > > > > we > > > > > > > > > > can > > > > > > > > > > > handle records that are that large (e.g. one big record > > per > > > > > > batch). > > > > > > > > > > Doesn't > > > > > > > > > > > the introduction of smaller buffer sizes > > > (batch.initial.size) > > > > > > mean > > > > > > > a > > > > > > > > > > > corresponding decrease in the maximum record size that > > the > > > > > > producer > > > > > > > > can > > > > > > > > > > > handle? > > > > > > > > > > > > > > > > > > > > > > Actually, the "batch.size" is only like a threshold to > > > decide > > > > > if > > > > > > > the > > > > > > > > > > batch > > > > > > > > > > > is "ready to be sent". That is, even if you set the > > > > > > > "batch.size=16KB" > > > > > > > > > > > (default value), users can still send one record sized > > with > > > > > 20KB, > > > > > > > as > > > > > > > > > long > > > > > > > > > > > as the size is less than "max.request.size" in producer > > > > > (default > > > > > > > > 1MB). > > > > > > > > > > > Therefore, the introduction of "batch.initial.size" > won't > > > > > > decrease > > > > > > > > the > > > > > > > > > > > maximum record size that the producer can handle. > > > > > > > > > > > > > > > > > > > > > > > But isn't there the risk that drainBatchesForOneNode > > > would > > > > > end > > > > > > up > > > > > > > > not > > > > > > > > > > > sending ready > > > > > > > > > > > batches well past when they ought to be sent (according > > to > > > > > their > > > > > > > > > > linger.ms > > > > > > > > > > > ), > > > > > > > > > > > because it's sending buffers for earlier partitions too > > > > > > > aggressively? > > > > > > > > > > > > > > > > > > > > > > Did you mean that we have a "max.request.size" per > > request > > > > > > (default > > > > > > > > is > > > > > > > > > > > 1MB), and before this KIP, the request can include 64 > > > batches > > > > > in > > > > > > > > single > > > > > > > > > > > request ["batch.size"(16KB) * 64 = 1MB], but now, we > > might > > > be > > > > > > able > > > > > > > to > > > > > > > > > > > include 32 batches or less, because we aggressively > sent > > > more > > > > > > > records > > > > > > > > > in > > > > > > > > > > > one batch, is that what you meant? That's a really good > > > point > > > > > > that > > > > > > > > I've > > > > > > > > > > > never thought about. I think your suggestion to go > > through > > > > > other > > > > > > > > > > partitions > > > > > > > > > > > that just fit "batch.size", or expire "linger.ms" > first, > > > > > before > > > > > > > > > handling > > > > > > > > > > > the one that is > "batch.size" limit is not a good way, > > > > because > > > > > > it > > > > > > > > > might > > > > > > > > > > > cause the one with size > "batch.size" always in the > > lowest > > > > > > > priority, > > > > > > > > > and > > > > > > > > > > > cause starving issue that the batch won't have chance > to > > > get > > > > > > sent. > > > > > > > > > > > > > > > > > > > > > > I don't have better solution for it, but maybe I can > > > firstly > > > > > > > decrease > > > > > > > > > the > > > > > > > > > > > "batch.max.size" to 32KB, instead of aggressively 256KB > > in > > > > the > > > > > > KIP. > > > > > > > > > That > > > > > > > > > > > should alleviate the problem. And still improve the > > > > throughput. > > > > > > > What > > > > > > > > do > > > > > > > > > > you > > > > > > > > > > > think? > > > > > > > > > > > > > > > > > > > > > > Thank you. > > > > > > > > > > > Luke > > > > > > > > > > > > > > > > > > > > > > On Tue, Nov 23, 2021 at 9:04 AM Artem Livshits > > > > > > > > > > > <alivsh...@confluent.io.invalid> wrote: > > > > > > > > > > > > > > > > > > > > > > > > I think this KIP would change the behaviour of > > > producers > > > > > when > > > > > > > > there > > > > > > > > > > are > > > > > > > > > > > > multiple partitions ready to be sent > > > > > > > > > > > > > > > > > > > > > > > > This is correct, the pattern changes and becomes more > > > > > > > > coarse-grained. > > > > > > > > > > > But > > > > > > > > > > > > I don't think it changes fairness over the long > run. I > > > > think > > > > > > > it's > > > > > > > > a > > > > > > > > > > good > > > > > > > > > > > > idea to change drainIndex to be random rather than > > round > > > > > robin > > > > > > to > > > > > > > > > avoid > > > > > > > > > > > > forming patterns where some partitions would > > consistently > > > > get > > > > > > > > higher > > > > > > > > > > > > latencies than others because they wait longer for > > their > > > > > turn. > > > > > > > > > > > > > > > > > > > > > > > > If we really wanted to preserve the exact patterns, > we > > > > could > > > > > > > either > > > > > > > > > try > > > > > > > > > > > to > > > > > > > > > > > > support multiple 16KB batches from one partition per > > > > request > > > > > > > > > (probably > > > > > > > > > > > > would require protocol change to change logic on the > > > broker > > > > > for > > > > > > > > > > duplicate > > > > > > > > > > > > detection) or try to re-batch 16KB batches from > > > accumulator > > > > > > into > > > > > > > > > larger > > > > > > > > > > > > batches during send (additional computations) or try > to > > > > > > consider > > > > > > > > all > > > > > > > > > > > > partitions assigned to a broker to check if a new > batch > > > > needs > > > > > > to > > > > > > > be > > > > > > > > > > > created > > > > > > > > > > > > (i.e. compare cumulative batch size from all > partitions > > > > > > assigned > > > > > > > > to a > > > > > > > > > > > > broker and create new batch when cumulative size is > > 1MB, > > > > more > > > > > > > > > complex). > > > > > > > > > > > > > > > > > > > > > > > > Overall, it seems like just increasing the max batch > > size > > > > is > > > > > a > > > > > > > > > simpler > > > > > > > > > > > > solution and it does favor larger batch sizes, which > is > > > > > > > beneficial > > > > > > > > > not > > > > > > > > > > > just > > > > > > > > > > > > for production. > > > > > > > > > > > > > > > > > > > > > > > > > ready batches well past when they ought to be sent > > > > > (according > > > > > > > to > > > > > > > > > > their > > > > > > > > > > > > linger.ms) > > > > > > > > > > > > > > > > > > > > > > > > The trigger for marking batches ready to be sent > isn't > > > > > changed > > > > > > - > > > > > > > a > > > > > > > > > > batch > > > > > > > > > > > is > > > > > > > > > > > > ready to be sent once it reaches 16KB, so by the time > > > > larger > > > > > > > > batches > > > > > > > > > > > start > > > > > > > > > > > > forming, linger.ms wouldn't matter much because the > > > > batching > > > > > > > goal > > > > > > > > is > > > > > > > > > > met > > > > > > > > > > > > and the batch can be sent immediately. Larger > batches > > > > start > > > > > > > > forming > > > > > > > > > > once > > > > > > > > > > > > the client starts waiting for the server, in which > case > > > > some > > > > > > data > > > > > > > > > will > > > > > > > > > > > wait > > > > > > > > > > > > its turn to be sent. This will happen for some data > > > > > regardless > > > > > > > of > > > > > > > > > how > > > > > > > > > > we > > > > > > > > > > > > pick data to send, the question is just whether we'd > > have > > > > > some > > > > > > > > > > scenarios > > > > > > > > > > > > where some partitions would consistently experience > > > higher > > > > > > > latency > > > > > > > > > than > > > > > > > > > > > > others. I think picking drainIndex randomly would > > > prevent > > > > > such > > > > > > > > > > > scenarios. > > > > > > > > > > > > > > > > > > > > > > > > -Artem > > > > > > > > > > > > > > > > > > > > > > > > On Mon, Nov 22, 2021 at 2:28 AM Tom Bentley < > > > > > > tbent...@redhat.com > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > Hi Luke, > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks for the KIP! > > > > > > > > > > > > > > > > > > > > > > > > > > Currently because buffers are allocated using > > > batch.size > > > > it > > > > > > > means > > > > > > > > > we > > > > > > > > > > > can > > > > > > > > > > > > > handle records that are that large (e.g. one big > > record > > > > per > > > > > > > > batch). > > > > > > > > > > > > Doesn't > > > > > > > > > > > > > the introduction of smaller buffer sizes > > > > > (batch.initial.size) > > > > > > > > mean > > > > > > > > > a > > > > > > > > > > > > > corresponding decrease in the maximum record size > > that > > > > the > > > > > > > > producer > > > > > > > > > > can > > > > > > > > > > > > > handle? That might not be a problem if the user > knows > > > > their > > > > > > > > maximum > > > > > > > > > > > > record > > > > > > > > > > > > > size and has tuned batch.initial.size accordingly, > > but > > > if > > > > > the > > > > > > > > > default > > > > > > > > > > > for > > > > > > > > > > > > > batch.initial.size < batch.size it could cause > > > > regressions > > > > > > for > > > > > > > > > > existing > > > > > > > > > > > > > users with a large record size, I think. It should > be > > > > > enough > > > > > > > for > > > > > > > > > > > > > batch.initial.size to default to batch.size, > allowing > > > > users > > > > > > who > > > > > > > > > care > > > > > > > > > > > > about > > > > > > > > > > > > > the memory saving in the off-peak throughput case > to > > do > > > > the > > > > > > > > tuning, > > > > > > > > > > but > > > > > > > > > > > > not > > > > > > > > > > > > > causing a regression for existing users. > > > > > > > > > > > > > > > > > > > > > > > > > > I think this KIP would change the behaviour of > > > producers > > > > > when > > > > > > > > there > > > > > > > > > > are > > > > > > > > > > > > > multiple partitions ready to be sent: By sending > all > > > the > > > > > > ready > > > > > > > > > > buffers > > > > > > > > > > > > > (which may now be > batch.size) for the first > > > partition, > > > > we > > > > > > > could > > > > > > > > > end > > > > > > > > > > > up > > > > > > > > > > > > > excluding ready buffers for other partitions from > the > > > > > current > > > > > > > > send. > > > > > > > > > > In > > > > > > > > > > > > > other words, as I understand the KIP currently, > > > there's a > > > > > > > change > > > > > > > > in > > > > > > > > > > > > > fairness. I think the code in > > > > > > > > > > RecordAccumulator#drainBatchesForOneNode > > > > > > > > > > > > will > > > > > > > > > > > > > ensure fairness in the long run, because the > > drainIndex > > > > > will > > > > > > > > ensure > > > > > > > > > > > that > > > > > > > > > > > > > those other partitions each get their turn at being > > the > > > > > > first. > > > > > > > > But > > > > > > > > > > > isn't > > > > > > > > > > > > > there the risk that drainBatchesForOneNode would > end > > up > > > > not > > > > > > > > sending > > > > > > > > > > > ready > > > > > > > > > > > > > batches well past when they ought to be sent > > (according > > > > to > > > > > > > their > > > > > > > > > > > > linger.ms > > > > > > > > > > > > > ), > > > > > > > > > > > > > because it's sending buffers for earlier partitions > > too > > > > > > > > > aggressively? > > > > > > > > > > > Or, > > > > > > > > > > > > > to put it another way, perhaps the > RecordAccumulator > > > > should > > > > > > > > > > round-robin > > > > > > > > > > > > the > > > > > > > > > > > > > ready buffers for _all_ the partitions before > trying > > to > > > > > fill > > > > > > > the > > > > > > > > > > > > remaining > > > > > > > > > > > > > space with the extra buffers (beyond the batch.size > > > > limit) > > > > > > for > > > > > > > > the > > > > > > > > > > > first > > > > > > > > > > > > > partitions? > > > > > > > > > > > > > > > > > > > > > > > > > > Kind regards, > > > > > > > > > > > > > > > > > > > > > > > > > > Tom > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Oct 20, 2021 at 1:35 PM Luke Chen < > > > > > show...@gmail.com > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi Ismael and all devs, > > > > > > > > > > > > > > Is there any comments/suggestions to this KIP? > > > > > > > > > > > > > > If no, I'm going to update the KIP based on my > > > previous > > > > > > mail, > > > > > > > > and > > > > > > > > > > > > start a > > > > > > > > > > > > > > vote tomorrow or next week. > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thank you. > > > > > > > > > > > > > > Luke > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Mon, Oct 18, 2021 at 2:40 PM Luke Chen < > > > > > > show...@gmail.com > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi Ismael, > > > > > > > > > > > > > > > Thanks for your comments. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 1. Why do we have to reallocate the buffer? We > > can > > > > > keep a > > > > > > > > list > > > > > > > > > of > > > > > > > > > > > > > buffers > > > > > > > > > > > > > > > instead and avoid reallocation. > > > > > > > > > > > > > > > -> Do you mean we allocate multiple buffers > with > > > > > > > > > > > > "buffer.initial.size", > > > > > > > > > > > > > > > and link them together (with linked list)? > > > > > > > > > > > > > > > ex: > > > > > > > > > > > > > > > a. We allocate 4KB initial buffer > > > > > > > > > > > > > > > | 4KB | > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > b. when new records reached and the remaining > > > buffer > > > > is > > > > > > not > > > > > > > > > > enough > > > > > > > > > > > > for > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > records, we create another batch with > > > > > > "batch.initial.size" > > > > > > > > > buffer > > > > > > > > > > > > > > > ex: we already have 3KB of data in the 1st > > buffer, > > > > and > > > > > > here > > > > > > > > > comes > > > > > > > > > > > the > > > > > > > > > > > > > 2KB > > > > > > > > > > > > > > > record > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > | 4KB (1KB remaining) | > > > > > > > > > > > > > > > now, record: 2KB coming > > > > > > > > > > > > > > > We fill the 1st 1KB into 1st buffer, and create > > new > > > > > > buffer, > > > > > > > > and > > > > > > > > > > > > linked > > > > > > > > > > > > > > > together, and fill the rest of data into it > > > > > > > > > > > > > > > | 4KB (full) | ---> | 4KB (3KB remaining) | > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Is that what you mean? > > > > > > > > > > > > > > > If so, I think I like this idea! > > > > > > > > > > > > > > > If not, please explain more detail about it. > > > > > > > > > > > > > > > Thank you. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 2. I think we should also consider tweaking the > > > > > semantics > > > > > > > of > > > > > > > > > > > > batch.size > > > > > > > > > > > > > > so > > > > > > > > > > > > > > > that the sent batches can be larger if the > batch > > is > > > > not > > > > > > > ready > > > > > > > > > to > > > > > > > > > > be > > > > > > > > > > > > > sent > > > > > > > > > > > > > > > (while still respecting max.request.size and > > > perhaps > > > > a > > > > > > new > > > > > > > > > > > > > > max.batch.size). > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > --> In the KIP, I was trying to make the > > > "batch.size" > > > > > as > > > > > > > the > > > > > > > > > > upper > > > > > > > > > > > > > bound > > > > > > > > > > > > > > > of the batch size, and introduce a > > > > "batch.initial.size" > > > > > > as > > > > > > > > > > initial > > > > > > > > > > > > > batch > > > > > > > > > > > > > > > size. > > > > > > > > > > > > > > > So are you saying that we can let "batch.size" > as > > > > > initial > > > > > > > > batch > > > > > > > > > > > size > > > > > > > > > > > > > and > > > > > > > > > > > > > > > introduce a "max.batch.size" as upper bound > > value? > > > > > > > > > > > > > > > That's a good suggestion, but that would change > > the > > > > > > > semantics > > > > > > > > > of > > > > > > > > > > > > > > > "batch.size", which might surprise some users. > I > > > > think > > > > > my > > > > > > > > > > original > > > > > > > > > > > > > > proposal > > > > > > > > > > > > > > > ("batch.initial.size") is safer for users. What > > do > > > > you > > > > > > > think? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thank you. > > > > > > > > > > > > > > > Luke > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Mon, Oct 18, 2021 at 3:12 AM Ismael Juma < > > > > > > > > ism...@juma.me.uk > > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >> I think we should also consider tweaking the > > > > semantics > > > > > > of > > > > > > > > > > > batch.size > > > > > > > > > > > > > so > > > > > > > > > > > > > > >> that the sent batches can be larger if the > batch > > > is > > > > > not > > > > > > > > ready > > > > > > > > > to > > > > > > > > > > > be > > > > > > > > > > > > > sent > > > > > > > > > > > > > > >> (while still respecting max.request.size and > > > > perhaps a > > > > > > new > > > > > > > > > > > > > > >> max.batch.size). > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> Ismael > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> On Sun, Oct 17, 2021, 12:08 PM Ismael Juma < > > > > > > > > ism...@juma.me.uk > > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > Hi Luke, > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > Thanks for the KIP. Why do we have to > > reallocate > > > > the > > > > > > > > buffer? > > > > > > > > > > We > > > > > > > > > > > > can > > > > > > > > > > > > > > >> keep a > > > > > > > > > > > > > > >> > list of buffers instead and avoid > > reallocation. > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > Ismael > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > On Sun, Oct 17, 2021, 2:02 AM Luke Chen < > > > > > > > > show...@gmail.com> > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> >> Hi Kafka dev, > > > > > > > > > > > > > > >> >> I'd like to start the discussion for the > > > > proposal: > > > > > > > > KIP-782: > > > > > > > > > > > > > > Expandable > > > > > > > > > > > > > > >> >> batch size in producer. > > > > > > > > > > > > > > >> >> > > > > > > > > > > > > > > >> >> The main purpose for this KIP is to have > > better > > > > > > memory > > > > > > > > > usage > > > > > > > > > > in > > > > > > > > > > > > > > >> producer, > > > > > > > > > > > > > > >> >> and also save users from the dilemma while > > > > setting > > > > > > the > > > > > > > > > batch > > > > > > > > > > > size > > > > > > > > > > > > > > >> >> configuration. After this KIP, users can > set > > a > > > > > higher > > > > > > > > > > > batch.size > > > > > > > > > > > > > > >> without > > > > > > > > > > > > > > >> >> worries, and of course, with an appropriate > > > > > > > > > > > "batch.initial.size" > > > > > > > > > > > > > and > > > > > > > > > > > > > > >> >> "batch.reallocation.factor". > > > > > > > > > > > > > > >> >> > > > > > > > > > > > > > > >> >> Derailed description can be found here: > > > > > > > > > > > > > > >> >> > > > > > > > > > > > > > > >> >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-782%3A+Expandable+batch+size+in+producer > > > > > > > > > > > > > > >> >> > > > > > > > > > > > > > > >> >> Any comments and feedback are welcome. > > > > > > > > > > > > > > >> >> > > > > > > > > > > > > > > >> >> Thank you. > > > > > > > > > > > > > > >> >> Luke > > > > > > > > > > > > > > >> >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >