Hi Becket,

I believe 1+2 has been answered by Chesnay already. Just to add to 2: I'm
not the biggest fan of reusing task metrics but that's what FLIP-33 and
different folks suggested. I'd probably keep task I/O metrics only for
internal things and add a new metric for external calls. Then, we could
even allow users to track I/O in AsyncIO (which would currently be a mess).
However, with the current abstraction, it would be relatively easy to add
separate metrics later.

3. As outlined in the JavaDoc and in the draft PR [1], it's up to the user
to implement it in a way that fetch time always corresponds to the latest
polled record. For SourceReaderBase, I have added a new
RecordsWithSplitIds#lastFetchTime (with default return value null) that
sets the last fetch time automatically whenever the next batch is selected.
Tbh this metric is a bit more challenging to implement for
non-SourceReaderBase sources but I have not found a better, thread-safe
way. Of course, we could shift the complete calculation into user-land but
I'm not sure that this is easier.
For your scenarios:
- in A, you assume SourceReaderBase. In that case, we could eagerly report
the metric as sketched by you. It depends on the definition of "last
processed record" in FLIP-33, whether this eager reporting is more correct
than the lazy reporting that I have proposed. The former case assumes "last
processed record" = last fetched record, while the latter case assumes
"last processed record" = "last polled record". For the proposed solution,
the user would just need to implement RecordsWithSplitIds#lastFetchTime,
which typically corresponds to the creation time of the RecordsWithSplitIds
instance.
- B is not assuming SourceReaderBase.
If it's SourceReaderBase, the same proposed solution works out of the box:
SourceOperator intercepts the emitted event time and uses the fetch time of
the current batch.
If it's not SourceReaderBase, the user would need to attach the timestamp
to the handover protocol if multi-threaded and set the lastFetchTimeGauge
when a value in the handover protocol is selected (typically a batch).
If it's a single threaded source, the user could directly set the current
timestamp after fetching the records in a sync fashion.
The bad case is if the user is fetching individual records (either sync or
async), then the fetch time would be updated with every record. However,
I'm assuming that the required system call is dwarfed by involved I/O.

[1] https://github.com/apache/flink/pull/15972

On Tue, Jul 13, 2021 at 12:58 PM Chesnay Schepler <ches...@apache.org>
wrote:

> Re 1: We don't expose the reuse* methods, because the proposed
> OperatorIOMetricGroup is a separate interface from the existing
> implementations (which will be renamed and implement the new interface).
>
> Re 2: Currently the plan is to re-use the "new" numByesIn/Out counters
> for tasks ("new" because all we are doing is exposing already existing
> metrics). We may however change this in the future if we want to report
> the byte metrics on an operator level, which is primarily interesting
> for async IO or other external connectivity outside of sinks/sources.
>
> On 13/07/2021 12:38, Becket Qin wrote:
> > Hi Arvid,
> >
> > Thanks for the proposal. I like the idea of exposing concrete metric
> group
> > class so that users can access the predefined metrics.
> >
> > A few questions are following:
> >
> > 1. When exposing the OperatorIOMetrics to the users, we are also exposing
> > the reuseInputMetricsForTask to the users. Should we hide these two
> methods
> > because users won't have enough information to decide whether the records
> > IO metrics should be reused by the task or not.
> >
> > 2. Similar to question 1, in the OperatorIOMetricGroup, we are adding
> > numBytesInCounter and numBytesOutCounter. Should these metrics be reusing
> > the task level metrics by default?
> >
> > 3. Regarding SourceMetricGroup#setLastFetchTimeGauge(), I am not sure how
> > it works with the FetchLag. Typically there are two cases when reporting
> > the fetch lag.
> >      A. The EventTime is known at the point when the record is fetched in
> > the SplitFetcher, so the fetch lag can be derived and reported
> immediately.
> >      B. The EventTime is known only after the fetched record was parsed
> in
> > the RecordEmitter. In this case, the RecordEmitter needs to get the fetch
> > time of that particular record.
> > I am not sure when users set the LastFetchTime in the above two cases.
> Can
> > you help elaborate on how users should use it?
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> >
> >
> > On Thu, Jul 8, 2021 at 10:25 PM Arvid Heise <ar...@apache.org> wrote:
> >
> >> Dear devs,
> >>
> >> As a continuation and generalization of FLIP-33 (Standardize Connector
> >> Metrics) [1], we'd like to discuss how we actually expose the
> standardized
> >> operator metrics to users in terms of changes to the API.
> >>
> >> Please check out the FLIP [2] and provide feedback.
> >>
> >> Best,
> >>
> >> Arvid
> >>
> >> [1]
> >>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics
> >> [2]
> >>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-179%3A+Expose+Standardized+Operator+Metrics
> >>
>
>

Reply via email to