pnowojski commented on a change in pull request #11567: [FLINK-16645] Limit the 
maximum backlogs in subpartitions
URL: https://github.com/apache/flink/pull/11567#discussion_r403675985
 
 

 ##########
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
 ##########
 @@ -375,4 +397,29 @@ void onConsumedSubpartition(int subpartitionIndex) {
        private void checkInProduceState() throws IllegalStateException {
                checkState(!isFinished, "Partition already finished.");
        }
+
+       /**
+        * Check whether all subpartitions' backlogs are less than the 
limitation of max backlogs, and make this partition
+        * available again if yes.
+        */
+       public void notifyDecreaseBacklog(int buffersInBacklog) {
+               if (buffersInBacklog == maxBuffersPerChannel) {
+                       if (--unavailableSubpartitionsCount == 0) {
+                               CompletableFuture<?> toNotify = 
availabilityHelper.getUnavailableToResetAvailable();
+                               toNotify.complete(null);
+                       }
+               }
+       }
+
+       /**
+        * Check whether any subpartition's backlog exceeds the limitation of 
max backlogs, and make this partition
+        * unavailabe if yes.
+        */
+       public void notifyIncreaseBacklog(int buffersInBacklog) {
+               if (buffersInBacklog == maxBuffersPerChannel + 1) {
+                       if (++unavailableSubpartitionsCount == 1) {
+                               availabilityHelper.resetUnavailable();
+                       }
+               }
+       }
 
 Review comment:
   Ok, I've just thought about probably a simpler and better idea. Why don't we 
move this anti data skew logic inside the `LocalBufferPool`? It's already 
synchronized for recycling the memory segments from netty threads and 
requesting from the task thread and it already implements 
`AvailabilityProvider` with an `AvailabilityHelper`. It currently doesn't know 
which channel/subpartition assigned to which buffer but it might be doable to 
pass this knowledge there:
   
   - add subpartition index argument to `ResultPartitionWriter#getBufferBuilder`
   - embed it into the `NetworkBuffer` inside `LocalBufferPool#toBuffer`? Maybe 
via passing a custom `BufferRecycler`?
   - use the subpartition id during recycling
   
   Couple of caveats:
   
   - `LocalBufferPool` is also used for input gates at least, so it might need 
to support managing buffers with and without channels accounting.
   - for `BroadcastRecordWriter` the accounting would either need to be 
disabled, or `Integer.MAX_VALUE` could be used to indicate to account all 
channels?
   - there might be some other code paths leading to `LocalBufferPool` that do 
not care about anti data skew accounting.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to