pnowojski commented on a change in pull request #17660:
URL: https://github.com/apache/flink/pull/17660#discussion_r744541970



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java
##########
@@ -111,6 +111,17 @@ public int getNumberOfQueuedBuffers() {
         return totalBuffers;
     }
 
+    @Override
+    public long getSizeOfQueuedBuffers() {
+        long totalNumberOfBytes = 0;
+
+        for (ResultSubpartition subpartition : subpartitions) {
+            totalNumberOfBytes += Math.max(0, 
subpartition.getTotalNumberOfBytes());
+        }
+
+        return numBytesOut.getCount() - totalNumberOfBytes;

Review comment:
       This metric is updated only after buffer writers are finished, which 
will lead to incorrect results here (negative numbers). I would suggest to 
calculate and update the `numBytesOut` every record. But as this would be now 
done on the per record code path, I would suggest to refactor the code to have 
`numBytesOut` as a simple `private long` field to avoid potential performance 
issues.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
##########
@@ -535,7 +541,9 @@ private void updateStatistics(BufferConsumer buffer) {
     }
 
     private void updateStatistics(Buffer buffer) {
-        totalNumberOfBytes += buffer.getSize();
+        if (buffer.isBuffer()) {
+            totalNumberOfBytes += buffer.getSize();

Review comment:
       scratch that. Now I can see why have you change this. I think this 
change would be fine, assuming we want the `outputQueueSize` to ignore events. 
If `outputQueueLength` is ignoring events, then I would vote for 
`outputQueueSize` to ignore them as well. In other words, they should stay in 
sync.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to