Hi Bruno,

Thanks for raising this point. I think the main motivation behind this
proposal is, like Matthias said, to ease the understanding burden from
users to our own shoulders. Testing wise, I think we do not necessarily
need to explode the testing matrix but just test the last version before
each metrics refactoring (again, hopefully it is the only time) and hence I
think it worth benefiting user's experience. WDYT?

Hi Matthias,

Thanks for your feedback, I will update the wiki page accordingly.

Will also close the other voting thread with your vote.

Guozhang

On Thu, Sep 5, 2019 at 11:27 AM Matthias J. Sax <matth...@confluent.io>
wrote:

> I share Bruno's concern about future releases, however, I would make
> slightly different proposal.
>
> Instead of using "latest" we can just make the config optional and if
> not set, we use the new metrics code? This way we don't need to add a
> new version number each time we do a new release (note, that it would be
> weird to keep default value "2.4" in future releases).
>
> For enabling backward compatibility: I don't have a strong opinion if we
> should have a single value "0.10.0-2.3" or list each version individually.
>
> In KIP-268 (fixing metadata upgrade) we decided to list each version
> individually as it seems simpler for users. Also, we wanted to hide
> which release uses which metadata version (v0 in 0.10.0, and v1 in
> 0.10.1 to 1.1). We could have collapsed `0.10.1` to `1.1` into a single
> value though but it seemed not to give best user experience.
>
> I think this KIP is a little different though and both options seems to
> be valid. However, I would like to emphasize that we should optimize for
> user experience (and not if it's harder/easier to test etc---in doubt,
> we should always take on the burden if is helps to lift the burden from
> users).
>
> Overall, I am +1
>
> Some nits:
>
> (1) I think the motivation section for updating `StreamsMetrics`
> interface does not make it clear why we need the change. What is the
> issue with the current interface and how do the new method address the
> issue
>
> (2) The metric name `put | put-if-absent .. | get-latency (avg | max)`
> is hard to read because is indicate that there is a `get-latency` method
> call on stores -- can we update it to
>
> `(put | put-if-absent .. | get)-latency (avg | max)`
>
> (3) typo: `When users override it to "2.2" or below,` this should be
> "2.3" -- or maybe even different if Bruno's concern gets addressed.
>
>
>
>
> -Matthias
>
>
>
>
>
> On 9/4/19 12:26 PM, Bruno Cadonna wrote:
> > Hi,
> >
> > I am sorry to restart the discussion here, but I came across a small
> > issue in the KIP.
> >
> > I started to implement KIP-444 and I am bit concerned about the values
> > for the the config `built.in.metrics.version`. In the KIP the possible
> > values are specified as all Kafka Streams versions. I think that this
> > set of values is really hard to maintain in the code and it also blows
> > up the testing burden unnecessarily because all versions need to be
> > tested. My proposal (backed by John) is to use the following values:
> > - `latest` for the latest version of the metrics
> > - `0.10.0-2.3` for the version before `latest`
> > If in future, let's say in version 4.1, we need again to change the
> > metrics, we would add `2.4-4.0` to the values of the config. With
> > major versions, we could also get rid of some values.
> >
> > WDYT?
> >
> > You can also have a look at the PR
> > https://github.com/apache/kafka/pull/7279 to see this in code.
> >
> > Best,
> > Bruno
> >
> > On Tue, Aug 20, 2019 at 8:29 PM Guozhang Wang <wangg...@gmail.com>
> wrote:
> >>
> >> Hello Bruno,
> >>
> >> I've updated the wiki page again per your comments, here's a brief
> summary:
> >>
> >> 1. added the list of removed metrics.
> >> 2. added a task-level INFO metric "dropped-records" that covers all
> >> scenarios and merges in the existing "late-records-drop",
> >> "skipped-records", and "expired-window-records-drop".
> >> 3. renamed the util functions of StreamsMetrics as `addLatencyRateTotal`
> >> and `addRateTotal` sensors.
> >>
> >>
> >> Since I feel it has incorporated all of your comments I'm going to start
> >> the vote thread for this KIP now.
> >>
> >>
> >> Guozhang
> >>
> >>
> >> On Tue, Aug 20, 2019 at 9:59 AM Guozhang Wang <wangg...@gmail.com>
> wrote:
> >>
> >>> Hi Bruno,
> >>>
> >>> No it was not intentional, and we can definitely add the total amount
> >>> sensor as well -- they are just util functions to save users some
> lines of
> >>> code anyways, and should be straightforward.
> >>>
> >>> Guozhang
> >>>
> >>>
> >>> On Tue, Aug 20, 2019 at 1:05 AM Bruno Cadonna <br...@confluent.io>
> wrote:
> >>>
> >>>> Hi Guozhang,
> >>>>
> >>>> I totally missed the total invocation count metric in the javadoc.
> >>>> Which brings me to a follow-up question. Should the names of the
> >>>> methods reflect the included total invocation count? We have to rename
> >>>> them anyways. One option would be to simply add `Total` to the method
> >>>> names, i.e., `addLatencyAndRateAndTotalSensor` and
> >>>> `addRateAndTotalSensor` (alternatively without the `And`s). Since
> >>>> those sensors record exclusively invocations, another option would be
> >>>> `addInvocationSensor` and `addInvocationSensorWithoutLatency`.
> >>>>
> >>>> As far as I can see, we have sensors to record invocations but none to
> >>>> record amounts. Is that intentional? No need to add it to this KIP, I
> >>>> am just curious.
> >>>>
> >>>> Best,
> >>>> Bruno
> >>>>
> >>>> On Tue, Aug 20, 2019 at 1:13 AM Guozhang Wang <wangg...@gmail.com>
> wrote:
> >>>>>
> >>>>> Hi Bruno,
> >>>>>
> >>>>> Just realized that for `addRateSensor` and `addLatencyAndRateSensor`
> >>>> we've
> >>>>> actually added the total invocation metric already.
> >>>>>
> >>>>>
> >>>>> Guozhang
> >>>>>
> >>>>> On Mon, Aug 19, 2019 at 4:11 PM Guozhang Wang <wangg...@gmail.com>
> >>>> wrote:
> >>>>>
> >>>>>> Hi Bruno,
> >>>>>>
> >>>>>>
> >>>>>> On Tue, Aug 6, 2019 at 1:51 AM Bruno Cadonna <br...@confluent.io>
> >>>> wrote:
> >>>>>>
> >>>>>>> Hi Guozhang,
> >>>>>>>
> >>>>>>> I left my comments inline.
> >>>>>>>
> >>>>>>> On Thu, Jul 18, 2019 at 8:28 PM Guozhang Wang <wangg...@gmail.com>
> >>>> wrote:
> >>>>>>>>
> >>>>>>>> Hello Bruno,
> >>>>>>>>
> >>>>>>>> Thanks for the feedbacks, replied inline.
> >>>>>>>>
> >>>>>>>> On Mon, Jul 1, 2019 at 7:08 AM Bruno Cadonna <br...@confluent.io>
> >>>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> Hi Guozhang,
> >>>>>>>>>
> >>>>>>>>> Thank you for the KIP.
> >>>>>>>>>
> >>>>>>>>> 1) As far as I understand, the StreamsMetrics interface is there
> >>>> for
> >>>>>>>>> user-defined processors. Would it make sense to also add a
> >>>> method to
> >>>>>>>>> the interface to specify a sensor that records skipped records?
> >>>>>>>>>
> >>>>>>>>> Not sure I follow.. if users want to add a specific skipped
> >>>> records
> >>>>>>>> sensor, she can still do that as a "throughput" sensor via "
> >>>>>>>> addThroughputSensor" and then "record" right?
> >>>>>>>>
> >>>>>>>> As an after-thought, maybe it's better to rename `throughput` to
> >>>> `rate`
> >>>>>>> in
> >>>>>>>> the public APIs since it is really meant for the latter semantics.
> >>>> I did
> >>>>>>>> not change it just to make less API changes / deprecate fewer
> >>>> functions.
> >>>>>>>> But if we feel it is important we can change it as well.
> >>>>>>>>
> >>>>>>>
> >>>>>>> I see now that a user can record the rate of skipped records.
> >>>> However,
> >>>>>>> I was referring to the total number of skipped records. Maybe my
> >>>>>>> question should be more general: should we allow the user to also
> >>>>>>> specify sensors for totals or combinations of rate and totals?
> >>>>>>>
> >>>>>>> Sounds good to me, I will add it to the wiki page as well for
> >>>>>> StreamsMetrics.
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>> Regarding the naming, I like `rate` more than `throughput`, but I
> >>>>>>> would not fight for it.
> >>>>>>>
> >>>>>>>>
> >>>>>>>>> 2) What are the semantics of active-task-process and
> >>>>>>> standby-task-process
> >>>>>>>>>
> >>>>>>>>> Ah good catch, I think I made it in the wrong column. Just some
> >>>>>>>> explanations here: Within a thread's looped iterations, it will
> >>>> first
> >>>>>>> try
> >>>>>>>> to process some records from the active tasks, and then see if
> >>>> there are
> >>>>>>>> any standby-tasks that can be processed as well (i.e. just reading
> >>>> from
> >>>>>>> the
> >>>>>>>> restore consumer and apply to the local stores). The ratio metrics
> >>>> are
> >>>>>>> for
> >>>>>>>> indicating 1) what tasks (active or standby) does this thread own
> >>>> so
> >>>>>>> far,
> >>>>>>>> and 2) how much time in percentage does it spend on each of them.
> >>>>>>>>
> >>>>>>>> But this metric should really be a task-level one that includes
> >>>> both the
> >>>>>>>> thread-id and task-id, and upon task migrations they will be
> >>>> dynamically
> >>>>>>>> deleted / (re)-created. For each task-id it may be owned by
> >>>> multiple
> >>>>>>>> threads as one active and others standby, and hence the separation
> >>>> of
> >>>>>>>> active / standby seems still necessary.
> >>>>>>>>
> >>>>>>>
> >>>>>>> Makes sense.
> >>>>>>>
> >>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>> 3) How do dropped-late-records and expired-window-record-drop
> >>>> relate
> >>>>>>>>> to each other? I guess the former is for records that fall
> >>>> outside the
> >>>>>>>>> grace period and the latter is for records that are processed
> >>>> after
> >>>>>>>>> the retention period of the window. Is this correct?
> >>>>>>>>>
> >>>>>>>>> Yes, that's correct. The names are indeed a bit confusing since
> >>>> they
> >>>>>>> are
> >>>>>>>> added at different releases historically..
> >>>>>>>>
> >>>>>>>> More precisely, the `grace period` is a notion of the operator
> >>>> (hence
> >>>>>>> the
> >>>>>>>> metric is node-level, though it would only be used for DSL
> >>>> operators)
> >>>>>>> while
> >>>>>>>> the `retention` is a notion of the store (hence the metric is
> >>>>>>> store-level).
> >>>>>>>> Usually grace period will be smaller than store retention though.
> >>>>>>>>
> >>>>>>>> Processor node is aware of `grace period` and when received a
> >>>> record
> >>>>>>> that
> >>>>>>>> is older than grace deadline, it will be dropped immediately;
> >>>> otherwise
> >>>>>>> it
> >>>>>>>> will still be processed a maybe a new update is "put" into the
> >>>> store.
> >>>>>>> The
> >>>>>>>> store is aware of its `retention period` and then upon a "put"
> >>>> call if
> >>>>>>> it
> >>>>>>>> realized it is older than the retention deadline, that put call
> >>>> would be
> >>>>>>>> ignored and metric is recorded.
> >>>>>>>>
> >>>>>>>> We have to separate them here since the window store can be used
> >>>> in both
> >>>>>>>> DSL and PAPI, and for the former case it would likely to be
> already
> >>>>>>> ignored
> >>>>>>>> at the processor node level due to the grace period which is
> >>>> usually
> >>>>>>>> smaller than retention; but for PAPI there's no grace period and
> >>>> hence
> >>>>>>> the
> >>>>>>>> processor would likely still process and call "put" on the store.
> >>>>>>>>
> >>>>>>>
> >>>>>>> Alright! Got it!
> >>>>>>>
> >>>>>>>>
> >>>>>>>>> 4) Is there an actual difference between skipped and dropped
> >>>> records?
> >>>>>>>>> If not, shall we unify the terminology?
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>> There is. Dropped records are only due to lateness; where as
> >>>> skipped
> >>>>>>>> records can be due to serde errors (and user's error handling
> >>>> indicate
> >>>>>>>> "skip and continue"), timestamp errors, etc.
> >>>>>>>>
> >>>>>>>> I've considered maybe a better (more extensible) way would be
> >>>> defining a
> >>>>>>>> single metric name, say skipped-records, but use different tags to
> >>>>>>> indicate
> >>>>>>>> if its skipping reason (errors, windowing semantics, etc). But
> >>>> there's
> >>>>>>>> still a tricky difference: for serde caused skipping for example,
> >>>> they
> >>>>>>> will
> >>>>>>>> be skipped at the very beginning and there's no effects taken at
> >>>> all.
> >>>>>>> For
> >>>>>>>> some others e.g. null-key / value at the reduce operator, it is
> >>>> only
> >>>>>>>> skipped at the middle of the processing, i.e. some effects may
> have
> >>>>>>> already
> >>>>>>>> been taken in up-stream sub-topologies. And that's why for
> >>>>>>> skipped-records
> >>>>>>>> I've defined it on both task-level and node-level and the
> >>>> aggregate of
> >>>>>>> the
> >>>>>>>> latter may still be smaller than the former, whereas for
> >>>>>>> dropped-records it
> >>>>>>>> is only for node-level.
> >>>>>>>>
> >>>>>>>> So how about an even more significant change then: we enlarge the
> >>>>>>>> `dropped-late-records` to `dropped-records` which is node-level
> >>>> only,
> >>>>>>> but
> >>>>>>>> includes reasons form lateness to semantics (like null-key) as
> >>>> well; and
> >>>>>>>> then we have a task-level-only `skipped-records` which only record
> >>>> those
> >>>>>>>> dropped at the very beginning and did not make it at all to the
> >>>>>>> processing
> >>>>>>>> topology. I feel this is a clearer distinguishment but also a
> >>>> bigger
> >>>>>>> change
> >>>>>>>> to users.
> >>>>>>>>
> >>>>>>>
> >>>>>>> I like the way you dropped-records and skipped-records are now
> >>>>>>> defined. My follow-up question is whether we should give names to
> >>>>>>> those metrics that better describe their semantics, like:
> >>>>>>>
> >>>>>>> dropped-records-at-source and dropped-records-at-processor
> >>>>>>>
> >>>>>>> or
> >>>>>>>
> >>>>>>> records-dropped-at-source and records-dropped-at-processor
> >>>>>>>
> >>>>>>> or
> >>>>>>>
> >>>>>>> source-dropped-records and processor-dropped-records
> >>>>>>>
> >>>>>>> or alternatively with skipped. However, I would use the same term
> as
> >>>>>>> in expired-window-record-drop
> >>>>>>>
> >>>>>>> Maybe, we should also consider to rename expired-window-record-drop
> >>>> to
> >>>>>>> expired-window-record-dropped to be consistent.
> >>>>>>>
> >>>>>>> WDYT?
> >>>>>>>
> >>>>>>> I was not considering "expired-window-record-drop" before since it
> >>>> is a
> >>>>>> store-level metric, and I was only considering task-level
> >>>> (skipped-records)
> >>>>>> and processor-node-level (dropped-records) metrics, and I'm using
> >>>> different
> >>>>>> terms deliberately to hint users that they are different leveled
> >>>> metrics.
> >>>>>>
> >>>>>> I still feel that using `skip` for task-level metrics indicating
> that
> >>>> this
> >>>>>> record was not processed at all, and using `drop` for
> processor-level
> >>>>>> metrics that this record is only dropped at this stage of the
> >>>> topology is a
> >>>>>> better one; but I'm also okay with some finer grained metrics so
> that
> >>>> we
> >>>>>> can align the processor-level with store-level (they are on the same
> >>>>>> granularity any ways), like:
> >>>>>>
> >>>>>> `dropped-records-null-field`: at processor nodes
> >>>>>>
> >>>>>> `dropped-records-too-late`: at processor nodes
> >>>>>>
> >>>>>> `dropped-records-expired-window`: at window-stores
> >>>>>>
> >>>>>>
> >>>>>>>>
> >>>>>>>>> 5) What happens with removed metrics when the user sets the
> >>>> version of
> >>>>>>>>> "built.in.metrics.version" to 2.2-
> >>>>>>>>>
> >>>>>>>>> I think for those redundant ones like ""forward-rate" and
> >>>>>>> "destroy-rate"
> >>>>>>>> we can still remove them with 2.2- as well; for other ones that
> are
> >>>>>>> removed
> >>>>>>>> / replaced like thread-level skipped-records we should still
> >>>> maintain
> >>>>>>> them.
> >>>>>>>>
> >>>>>>>
> >>>>>>> Could you add this comment about removal of redundant metrics to
> the
> >>>>>>> KIP such that is documented somewhere?
> >>>>>>>
> >>>>>>> Yes, for sure.
> >>>>>>
> >>>>>>
> >>>>>>>
> >>>>>>> Best,
> >>>>>>> Bruno
> >>>>>>>
> >>>>>>
> >>>>>> I've also decided to remove the rebalance-related metrics from the
> >>>>>> instance-level and move it to consumer itself as part of KIP-429.
> >>>>>>
> >>>>>>
> >>>>>> --
> >>>>>> -- Guozhang
> >>>>>>
> >>>>>
> >>>>>
> >>>>> --
> >>>>> -- Guozhang
> >>>>
> >>>
> >>>
> >>> --
> >>> -- Guozhang
> >>>
> >>
> >>
> >> --
> >> -- Guozhang
>
>

-- 
-- Guozhang

Reply via email to