akalash commented on a change in pull request #18392:
URL: https://github.com/apache/flink/pull/18392#discussion_r789591793
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
##########
@@ -356,6 +362,76 @@ public int getNumberOfRegisteredBufferPools() {
}
}
+ public long getNumberOfRequestedMemorySegments() {
+ long requestedSegments = 0;
+ synchronized (factoryLock) {
+ for (LocalBufferPool bufferPool : allBufferPools) {
+ int maxNumberOfMemorySegments =
bufferPool.getMaxNumberOfMemorySegments();
+ /**
+ * As defined in {@link
+ *
org.apache.flink.runtime.shuffle.NettyShuffleUtils#getMinMaxNetworkBuffersPerResultPartition(int,
+ * int, int, int, int,
+ *
org.apache.flink.runtime.io.network.partition.ResultPartitionType)}. Unbounded
+ * subpartitions have {@link maxNumberOfMemorySegments} set to
{@code
+ * Integer.MAX_VALUE}. In this case let's use number of
required segments instead.
+ */
+ if (maxNumberOfMemorySegments < Integer.MAX_VALUE) {
Review comment:
Oh, it looks so fragile. Maybe does it make sense to create a static
variable UNBOUND_MEMORY_SEGMENTS which can help to avoid the comment above?
Also, maybe is the best place for this `if` logic inside of
`LocalBufferPool` since according to the contract of `LocalBufferPool` it know
that MaxNumberOfMemorySegments equal to MAX_VALUE represents unbound memory?
But unfortunately, it means that we need to add one more method to
`LocalBufferPool`.
It is difficult to say what is better but right now it looks like that side
class knows about the inner implementation of `NettyShuffleUtils` and
`LocalBufferPool` which is not good.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/NettyShuffleMetricFactory.java
##########
@@ -223,4 +230,26 @@ public static void registerDebloatingTaskMetrics(
taskGroup.gauge(
MetricNames.ESTIMATED_TIME_TO_CONSUME_BUFFERS, new
TimeToConsumeGauge(inputGates));
}
+
+ /**
+ * This is a small hack. Instead of spawning a custom thread to monitor
{@link
+ * NetworkBufferPool} usage, we are re-using {@link View#update()} method
for this purpose.
+ */
+ private static class RequestedMemoryUsageMetric implements Gauge<Integer>,
View {
Review comment:
If I am right and the value can not be changed in runtime then I suppose
we don't need the View or even metric at all.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
##########
@@ -356,6 +362,76 @@ public int getNumberOfRegisteredBufferPools() {
}
}
+ public long getNumberOfRequestedMemorySegments() {
+ long requestedSegments = 0;
+ synchronized (factoryLock) {
+ for (LocalBufferPool bufferPool : allBufferPools) {
+ int maxNumberOfMemorySegments =
bufferPool.getMaxNumberOfMemorySegments();
+ /**
+ * As defined in {@link
+ *
org.apache.flink.runtime.shuffle.NettyShuffleUtils#getMinMaxNetworkBuffersPerResultPartition(int,
+ * int, int, int, int,
+ *
org.apache.flink.runtime.io.network.partition.ResultPartitionType)}. Unbounded
+ * subpartitions have {@link maxNumberOfMemorySegments} set to
{@code
+ * Integer.MAX_VALUE}. In this case let's use number of
required segments instead.
+ */
+ if (maxNumberOfMemorySegments < Integer.MAX_VALUE) {
+ requestedSegments += maxNumberOfMemorySegments;
+ } else {
+ requestedSegments +=
bufferPool.getNumberOfRequiredMemorySegments();
+ }
+ }
+ }
+ return requestedSegments;
+ }
+
+ public long getRequestedMemory() {
+ return getNumberOfRequestedMemorySegments() * memorySegmentSize;
+ }
+
+ public int getRequestedSegmentsUsage() {
+ return Math.toIntExact(
+ 100L * getNumberOfRequestedMemorySegments() /
getTotalNumberOfMemorySegments());
+ }
+
+ @VisibleForTesting
+ Optional<String> getUsageWarning() {
+ int currentUsage = getRequestedSegmentsUsage();
Review comment:
Does it even possible to have different values at different attempts
here? I mean as I see all values like `memorySegmentSize`,
`TotalNumberOfMemorySegments` and `MaxNumberOfMemorySegments` are final and
allBufferPools is filled out only once at the start. that means this value
should be the same every time after the task initialization
Or do we have the case when we create new buffer pool in runtime?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]