lindong28 commented on a change in pull request #15161:
URL: https://github.com/apache/flink/pull/15161#discussion_r600297493
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
##########
@@ -88,11 +90,15 @@ public void init() throws Exception {
input = new StreamTaskSourceInput<>(sourceOperator, 0, 0);
}
+ CountingOutput<T> countingOutput =
Review comment:
Thank you Stephan for the explanation. Yes I agree we should try to
minimize the number of wrappers on the performance critical path. I have
updated the PR to move `counter` logic into `AsyncDataOutputToOutput`.
I noticed that
[StreamTaskSourceOutput](https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessorFactory.java#L225)
currently does not take any metric whereas
[StreamTaskNetworkOutput](https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessorFactory.java#L205),
whose usage is in the same loop body as `StreamTaskSourceOutput`, takes
`mainOperatorRecordsIn` as input.
I tried to find the answer in the code. Still it is not clear to me whether
`StreamTaskSourceOutput` should take metrics (e.g. numRecordsIn, numRecordsOut)
as input and update them. I will keep the `StreamTaskSourceOutput` as is.
+@becketqin for comments.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]