pnowojski commented on a change in pull request #18392:
URL: https://github.com/apache/flink/pull/18392#discussion_r790754115



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
##########
@@ -356,6 +362,76 @@ public int getNumberOfRegisteredBufferPools() {
         }
     }
 
+    public long getNumberOfRequestedMemorySegments() {
+        long requestedSegments = 0;
+        synchronized (factoryLock) {
+            for (LocalBufferPool bufferPool : allBufferPools) {
+                int maxNumberOfMemorySegments = 
bufferPool.getMaxNumberOfMemorySegments();
+                /**
+                 * As defined in {@link
+                 * 
org.apache.flink.runtime.shuffle.NettyShuffleUtils#getMinMaxNetworkBuffersPerResultPartition(int,
+                 * int, int, int, int,
+                 * 
org.apache.flink.runtime.io.network.partition.ResultPartitionType)}. Unbounded
+                 * subpartitions have {@link maxNumberOfMemorySegments} set to 
{@code
+                 * Integer.MAX_VALUE}. In this case let's use number of 
required segments instead.
+                 */
+                if (maxNumberOfMemorySegments < Integer.MAX_VALUE) {
+                    requestedSegments += maxNumberOfMemorySegments;
+                } else {
+                    requestedSegments += 
bufferPool.getNumberOfRequiredMemorySegments();
+                }
+            }
+        }
+        return requestedSegments;
+    }
+
+    public long getRequestedMemory() {
+        return getNumberOfRequestedMemorySegments() * memorySegmentSize;
+    }
+
+    public int getRequestedSegmentsUsage() {
+        return Math.toIntExact(
+                100L * getNumberOfRequestedMemorySegments() / 
getTotalNumberOfMemorySegments());
+    }
+
+    @VisibleForTesting
+    Optional<String> getUsageWarning() {
+        int currentUsage = getRequestedSegmentsUsage();

Review comment:
       > Or do we have the case when we create new buffer pool in runtime?
   
   Yes, that's possible. First and foremost this can happen if you have one 
cluster running multiple, potential short lived adhoc, jobs. Secondly for batch 
execution, not all tasks are scheduled at once. Tasks from later stages can be 
unblocked from execution once their upstream tasks complete producing the 
result, so at different point of time, different number of tasks, with 
different memory requirements can be running even for a single job. Lastly 
during startup not all `LocalBufferPools` are created at once and there might 
be quite significant delay in different tasks startup time. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to