[
https://issues.apache.org/jira/browse/FLINK-18808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17177743#comment-17177743
]
ming li commented on FLINK-18808:
---------------------------------
{quote}I would be afraid of this check adding too much computation overhead.
Accessing both {{HashMap}} and {{ArrayList}} on the critical per record path is
not desirable.
{quote}
Yes, you are right. I did not consider the computational overhead caused by
this logic.:(
{quote}My last idea would be to add this version of {{boolean
collectAndCheckIfEmitted(...)}} to {{RecordWriterOutput}} and use it in
{{BroadcastingOutputCollector}} and {{DirectedOutput}} as I proposed above.
{quote}
We can try to add a {{collectAndCheckIfEmitted}} method to
{{RecordWriterOutput}}, but we still need to distinguish between
{{RecordWriterOutput}} and {{ChainingOutput}} by grouping (but this time we
directly use the array to store), and add a logic to judge whether it has been
sent to avoid multiple statistics.
{code:java}
protected final Output<StreamRecord<T>>[] chainedOutputs;
protected final RecordWriterOutput<StreamRecord<T>>[] nonChainedOutputs;
{code}
{{}}The {{collect}} method may be implemented like this:
{code:java}
@Override
public void collect(StreamRecord<T> record) {
boolean emitted = false;
for (RecordWriterOutput<StreamRecord<T>> output : nonChainedOutputs) {
emitted |= output.collectAndCheckIfEmitted(record);
}
if (emitted) {
numRecordOut.inc();
}
for (Output<StreamRecord<T>> output : chainedOutputs) {
output.collect(record);
}
}
@Override
public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) {
boolean emitted = false;
for (RecordWriterOutput<StreamRecord<T>> output : nonChainedOutputs) {
emitted |= output.collectAndCheckIfEmitted(outputTag, record);
}
if (emitted) {
numRecordOut.inc();
}
for (Output<StreamRecord<T>> output : chainedOutputs) {
output.collect(outputTag, record);
}
}{code}
In the case of {{allOutputs.size() == 1}}, we can judge whether it is a
{{RecordWriterOutput}} instance in {{AbstractStreamOperator#setup}}, and if so,
we can directly call {{OperatorMetricGroup#reuseOutputMetricsForTask}} for
statistics.
> Task-level numRecordsOut metric may be underestimated
> -----------------------------------------------------
>
> Key: FLINK-18808
> URL: https://issues.apache.org/jira/browse/FLINK-18808
> Project: Flink
> Issue Type: Improvement
> Components: Runtime / Metrics, Runtime / Task
> Affects Versions: 1.11.1
> Reporter: ming li
> Assignee: ming li
> Priority: Major
> Labels: pull-request-available, usability
> Attachments: image-2020-08-04-11-28-13-800.png,
> image-2020-08-04-11-32-20-678.png, image-2020-08-13-18-36-13-282.png
>
>
> At present, we only register task-level numRecordsOut metric by reusing
> operator output record counter at the end of OperatorChain.
> {code:java}
> if (config.isChainEnd()) {
> operatorMetricGroup.getIOMetricGroup().reuseOutputMetricsForTask();
> }
> {code}
> If we only send data out through the last operator of OperatorChain, there is
> no problem with this statistics. But consider the following scenario:
> !image-2020-08-04-11-28-13-800.png|width=507,height=174!
> In this JobGraph, we not only send data in the last operator, but also send
> data in the middle operator of OperatorChain (the map operator just returns
> the original value directly). Below is one of our test topology, we can see
> that the statistics actually only have half of the total data received by the
> downstream.
> !image-2020-08-04-11-32-20-678.png|width=648,height=251!
> I think the data sent out by the intermediate operator should also be counted
> into the numRecordsOut of the Task. But currently we are not reusing
> operators output record counters in the intermediate operators, which leads
> to our task-level numRecordsOut metric is underestimated (although this has
> no effect on the actual operation of the job, it may affect our monitoring).
> A simple idea of mine is to modify the condition of reusing operators
> output record counter:
> {code:java}
> if (!config.getNonChainedOutputs(getUserCodeClassloader()).isEmpty()) {
> operatorMetricGroup.getIOMetricGroup().reuseOutputMetricsForTask();
> }{code}
> In addition, I have another question: If a record is broadcast to all
> downstream, should the numRecordsOut counter increase by one or the
> downstream number? It seems that currently we are adding one to calculate the
> numRecordsOut metric.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)