StephanEwen commented on a change in pull request #15161:
URL: https://github.com/apache/flink/pull/15161#discussion_r601620880



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessorFactory.java
##########
@@ -351,7 +351,7 @@ public StreamTaskSourceOutput(
                 WatermarkGauge inputWatermarkGauge,
                 MultiStreamStreamStatusTracker streamStatusTracker,
                 int inputIndex) {
-            super(chainedSourceOutput, streamStatusMaintainer, 
inputWatermarkGauge);
+            super(chainedSourceOutput, streamStatusMaintainer, 
inputWatermarkGauge, null);

Review comment:
       Can we just pass a `new SimpleCounter()` here? I think that this code 
needs to be adjusted as well in the future to do proper record counting 
(register the proper counter), so we can directly operate under the assumption 
that there will be a counter passed in here.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
##########
@@ -145,19 +155,25 @@ private void 
triggerCheckpointForExternallyInducedSource(long checkpointId) {
 
         private final Output<StreamRecord<T>> output;
         @Nullable private final WatermarkGauge inputWatermarkGauge;
+        @Nullable private final Counter numRecordsOut;
 
         public AsyncDataOutputToOutput(
                 Output<StreamRecord<T>> output,
                 StreamStatusMaintainer streamStatusMaintainer,
-                @Nullable WatermarkGauge inputWatermarkGauge) {
+                @Nullable WatermarkGauge inputWatermarkGauge,
+                @Nullable Counter numRecordsOut) {
             super(streamStatusMaintainer);
 
             this.output = checkNotNull(output);
             this.inputWatermarkGauge = inputWatermarkGauge;
+            this.numRecordsOut = numRecordsOut;
         }
 
         @Override
         public void emitRecord(StreamRecord<T> streamRecord) {
+            if (numRecordsOut != null) {

Review comment:
       We could make the field non-nullable and skip the branch here. See 
comment above also.
   
   Again, not sure if it makes a big difference, JIT probably profiles this 
away, but it also means one branch less for the JIT to worry about, and 
non-nullability is also easier for the next maintainer to reason about. 
Especially if the counter is going to be non-null anyways.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to