Hi Guozhang,

Your summary corresponds to my proposal.

A new value would only be added if in future we change the metrics in
a backward-incompatible way, i.e., 2.4-<the version before the new
breaking change>. "latest" will always stay the default.

Best,
Bruno

On Thu, Sep 5, 2019 at 10:57 PM Guozhang Wang <wangg...@gmail.com> wrote:
>
> Hello Bruno,
>
> I think your concern makes sense, let's adopt this suggestion in KIP-444
> instead. Just to clarify:
>
> 1. The default value would be "latest".
> 2. The only other valid value is "0.10.0-2.3".
>
> And moving forward this config may stay without any new values.
>
>
> Guozhang
>
>
> On Thu, Sep 5, 2019 at 12:16 PM Bruno Cadonna <br...@confluent.io> wrote:
>
> > Hi Guozhang,
> >
> > I think user experience and code maintenance are tightly related. The
> > harder to maintain the code the worse the user experience will get.
> >
> > Making the config optional does not solve the issue. Wouldn't users be
> > puzzled when we release 2.5 and they cannot set
> > built.in.metrics.version to 2.4 to be sure to get the same metrics for
> > that version? It seems with that solution we would just move
> > maintenance to the next release.
> >
> > I cannot see how the values `latest` and `0.10.0-2.3` would lead to a
> > bad user experience.
> >
> > Regarding testing, at least on integration test level, we absolutely
> > need to test all versions. It is too easy to make a mistake with so
> > many versions. Remember that on integration test level we need to
> > start an embedded Kafka for each single test.
> >
> > Best,
> > Bruno
> >
> > On Thu, Sep 5, 2019 at 8:46 PM Guozhang Wang <wangg...@gmail.com> wrote:
> > >
> > > Hi Bruno,
> > >
> > > Thanks for raising this point. I think the main motivation behind this
> > > proposal is, like Matthias said, to ease the understanding burden from
> > > users to our own shoulders. Testing wise, I think we do not necessarily
> > > need to explode the testing matrix but just test the last version before
> > > each metrics refactoring (again, hopefully it is the only time) and
> > hence I
> > > think it worth benefiting user's experience. WDYT?
> > >
> > > Hi Matthias,
> > >
> > > Thanks for your feedback, I will update the wiki page accordingly.
> > >
> > > Will also close the other voting thread with your vote.
> > >
> > > Guozhang
> > >
> > > On Thu, Sep 5, 2019 at 11:27 AM Matthias J. Sax <matth...@confluent.io>
> > > wrote:
> > >
> > > > I share Bruno's concern about future releases, however, I would make
> > > > slightly different proposal.
> > > >
> > > > Instead of using "latest" we can just make the config optional and if
> > > > not set, we use the new metrics code? This way we don't need to add a
> > > > new version number each time we do a new release (note, that it would
> > be
> > > > weird to keep default value "2.4" in future releases).
> > > >
> > > > For enabling backward compatibility: I don't have a strong opinion if
> > we
> > > > should have a single value "0.10.0-2.3" or list each version
> > individually.
> > > >
> > > > In KIP-268 (fixing metadata upgrade) we decided to list each version
> > > > individually as it seems simpler for users. Also, we wanted to hide
> > > > which release uses which metadata version (v0 in 0.10.0, and v1 in
> > > > 0.10.1 to 1.1). We could have collapsed `0.10.1` to `1.1` into a single
> > > > value though but it seemed not to give best user experience.
> > > >
> > > > I think this KIP is a little different though and both options seems to
> > > > be valid. However, I would like to emphasize that we should optimize
> > for
> > > > user experience (and not if it's harder/easier to test etc---in doubt,
> > > > we should always take on the burden if is helps to lift the burden from
> > > > users).
> > > >
> > > > Overall, I am +1
> > > >
> > > > Some nits:
> > > >
> > > > (1) I think the motivation section for updating `StreamsMetrics`
> > > > interface does not make it clear why we need the change. What is the
> > > > issue with the current interface and how do the new method address the
> > > > issue
> > > >
> > > > (2) The metric name `put | put-if-absent .. | get-latency (avg | max)`
> > > > is hard to read because is indicate that there is a `get-latency`
> > method
> > > > call on stores -- can we update it to
> > > >
> > > > `(put | put-if-absent .. | get)-latency (avg | max)`
> > > >
> > > > (3) typo: `When users override it to "2.2" or below,` this should be
> > > > "2.3" -- or maybe even different if Bruno's concern gets addressed.
> > > >
> > > >
> > > >
> > > >
> > > > -Matthias
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > On 9/4/19 12:26 PM, Bruno Cadonna wrote:
> > > > > Hi,
> > > > >
> > > > > I am sorry to restart the discussion here, but I came across a small
> > > > > issue in the KIP.
> > > > >
> > > > > I started to implement KIP-444 and I am bit concerned about the
> > values
> > > > > for the the config `built.in.metrics.version`. In the KIP the
> > possible
> > > > > values are specified as all Kafka Streams versions. I think that this
> > > > > set of values is really hard to maintain in the code and it also
> > blows
> > > > > up the testing burden unnecessarily because all versions need to be
> > > > > tested. My proposal (backed by John) is to use the following values:
> > > > > - `latest` for the latest version of the metrics
> > > > > - `0.10.0-2.3` for the version before `latest`
> > > > > If in future, let's say in version 4.1, we need again to change the
> > > > > metrics, we would add `2.4-4.0` to the values of the config. With
> > > > > major versions, we could also get rid of some values.
> > > > >
> > > > > WDYT?
> > > > >
> > > > > You can also have a look at the PR
> > > > > https://github.com/apache/kafka/pull/7279 to see this in code.
> > > > >
> > > > > Best,
> > > > > Bruno
> > > > >
> > > > > On Tue, Aug 20, 2019 at 8:29 PM Guozhang Wang <wangg...@gmail.com>
> > > > wrote:
> > > > >>
> > > > >> Hello Bruno,
> > > > >>
> > > > >> I've updated the wiki page again per your comments, here's a brief
> > > > summary:
> > > > >>
> > > > >> 1. added the list of removed metrics.
> > > > >> 2. added a task-level INFO metric "dropped-records" that covers all
> > > > >> scenarios and merges in the existing "late-records-drop",
> > > > >> "skipped-records", and "expired-window-records-drop".
> > > > >> 3. renamed the util functions of StreamsMetrics as
> > `addLatencyRateTotal`
> > > > >> and `addRateTotal` sensors.
> > > > >>
> > > > >>
> > > > >> Since I feel it has incorporated all of your comments I'm going to
> > start
> > > > >> the vote thread for this KIP now.
> > > > >>
> > > > >>
> > > > >> Guozhang
> > > > >>
> > > > >>
> > > > >> On Tue, Aug 20, 2019 at 9:59 AM Guozhang Wang <wangg...@gmail.com>
> > > > wrote:
> > > > >>
> > > > >>> Hi Bruno,
> > > > >>>
> > > > >>> No it was not intentional, and we can definitely add the total
> > amount
> > > > >>> sensor as well -- they are just util functions to save users some
> > > > lines of
> > > > >>> code anyways, and should be straightforward.
> > > > >>>
> > > > >>> Guozhang
> > > > >>>
> > > > >>>
> > > > >>> On Tue, Aug 20, 2019 at 1:05 AM Bruno Cadonna <br...@confluent.io>
> > > > wrote:
> > > > >>>
> > > > >>>> Hi Guozhang,
> > > > >>>>
> > > > >>>> I totally missed the total invocation count metric in the javadoc.
> > > > >>>> Which brings me to a follow-up question. Should the names of the
> > > > >>>> methods reflect the included total invocation count? We have to
> > rename
> > > > >>>> them anyways. One option would be to simply add `Total` to the
> > method
> > > > >>>> names, i.e., `addLatencyAndRateAndTotalSensor` and
> > > > >>>> `addRateAndTotalSensor` (alternatively without the `And`s). Since
> > > > >>>> those sensors record exclusively invocations, another option
> > would be
> > > > >>>> `addInvocationSensor` and `addInvocationSensorWithoutLatency`.
> > > > >>>>
> > > > >>>> As far as I can see, we have sensors to record invocations but
> > none to
> > > > >>>> record amounts. Is that intentional? No need to add it to this
> > KIP, I
> > > > >>>> am just curious.
> > > > >>>>
> > > > >>>> Best,
> > > > >>>> Bruno
> > > > >>>>
> > > > >>>> On Tue, Aug 20, 2019 at 1:13 AM Guozhang Wang <wangg...@gmail.com
> > >
> > > > wrote:
> > > > >>>>>
> > > > >>>>> Hi Bruno,
> > > > >>>>>
> > > > >>>>> Just realized that for `addRateSensor` and
> > `addLatencyAndRateSensor`
> > > > >>>> we've
> > > > >>>>> actually added the total invocation metric already.
> > > > >>>>>
> > > > >>>>>
> > > > >>>>> Guozhang
> > > > >>>>>
> > > > >>>>> On Mon, Aug 19, 2019 at 4:11 PM Guozhang Wang <
> > wangg...@gmail.com>
> > > > >>>> wrote:
> > > > >>>>>
> > > > >>>>>> Hi Bruno,
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>> On Tue, Aug 6, 2019 at 1:51 AM Bruno Cadonna <
> > br...@confluent.io>
> > > > >>>> wrote:
> > > > >>>>>>
> > > > >>>>>>> Hi Guozhang,
> > > > >>>>>>>
> > > > >>>>>>> I left my comments inline.
> > > > >>>>>>>
> > > > >>>>>>> On Thu, Jul 18, 2019 at 8:28 PM Guozhang Wang <
> > wangg...@gmail.com>
> > > > >>>> wrote:
> > > > >>>>>>>>
> > > > >>>>>>>> Hello Bruno,
> > > > >>>>>>>>
> > > > >>>>>>>> Thanks for the feedbacks, replied inline.
> > > > >>>>>>>>
> > > > >>>>>>>> On Mon, Jul 1, 2019 at 7:08 AM Bruno Cadonna <
> > br...@confluent.io>
> > > > >>>>>>> wrote:
> > > > >>>>>>>>
> > > > >>>>>>>>> Hi Guozhang,
> > > > >>>>>>>>>
> > > > >>>>>>>>> Thank you for the KIP.
> > > > >>>>>>>>>
> > > > >>>>>>>>> 1) As far as I understand, the StreamsMetrics interface is
> > there
> > > > >>>> for
> > > > >>>>>>>>> user-defined processors. Would it make sense to also add a
> > > > >>>> method to
> > > > >>>>>>>>> the interface to specify a sensor that records skipped
> > records?
> > > > >>>>>>>>>
> > > > >>>>>>>>> Not sure I follow.. if users want to add a specific skipped
> > > > >>>> records
> > > > >>>>>>>> sensor, she can still do that as a "throughput" sensor via "
> > > > >>>>>>>> addThroughputSensor" and then "record" right?
> > > > >>>>>>>>
> > > > >>>>>>>> As an after-thought, maybe it's better to rename `throughput`
> > to
> > > > >>>> `rate`
> > > > >>>>>>> in
> > > > >>>>>>>> the public APIs since it is really meant for the latter
> > semantics.
> > > > >>>> I did
> > > > >>>>>>>> not change it just to make less API changes / deprecate fewer
> > > > >>>> functions.
> > > > >>>>>>>> But if we feel it is important we can change it as well.
> > > > >>>>>>>>
> > > > >>>>>>>
> > > > >>>>>>> I see now that a user can record the rate of skipped records.
> > > > >>>> However,
> > > > >>>>>>> I was referring to the total number of skipped records. Maybe
> > my
> > > > >>>>>>> question should be more general: should we allow the user to
> > also
> > > > >>>>>>> specify sensors for totals or combinations of rate and totals?
> > > > >>>>>>>
> > > > >>>>>>> Sounds good to me, I will add it to the wiki page as well for
> > > > >>>>>> StreamsMetrics.
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>>> Regarding the naming, I like `rate` more than `throughput`,
> > but I
> > > > >>>>>>> would not fight for it.
> > > > >>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>>>> 2) What are the semantics of active-task-process and
> > > > >>>>>>> standby-task-process
> > > > >>>>>>>>>
> > > > >>>>>>>>> Ah good catch, I think I made it in the wrong column. Just
> > some
> > > > >>>>>>>> explanations here: Within a thread's looped iterations, it
> > will
> > > > >>>> first
> > > > >>>>>>> try
> > > > >>>>>>>> to process some records from the active tasks, and then see if
> > > > >>>> there are
> > > > >>>>>>>> any standby-tasks that can be processed as well (i.e. just
> > reading
> > > > >>>> from
> > > > >>>>>>> the
> > > > >>>>>>>> restore consumer and apply to the local stores). The ratio
> > metrics
> > > > >>>> are
> > > > >>>>>>> for
> > > > >>>>>>>> indicating 1) what tasks (active or standby) does this thread
> > own
> > > > >>>> so
> > > > >>>>>>> far,
> > > > >>>>>>>> and 2) how much time in percentage does it spend on each of
> > them.
> > > > >>>>>>>>
> > > > >>>>>>>> But this metric should really be a task-level one that
> > includes
> > > > >>>> both the
> > > > >>>>>>>> thread-id and task-id, and upon task migrations they will be
> > > > >>>> dynamically
> > > > >>>>>>>> deleted / (re)-created. For each task-id it may be owned by
> > > > >>>> multiple
> > > > >>>>>>>> threads as one active and others standby, and hence the
> > separation
> > > > >>>> of
> > > > >>>>>>>> active / standby seems still necessary.
> > > > >>>>>>>>
> > > > >>>>>>>
> > > > >>>>>>> Makes sense.
> > > > >>>>>>>
> > > > >>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>>>> 3) How do dropped-late-records and expired-window-record-drop
> > > > >>>> relate
> > > > >>>>>>>>> to each other? I guess the former is for records that fall
> > > > >>>> outside the
> > > > >>>>>>>>> grace period and the latter is for records that are processed
> > > > >>>> after
> > > > >>>>>>>>> the retention period of the window. Is this correct?
> > > > >>>>>>>>>
> > > > >>>>>>>>> Yes, that's correct. The names are indeed a bit confusing
> > since
> > > > >>>> they
> > > > >>>>>>> are
> > > > >>>>>>>> added at different releases historically..
> > > > >>>>>>>>
> > > > >>>>>>>> More precisely, the `grace period` is a notion of the operator
> > > > >>>> (hence
> > > > >>>>>>> the
> > > > >>>>>>>> metric is node-level, though it would only be used for DSL
> > > > >>>> operators)
> > > > >>>>>>> while
> > > > >>>>>>>> the `retention` is a notion of the store (hence the metric is
> > > > >>>>>>> store-level).
> > > > >>>>>>>> Usually grace period will be smaller than store retention
> > though.
> > > > >>>>>>>>
> > > > >>>>>>>> Processor node is aware of `grace period` and when received a
> > > > >>>> record
> > > > >>>>>>> that
> > > > >>>>>>>> is older than grace deadline, it will be dropped immediately;
> > > > >>>> otherwise
> > > > >>>>>>> it
> > > > >>>>>>>> will still be processed a maybe a new update is "put" into the
> > > > >>>> store.
> > > > >>>>>>> The
> > > > >>>>>>>> store is aware of its `retention period` and then upon a "put"
> > > > >>>> call if
> > > > >>>>>>> it
> > > > >>>>>>>> realized it is older than the retention deadline, that put
> > call
> > > > >>>> would be
> > > > >>>>>>>> ignored and metric is recorded.
> > > > >>>>>>>>
> > > > >>>>>>>> We have to separate them here since the window store can be
> > used
> > > > >>>> in both
> > > > >>>>>>>> DSL and PAPI, and for the former case it would likely to be
> > > > already
> > > > >>>>>>> ignored
> > > > >>>>>>>> at the processor node level due to the grace period which is
> > > > >>>> usually
> > > > >>>>>>>> smaller than retention; but for PAPI there's no grace period
> > and
> > > > >>>> hence
> > > > >>>>>>> the
> > > > >>>>>>>> processor would likely still process and call "put" on the
> > store.
> > > > >>>>>>>>
> > > > >>>>>>>
> > > > >>>>>>> Alright! Got it!
> > > > >>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>>>> 4) Is there an actual difference between skipped and dropped
> > > > >>>> records?
> > > > >>>>>>>>> If not, shall we unify the terminology?
> > > > >>>>>>>>>
> > > > >>>>>>>>>
> > > > >>>>>>>> There is. Dropped records are only due to lateness; where as
> > > > >>>> skipped
> > > > >>>>>>>> records can be due to serde errors (and user's error handling
> > > > >>>> indicate
> > > > >>>>>>>> "skip and continue"), timestamp errors, etc.
> > > > >>>>>>>>
> > > > >>>>>>>> I've considered maybe a better (more extensible) way would be
> > > > >>>> defining a
> > > > >>>>>>>> single metric name, say skipped-records, but use different
> > tags to
> > > > >>>>>>> indicate
> > > > >>>>>>>> if its skipping reason (errors, windowing semantics, etc). But
> > > > >>>> there's
> > > > >>>>>>>> still a tricky difference: for serde caused skipping for
> > example,
> > > > >>>> they
> > > > >>>>>>> will
> > > > >>>>>>>> be skipped at the very beginning and there's no effects taken
> > at
> > > > >>>> all.
> > > > >>>>>>> For
> > > > >>>>>>>> some others e.g. null-key / value at the reduce operator, it
> > is
> > > > >>>> only
> > > > >>>>>>>> skipped at the middle of the processing, i.e. some effects may
> > > > have
> > > > >>>>>>> already
> > > > >>>>>>>> been taken in up-stream sub-topologies. And that's why for
> > > > >>>>>>> skipped-records
> > > > >>>>>>>> I've defined it on both task-level and node-level and the
> > > > >>>> aggregate of
> > > > >>>>>>> the
> > > > >>>>>>>> latter may still be smaller than the former, whereas for
> > > > >>>>>>> dropped-records it
> > > > >>>>>>>> is only for node-level.
> > > > >>>>>>>>
> > > > >>>>>>>> So how about an even more significant change then: we enlarge
> > the
> > > > >>>>>>>> `dropped-late-records` to `dropped-records` which is
> > node-level
> > > > >>>> only,
> > > > >>>>>>> but
> > > > >>>>>>>> includes reasons form lateness to semantics (like null-key) as
> > > > >>>> well; and
> > > > >>>>>>>> then we have a task-level-only `skipped-records` which only
> > record
> > > > >>>> those
> > > > >>>>>>>> dropped at the very beginning and did not make it at all to
> > the
> > > > >>>>>>> processing
> > > > >>>>>>>> topology. I feel this is a clearer distinguishment but also a
> > > > >>>> bigger
> > > > >>>>>>> change
> > > > >>>>>>>> to users.
> > > > >>>>>>>>
> > > > >>>>>>>
> > > > >>>>>>> I like the way you dropped-records and skipped-records are now
> > > > >>>>>>> defined. My follow-up question is whether we should give names
> > to
> > > > >>>>>>> those metrics that better describe their semantics, like:
> > > > >>>>>>>
> > > > >>>>>>> dropped-records-at-source and dropped-records-at-processor
> > > > >>>>>>>
> > > > >>>>>>> or
> > > > >>>>>>>
> > > > >>>>>>> records-dropped-at-source and records-dropped-at-processor
> > > > >>>>>>>
> > > > >>>>>>> or
> > > > >>>>>>>
> > > > >>>>>>> source-dropped-records and processor-dropped-records
> > > > >>>>>>>
> > > > >>>>>>> or alternatively with skipped. However, I would use the same
> > term
> > > > as
> > > > >>>>>>> in expired-window-record-drop
> > > > >>>>>>>
> > > > >>>>>>> Maybe, we should also consider to rename
> > expired-window-record-drop
> > > > >>>> to
> > > > >>>>>>> expired-window-record-dropped to be consistent.
> > > > >>>>>>>
> > > > >>>>>>> WDYT?
> > > > >>>>>>>
> > > > >>>>>>> I was not considering "expired-window-record-drop" before
> > since it
> > > > >>>> is a
> > > > >>>>>> store-level metric, and I was only considering task-level
> > > > >>>> (skipped-records)
> > > > >>>>>> and processor-node-level (dropped-records) metrics, and I'm
> > using
> > > > >>>> different
> > > > >>>>>> terms deliberately to hint users that they are different leveled
> > > > >>>> metrics.
> > > > >>>>>>
> > > > >>>>>> I still feel that using `skip` for task-level metrics indicating
> > > > that
> > > > >>>> this
> > > > >>>>>> record was not processed at all, and using `drop` for
> > > > processor-level
> > > > >>>>>> metrics that this record is only dropped at this stage of the
> > > > >>>> topology is a
> > > > >>>>>> better one; but I'm also okay with some finer grained metrics so
> > > > that
> > > > >>>> we
> > > > >>>>>> can align the processor-level with store-level (they are on the
> > same
> > > > >>>>>> granularity any ways), like:
> > > > >>>>>>
> > > > >>>>>> `dropped-records-null-field`: at processor nodes
> > > > >>>>>>
> > > > >>>>>> `dropped-records-too-late`: at processor nodes
> > > > >>>>>>
> > > > >>>>>> `dropped-records-expired-window`: at window-stores
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>>>>
> > > > >>>>>>>>> 5) What happens with removed metrics when the user sets the
> > > > >>>> version of
> > > > >>>>>>>>> "built.in.metrics.version" to 2.2-
> > > > >>>>>>>>>
> > > > >>>>>>>>> I think for those redundant ones like ""forward-rate" and
> > > > >>>>>>> "destroy-rate"
> > > > >>>>>>>> we can still remove them with 2.2- as well; for other ones
> > that
> > > > are
> > > > >>>>>>> removed
> > > > >>>>>>>> / replaced like thread-level skipped-records we should still
> > > > >>>> maintain
> > > > >>>>>>> them.
> > > > >>>>>>>>
> > > > >>>>>>>
> > > > >>>>>>> Could you add this comment about removal of redundant metrics
> > to
> > > > the
> > > > >>>>>>> KIP such that is documented somewhere?
> > > > >>>>>>>
> > > > >>>>>>> Yes, for sure.
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>>>
> > > > >>>>>>> Best,
> > > > >>>>>>> Bruno
> > > > >>>>>>>
> > > > >>>>>>
> > > > >>>>>> I've also decided to remove the rebalance-related metrics from
> > the
> > > > >>>>>> instance-level and move it to consumer itself as part of
> > KIP-429.
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>> --
> > > > >>>>>> -- Guozhang
> > > > >>>>>>
> > > > >>>>>
> > > > >>>>>
> > > > >>>>> --
> > > > >>>>> -- Guozhang
> > > > >>>>
> > > > >>>
> > > > >>>
> > > > >>> --
> > > > >>> -- Guozhang
> > > > >>>
> > > > >>
> > > > >>
> > > > >> --
> > > > >> -- Guozhang
> > > >
> > > >
> > >
> > > --
> > > -- Guozhang
> >
>
>
> --
> -- Guozhang

Reply via email to