To avoid confusion, can we either rename "SourceMetricGroup" to " SplitReaderMetricGroup" or add "Reader" to the setter method names?
Yes, we should add the "unassigned/pending splits" enumerator metric. I tried to publish those metrics for IcebergSourceEnumerator and ran into an issue [1]. I don't want to distract the discussion with the jira ticket. [1] https://issues.apache.org/jira/browse/FLINK-21000 On Thu, Jul 15, 2021 at 1:01 PM Arvid Heise <ar...@apache.org> wrote: > Hi Steven, > > The semantics are unchanged compared to FLIP-33 [1] but I see your point. > > In reality, pending records would be mostly for event storage systems > (Kafka, Kinesis, ...). Here, we would report the consumer lag effectively. > If consumer lag is more prominent, we could also rename it. > > For pending bytes, this is mostly related to file source or any kind of > byte streams. At this point, we can only capture the assigned splits on > reader levels. I don't think it makes sense to add the same metric to the > enumerator as that might induce too much I/O on the job master. I could > rather envision another metric that captures how many unassigned splits > there are. In general, I think it would be a good idea to add another type > of top-level metric group for SplitEnumerator called > SplitEnumeratorMetricGroup in SplitEnumeratorContext. There we could add > unassigned/pending splits metric. WDYT? > > [1] > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics > > On Wed, Jul 14, 2021 at 9:00 AM Steven Wu <stevenz...@gmail.com> wrote: > > > I am trying to understand what those two metrics really capture > > > > <G extends Gauge<Long>> G setPendingBytesGauge(G pendingBytesGauge); > > > > - use file source as an example, it captures the remaining bytes for > > the current file split that the reader is processing? How would users > > interpret or use this metric? enumerator keeps tracks of the > > pending/unassigned splits, which is an indication of the size of the > > backlog. that would be very useful > > > > > > <G extends Gauge<Long>> G setPendingRecordsGauge(G pendingRecordsGauge); > > > > - In the Kafka source case, this is intended to capture the consumer > lag > > (log head offset from broker - current record offset)? that could be > > used > > to capture the size of the backlog > > > > > > > > On Tue, Jul 13, 2021 at 3:01 PM Arvid Heise <ar...@apache.org> wrote: > > > > > Hi Becket, > > > > > > I believe 1+2 has been answered by Chesnay already. Just to add to 2: > I'm > > > not the biggest fan of reusing task metrics but that's what FLIP-33 and > > > different folks suggested. I'd probably keep task I/O metrics only for > > > internal things and add a new metric for external calls. Then, we could > > > even allow users to track I/O in AsyncIO (which would currently be a > > mess). > > > However, with the current abstraction, it would be relatively easy to > add > > > separate metrics later. > > > > > > 3. As outlined in the JavaDoc and in the draft PR [1], it's up to the > > user > > > to implement it in a way that fetch time always corresponds to the > latest > > > polled record. For SourceReaderBase, I have added a new > > > RecordsWithSplitIds#lastFetchTime (with default return value null) that > > > sets the last fetch time automatically whenever the next batch is > > selected. > > > Tbh this metric is a bit more challenging to implement for > > > non-SourceReaderBase sources but I have not found a better, thread-safe > > > way. Of course, we could shift the complete calculation into user-land > > but > > > I'm not sure that this is easier. > > > For your scenarios: > > > - in A, you assume SourceReaderBase. In that case, we could eagerly > > report > > > the metric as sketched by you. It depends on the definition of "last > > > processed record" in FLIP-33, whether this eager reporting is more > > correct > > > than the lazy reporting that I have proposed. The former case assumes > > "last > > > processed record" = last fetched record, while the latter case assumes > > > "last processed record" = "last polled record". For the proposed > > solution, > > > the user would just need to implement > RecordsWithSplitIds#lastFetchTime, > > > which typically corresponds to the creation time of the > > RecordsWithSplitIds > > > instance. > > > - B is not assuming SourceReaderBase. > > > If it's SourceReaderBase, the same proposed solution works out of the > > box: > > > SourceOperator intercepts the emitted event time and uses the fetch > time > > of > > > the current batch. > > > If it's not SourceReaderBase, the user would need to attach the > timestamp > > > to the handover protocol if multi-threaded and set the > lastFetchTimeGauge > > > when a value in the handover protocol is selected (typically a batch). > > > If it's a single threaded source, the user could directly set the > current > > > timestamp after fetching the records in a sync fashion. > > > The bad case is if the user is fetching individual records (either sync > > or > > > async), then the fetch time would be updated with every record. > However, > > > I'm assuming that the required system call is dwarfed by involved I/O. > > > > > > [1] https://github.com/apache/flink/pull/15972 > > > > > > On Tue, Jul 13, 2021 at 12:58 PM Chesnay Schepler <ches...@apache.org> > > > wrote: > > > > > > > Re 1: We don't expose the reuse* methods, because the proposed > > > > OperatorIOMetricGroup is a separate interface from the existing > > > > implementations (which will be renamed and implement the new > > interface). > > > > > > > > Re 2: Currently the plan is to re-use the "new" numByesIn/Out > counters > > > > for tasks ("new" because all we are doing is exposing already > existing > > > > metrics). We may however change this in the future if we want to > report > > > > the byte metrics on an operator level, which is primarily interesting > > > > for async IO or other external connectivity outside of sinks/sources. > > > > > > > > On 13/07/2021 12:38, Becket Qin wrote: > > > > > Hi Arvid, > > > > > > > > > > Thanks for the proposal. I like the idea of exposing concrete > metric > > > > group > > > > > class so that users can access the predefined metrics. > > > > > > > > > > A few questions are following: > > > > > > > > > > 1. When exposing the OperatorIOMetrics to the users, we are also > > > exposing > > > > > the reuseInputMetricsForTask to the users. Should we hide these two > > > > methods > > > > > because users won't have enough information to decide whether the > > > records > > > > > IO metrics should be reused by the task or not. > > > > > > > > > > 2. Similar to question 1, in the OperatorIOMetricGroup, we are > adding > > > > > numBytesInCounter and numBytesOutCounter. Should these metrics be > > > reusing > > > > > the task level metrics by default? > > > > > > > > > > 3. Regarding SourceMetricGroup#setLastFetchTimeGauge(), I am not > sure > > > how > > > > > it works with the FetchLag. Typically there are two cases when > > > reporting > > > > > the fetch lag. > > > > > A. The EventTime is known at the point when the record is > > fetched > > > in > > > > > the SplitFetcher, so the fetch lag can be derived and reported > > > > immediately. > > > > > B. The EventTime is known only after the fetched record was > > parsed > > > > in > > > > > the RecordEmitter. In this case, the RecordEmitter needs to get the > > > fetch > > > > > time of that particular record. > > > > > I am not sure when users set the LastFetchTime in the above two > > cases. > > > > Can > > > > > you help elaborate on how users should use it? > > > > > > > > > > Thanks, > > > > > > > > > > Jiangjie (Becket) Qin > > > > > > > > > > > > > > > > > > > > On Thu, Jul 8, 2021 at 10:25 PM Arvid Heise <ar...@apache.org> > > wrote: > > > > > > > > > >> Dear devs, > > > > >> > > > > >> As a continuation and generalization of FLIP-33 (Standardize > > Connector > > > > >> Metrics) [1], we'd like to discuss how we actually expose the > > > > standardized > > > > >> operator metrics to users in terms of changes to the API. > > > > >> > > > > >> Please check out the FLIP [2] and provide feedback. > > > > >> > > > > >> Best, > > > > >> > > > > >> Arvid > > > > >> > > > > >> [1] > > > > >> > > > > >> > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics > > > > >> [2] > > > > >> > > > > >> > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-179%3A+Expose+Standardized+Operator+Metrics > > > > >> > > > > > > > > > > > > > >