Hi Bruno,

Just realized that for `addRateSensor` and `addLatencyAndRateSensor` we've
actually added the total invocation metric already.


Guozhang

On Mon, Aug 19, 2019 at 4:11 PM Guozhang Wang <wangg...@gmail.com> wrote:

> Hi Bruno,
>
>
> On Tue, Aug 6, 2019 at 1:51 AM Bruno Cadonna <br...@confluent.io> wrote:
>
>> Hi Guozhang,
>>
>> I left my comments inline.
>>
>> On Thu, Jul 18, 2019 at 8:28 PM Guozhang Wang <wangg...@gmail.com> wrote:
>> >
>> > Hello Bruno,
>> >
>> > Thanks for the feedbacks, replied inline.
>> >
>> > On Mon, Jul 1, 2019 at 7:08 AM Bruno Cadonna <br...@confluent.io>
>> wrote:
>> >
>> > > Hi Guozhang,
>> > >
>> > > Thank you for the KIP.
>> > >
>> > > 1) As far as I understand, the StreamsMetrics interface is there for
>> > > user-defined processors. Would it make sense to also add a method to
>> > > the interface to specify a sensor that records skipped records?
>> > >
>> > > Not sure I follow.. if users want to add a specific skipped records
>> > sensor, she can still do that as a "throughput" sensor via "
>> > addThroughputSensor" and then "record" right?
>> >
>> > As an after-thought, maybe it's better to rename `throughput` to `rate`
>> in
>> > the public APIs since it is really meant for the latter semantics. I did
>> > not change it just to make less API changes / deprecate fewer functions.
>> > But if we feel it is important we can change it as well.
>> >
>>
>> I see now that a user can record the rate of skipped records. However,
>> I was referring to the total number of skipped records. Maybe my
>> question should be more general: should we allow the user to also
>> specify sensors for totals or combinations of rate and totals?
>>
>> Sounds good to me, I will add it to the wiki page as well for
> StreamsMetrics.
>
>
>
>> Regarding the naming, I like `rate` more than `throughput`, but I
>> would not fight for it.
>>
>> >
>> > > 2) What are the semantics of active-task-process and
>> standby-task-process
>> > >
>> > > Ah good catch, I think I made it in the wrong column. Just some
>> > explanations here: Within a thread's looped iterations, it will first
>> try
>> > to process some records from the active tasks, and then see if there are
>> > any standby-tasks that can be processed as well (i.e. just reading from
>> the
>> > restore consumer and apply to the local stores). The ratio metrics are
>> for
>> > indicating 1) what tasks (active or standby) does this thread own so
>> far,
>> > and 2) how much time in percentage does it spend on each of them.
>> >
>> > But this metric should really be a task-level one that includes both the
>> > thread-id and task-id, and upon task migrations they will be dynamically
>> > deleted / (re)-created. For each task-id it may be owned by multiple
>> > threads as one active and others standby, and hence the separation of
>> > active / standby seems still necessary.
>> >
>>
>> Makes sense.
>>
>>
>> >
>> >
>> > > 3) How do dropped-late-records and expired-window-record-drop relate
>> > > to each other? I guess the former is for records that fall outside the
>> > > grace period and the latter is for records that are processed after
>> > > the retention period of the window. Is this correct?
>> > >
>> > > Yes, that's correct. The names are indeed a bit confusing since they
>> are
>> > added at different releases historically..
>> >
>> > More precisely, the `grace period` is a notion of the operator (hence
>> the
>> > metric is node-level, though it would only be used for DSL operators)
>> while
>> > the `retention` is a notion of the store (hence the metric is
>> store-level).
>> > Usually grace period will be smaller than store retention though.
>> >
>> > Processor node is aware of `grace period` and when received a record
>> that
>> > is older than grace deadline, it will be dropped immediately; otherwise
>> it
>> > will still be processed a maybe a new update is "put" into the store.
>> The
>> > store is aware of its `retention period` and then upon a "put" call if
>> it
>> > realized it is older than the retention deadline, that put call would be
>> > ignored and metric is recorded.
>> >
>> > We have to separate them here since the window store can be used in both
>> > DSL and PAPI, and for the former case it would likely to be already
>> ignored
>> > at the processor node level due to the grace period which is usually
>> > smaller than retention; but for PAPI there's no grace period and hence
>> the
>> > processor would likely still process and call "put" on the store.
>> >
>>
>> Alright! Got it!
>>
>> >
>> > > 4) Is there an actual difference between skipped and dropped records?
>> > > If not, shall we unify the terminology?
>> > >
>> > >
>> > There is. Dropped records are only due to lateness; where as skipped
>> > records can be due to serde errors (and user's error handling indicate
>> > "skip and continue"), timestamp errors, etc.
>> >
>> > I've considered maybe a better (more extensible) way would be defining a
>> > single metric name, say skipped-records, but use different tags to
>> indicate
>> > if its skipping reason (errors, windowing semantics, etc). But there's
>> > still a tricky difference: for serde caused skipping for example, they
>> will
>> > be skipped at the very beginning and there's no effects taken at all.
>> For
>> > some others e.g. null-key / value at the reduce operator, it is only
>> > skipped at the middle of the processing, i.e. some effects may have
>> already
>> > been taken in up-stream sub-topologies. And that's why for
>> skipped-records
>> > I've defined it on both task-level and node-level and the aggregate of
>> the
>> > latter may still be smaller than the former, whereas for
>> dropped-records it
>> > is only for node-level.
>> >
>> > So how about an even more significant change then: we enlarge the
>> > `dropped-late-records` to `dropped-records` which is node-level only,
>> but
>> > includes reasons form lateness to semantics (like null-key) as well; and
>> > then we have a task-level-only `skipped-records` which only record those
>> > dropped at the very beginning and did not make it at all to the
>> processing
>> > topology. I feel this is a clearer distinguishment but also a bigger
>> change
>> > to users.
>> >
>>
>> I like the way you dropped-records and skipped-records are now
>> defined. My follow-up question is whether we should give names to
>> those metrics that better describe their semantics, like:
>>
>> dropped-records-at-source and dropped-records-at-processor
>>
>> or
>>
>> records-dropped-at-source and records-dropped-at-processor
>>
>> or
>>
>> source-dropped-records and processor-dropped-records
>>
>> or alternatively with skipped. However, I would use the same term as
>> in expired-window-record-drop
>>
>> Maybe, we should also consider to rename expired-window-record-drop to
>> expired-window-record-dropped to be consistent.
>>
>> WDYT?
>>
>> I was not considering "expired-window-record-drop" before since it is a
> store-level metric, and I was only considering task-level (skipped-records)
> and processor-node-level (dropped-records) metrics, and I'm using different
> terms deliberately to hint users that they are different leveled metrics.
>
> I still feel that using `skip` for task-level metrics indicating that this
> record was not processed at all, and using `drop` for processor-level
> metrics that this record is only dropped at this stage of the topology is a
> better one; but I'm also okay with some finer grained metrics so that we
> can align the processor-level with store-level (they are on the same
> granularity any ways), like:
>
> `dropped-records-null-field`: at processor nodes
>
> `dropped-records-too-late`: at processor nodes
>
> `dropped-records-expired-window`: at window-stores
>
>
>> >
>> > > 5) What happens with removed metrics when the user sets the version of
>> > > "built.in.metrics.version" to 2.2-
>> > >
>> > > I think for those redundant ones like ""forward-rate" and
>> "destroy-rate"
>> > we can still remove them with 2.2- as well; for other ones that are
>> removed
>> > / replaced like thread-level skipped-records we should still maintain
>> them.
>> >
>>
>> Could you add this comment about removal of redundant metrics to the
>> KIP such that is documented somewhere?
>>
>> Yes, for sure.
>
>
>>
>> Best,
>> Bruno
>>
>
> I've also decided to remove the rebalance-related metrics from the
> instance-level and move it to consumer itself as part of KIP-429.
>
>
> --
> -- Guozhang
>


-- 
-- Guozhang

Reply via email to