zhijiangW commented on a change in pull request #11687:
URL: https://github.com/apache/flink/pull/11687#discussion_r420160941
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
##########
@@ -87,10 +90,14 @@ public LocalInputChannel(
int maxBackoff,
InputChannelMetrics metrics) {
- super(inputGate, channelIndex, partitionId, initialBackoff,
maxBackoff, metrics.getNumBytesInLocalCounter(),
metrics.getNumBuffersInLocalCounter());
+ super(inputGate, channelIndex, partitionId, initialBackoff,
maxBackoff, metrics);
this.partitionManager = checkNotNull(partitionManager);
this.taskEventPublisher = checkNotNull(taskEventPublisher);
+ // In most cases we only need one buffer for reading recovered
state except for very large record.
+ // Then only one floating buffer is required. Even though we
need more buffers for recovery for
+ // large record, it only increases some interactions with pool.
+ this.bufferManager = new BufferManager(this, 1);
Review comment:
I mean we can give any initial `numRequiredBuffers` actually, e.g. `0`.
Then while requesting floating buffers from buffer pool in practice, it might
have two different paths:
- Get the available buffer right now from buffer pool.
- No available buffer from buffer pool now, then it will register listener
to buffer pool for later notification. For this process we should give a proper
value of `numRequiredBuffers`. As long as this value is more than 0, it can
work correctly.
It is better to give some detail examples to explain how
`numRequiredBuffers` works differently in practice.
E.g. `numRequiredBuffers` is set to `1` for `RemoteInputChannel`. When the
exclusive buffers are processed and recycled back to `RemoteInputChannel`, then
the current available mount might be 2 which is more than the
`numRequiredBuffers (1)`. So when the floating buffers are processed and
recycled to `LocalBufferPool`, then it will notify the listener
(`RemoteInputChannel`) of available buffers. But the `RemoteInputChannel`
already has enough buffers now, so it will not accept this floating buffer to
cause waste efforts of register & notify.
But if we give a larger `numRequiredBuffers`, then we only need to register
listener once. After that as long as the floating buffers are processed and
recycled, they will be notified to `RemoteInputChannel` and be accepted to
reuse future. So it might reduce more interactions between `RemoteInputChannel`
and `LocalBufferPool` in practice. Especially for the channel state recovery
case, it makes sense to try to occupy all the floating buffers from
`LocalBufferPool` because only one channel is unspilling at the same time.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]