I share Bruno's concern about future releases, however, I would make
slightly different proposal.

Instead of using "latest" we can just make the config optional and if
not set, we use the new metrics code? This way we don't need to add a
new version number each time we do a new release (note, that it would be
weird to keep default value "2.4" in future releases).

For enabling backward compatibility: I don't have a strong opinion if we
should have a single value "0.10.0-2.3" or list each version individually.

In KIP-268 (fixing metadata upgrade) we decided to list each version
individually as it seems simpler for users. Also, we wanted to hide
which release uses which metadata version (v0 in 0.10.0, and v1 in
0.10.1 to 1.1). We could have collapsed `0.10.1` to `1.1` into a single
value though but it seemed not to give best user experience.

I think this KIP is a little different though and both options seems to
be valid. However, I would like to emphasize that we should optimize for
user experience (and not if it's harder/easier to test etc---in doubt,
we should always take on the burden if is helps to lift the burden from
users).

Overall, I am +1

Some nits:

(1) I think the motivation section for updating `StreamsMetrics`
interface does not make it clear why we need the change. What is the
issue with the current interface and how do the new method address the issue

(2) The metric name `put | put-if-absent .. | get-latency (avg | max)`
is hard to read because is indicate that there is a `get-latency` method
call on stores -- can we update it to

`(put | put-if-absent .. | get)-latency (avg | max)`

(3) typo: `When users override it to "2.2" or below,` this should be
"2.3" -- or maybe even different if Bruno's concern gets addressed.




-Matthias





On 9/4/19 12:26 PM, Bruno Cadonna wrote:
> Hi,
> 
> I am sorry to restart the discussion here, but I came across a small
> issue in the KIP.
> 
> I started to implement KIP-444 and I am bit concerned about the values
> for the the config `built.in.metrics.version`. In the KIP the possible
> values are specified as all Kafka Streams versions. I think that this
> set of values is really hard to maintain in the code and it also blows
> up the testing burden unnecessarily because all versions need to be
> tested. My proposal (backed by John) is to use the following values:
> - `latest` for the latest version of the metrics
> - `0.10.0-2.3` for the version before `latest`
> If in future, let's say in version 4.1, we need again to change the
> metrics, we would add `2.4-4.0` to the values of the config. With
> major versions, we could also get rid of some values.
> 
> WDYT?
> 
> You can also have a look at the PR
> https://github.com/apache/kafka/pull/7279 to see this in code.
> 
> Best,
> Bruno
> 
> On Tue, Aug 20, 2019 at 8:29 PM Guozhang Wang <wangg...@gmail.com> wrote:
>>
>> Hello Bruno,
>>
>> I've updated the wiki page again per your comments, here's a brief summary:
>>
>> 1. added the list of removed metrics.
>> 2. added a task-level INFO metric "dropped-records" that covers all
>> scenarios and merges in the existing "late-records-drop",
>> "skipped-records", and "expired-window-records-drop".
>> 3. renamed the util functions of StreamsMetrics as `addLatencyRateTotal`
>> and `addRateTotal` sensors.
>>
>>
>> Since I feel it has incorporated all of your comments I'm going to start
>> the vote thread for this KIP now.
>>
>>
>> Guozhang
>>
>>
>> On Tue, Aug 20, 2019 at 9:59 AM Guozhang Wang <wangg...@gmail.com> wrote:
>>
>>> Hi Bruno,
>>>
>>> No it was not intentional, and we can definitely add the total amount
>>> sensor as well -- they are just util functions to save users some lines of
>>> code anyways, and should be straightforward.
>>>
>>> Guozhang
>>>
>>>
>>> On Tue, Aug 20, 2019 at 1:05 AM Bruno Cadonna <br...@confluent.io> wrote:
>>>
>>>> Hi Guozhang,
>>>>
>>>> I totally missed the total invocation count metric in the javadoc.
>>>> Which brings me to a follow-up question. Should the names of the
>>>> methods reflect the included total invocation count? We have to rename
>>>> them anyways. One option would be to simply add `Total` to the method
>>>> names, i.e., `addLatencyAndRateAndTotalSensor` and
>>>> `addRateAndTotalSensor` (alternatively without the `And`s). Since
>>>> those sensors record exclusively invocations, another option would be
>>>> `addInvocationSensor` and `addInvocationSensorWithoutLatency`.
>>>>
>>>> As far as I can see, we have sensors to record invocations but none to
>>>> record amounts. Is that intentional? No need to add it to this KIP, I
>>>> am just curious.
>>>>
>>>> Best,
>>>> Bruno
>>>>
>>>> On Tue, Aug 20, 2019 at 1:13 AM Guozhang Wang <wangg...@gmail.com> wrote:
>>>>>
>>>>> Hi Bruno,
>>>>>
>>>>> Just realized that for `addRateSensor` and `addLatencyAndRateSensor`
>>>> we've
>>>>> actually added the total invocation metric already.
>>>>>
>>>>>
>>>>> Guozhang
>>>>>
>>>>> On Mon, Aug 19, 2019 at 4:11 PM Guozhang Wang <wangg...@gmail.com>
>>>> wrote:
>>>>>
>>>>>> Hi Bruno,
>>>>>>
>>>>>>
>>>>>> On Tue, Aug 6, 2019 at 1:51 AM Bruno Cadonna <br...@confluent.io>
>>>> wrote:
>>>>>>
>>>>>>> Hi Guozhang,
>>>>>>>
>>>>>>> I left my comments inline.
>>>>>>>
>>>>>>> On Thu, Jul 18, 2019 at 8:28 PM Guozhang Wang <wangg...@gmail.com>
>>>> wrote:
>>>>>>>>
>>>>>>>> Hello Bruno,
>>>>>>>>
>>>>>>>> Thanks for the feedbacks, replied inline.
>>>>>>>>
>>>>>>>> On Mon, Jul 1, 2019 at 7:08 AM Bruno Cadonna <br...@confluent.io>
>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi Guozhang,
>>>>>>>>>
>>>>>>>>> Thank you for the KIP.
>>>>>>>>>
>>>>>>>>> 1) As far as I understand, the StreamsMetrics interface is there
>>>> for
>>>>>>>>> user-defined processors. Would it make sense to also add a
>>>> method to
>>>>>>>>> the interface to specify a sensor that records skipped records?
>>>>>>>>>
>>>>>>>>> Not sure I follow.. if users want to add a specific skipped
>>>> records
>>>>>>>> sensor, she can still do that as a "throughput" sensor via "
>>>>>>>> addThroughputSensor" and then "record" right?
>>>>>>>>
>>>>>>>> As an after-thought, maybe it's better to rename `throughput` to
>>>> `rate`
>>>>>>> in
>>>>>>>> the public APIs since it is really meant for the latter semantics.
>>>> I did
>>>>>>>> not change it just to make less API changes / deprecate fewer
>>>> functions.
>>>>>>>> But if we feel it is important we can change it as well.
>>>>>>>>
>>>>>>>
>>>>>>> I see now that a user can record the rate of skipped records.
>>>> However,
>>>>>>> I was referring to the total number of skipped records. Maybe my
>>>>>>> question should be more general: should we allow the user to also
>>>>>>> specify sensors for totals or combinations of rate and totals?
>>>>>>>
>>>>>>> Sounds good to me, I will add it to the wiki page as well for
>>>>>> StreamsMetrics.
>>>>>>
>>>>>>
>>>>>>
>>>>>>> Regarding the naming, I like `rate` more than `throughput`, but I
>>>>>>> would not fight for it.
>>>>>>>
>>>>>>>>
>>>>>>>>> 2) What are the semantics of active-task-process and
>>>>>>> standby-task-process
>>>>>>>>>
>>>>>>>>> Ah good catch, I think I made it in the wrong column. Just some
>>>>>>>> explanations here: Within a thread's looped iterations, it will
>>>> first
>>>>>>> try
>>>>>>>> to process some records from the active tasks, and then see if
>>>> there are
>>>>>>>> any standby-tasks that can be processed as well (i.e. just reading
>>>> from
>>>>>>> the
>>>>>>>> restore consumer and apply to the local stores). The ratio metrics
>>>> are
>>>>>>> for
>>>>>>>> indicating 1) what tasks (active or standby) does this thread own
>>>> so
>>>>>>> far,
>>>>>>>> and 2) how much time in percentage does it spend on each of them.
>>>>>>>>
>>>>>>>> But this metric should really be a task-level one that includes
>>>> both the
>>>>>>>> thread-id and task-id, and upon task migrations they will be
>>>> dynamically
>>>>>>>> deleted / (re)-created. For each task-id it may be owned by
>>>> multiple
>>>>>>>> threads as one active and others standby, and hence the separation
>>>> of
>>>>>>>> active / standby seems still necessary.
>>>>>>>>
>>>>>>>
>>>>>>> Makes sense.
>>>>>>>
>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>> 3) How do dropped-late-records and expired-window-record-drop
>>>> relate
>>>>>>>>> to each other? I guess the former is for records that fall
>>>> outside the
>>>>>>>>> grace period and the latter is for records that are processed
>>>> after
>>>>>>>>> the retention period of the window. Is this correct?
>>>>>>>>>
>>>>>>>>> Yes, that's correct. The names are indeed a bit confusing since
>>>> they
>>>>>>> are
>>>>>>>> added at different releases historically..
>>>>>>>>
>>>>>>>> More precisely, the `grace period` is a notion of the operator
>>>> (hence
>>>>>>> the
>>>>>>>> metric is node-level, though it would only be used for DSL
>>>> operators)
>>>>>>> while
>>>>>>>> the `retention` is a notion of the store (hence the metric is
>>>>>>> store-level).
>>>>>>>> Usually grace period will be smaller than store retention though.
>>>>>>>>
>>>>>>>> Processor node is aware of `grace period` and when received a
>>>> record
>>>>>>> that
>>>>>>>> is older than grace deadline, it will be dropped immediately;
>>>> otherwise
>>>>>>> it
>>>>>>>> will still be processed a maybe a new update is "put" into the
>>>> store.
>>>>>>> The
>>>>>>>> store is aware of its `retention period` and then upon a "put"
>>>> call if
>>>>>>> it
>>>>>>>> realized it is older than the retention deadline, that put call
>>>> would be
>>>>>>>> ignored and metric is recorded.
>>>>>>>>
>>>>>>>> We have to separate them here since the window store can be used
>>>> in both
>>>>>>>> DSL and PAPI, and for the former case it would likely to be already
>>>>>>> ignored
>>>>>>>> at the processor node level due to the grace period which is
>>>> usually
>>>>>>>> smaller than retention; but for PAPI there's no grace period and
>>>> hence
>>>>>>> the
>>>>>>>> processor would likely still process and call "put" on the store.
>>>>>>>>
>>>>>>>
>>>>>>> Alright! Got it!
>>>>>>>
>>>>>>>>
>>>>>>>>> 4) Is there an actual difference between skipped and dropped
>>>> records?
>>>>>>>>> If not, shall we unify the terminology?
>>>>>>>>>
>>>>>>>>>
>>>>>>>> There is. Dropped records are only due to lateness; where as
>>>> skipped
>>>>>>>> records can be due to serde errors (and user's error handling
>>>> indicate
>>>>>>>> "skip and continue"), timestamp errors, etc.
>>>>>>>>
>>>>>>>> I've considered maybe a better (more extensible) way would be
>>>> defining a
>>>>>>>> single metric name, say skipped-records, but use different tags to
>>>>>>> indicate
>>>>>>>> if its skipping reason (errors, windowing semantics, etc). But
>>>> there's
>>>>>>>> still a tricky difference: for serde caused skipping for example,
>>>> they
>>>>>>> will
>>>>>>>> be skipped at the very beginning and there's no effects taken at
>>>> all.
>>>>>>> For
>>>>>>>> some others e.g. null-key / value at the reduce operator, it is
>>>> only
>>>>>>>> skipped at the middle of the processing, i.e. some effects may have
>>>>>>> already
>>>>>>>> been taken in up-stream sub-topologies. And that's why for
>>>>>>> skipped-records
>>>>>>>> I've defined it on both task-level and node-level and the
>>>> aggregate of
>>>>>>> the
>>>>>>>> latter may still be smaller than the former, whereas for
>>>>>>> dropped-records it
>>>>>>>> is only for node-level.
>>>>>>>>
>>>>>>>> So how about an even more significant change then: we enlarge the
>>>>>>>> `dropped-late-records` to `dropped-records` which is node-level
>>>> only,
>>>>>>> but
>>>>>>>> includes reasons form lateness to semantics (like null-key) as
>>>> well; and
>>>>>>>> then we have a task-level-only `skipped-records` which only record
>>>> those
>>>>>>>> dropped at the very beginning and did not make it at all to the
>>>>>>> processing
>>>>>>>> topology. I feel this is a clearer distinguishment but also a
>>>> bigger
>>>>>>> change
>>>>>>>> to users.
>>>>>>>>
>>>>>>>
>>>>>>> I like the way you dropped-records and skipped-records are now
>>>>>>> defined. My follow-up question is whether we should give names to
>>>>>>> those metrics that better describe their semantics, like:
>>>>>>>
>>>>>>> dropped-records-at-source and dropped-records-at-processor
>>>>>>>
>>>>>>> or
>>>>>>>
>>>>>>> records-dropped-at-source and records-dropped-at-processor
>>>>>>>
>>>>>>> or
>>>>>>>
>>>>>>> source-dropped-records and processor-dropped-records
>>>>>>>
>>>>>>> or alternatively with skipped. However, I would use the same term as
>>>>>>> in expired-window-record-drop
>>>>>>>
>>>>>>> Maybe, we should also consider to rename expired-window-record-drop
>>>> to
>>>>>>> expired-window-record-dropped to be consistent.
>>>>>>>
>>>>>>> WDYT?
>>>>>>>
>>>>>>> I was not considering "expired-window-record-drop" before since it
>>>> is a
>>>>>> store-level metric, and I was only considering task-level
>>>> (skipped-records)
>>>>>> and processor-node-level (dropped-records) metrics, and I'm using
>>>> different
>>>>>> terms deliberately to hint users that they are different leveled
>>>> metrics.
>>>>>>
>>>>>> I still feel that using `skip` for task-level metrics indicating that
>>>> this
>>>>>> record was not processed at all, and using `drop` for processor-level
>>>>>> metrics that this record is only dropped at this stage of the
>>>> topology is a
>>>>>> better one; but I'm also okay with some finer grained metrics so that
>>>> we
>>>>>> can align the processor-level with store-level (they are on the same
>>>>>> granularity any ways), like:
>>>>>>
>>>>>> `dropped-records-null-field`: at processor nodes
>>>>>>
>>>>>> `dropped-records-too-late`: at processor nodes
>>>>>>
>>>>>> `dropped-records-expired-window`: at window-stores
>>>>>>
>>>>>>
>>>>>>>>
>>>>>>>>> 5) What happens with removed metrics when the user sets the
>>>> version of
>>>>>>>>> "built.in.metrics.version" to 2.2-
>>>>>>>>>
>>>>>>>>> I think for those redundant ones like ""forward-rate" and
>>>>>>> "destroy-rate"
>>>>>>>> we can still remove them with 2.2- as well; for other ones that are
>>>>>>> removed
>>>>>>>> / replaced like thread-level skipped-records we should still
>>>> maintain
>>>>>>> them.
>>>>>>>>
>>>>>>>
>>>>>>> Could you add this comment about removal of redundant metrics to the
>>>>>>> KIP such that is documented somewhere?
>>>>>>>
>>>>>>> Yes, for sure.
>>>>>>
>>>>>>
>>>>>>>
>>>>>>> Best,
>>>>>>> Bruno
>>>>>>>
>>>>>>
>>>>>> I've also decided to remove the rebalance-related metrics from the
>>>>>> instance-level and move it to consumer itself as part of KIP-429.
>>>>>>
>>>>>>
>>>>>> --
>>>>>> -- Guozhang
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> -- Guozhang
>>>>
>>>
>>>
>>> --
>>> -- Guozhang
>>>
>>
>>
>> --
>> -- Guozhang

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to