AHeise commented on a change in pull request #15972:
URL: https://github.com/apache/flink/pull/15972#discussion_r664273161
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
##########
@@ -291,13 +300,23 @@ public InputStatus emitNext(DataOutput<OUT> output)
throws Exception {
// short circuit the common case (every invocation except the first)
if (currentMainOutput != null) {
- return sourceReader.pollNext(currentMainOutput);
+ return pollNext();
}
// this creates a batch or streaming output based on the runtime mode
- currentMainOutput = eventTimeLogic.createMainOutput(output);
+ currentMainOutput =
+ eventTimeLogic.createMainOutput(
+ new MetricTrackingOutput<>(output, sourceMetricGroup));
lastInvokedOutput = output;
- return sourceReader.pollNext(currentMainOutput);
+ return pollNext();
+ }
+
+ private InputStatus pollNext() throws Exception {
+ InputStatus inputStatus = sourceReader.pollNext(currentMainOutput);
+ if (inputStatus == InputStatus.NOTHING_AVAILABLE) {
+ sourceMetricGroup.idlingStarted();
+ }
+ return inputStatus;
Review comment:
Yes, I didn't want to add tests until the design is approved to save
some time. The proper PR on top of the FLIP needs pretty much a test per metric.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]