1996fanrui commented on code in PR #19499:
URL: https://github.com/apache/flink/pull/19499#discussion_r901698881
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java:
##########
@@ -298,7 +318,11 @@ public int getNumBuffers() {
@Override
public int bestEffortGetNumOfUsedBuffers() {
- return Math.max(0, numberOfRequestedMemorySegments -
availableMemorySegments.size());
+ return Math.max(
+ 0,
+ numberOfRequestedMemorySegments
+ + numberOfRequestedOverdraftMemorySegments
+ - availableMemorySegments.size());
Review Comment:
Thanks for your reminder.
I prefer report the pool usage was > 100% if overdraft buffers are being
used. Because the overdraft buffer is overused, it doesn’t belong inside the
LocalBufferPool. So I prefer it, what do you think?
##########
flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java:
##########
@@ -273,6 +273,26 @@ public class NettyShuffleEnvironmentOptions {
+ " case of data skew and high number of
configured floating buffers. This limit is not strictly guaranteed,"
+ " and can be ignored by things like
flatMap operators, records spanning multiple buffers or single timer"
+ " producing large amount of data.");
+ /**
+ * Number of max overdraft network buffers to use for each
ResultPartition. Currently, it just
+ * supports ResultPartition. InputGate can be supported if necessary.
+ */
+ @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK)
+ public static final ConfigOption<Integer>
NETWORK_MAX_OVERDRAFT_BUFFERS_PER_GATE =
+ key("taskmanager.network.memory.max-overdraft-buffers-per-gate")
+ .intType()
+ .defaultValue(10)
+ .withDescription(
+ String.format(
+ "Number of max overdraft network buffers
to use for each ResultPartition."
+ + " The overdraft buffers will be
used when the ResultPartition cannot apply to the normal buffers from the"
+ + "
LocalBufferPool(LocalBufferPool is unavailable), e.g: the all buffers of
LocalBufferPool be applied or"
+ + " the number of buffers for a
single channel has reached %s. When the LocalBufferPool is unavailable,"
+ + " the LocalBufferPool can
provide some additional buffers to allow overdraft. It can effectively solve"
+ + " the problem that multiple
output buffers are required to process a single data, causing the Task to block
in the requestMemory,"
+ + " such as: flatMap operator,
records spanning multiple buffers or a single timer generates a large amount of
data."
+ + " It doesn't need to wait for
ResultPartition is available to start Unaligned Checkpoint directly.",
+ NETWORK_MAX_BUFFERS_PER_CHANNEL.key()));
Review Comment:
Hi @pnowojski , thanks for your review, I updated.
But I still use ResultPartition in the first sentence, because it has
multiple ResultPartitions when operator has multiple downstream operators. And
each ResultPartition has its own overdraft buffer.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]