pnowojski commented on a change in pull request #11567: [FLINK-16645] Limit the
maximum backlogs in subpartitions
URL: https://github.com/apache/flink/pull/11567#discussion_r403475013
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
##########
@@ -375,4 +397,29 @@ void onConsumedSubpartition(int subpartitionIndex) {
private void checkInProduceState() throws IllegalStateException {
checkState(!isFinished, "Partition already finished.");
}
+
+ /**
+ * Check whether all subpartitions' backlogs are less than the
limitation of max backlogs, and make this partition
+ * available again if yes.
+ */
+ public void notifyDecreaseBacklog(int buffersInBacklog) {
+ if (buffersInBacklog == maxBuffersPerChannel) {
+ if (--unavailableSubpartitionsCount == 0) {
+ CompletableFuture<?> toNotify =
availabilityHelper.getUnavailableToResetAvailable();
+ toNotify.complete(null);
+ }
+ }
+ }
+
+ /**
+ * Check whether any subpartition's backlog exceeds the limitation of
max backlogs, and make this partition
+ * unavailabe if yes.
+ */
+ public void notifyIncreaseBacklog(int buffersInBacklog) {
+ if (buffersInBacklog == maxBuffersPerChannel + 1) {
+ if (++unavailableSubpartitionsCount == 1) {
+ availabilityHelper.resetUnavailable();
+ }
+ }
+ }
Review comment:
Continuing [the previous thread ](**url**)
> each PipelinedSubpartition has it's own buffers field, so the updates here
to unavailableSubpartitionsCnt are complete not synchronized. Some simple
solution might be to change unavailableSubpartitionsCnt to an AtomicLong, but
let me think about alternative solutions.
There is also another issue. `AvailabilityHelper` is not thread safe class,
while you are accessing and modifying it's state from different threads, so my
previous idea with the `AtomicLong unavailableSubpartitionsCount ` wouldn't
work. This also affects `getAvailableFuture`.
I don't know if that can be solved in an efficient way. One thing that could
help a bit, is some old idea of mine
https://github.com/pnowojski/flink/commit/24e360e7db2d6f354c2f7369b95607eee1a5cb2c
that there is no reason for each subpartition to have unique lock. Having
one global per partition lock would help with couple of issues, like
`flushAll()`, some load rebalancing ideas that we had or here. However we would
have to also synchronize `ResultPartition#getAvailableFuture(...)` which would
affect performance on the happy/hot path :/
Maybe a solution would be to not use `availabilityHelper` here at all. But
instead `availabilityHelper.resetUnavailable();` enqueue an action in the
mailbox, which would suspend default action and instead of
`toNotify.complete(null);`, also enqueue a mailbox action to re-enable default
actions. That would increase a cost of `#notifyIncreaseBacklog` call, but not
on the happy/hot path, but only if this task is being back-pressured, so when
performance shouldn't matter.
However we would still need either `AtomicLong` or global
`ResultPartition.lock` to not have race conditions on
`unavailableSubpartitionsCount`.
(lets think this though before jumping to implementation)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services