pnowojski commented on a change in pull request #11567: [FLINK-16645] Limit the
maximum backlogs in subpartitions
URL: https://github.com/apache/flink/pull/11567#discussion_r403727993
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
##########
@@ -375,4 +397,29 @@ void onConsumedSubpartition(int subpartitionIndex) {
private void checkInProduceState() throws IllegalStateException {
checkState(!isFinished, "Partition already finished.");
}
+
+ /**
+ * Check whether all subpartitions' backlogs are less than the
limitation of max backlogs, and make this partition
+ * available again if yes.
+ */
+ public void notifyDecreaseBacklog(int buffersInBacklog) {
+ if (buffersInBacklog == maxBuffersPerChannel) {
+ if (--unavailableSubpartitionsCount == 0) {
+ CompletableFuture<?> toNotify =
availabilityHelper.getUnavailableToResetAvailable();
+ toNotify.complete(null);
+ }
+ }
+ }
+
+ /**
+ * Check whether any subpartition's backlog exceeds the limitation of
max backlogs, and make this partition
+ * unavailabe if yes.
+ */
+ public void notifyIncreaseBacklog(int buffersInBacklog) {
+ if (buffersInBacklog == maxBuffersPerChannel + 1) {
+ if (++unavailableSubpartitionsCount == 1) {
+ availabilityHelper.resetUnavailable();
+ }
+ }
+ }
Review comment:
Yes, more or less that was my last idea (keep in mind that I might be
missing something).
> If I understand this part correctly, wouldn't it be adding too many
(thousands of) BufferRecycler instances here?
For one thing, we could create one instance of `CustomBufferRecycler` per
subpartitionIndex and keep it referenced on some array field
`CustomBufferRecycler[] recyclers` - we don't have to keep creating them on
each request - so it shouldn't affect performance/GC pressure.
Secondly, we already have per subpartition index multiple classes (for
example `PipelinedSubpartition`, `PipelinedSubpartitionView`,
`RemoteInputChannel`, `BufferBuilder`, `BufferConsumer` and all of theirs
fields + buffers/segments themselves). One more shouldn't matter?
Thirdly, CustomBufferRecycler should be ~16 bytes, that's just 160KB for
10000 channels.
Lastly, we could also hide `subpartitionIndex` inside `NetworkBuffer` class,
but I think that might be less clean taking into account that `InputGate` won't
be using this field? But I might be wrong/persuaded the other direction here.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services