akalash commented on code in PR #19499:
URL: https://github.com/apache/flink/pull/19499#discussion_r858777920
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java:
##########
@@ -375,11 +399,14 @@ private MemorySegment requestMemorySegment(int
targetChannel) {
// target channel over quota; do not return a segment
if (targetChannel != UNKNOWN_CHANNEL
&& subpartitionBuffersCount[targetChannel] >=
maxBuffersPerChannel) {
- return null;
+ segment =
requestOverdraftMemorySegmentFromGlobal(targetChannel);
+ } else {
+ segment =
+ availableMemorySegments.isEmpty()
+ ?
requestOverdraftMemorySegmentFromGlobal(targetChannel)
Review Comment:
It is up to you but in my opinion, `requestOverdraftMemorySegmentFromGlobal`
which is used in `if` and `else` blocks looks weird. Don't you think that it is
better just update `if` condition:
```
if (targetChannel != UNKNOWN_CHANNEL
&& subpartitionBuffersCount[targetChannel] >=
maxBuffersPerChannel
|| availableMemorySegments.isEmpty()) {
segment =
requestOverdraftMemorySegmentFromGlobal(targetChannel);
} else {
segment = availableMemorySegments.poll();
}
```
I also don't really sure that we should use overdraft if we still have
`availableMemorySegments`. Perhaps we just need to ignore
`maxBuffersPerChannel`:
```
if (availableMemorySegments.isEmpty()) {
segment =
requestOverdraftMemorySegmentFromGlobal(targetChannel);
} else {
segment = availableMemorySegments.poll();
}
```
But I don't know how legal it is. I need to think ...
##########
flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java:
##########
@@ -273,6 +273,26 @@ public class NettyShuffleEnvironmentOptions {
+ " case of data skew and high number of
configured floating buffers. This limit is not strictly guaranteed,"
+ " and can be ignored by things like
flatMap operators, records spanning multiple buffers or single timer"
+ " producing large amount of data.");
+ /**
+ * Number of max overdraft network buffers to use for each
ResultPartition. Currently, it just
+ * supports ResultPartition. InputGate can be supported if necessary.
Review Comment:
Too much-repeated information. `Number of max overdraft network buffers to
use for each ResultPartition.` is enough, in my opinion, other sentences just
repeat it.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java:
##########
@@ -430,6 +457,25 @@ private boolean requestMemorySegmentFromGlobal() {
return false;
}
+ private MemorySegment requestOverdraftMemorySegmentFromGlobal(int
targetChannel) {
+ assert Thread.holdsLock(availableMemorySegments);
+
+ if (numberOfRequestedOverdraftMemorySegments >=
maxOverdraftBuffersPerGate
+ || targetChannel == UNKNOWN_CHANNEL) {
Review Comment:
Why we don't use overdraft for UNKNOWN_CHANNEL?
##########
flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java:
##########
@@ -273,6 +273,26 @@ public class NettyShuffleEnvironmentOptions {
+ " case of data skew and high number of
configured floating buffers. This limit is not strictly guaranteed,"
+ " and can be ignored by things like
flatMap operators, records spanning multiple buffers or single timer"
+ " producing large amount of data.");
+ /**
+ * Number of max overdraft network buffers to use for each
ResultPartition. Currently, it just
+ * supports ResultPartition. InputGate can be supported if necessary.
+ */
+ @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK)
+ public static final ConfigOption<Integer>
NETWORK_MAX_OVERDRAFT_BUFFERS_PER_GATE =
+ key("taskmanager.network.memory.max-overdraft-buffers-per-gate")
+ .intType()
+ .defaultValue(10)
Review Comment:
Default value should be discussed later.
##########
flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java:
##########
@@ -273,6 +273,26 @@ public class NettyShuffleEnvironmentOptions {
+ " case of data skew and high number of
configured floating buffers. This limit is not strictly guaranteed,"
+ " and can be ignored by things like
flatMap operators, records spanning multiple buffers or single timer"
+ " producing large amount of data.");
+ /**
+ * Number of max overdraft network buffers to use for each
ResultPartition. Currently, it just
+ * supports ResultPartition. InputGate can be supported if necessary.
+ */
+ @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK)
+ public static final ConfigOption<Integer>
NETWORK_MAX_OVERDRAFT_BUFFERS_PER_GATE =
+ key("taskmanager.network.memory.max-overdraft-buffers-per-gate")
+ .intType()
+ .defaultValue(10)
+ .withDescription(
+ String.format(
+ "Number of max overdraft network buffers
to use for each ResultPartition."
+ + " The overdraft buffers will be
used when the ResultPartition cannot apply to the normal buffers from the"
+ + "
LocalBufferPool(LocalBufferPool is unavailable), e.g: the all buffers of
LocalBufferPool be applied or"
+ + " the number of buffers for a
single channel has reached %s. When the LocalBufferPool is unavailable,"
+ + " the LocalBufferPool can
provide some additional buffers to allow overdraft. It can effectively solve"
+ + " the problem that multiple
output buffers are required to process a single data, causing the Task to block
in the requestMemory,"
+ + " such as: flatMap operator,
records spanning multiple buffers or a single timer generates a large amount of
data."
+ + " It doesn't need to wait for
ResultPartition is available to start Unaligned Checkpoint directly.",
+ NETWORK_MAX_BUFFERS_PER_CHANNEL.key()));
Review Comment:
I am not really sure that we should mention LocalBufferPool(we need to check
another configuration). It is user documentation we should just simply explain
how and when this configuration can help the user.
##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java:
##########
@@ -543,6 +543,164 @@ public void testConsistentAvailability() throws Exception
{
}
}
+ @Test
+ public void testOverdraftBuffer() throws IOException, InterruptedException
{
Review Comment:
Why do you keep all scenarios in one test? It looks too big and difficult to
read. Can you please, split this test into several tests with the descriptive
names?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java:
##########
@@ -472,18 +518,22 @@ private void onGlobalPoolAvailable() {
private boolean shouldBeAvailable() {
assert Thread.holdsLock(availableMemorySegments);
- return !availableMemorySegments.isEmpty() &&
unavailableSubpartitionsCount == 0;
+ return !availableMemorySegments.isEmpty()
+ && unavailableSubpartitionsCount == 0
+ && numberOfRequestedOverdraftMemorySegments == 0;
Review Comment:
I see a lot of pattern `unavailableSubpartitionsCount == 0 &&
umberOfRequestedOverdraftMemorySegments == 0` I suppose we need to extract it
to separated method in order to avoid misuse in the future.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]