Hi Guozhang, Thanks, I have updated the KIP with the mentioned changes.
Thanks! Sagar. On Tue, Sep 21, 2021 at 3:45 AM Guozhang Wang <[email protected]> wrote: > Hi Sagar, > > Thanks for the added metrics, about its name, if it is proposed as a > task-level config, then we do not need to prefix its name as `task-`. But > on the other hand, it's better to give the full description of the metrics, > like its type name / tag maps / recording levels etc, an example is here: > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-607%3A+Add+Metrics+to+Kafka+Streams+to+Report+Properties+of+RocksDB > > Guozhang > > On Mon, Sep 20, 2021 at 10:04 AM Sagar <[email protected]> wrote: > > > Hi All, > > > > Bumping this thread again. > > > > Thanks! > > Sagar. > > > > On Sat, Sep 11, 2021 at 2:04 PM Sagar <[email protected]> wrote: > > > > > Hi Mathias, > > > > > > I missed out on the metrics part. > > > > > > I have added the new metric in the proposed changes section along with > > the > > > small re-wording that you talked about. > > > > > > Let me know if that makes sense. > > > > > > Thanks! > > > Sagar. > > > > > > On Fri, Sep 10, 2021 at 3:45 AM Matthias J. Sax <[email protected]> > > wrote: > > > > > >> Thanks for the KIP. > > >> > > >> There was some discussion about adding a metric on the thread, but the > > >> KIP does not contain anything about it. Did we drop this suggestion or > > >> was the KIP not updated accordingly? > > >> > > >> > > >> Nit: > > >> > > >> > This would be a global config applicable per processing topology > > >> > > >> Can we change this to `per Kafka Streams instance.` > > >> > > >> Atm, a Stream instance executes a single topology, so it does not make > > >> any effective difference right now. However, it seems better (more > > >> logical) to bind the config to the instance (not the topology the > > >> instance executes). > > >> > > >> > > >> -Matthias > > >> > > >> On 9/2/21 6:08 AM, Sagar wrote: > > >> > Thanks Guozhang and Luke. > > >> > > > >> > I have updated the KIP with all the suggested changes. > > >> > > > >> > Do you think we could start voting for this? > > >> > > > >> > Thanks! > > >> > Sagar. > > >> > > > >> > On Thu, Sep 2, 2021 at 8:26 AM Luke Chen <[email protected]> wrote: > > >> > > > >> >> Thanks for the KIP. Overall LGTM. > > >> >> > > >> >> Just one thought, if we "rename" the config directly as mentioned > in > > >> the > > >> >> KIP, would that break existing applications? > > >> >> Should we deprecate the old one first, and make the old/new names > > >> co-exist > > >> >> for some period of time? > > >> >> > > >> >> Public Interfaces > > >> >> > > >> >> - Adding a new config *input.buffer.max.bytes *applicable at a > > >> topology > > >> >> level. The importance of this config would be *Medium*. > > >> >> - Renaming *cache.max.bytes.buffering* to > > >> *statestore.cache.max.bytes*. > > >> >> > > >> >> > > >> >> > > >> >> Thank you. > > >> >> Luke > > >> >> > > >> >> On Thu, Sep 2, 2021 at 1:50 AM Guozhang Wang <[email protected]> > > >> wrote: > > >> >> > > >> >>> Currently the state store cache size default value is 10MB today, > > >> which > > >> >>> arguably is rather small. So I'm thinking maybe for this config > > >> default > > >> >> to > > >> >>> 512MB. > > >> >>> > > >> >>> Other than that, LGTM. > > >> >>> > > >> >>> On Sat, Aug 28, 2021 at 11:34 AM Sagar <[email protected] > > > > >> >> wrote: > > >> >>> > > >> >>>> Thanks Guozhang and Sophie. > > >> >>>> > > >> >>>> Yeah a small default value would lower the throughput. I didn't > > quite > > >> >>>> realise it earlier. It's slightly hard to predict this value so I > > >> would > > >> >>>> guess around 1/2 GB to 1 GB? WDYT? > > >> >>>> > > >> >>>> Regarding the renaming of the config and the new metric, sure > would > > >> >>> include > > >> >>>> it in the KIP. > > >> >>>> > > >> >>>> Lastly, importance would also. be added. I guess Medium should be > > ok. > > >> >>>> > > >> >>>> Thanks! > > >> >>>> Sagar. > > >> >>>> > > >> >>>> > > >> >>>> On Sat, Aug 28, 2021 at 10:42 AM Sophie Blee-Goldman > > >> >>>> <[email protected]> wrote: > > >> >>>> > > >> >>>>> 1) I agree that we should just distribute the bytes evenly, at > > least > > >> >>> for > > >> >>>>> now. It's simpler to understand and > > >> >>>>> we can always change it later, plus it makes sense to keep this > > >> >> aligned > > >> >>>>> with how the cache works today > > >> >>>>> > > >> >>>>> 2) +1 to being conservative in the generous sense, it's just not > > >> >>>> something > > >> >>>>> we can predict with any degree > > >> >>>>> of accuracy and even if we could, the appropriate value is going > > to > > >> >>>> differ > > >> >>>>> wildly across applications and use > > >> >>>>> cases. We might want to just pick some multiple of the default > > cache > > >> >>>> size, > > >> >>>>> and maybe do some research on > > >> >>>>> other relevant defaults or sizes (default JVM heap, size of > > >> available > > >> >>>>> memory in common hosts eg EC2 > > >> >>>>> instances, etc). We don't need to worry as much about erring on > > the > > >> >>> side > > >> >>>> of > > >> >>>>> too big, since other configs like > > >> >>>>> the max.poll.records will help somewhat to keep it from > exploding. > > >> >>>>> > > >> >>>>> 4) 100%, I always found the *cache.max.bytes.buffering* config > > name > > >> >> to > > >> >>> be > > >> >>>>> incredibly confusing. Deprecating this in > > >> >>>>> favor of "*statestore.cache.max.bytes*" and aligning it to the > new > > >> >>> input > > >> >>>>> buffer config sounds good to me to include here. > > >> >>>>> > > >> >>>>> 5) The KIP should list all relevant public-facing changes, > > including > > >> >>>>> metadata like the config's "Importance". Personally > > >> >>>>> I would recommend Medium, or even High if we're really worried > > about > > >> >>> the > > >> >>>>> default being wrong for a lot of users > > >> >>>>> > > >> >>>>> Thanks for the KIP, besides those few things that Guozhang > brought > > >> up > > >> >>> and > > >> >>>>> the config importance, everything SGTM > > >> >>>>> > > >> >>>>> -Sophie > > >> >>>>> > > >> >>>>> On Thu, Aug 26, 2021 at 2:41 PM Guozhang Wang < > [email protected] > > > > > >> >>>> wrote: > > >> >>>>> > > >> >>>>>> 1) I meant for your proposed solution. I.e. to distribute the > > >> >>>> configured > > >> >>>>>> bytes among threads evenly. > > >> >>>>>> > > >> >>>>>> 2) I was actually thinking about making the default a large > > enough > > >> >>>> value > > >> >>>>> so > > >> >>>>>> that we would not introduce performance regression: thinking > > about > > >> >> a > > >> >>>> use > > >> >>>>>> case with many partitions and each record may be large, then > > >> >>>> effectively > > >> >>>>> we > > >> >>>>>> would only start pausing when the total bytes buffered is > pretty > > >> >>> large. > > >> >>>>> If > > >> >>>>>> we set the default value to small, we would be "more > aggressive" > > on > > >> >>>>> pausing > > >> >>>>>> which may impact throughput. > > >> >>>>>> > > >> >>>>>> 3) Yes exactly, this would naturally be at the > "partition-group" > > >> >>> class > > >> >>>>>> since that represents the task's all input partitions. > > >> >>>>>> > > >> >>>>>> 4) This is just a bold thought, I'm interested to see other's > > >> >>> thoughts. > > >> >>>>>> > > >> >>>>>> > > >> >>>>>> Guozhang > > >> >>>>>> > > >> >>>>>> On Mon, Aug 23, 2021 at 4:10 AM Sagar < > [email protected] > > > > > >> >>>> wrote: > > >> >>>>>> > > >> >>>>>>> Thanks Guozhang. > > >> >>>>>>> > > >> >>>>>>> 1) Just for my confirmation, when you say we should proceed > with > > >> >>> the > > >> >>>>> even > > >> >>>>>>> distribution of bytes, are you referring to the Proposed > > Solution > > >> >>> in > > >> >>>>> the > > >> >>>>>>> KIP or the option you had considered in the JIRA? > > >> >>>>>>> 2) Default value for the config is something that I missed. I > > >> >> agree > > >> >>>> we > > >> >>>>>>> can't have really large values as it might be detrimental to > the > > >> >>>>>>> performance. Maybe, as a starting point, we assume that only 1 > > >> >>> Stream > > >> >>>>>> Task > > >> >>>>>>> is running so what could be the ideal value in such a > scenario? > > >> >>>>> Somewhere > > >> >>>>>>> around 10MB similar to the caching config? > > >> >>>>>>> 3) When you say, *a task level metric indicating the current > > >> >>> totally > > >> >>>>>>> aggregated metrics, * you mean the bytes aggregated at a task > > >> >>> level? > > >> >>>>>>> 4) I am ok with the name change, but would like to know > others' > > >> >>>>> thoughts. > > >> >>>>>>> > > >> >>>>>>> Thanks! > > >> >>>>>>> Sagar. > > >> >>>>>>> > > >> >>>>>>> On Sun, Aug 22, 2021 at 11:54 PM Guozhang Wang < > > >> >> [email protected] > > >> >>>> > > >> >>>>>> wrote: > > >> >>>>>>> > > >> >>>>>>>> Thanks Sagar for writing this PR. > > >> >>>>>>>> > > >> >>>>>>>> I think twice about the options that have been proposed in > > >> >>>>>>>> https://issues.apache.org/jira/browse/KAFKA-13152, and feel > > >> >> that > > >> >>>> at > > >> >>>>>> the > > >> >>>>>>>> moment it's simpler to just do the even distribution of the > > >> >>>>> configured > > >> >>>>>>>> total bytes. My rationale is that right now we have a static > > >> >>> tasks > > >> >>>> -> > > >> >>>>>>>> threads mapping, and hence each partition would only be > fetched > > >> >>> by > > >> >>>> a > > >> >>>>>>> single > > >> >>>>>>>> thread / consumer at a given time. If in the future we break > > >> >> that > > >> >>>>>> static > > >> >>>>>>>> mapping into dynamic mapping, then we would not be able to do > > >> >>> this > > >> >>>>> even > > >> >>>>>>>> distribution. Instead we would have other threads polling > from > > >> >>>>> consumer > > >> >>>>>>>> only, and those threads would be responsible for checking the > > >> >>>> config > > >> >>>>>> and > > >> >>>>>>>> pause non-empty partitions if it goes beyond the threshold. > But > > >> >>>> since > > >> >>>>>> at > > >> >>>>>>>> that time we would not change the config but just how it > would > > >> >> be > > >> >>>>>>>> implemented behind the scenes we would not need another KIP > to > > >> >>>> change > > >> >>>>>> it. > > >> >>>>>>>> > > >> >>>>>>>> Some more comments: > > >> >>>>>>>> > > >> >>>>>>>> 1. We need to discuss a bit about the default value of this > new > > >> >>>>> config. > > >> >>>>>>>> Personally I think we need to be a bit conservative with > large > > >> >>>> values > > >> >>>>>> so > > >> >>>>>>>> that it would not have any perf regression compared with old > > >> >>>> configs > > >> >>>>>>>> especially with large topology and large number of > partitions. > > >> >>>>>>>> 2. I looked at the existing metrics, and do not have > > >> >>> corresponding > > >> >>>>>>> sensors. > > >> >>>>>>>> How about also adding a task level metric indicating the > > >> >> current > > >> >>>>>> totally > > >> >>>>>>>> aggregated metrics. The reason I do not suggest this metric > on > > >> >>> the > > >> >>>>>>>> per-thread level is that in the future we may break the > static > > >> >>>>> mapping > > >> >>>>>> of > > >> >>>>>>>> tasks -> threads. > > >> >>>>>>>> > > >> >>>>>>>> [optional] As an orthogonal thought, I'm thinking maybe we > can > > >> >>>> rename > > >> >>>>>> the > > >> >>>>>>>> other "*cache.max.bytes.buffering*" as > > >> >>> "statestore.cache.max.bytes" > > >> >>>>>> (via > > >> >>>>>>>> deprecation of course), piggy-backed in this KIP? Would like > to > > >> >>>> hear > > >> >>>>>>>> others' thoughts. > > >> >>>>>>>> > > >> >>>>>>>> > > >> >>>>>>>> Guozhang > > >> >>>>>>>> > > >> >>>>>>>> > > >> >>>>>>>> > > >> >>>>>>>> On Sun, Aug 22, 2021 at 9:29 AM Sagar < > > >> >> [email protected] > > >> >>>> > > >> >>>>>> wrote: > > >> >>>>>>>> > > >> >>>>>>>>> Hi All, > > >> >>>>>>>>> > > >> >>>>>>>>> I would like to start a discussion on the following KIP: > > >> >>>>>>>>> > > >> >>>>>>>> > > >> >>>>>>> > > >> >>>>>> > > >> >>>>> > > >> >>>> > > >> >>> > > >> >> > > >> > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186878390 > > >> >>>>>>>>> > > >> >>>>>>>>> Thanks! > > >> >>>>>>>>> Sagar. > > >> >>>>>>>>> > > >> >>>>>>>> > > >> >>>>>>>> > > >> >>>>>>>> -- > > >> >>>>>>>> -- Guozhang > > >> >>>>>>>> > > >> >>>>>>> > > >> >>>>>> > > >> >>>>>> > > >> >>>>>> -- > > >> >>>>>> -- Guozhang > > >> >>>>>> > > >> >>>>> > > >> >>>> > > >> >>> > > >> >>> > > >> >>> -- > > >> >>> -- Guozhang > > >> >>> > > >> >> > > >> > > > >> > > > > > > > > -- > -- Guozhang >
