Hi Bruno, Thanks for raising this point. I think the main motivation behind this proposal is, like Matthias said, to ease the understanding burden from users to our own shoulders. Testing wise, I think we do not necessarily need to explode the testing matrix but just test the last version before each metrics refactoring (again, hopefully it is the only time) and hence I think it worth benefiting user's experience. WDYT?
Hi Matthias, Thanks for your feedback, I will update the wiki page accordingly. Will also close the other voting thread with your vote. Guozhang On Thu, Sep 5, 2019 at 11:27 AM Matthias J. Sax <matth...@confluent.io> wrote: > I share Bruno's concern about future releases, however, I would make > slightly different proposal. > > Instead of using "latest" we can just make the config optional and if > not set, we use the new metrics code? This way we don't need to add a > new version number each time we do a new release (note, that it would be > weird to keep default value "2.4" in future releases). > > For enabling backward compatibility: I don't have a strong opinion if we > should have a single value "0.10.0-2.3" or list each version individually. > > In KIP-268 (fixing metadata upgrade) we decided to list each version > individually as it seems simpler for users. Also, we wanted to hide > which release uses which metadata version (v0 in 0.10.0, and v1 in > 0.10.1 to 1.1). We could have collapsed `0.10.1` to `1.1` into a single > value though but it seemed not to give best user experience. > > I think this KIP is a little different though and both options seems to > be valid. However, I would like to emphasize that we should optimize for > user experience (and not if it's harder/easier to test etc---in doubt, > we should always take on the burden if is helps to lift the burden from > users). > > Overall, I am +1 > > Some nits: > > (1) I think the motivation section for updating `StreamsMetrics` > interface does not make it clear why we need the change. What is the > issue with the current interface and how do the new method address the > issue > > (2) The metric name `put | put-if-absent .. | get-latency (avg | max)` > is hard to read because is indicate that there is a `get-latency` method > call on stores -- can we update it to > > `(put | put-if-absent .. | get)-latency (avg | max)` > > (3) typo: `When users override it to "2.2" or below,` this should be > "2.3" -- or maybe even different if Bruno's concern gets addressed. > > > > > -Matthias > > > > > > On 9/4/19 12:26 PM, Bruno Cadonna wrote: > > Hi, > > > > I am sorry to restart the discussion here, but I came across a small > > issue in the KIP. > > > > I started to implement KIP-444 and I am bit concerned about the values > > for the the config `built.in.metrics.version`. In the KIP the possible > > values are specified as all Kafka Streams versions. I think that this > > set of values is really hard to maintain in the code and it also blows > > up the testing burden unnecessarily because all versions need to be > > tested. My proposal (backed by John) is to use the following values: > > - `latest` for the latest version of the metrics > > - `0.10.0-2.3` for the version before `latest` > > If in future, let's say in version 4.1, we need again to change the > > metrics, we would add `2.4-4.0` to the values of the config. With > > major versions, we could also get rid of some values. > > > > WDYT? > > > > You can also have a look at the PR > > https://github.com/apache/kafka/pull/7279 to see this in code. > > > > Best, > > Bruno > > > > On Tue, Aug 20, 2019 at 8:29 PM Guozhang Wang <wangg...@gmail.com> > wrote: > >> > >> Hello Bruno, > >> > >> I've updated the wiki page again per your comments, here's a brief > summary: > >> > >> 1. added the list of removed metrics. > >> 2. added a task-level INFO metric "dropped-records" that covers all > >> scenarios and merges in the existing "late-records-drop", > >> "skipped-records", and "expired-window-records-drop". > >> 3. renamed the util functions of StreamsMetrics as `addLatencyRateTotal` > >> and `addRateTotal` sensors. > >> > >> > >> Since I feel it has incorporated all of your comments I'm going to start > >> the vote thread for this KIP now. > >> > >> > >> Guozhang > >> > >> > >> On Tue, Aug 20, 2019 at 9:59 AM Guozhang Wang <wangg...@gmail.com> > wrote: > >> > >>> Hi Bruno, > >>> > >>> No it was not intentional, and we can definitely add the total amount > >>> sensor as well -- they are just util functions to save users some > lines of > >>> code anyways, and should be straightforward. > >>> > >>> Guozhang > >>> > >>> > >>> On Tue, Aug 20, 2019 at 1:05 AM Bruno Cadonna <br...@confluent.io> > wrote: > >>> > >>>> Hi Guozhang, > >>>> > >>>> I totally missed the total invocation count metric in the javadoc. > >>>> Which brings me to a follow-up question. Should the names of the > >>>> methods reflect the included total invocation count? We have to rename > >>>> them anyways. One option would be to simply add `Total` to the method > >>>> names, i.e., `addLatencyAndRateAndTotalSensor` and > >>>> `addRateAndTotalSensor` (alternatively without the `And`s). Since > >>>> those sensors record exclusively invocations, another option would be > >>>> `addInvocationSensor` and `addInvocationSensorWithoutLatency`. > >>>> > >>>> As far as I can see, we have sensors to record invocations but none to > >>>> record amounts. Is that intentional? No need to add it to this KIP, I > >>>> am just curious. > >>>> > >>>> Best, > >>>> Bruno > >>>> > >>>> On Tue, Aug 20, 2019 at 1:13 AM Guozhang Wang <wangg...@gmail.com> > wrote: > >>>>> > >>>>> Hi Bruno, > >>>>> > >>>>> Just realized that for `addRateSensor` and `addLatencyAndRateSensor` > >>>> we've > >>>>> actually added the total invocation metric already. > >>>>> > >>>>> > >>>>> Guozhang > >>>>> > >>>>> On Mon, Aug 19, 2019 at 4:11 PM Guozhang Wang <wangg...@gmail.com> > >>>> wrote: > >>>>> > >>>>>> Hi Bruno, > >>>>>> > >>>>>> > >>>>>> On Tue, Aug 6, 2019 at 1:51 AM Bruno Cadonna <br...@confluent.io> > >>>> wrote: > >>>>>> > >>>>>>> Hi Guozhang, > >>>>>>> > >>>>>>> I left my comments inline. > >>>>>>> > >>>>>>> On Thu, Jul 18, 2019 at 8:28 PM Guozhang Wang <wangg...@gmail.com> > >>>> wrote: > >>>>>>>> > >>>>>>>> Hello Bruno, > >>>>>>>> > >>>>>>>> Thanks for the feedbacks, replied inline. > >>>>>>>> > >>>>>>>> On Mon, Jul 1, 2019 at 7:08 AM Bruno Cadonna <br...@confluent.io> > >>>>>>> wrote: > >>>>>>>> > >>>>>>>>> Hi Guozhang, > >>>>>>>>> > >>>>>>>>> Thank you for the KIP. > >>>>>>>>> > >>>>>>>>> 1) As far as I understand, the StreamsMetrics interface is there > >>>> for > >>>>>>>>> user-defined processors. Would it make sense to also add a > >>>> method to > >>>>>>>>> the interface to specify a sensor that records skipped records? > >>>>>>>>> > >>>>>>>>> Not sure I follow.. if users want to add a specific skipped > >>>> records > >>>>>>>> sensor, she can still do that as a "throughput" sensor via " > >>>>>>>> addThroughputSensor" and then "record" right? > >>>>>>>> > >>>>>>>> As an after-thought, maybe it's better to rename `throughput` to > >>>> `rate` > >>>>>>> in > >>>>>>>> the public APIs since it is really meant for the latter semantics. > >>>> I did > >>>>>>>> not change it just to make less API changes / deprecate fewer > >>>> functions. > >>>>>>>> But if we feel it is important we can change it as well. > >>>>>>>> > >>>>>>> > >>>>>>> I see now that a user can record the rate of skipped records. > >>>> However, > >>>>>>> I was referring to the total number of skipped records. Maybe my > >>>>>>> question should be more general: should we allow the user to also > >>>>>>> specify sensors for totals or combinations of rate and totals? > >>>>>>> > >>>>>>> Sounds good to me, I will add it to the wiki page as well for > >>>>>> StreamsMetrics. > >>>>>> > >>>>>> > >>>>>> > >>>>>>> Regarding the naming, I like `rate` more than `throughput`, but I > >>>>>>> would not fight for it. > >>>>>>> > >>>>>>>> > >>>>>>>>> 2) What are the semantics of active-task-process and > >>>>>>> standby-task-process > >>>>>>>>> > >>>>>>>>> Ah good catch, I think I made it in the wrong column. Just some > >>>>>>>> explanations here: Within a thread's looped iterations, it will > >>>> first > >>>>>>> try > >>>>>>>> to process some records from the active tasks, and then see if > >>>> there are > >>>>>>>> any standby-tasks that can be processed as well (i.e. just reading > >>>> from > >>>>>>> the > >>>>>>>> restore consumer and apply to the local stores). The ratio metrics > >>>> are > >>>>>>> for > >>>>>>>> indicating 1) what tasks (active or standby) does this thread own > >>>> so > >>>>>>> far, > >>>>>>>> and 2) how much time in percentage does it spend on each of them. > >>>>>>>> > >>>>>>>> But this metric should really be a task-level one that includes > >>>> both the > >>>>>>>> thread-id and task-id, and upon task migrations they will be > >>>> dynamically > >>>>>>>> deleted / (re)-created. For each task-id it may be owned by > >>>> multiple > >>>>>>>> threads as one active and others standby, and hence the separation > >>>> of > >>>>>>>> active / standby seems still necessary. > >>>>>>>> > >>>>>>> > >>>>>>> Makes sense. > >>>>>>> > >>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>>> 3) How do dropped-late-records and expired-window-record-drop > >>>> relate > >>>>>>>>> to each other? I guess the former is for records that fall > >>>> outside the > >>>>>>>>> grace period and the latter is for records that are processed > >>>> after > >>>>>>>>> the retention period of the window. Is this correct? > >>>>>>>>> > >>>>>>>>> Yes, that's correct. The names are indeed a bit confusing since > >>>> they > >>>>>>> are > >>>>>>>> added at different releases historically.. > >>>>>>>> > >>>>>>>> More precisely, the `grace period` is a notion of the operator > >>>> (hence > >>>>>>> the > >>>>>>>> metric is node-level, though it would only be used for DSL > >>>> operators) > >>>>>>> while > >>>>>>>> the `retention` is a notion of the store (hence the metric is > >>>>>>> store-level). > >>>>>>>> Usually grace period will be smaller than store retention though. > >>>>>>>> > >>>>>>>> Processor node is aware of `grace period` and when received a > >>>> record > >>>>>>> that > >>>>>>>> is older than grace deadline, it will be dropped immediately; > >>>> otherwise > >>>>>>> it > >>>>>>>> will still be processed a maybe a new update is "put" into the > >>>> store. > >>>>>>> The > >>>>>>>> store is aware of its `retention period` and then upon a "put" > >>>> call if > >>>>>>> it > >>>>>>>> realized it is older than the retention deadline, that put call > >>>> would be > >>>>>>>> ignored and metric is recorded. > >>>>>>>> > >>>>>>>> We have to separate them here since the window store can be used > >>>> in both > >>>>>>>> DSL and PAPI, and for the former case it would likely to be > already > >>>>>>> ignored > >>>>>>>> at the processor node level due to the grace period which is > >>>> usually > >>>>>>>> smaller than retention; but for PAPI there's no grace period and > >>>> hence > >>>>>>> the > >>>>>>>> processor would likely still process and call "put" on the store. > >>>>>>>> > >>>>>>> > >>>>>>> Alright! Got it! > >>>>>>> > >>>>>>>> > >>>>>>>>> 4) Is there an actual difference between skipped and dropped > >>>> records? > >>>>>>>>> If not, shall we unify the terminology? > >>>>>>>>> > >>>>>>>>> > >>>>>>>> There is. Dropped records are only due to lateness; where as > >>>> skipped > >>>>>>>> records can be due to serde errors (and user's error handling > >>>> indicate > >>>>>>>> "skip and continue"), timestamp errors, etc. > >>>>>>>> > >>>>>>>> I've considered maybe a better (more extensible) way would be > >>>> defining a > >>>>>>>> single metric name, say skipped-records, but use different tags to > >>>>>>> indicate > >>>>>>>> if its skipping reason (errors, windowing semantics, etc). But > >>>> there's > >>>>>>>> still a tricky difference: for serde caused skipping for example, > >>>> they > >>>>>>> will > >>>>>>>> be skipped at the very beginning and there's no effects taken at > >>>> all. > >>>>>>> For > >>>>>>>> some others e.g. null-key / value at the reduce operator, it is > >>>> only > >>>>>>>> skipped at the middle of the processing, i.e. some effects may > have > >>>>>>> already > >>>>>>>> been taken in up-stream sub-topologies. And that's why for > >>>>>>> skipped-records > >>>>>>>> I've defined it on both task-level and node-level and the > >>>> aggregate of > >>>>>>> the > >>>>>>>> latter may still be smaller than the former, whereas for > >>>>>>> dropped-records it > >>>>>>>> is only for node-level. > >>>>>>>> > >>>>>>>> So how about an even more significant change then: we enlarge the > >>>>>>>> `dropped-late-records` to `dropped-records` which is node-level > >>>> only, > >>>>>>> but > >>>>>>>> includes reasons form lateness to semantics (like null-key) as > >>>> well; and > >>>>>>>> then we have a task-level-only `skipped-records` which only record > >>>> those > >>>>>>>> dropped at the very beginning and did not make it at all to the > >>>>>>> processing > >>>>>>>> topology. I feel this is a clearer distinguishment but also a > >>>> bigger > >>>>>>> change > >>>>>>>> to users. > >>>>>>>> > >>>>>>> > >>>>>>> I like the way you dropped-records and skipped-records are now > >>>>>>> defined. My follow-up question is whether we should give names to > >>>>>>> those metrics that better describe their semantics, like: > >>>>>>> > >>>>>>> dropped-records-at-source and dropped-records-at-processor > >>>>>>> > >>>>>>> or > >>>>>>> > >>>>>>> records-dropped-at-source and records-dropped-at-processor > >>>>>>> > >>>>>>> or > >>>>>>> > >>>>>>> source-dropped-records and processor-dropped-records > >>>>>>> > >>>>>>> or alternatively with skipped. However, I would use the same term > as > >>>>>>> in expired-window-record-drop > >>>>>>> > >>>>>>> Maybe, we should also consider to rename expired-window-record-drop > >>>> to > >>>>>>> expired-window-record-dropped to be consistent. > >>>>>>> > >>>>>>> WDYT? > >>>>>>> > >>>>>>> I was not considering "expired-window-record-drop" before since it > >>>> is a > >>>>>> store-level metric, and I was only considering task-level > >>>> (skipped-records) > >>>>>> and processor-node-level (dropped-records) metrics, and I'm using > >>>> different > >>>>>> terms deliberately to hint users that they are different leveled > >>>> metrics. > >>>>>> > >>>>>> I still feel that using `skip` for task-level metrics indicating > that > >>>> this > >>>>>> record was not processed at all, and using `drop` for > processor-level > >>>>>> metrics that this record is only dropped at this stage of the > >>>> topology is a > >>>>>> better one; but I'm also okay with some finer grained metrics so > that > >>>> we > >>>>>> can align the processor-level with store-level (they are on the same > >>>>>> granularity any ways), like: > >>>>>> > >>>>>> `dropped-records-null-field`: at processor nodes > >>>>>> > >>>>>> `dropped-records-too-late`: at processor nodes > >>>>>> > >>>>>> `dropped-records-expired-window`: at window-stores > >>>>>> > >>>>>> > >>>>>>>> > >>>>>>>>> 5) What happens with removed metrics when the user sets the > >>>> version of > >>>>>>>>> "built.in.metrics.version" to 2.2- > >>>>>>>>> > >>>>>>>>> I think for those redundant ones like ""forward-rate" and > >>>>>>> "destroy-rate" > >>>>>>>> we can still remove them with 2.2- as well; for other ones that > are > >>>>>>> removed > >>>>>>>> / replaced like thread-level skipped-records we should still > >>>> maintain > >>>>>>> them. > >>>>>>>> > >>>>>>> > >>>>>>> Could you add this comment about removal of redundant metrics to > the > >>>>>>> KIP such that is documented somewhere? > >>>>>>> > >>>>>>> Yes, for sure. > >>>>>> > >>>>>> > >>>>>>> > >>>>>>> Best, > >>>>>>> Bruno > >>>>>>> > >>>>>> > >>>>>> I've also decided to remove the rebalance-related metrics from the > >>>>>> instance-level and move it to consumer itself as part of KIP-429. > >>>>>> > >>>>>> > >>>>>> -- > >>>>>> -- Guozhang > >>>>>> > >>>>> > >>>>> > >>>>> -- > >>>>> -- Guozhang > >>>> > >>> > >>> > >>> -- > >>> -- Guozhang > >>> > >> > >> > >> -- > >> -- Guozhang > > -- -- Guozhang