1) I agree that we should just distribute the bytes evenly, at least for now. It's simpler to understand and we can always change it later, plus it makes sense to keep this aligned with how the cache works today
2) +1 to being conservative in the generous sense, it's just not something we can predict with any degree of accuracy and even if we could, the appropriate value is going to differ wildly across applications and use cases. We might want to just pick some multiple of the default cache size, and maybe do some research on other relevant defaults or sizes (default JVM heap, size of available memory in common hosts eg EC2 instances, etc). We don't need to worry as much about erring on the side of too big, since other configs like the max.poll.records will help somewhat to keep it from exploding. 4) 100%, I always found the *cache.max.bytes.buffering* config name to be incredibly confusing. Deprecating this in favor of "*statestore.cache.max.bytes*" and aligning it to the new input buffer config sounds good to me to include here. 5) The KIP should list all relevant public-facing changes, including metadata like the config's "Importance". Personally I would recommend Medium, or even High if we're really worried about the default being wrong for a lot of users Thanks for the KIP, besides those few things that Guozhang brought up and the config importance, everything SGTM -Sophie On Thu, Aug 26, 2021 at 2:41 PM Guozhang Wang <wangg...@gmail.com> wrote: > 1) I meant for your proposed solution. I.e. to distribute the configured > bytes among threads evenly. > > 2) I was actually thinking about making the default a large enough value so > that we would not introduce performance regression: thinking about a use > case with many partitions and each record may be large, then effectively we > would only start pausing when the total bytes buffered is pretty large. If > we set the default value to small, we would be "more aggressive" on pausing > which may impact throughput. > > 3) Yes exactly, this would naturally be at the "partition-group" class > since that represents the task's all input partitions. > > 4) This is just a bold thought, I'm interested to see other's thoughts. > > > Guozhang > > On Mon, Aug 23, 2021 at 4:10 AM Sagar <sagarmeansoc...@gmail.com> wrote: > > > Thanks Guozhang. > > > > 1) Just for my confirmation, when you say we should proceed with the even > > distribution of bytes, are you referring to the Proposed Solution in the > > KIP or the option you had considered in the JIRA? > > 2) Default value for the config is something that I missed. I agree we > > can't have really large values as it might be detrimental to the > > performance. Maybe, as a starting point, we assume that only 1 Stream > Task > > is running so what could be the ideal value in such a scenario? Somewhere > > around 10MB similar to the caching config? > > 3) When you say, *a task level metric indicating the current totally > > aggregated metrics, * you mean the bytes aggregated at a task level? > > 4) I am ok with the name change, but would like to know others' thoughts. > > > > Thanks! > > Sagar. > > > > On Sun, Aug 22, 2021 at 11:54 PM Guozhang Wang <wangg...@gmail.com> > wrote: > > > > > Thanks Sagar for writing this PR. > > > > > > I think twice about the options that have been proposed in > > > https://issues.apache.org/jira/browse/KAFKA-13152, and feel that at > the > > > moment it's simpler to just do the even distribution of the configured > > > total bytes. My rationale is that right now we have a static tasks -> > > > threads mapping, and hence each partition would only be fetched by a > > single > > > thread / consumer at a given time. If in the future we break that > static > > > mapping into dynamic mapping, then we would not be able to do this even > > > distribution. Instead we would have other threads polling from consumer > > > only, and those threads would be responsible for checking the config > and > > > pause non-empty partitions if it goes beyond the threshold. But since > at > > > that time we would not change the config but just how it would be > > > implemented behind the scenes we would not need another KIP to change > it. > > > > > > Some more comments: > > > > > > 1. We need to discuss a bit about the default value of this new config. > > > Personally I think we need to be a bit conservative with large values > so > > > that it would not have any perf regression compared with old configs > > > especially with large topology and large number of partitions. > > > 2. I looked at the existing metrics, and do not have corresponding > > sensors. > > > How about also adding a task level metric indicating the current > totally > > > aggregated metrics. The reason I do not suggest this metric on the > > > per-thread level is that in the future we may break the static mapping > of > > > tasks -> threads. > > > > > > [optional] As an orthogonal thought, I'm thinking maybe we can rename > the > > > other "*cache.max.bytes.buffering*" as "statestore.cache.max.bytes" > (via > > > deprecation of course), piggy-backed in this KIP? Would like to hear > > > others' thoughts. > > > > > > > > > Guozhang > > > > > > > > > > > > On Sun, Aug 22, 2021 at 9:29 AM Sagar <sagarmeansoc...@gmail.com> > wrote: > > > > > > > Hi All, > > > > > > > > I would like to start a discussion on the following KIP: > > > > > > > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186878390 > > > > > > > > Thanks! > > > > Sagar. > > > > > > > > > > > > > -- > > > -- Guozhang > > > > > > > > -- > -- Guozhang >