pnowojski commented on code in PR #19499:
URL: https://github.com/apache/flink/pull/19499#discussion_r901594695
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java:
##########
@@ -298,7 +318,11 @@ public int getNumBuffers() {
@Override
public int bestEffortGetNumOfUsedBuffers() {
- return Math.max(0, numberOfRequestedMemorySegments -
availableMemorySegments.size());
+ return Math.max(
+ 0,
+ numberOfRequestedMemorySegments
+ + numberOfRequestedOverdraftMemorySegments
+ - availableMemorySegments.size());
Review Comment:
This doesn't seem to be correct in combination of
`LocalBufferPool#getNumBuffers`. Please take a look at how are those methods
being used in `OutputBufferPoolUsageGauge#getValue`
Unless you intend to report that the pool usage was > 100% if overdraft
buffers are being used? If so, I think it should be documented in the
`docs/content/docs/ops/metrics.md` and `docs/content.zh/docs/ops/metrics.md`.
Either way, is this being tested somewhere?
##########
flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java:
##########
@@ -273,6 +273,26 @@ public class NettyShuffleEnvironmentOptions {
+ " case of data skew and high number of
configured floating buffers. This limit is not strictly guaranteed,"
+ " and can be ignored by things like
flatMap operators, records spanning multiple buffers or single timer"
+ " producing large amount of data.");
+ /**
+ * Number of max overdraft network buffers to use for each
ResultPartition. Currently, it just
+ * supports ResultPartition. InputGate can be supported if necessary.
+ */
+ @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK)
+ public static final ConfigOption<Integer>
NETWORK_MAX_OVERDRAFT_BUFFERS_PER_GATE =
+ key("taskmanager.network.memory.max-overdraft-buffers-per-gate")
+ .intType()
+ .defaultValue(10)
+ .withDescription(
+ String.format(
+ "Number of max overdraft network buffers
to use for each ResultPartition."
+ + " The overdraft buffers will be
used when the ResultPartition cannot apply to the normal buffers from the"
+ + "
LocalBufferPool(LocalBufferPool is unavailable), e.g: the all buffers of
LocalBufferPool be applied or"
+ + " the number of buffers for a
single channel has reached %s. When the LocalBufferPool is unavailable,"
+ + " the LocalBufferPool can
provide some additional buffers to allow overdraft. It can effectively solve"
+ + " the problem that multiple
output buffers are required to process a single data, causing the Task to block
in the requestMemory,"
+ + " such as: flatMap operator,
records spanning multiple buffers or a single timer generates a large amount of
data."
+ + " It doesn't need to wait for
ResultPartition is available to start Unaligned Checkpoint directly.",
+ NETWORK_MAX_BUFFERS_PER_CHANNEL.key()));
Review Comment:
I agree this is a bit too technical. What about:
> Number of max overdraft network buffers to use for each subtask
>
> The overdraft buffers will be used when the subtask cannot apply for the
normal buffers due to back pressure,
> while subtask is performing an action that can not be interrupted in the
middle, like serializing a large record,
> flatMap operator producing multiple records for one single input record or
processing time timer producing large output.
> In situations like that system will allow subtask to request overdraft
buffers, so that the subtask can finish such
> uninterruptible action, without blocking unaligned checkpoints for long
period of time. Overdraft buffers
> are provided on best effort basis only if the system has some unused
buffers available. Subtask that has used
> overdraft buffers won't be allowed to process any more records until the
overdraft buffers are returned to the pool.
?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]