[ 
https://issues.apache.org/jira/browse/FLINK-18808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17456235#comment-17456235
 ] 

Piotr Nowojski commented on FLINK-18808:
----------------------------------------

I'm not sure. Maybe it would be actually better to pick up the old PR 
https://github.com/apache/flink/pull/13109 for fixing the number of records 
produced? Maybe it's enough to rebase and simplify it ([as stated in my last 
comment|https://github.com/apache/flink/pull/13109#issuecomment-688690309]). 

Having properly working numRecordsSent and buggy numRecordsOut would be very 
confusing. 

Dropping numRecordsOut and replacing it with numRecordsSent would require us to 
invest extra effort in figuring out what to do with backward compatibility of 
the metrics and might prove impossible. 

On the other hand having both of them (properly working) might be a little bit 
redundant? 

That's why I would suggest to first re-evaluate this old PR. However I don't 
fully remember what was the status of this change, whether there were still 
some unanswered questions and whether it was safe from the performance 
perspective or not. 


> Task-level numRecordsOut metric may be underestimated
> -----------------------------------------------------
>
>                 Key: FLINK-18808
>                 URL: https://issues.apache.org/jira/browse/FLINK-18808
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Metrics, Runtime / Task
>    Affects Versions: 1.11.1
>            Reporter: ming li
>            Priority: Not a Priority
>              Labels: pull-request-available, usability
>         Attachments: image-2020-08-04-11-28-13-800.png, 
> image-2020-08-04-11-32-20-678.png, image-2020-08-13-18-36-13-282.png
>
>
> At present, we only register task-level numRecordsOut metric by reusing 
> operator output record counter at the end of OperatorChain.
> {code:java}
> if (config.isChainEnd()) {
>    operatorMetricGroup.getIOMetricGroup().reuseOutputMetricsForTask();
> }
> {code}
> If we only send data out through the last operator of OperatorChain, there is 
> no problem with this statistics. But consider the following scenario:
> !image-2020-08-04-11-28-13-800.png|width=507,height=174!
> In this JobGraph, we not only send data in the last operator, but also send 
> data in the middle operator of OperatorChain (the map operator just returns 
> the original value directly). Below is one of our test topology, we can see 
> that the statistics actually only have half of the total data received by the 
> downstream.
> !image-2020-08-04-11-32-20-678.png|width=648,height=251!
> I think the data sent out by the intermediate operator should also be counted 
> into the numRecordsOut of the Task. But currently we are not reusing 
> operators output record counters in the intermediate operators, which leads 
> to our task-level numRecordsOut metric is underestimated (although this has 
> no effect on the actual operation of the job, it may affect our monitoring).
> A simple idea of ​​mine is to modify the condition of reusing operators 
> output record counter:
> {code:java}
> if (!config.getNonChainedOutputs(getUserCodeClassloader()).isEmpty()) {
>    operatorMetricGroup.getIOMetricGroup().reuseOutputMetricsForTask();
> }{code}
> In addition, I have another question: If a record is broadcast to all 
> downstream, should the numRecordsOut counter increase by one or the 
> downstream number? It seems that currently we are adding one to calculate the 
> numRecordsOut metric.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to