pnowojski commented on a change in pull request #18392:
URL: https://github.com/apache/flink/pull/18392#discussion_r790754115
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
##########
@@ -356,6 +362,76 @@ public int getNumberOfRegisteredBufferPools() {
}
}
+ public long getNumberOfRequestedMemorySegments() {
+ long requestedSegments = 0;
+ synchronized (factoryLock) {
+ for (LocalBufferPool bufferPool : allBufferPools) {
+ int maxNumberOfMemorySegments =
bufferPool.getMaxNumberOfMemorySegments();
+ /**
+ * As defined in {@link
+ *
org.apache.flink.runtime.shuffle.NettyShuffleUtils#getMinMaxNetworkBuffersPerResultPartition(int,
+ * int, int, int, int,
+ *
org.apache.flink.runtime.io.network.partition.ResultPartitionType)}. Unbounded
+ * subpartitions have {@link maxNumberOfMemorySegments} set to
{@code
+ * Integer.MAX_VALUE}. In this case let's use number of
required segments instead.
+ */
+ if (maxNumberOfMemorySegments < Integer.MAX_VALUE) {
+ requestedSegments += maxNumberOfMemorySegments;
+ } else {
+ requestedSegments +=
bufferPool.getNumberOfRequiredMemorySegments();
+ }
+ }
+ }
+ return requestedSegments;
+ }
+
+ public long getRequestedMemory() {
+ return getNumberOfRequestedMemorySegments() * memorySegmentSize;
+ }
+
+ public int getRequestedSegmentsUsage() {
+ return Math.toIntExact(
+ 100L * getNumberOfRequestedMemorySegments() /
getTotalNumberOfMemorySegments());
+ }
+
+ @VisibleForTesting
+ Optional<String> getUsageWarning() {
+ int currentUsage = getRequestedSegmentsUsage();
Review comment:
> Or do we have the case when we create new buffer pool in runtime?
Yes, that's possible. First and foremost this can happen if you have one
cluster running multiple, potential short lived adhoc, jobs. Secondly for batch
execution, not all tasks are scheduled at once. Tasks from later stages can be
unblocked from execution once their upstream tasks complete producing the
result, so at different point of time, different number of tasks, with
different memory requirements can be running even for a single job. Lastly
during startup not all `LocalBufferPools` are created at once and there might
be quite significant delay in different tasks startup time.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]