akalash commented on a change in pull request #17660:
URL: https://github.com/apache/flink/pull/17660#discussion_r741850840



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
##########
@@ -497,6 +497,12 @@ public int unsynchronizedGetNumberOfQueuedBuffers() {
         return Math.max(buffers.size(), 0);
     }
 
+    @Override
+    public long unsynchronizedGetSizeOfQueuedBuffers() {
+        // Pretty rough approximation of real queue size.
+        return Math.max(unsynchronizedGetNumberOfQueuedBuffers() * bufferSize, 
0);

Review comment:
       This is pretty bad since the queue can hold events that have different 
sizes, buffers of old buffer size, and unfinished buffers. But it is the 
easiest way to calculate queue size without dramatically rewriting code.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
##########
@@ -535,7 +541,9 @@ private void updateStatistics(BufferConsumer buffer) {
     }
 
     private void updateStatistics(Buffer buffer) {
-        totalNumberOfBytes += buffer.getSize();
+        if (buffer.isBuffer()) {
+            totalNumberOfBytes += buffer.getSize();

Review comment:
       NB! Right now, The totalNumberOfBytes calculate only the size of the 
data buffers(without event) which broke the old behavior but as I can see we 
use it only in toString. So maybe it is not a problem but if it is, we can add 
another variable for data buffers size calculation.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java
##########
@@ -111,6 +111,17 @@ public int getNumberOfQueuedBuffers() {
         return totalBuffers;
     }
 
+    @Override
+    public long getSizeOfQueuedBuffers() {
+        long totalNumberOfBytes = 0;
+
+        for (ResultSubpartition subpartition : subpartitions) {
+            totalNumberOfBytes += Math.max(0, 
subpartition.getTotalNumberOfBytes());

Review comment:
       I just notice that the access to getTotalNumberOfBytes is unsynchronized 
here.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
##########
@@ -497,6 +497,12 @@ public int unsynchronizedGetNumberOfQueuedBuffers() {
         return Math.max(buffers.size(), 0);
     }
 
+    @Override
+    public long unsynchronizedGetSizeOfQueuedBuffers() {
+        // Pretty rough approximation of real queue size.
+        return Math.max(unsynchronizedGetNumberOfQueuedBuffers() * bufferSize, 
0);

Review comment:
       This is pretty bad since the queue can hold events that have different 
sizes, buffers of old buffer size, and unfinished buffers. But it is the 
easiest way to calculate queue size without dramatically rewriting code.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
##########
@@ -535,7 +541,9 @@ private void updateStatistics(BufferConsumer buffer) {
     }
 
     private void updateStatistics(Buffer buffer) {
-        totalNumberOfBytes += buffer.getSize();
+        if (buffer.isBuffer()) {
+            totalNumberOfBytes += buffer.getSize();

Review comment:
       NB! Right now, The totalNumberOfBytes calculate only the size of the 
data buffers(without event) which broke the old behavior but as I can see we 
use it only in toString. So maybe it is not a problem but if it is, we can add 
another variable for data buffers size calculation.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java
##########
@@ -111,6 +111,17 @@ public int getNumberOfQueuedBuffers() {
         return totalBuffers;
     }
 
+    @Override
+    public long getSizeOfQueuedBuffers() {
+        long totalNumberOfBytes = 0;
+
+        for (ResultSubpartition subpartition : subpartitions) {
+            totalNumberOfBytes += Math.max(0, 
subpartition.getTotalNumberOfBytes());

Review comment:
       I just notice that the access to getTotalNumberOfBytes is unsynchronized 
here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to