StephanEwen commented on a change in pull request #15161:
URL: https://github.com/apache/flink/pull/15161#discussion_r601620880
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessorFactory.java
##########
@@ -351,7 +351,7 @@ public StreamTaskSourceOutput(
WatermarkGauge inputWatermarkGauge,
MultiStreamStreamStatusTracker streamStatusTracker,
int inputIndex) {
- super(chainedSourceOutput, streamStatusMaintainer,
inputWatermarkGauge);
+ super(chainedSourceOutput, streamStatusMaintainer,
inputWatermarkGauge, null);
Review comment:
Can we just pass a `new SimpleCounter()` here? I think that this code
needs to be adjusted as well in the future to do proper record counting
(register the proper counter), so we can directly operate under the assumption
that there will be a counter passed in here.
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
##########
@@ -145,19 +155,25 @@ private void
triggerCheckpointForExternallyInducedSource(long checkpointId) {
private final Output<StreamRecord<T>> output;
@Nullable private final WatermarkGauge inputWatermarkGauge;
+ @Nullable private final Counter numRecordsOut;
public AsyncDataOutputToOutput(
Output<StreamRecord<T>> output,
StreamStatusMaintainer streamStatusMaintainer,
- @Nullable WatermarkGauge inputWatermarkGauge) {
+ @Nullable WatermarkGauge inputWatermarkGauge,
+ @Nullable Counter numRecordsOut) {
super(streamStatusMaintainer);
this.output = checkNotNull(output);
this.inputWatermarkGauge = inputWatermarkGauge;
+ this.numRecordsOut = numRecordsOut;
}
@Override
public void emitRecord(StreamRecord<T> streamRecord) {
+ if (numRecordsOut != null) {
Review comment:
We could make the field non-nullable and skip the branch here. See
comment above also.
Again, not sure if it makes a big difference, JIT probably profiles this
away, but it also means one branch less for the JIT to worry about, and
non-nullability is also easier for the next maintainer to reason about.
Especially if the counter is going to be non-null anyways.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]