Hi All, Bumping this thread again.
Thanks! Sagar. On Sat, Sep 11, 2021 at 2:04 PM Sagar <[email protected]> wrote: > Hi Mathias, > > I missed out on the metrics part. > > I have added the new metric in the proposed changes section along with the > small re-wording that you talked about. > > Let me know if that makes sense. > > Thanks! > Sagar. > > On Fri, Sep 10, 2021 at 3:45 AM Matthias J. Sax <[email protected]> wrote: > >> Thanks for the KIP. >> >> There was some discussion about adding a metric on the thread, but the >> KIP does not contain anything about it. Did we drop this suggestion or >> was the KIP not updated accordingly? >> >> >> Nit: >> >> > This would be a global config applicable per processing topology >> >> Can we change this to `per Kafka Streams instance.` >> >> Atm, a Stream instance executes a single topology, so it does not make >> any effective difference right now. However, it seems better (more >> logical) to bind the config to the instance (not the topology the >> instance executes). >> >> >> -Matthias >> >> On 9/2/21 6:08 AM, Sagar wrote: >> > Thanks Guozhang and Luke. >> > >> > I have updated the KIP with all the suggested changes. >> > >> > Do you think we could start voting for this? >> > >> > Thanks! >> > Sagar. >> > >> > On Thu, Sep 2, 2021 at 8:26 AM Luke Chen <[email protected]> wrote: >> > >> >> Thanks for the KIP. Overall LGTM. >> >> >> >> Just one thought, if we "rename" the config directly as mentioned in >> the >> >> KIP, would that break existing applications? >> >> Should we deprecate the old one first, and make the old/new names >> co-exist >> >> for some period of time? >> >> >> >> Public Interfaces >> >> >> >> - Adding a new config *input.buffer.max.bytes *applicable at a >> topology >> >> level. The importance of this config would be *Medium*. >> >> - Renaming *cache.max.bytes.buffering* to >> *statestore.cache.max.bytes*. >> >> >> >> >> >> >> >> Thank you. >> >> Luke >> >> >> >> On Thu, Sep 2, 2021 at 1:50 AM Guozhang Wang <[email protected]> >> wrote: >> >> >> >>> Currently the state store cache size default value is 10MB today, >> which >> >>> arguably is rather small. So I'm thinking maybe for this config >> default >> >> to >> >>> 512MB. >> >>> >> >>> Other than that, LGTM. >> >>> >> >>> On Sat, Aug 28, 2021 at 11:34 AM Sagar <[email protected]> >> >> wrote: >> >>> >> >>>> Thanks Guozhang and Sophie. >> >>>> >> >>>> Yeah a small default value would lower the throughput. I didn't quite >> >>>> realise it earlier. It's slightly hard to predict this value so I >> would >> >>>> guess around 1/2 GB to 1 GB? WDYT? >> >>>> >> >>>> Regarding the renaming of the config and the new metric, sure would >> >>> include >> >>>> it in the KIP. >> >>>> >> >>>> Lastly, importance would also. be added. I guess Medium should be ok. >> >>>> >> >>>> Thanks! >> >>>> Sagar. >> >>>> >> >>>> >> >>>> On Sat, Aug 28, 2021 at 10:42 AM Sophie Blee-Goldman >> >>>> <[email protected]> wrote: >> >>>> >> >>>>> 1) I agree that we should just distribute the bytes evenly, at least >> >>> for >> >>>>> now. It's simpler to understand and >> >>>>> we can always change it later, plus it makes sense to keep this >> >> aligned >> >>>>> with how the cache works today >> >>>>> >> >>>>> 2) +1 to being conservative in the generous sense, it's just not >> >>>> something >> >>>>> we can predict with any degree >> >>>>> of accuracy and even if we could, the appropriate value is going to >> >>>> differ >> >>>>> wildly across applications and use >> >>>>> cases. We might want to just pick some multiple of the default cache >> >>>> size, >> >>>>> and maybe do some research on >> >>>>> other relevant defaults or sizes (default JVM heap, size of >> available >> >>>>> memory in common hosts eg EC2 >> >>>>> instances, etc). We don't need to worry as much about erring on the >> >>> side >> >>>> of >> >>>>> too big, since other configs like >> >>>>> the max.poll.records will help somewhat to keep it from exploding. >> >>>>> >> >>>>> 4) 100%, I always found the *cache.max.bytes.buffering* config name >> >> to >> >>> be >> >>>>> incredibly confusing. Deprecating this in >> >>>>> favor of "*statestore.cache.max.bytes*" and aligning it to the new >> >>> input >> >>>>> buffer config sounds good to me to include here. >> >>>>> >> >>>>> 5) The KIP should list all relevant public-facing changes, including >> >>>>> metadata like the config's "Importance". Personally >> >>>>> I would recommend Medium, or even High if we're really worried about >> >>> the >> >>>>> default being wrong for a lot of users >> >>>>> >> >>>>> Thanks for the KIP, besides those few things that Guozhang brought >> up >> >>> and >> >>>>> the config importance, everything SGTM >> >>>>> >> >>>>> -Sophie >> >>>>> >> >>>>> On Thu, Aug 26, 2021 at 2:41 PM Guozhang Wang <[email protected]> >> >>>> wrote: >> >>>>> >> >>>>>> 1) I meant for your proposed solution. I.e. to distribute the >> >>>> configured >> >>>>>> bytes among threads evenly. >> >>>>>> >> >>>>>> 2) I was actually thinking about making the default a large enough >> >>>> value >> >>>>> so >> >>>>>> that we would not introduce performance regression: thinking about >> >> a >> >>>> use >> >>>>>> case with many partitions and each record may be large, then >> >>>> effectively >> >>>>> we >> >>>>>> would only start pausing when the total bytes buffered is pretty >> >>> large. >> >>>>> If >> >>>>>> we set the default value to small, we would be "more aggressive" on >> >>>>> pausing >> >>>>>> which may impact throughput. >> >>>>>> >> >>>>>> 3) Yes exactly, this would naturally be at the "partition-group" >> >>> class >> >>>>>> since that represents the task's all input partitions. >> >>>>>> >> >>>>>> 4) This is just a bold thought, I'm interested to see other's >> >>> thoughts. >> >>>>>> >> >>>>>> >> >>>>>> Guozhang >> >>>>>> >> >>>>>> On Mon, Aug 23, 2021 at 4:10 AM Sagar <[email protected]> >> >>>> wrote: >> >>>>>> >> >>>>>>> Thanks Guozhang. >> >>>>>>> >> >>>>>>> 1) Just for my confirmation, when you say we should proceed with >> >>> the >> >>>>> even >> >>>>>>> distribution of bytes, are you referring to the Proposed Solution >> >>> in >> >>>>> the >> >>>>>>> KIP or the option you had considered in the JIRA? >> >>>>>>> 2) Default value for the config is something that I missed. I >> >> agree >> >>>> we >> >>>>>>> can't have really large values as it might be detrimental to the >> >>>>>>> performance. Maybe, as a starting point, we assume that only 1 >> >>> Stream >> >>>>>> Task >> >>>>>>> is running so what could be the ideal value in such a scenario? >> >>>>> Somewhere >> >>>>>>> around 10MB similar to the caching config? >> >>>>>>> 3) When you say, *a task level metric indicating the current >> >>> totally >> >>>>>>> aggregated metrics, * you mean the bytes aggregated at a task >> >>> level? >> >>>>>>> 4) I am ok with the name change, but would like to know others' >> >>>>> thoughts. >> >>>>>>> >> >>>>>>> Thanks! >> >>>>>>> Sagar. >> >>>>>>> >> >>>>>>> On Sun, Aug 22, 2021 at 11:54 PM Guozhang Wang < >> >> [email protected] >> >>>> >> >>>>>> wrote: >> >>>>>>> >> >>>>>>>> Thanks Sagar for writing this PR. >> >>>>>>>> >> >>>>>>>> I think twice about the options that have been proposed in >> >>>>>>>> https://issues.apache.org/jira/browse/KAFKA-13152, and feel >> >> that >> >>>> at >> >>>>>> the >> >>>>>>>> moment it's simpler to just do the even distribution of the >> >>>>> configured >> >>>>>>>> total bytes. My rationale is that right now we have a static >> >>> tasks >> >>>> -> >> >>>>>>>> threads mapping, and hence each partition would only be fetched >> >>> by >> >>>> a >> >>>>>>> single >> >>>>>>>> thread / consumer at a given time. If in the future we break >> >> that >> >>>>>> static >> >>>>>>>> mapping into dynamic mapping, then we would not be able to do >> >>> this >> >>>>> even >> >>>>>>>> distribution. Instead we would have other threads polling from >> >>>>> consumer >> >>>>>>>> only, and those threads would be responsible for checking the >> >>>> config >> >>>>>> and >> >>>>>>>> pause non-empty partitions if it goes beyond the threshold. But >> >>>> since >> >>>>>> at >> >>>>>>>> that time we would not change the config but just how it would >> >> be >> >>>>>>>> implemented behind the scenes we would not need another KIP to >> >>>> change >> >>>>>> it. >> >>>>>>>> >> >>>>>>>> Some more comments: >> >>>>>>>> >> >>>>>>>> 1. We need to discuss a bit about the default value of this new >> >>>>> config. >> >>>>>>>> Personally I think we need to be a bit conservative with large >> >>>> values >> >>>>>> so >> >>>>>>>> that it would not have any perf regression compared with old >> >>>> configs >> >>>>>>>> especially with large topology and large number of partitions. >> >>>>>>>> 2. I looked at the existing metrics, and do not have >> >>> corresponding >> >>>>>>> sensors. >> >>>>>>>> How about also adding a task level metric indicating the >> >> current >> >>>>>> totally >> >>>>>>>> aggregated metrics. The reason I do not suggest this metric on >> >>> the >> >>>>>>>> per-thread level is that in the future we may break the static >> >>>>> mapping >> >>>>>> of >> >>>>>>>> tasks -> threads. >> >>>>>>>> >> >>>>>>>> [optional] As an orthogonal thought, I'm thinking maybe we can >> >>>> rename >> >>>>>> the >> >>>>>>>> other "*cache.max.bytes.buffering*" as >> >>> "statestore.cache.max.bytes" >> >>>>>> (via >> >>>>>>>> deprecation of course), piggy-backed in this KIP? Would like to >> >>>> hear >> >>>>>>>> others' thoughts. >> >>>>>>>> >> >>>>>>>> >> >>>>>>>> Guozhang >> >>>>>>>> >> >>>>>>>> >> >>>>>>>> >> >>>>>>>> On Sun, Aug 22, 2021 at 9:29 AM Sagar < >> >> [email protected] >> >>>> >> >>>>>> wrote: >> >>>>>>>> >> >>>>>>>>> Hi All, >> >>>>>>>>> >> >>>>>>>>> I would like to start a discussion on the following KIP: >> >>>>>>>>> >> >>>>>>>> >> >>>>>>> >> >>>>>> >> >>>>> >> >>>> >> >>> >> >> >> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186878390 >> >>>>>>>>> >> >>>>>>>>> Thanks! >> >>>>>>>>> Sagar. >> >>>>>>>>> >> >>>>>>>> >> >>>>>>>> >> >>>>>>>> -- >> >>>>>>>> -- Guozhang >> >>>>>>>> >> >>>>>>> >> >>>>>> >> >>>>>> >> >>>>>> -- >> >>>>>> -- Guozhang >> >>>>>> >> >>>>> >> >>>> >> >>> >> >>> >> >>> -- >> >>> -- Guozhang >> >>> >> >> >> > >> >
