jiangxin369 commented on code in PR #23851:
URL: https://github.com/apache/flink/pull/23851#discussion_r1466115976


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/SortBufferAccumulator.java:
##########
@@ -94,29 +99,58 @@ public class SortBufferAccumulator implements 
BufferAccumulator {
     @Nullable
     private TriConsumer<TieredStorageSubpartitionId, Buffer, Integer> 
accumulatedBufferFlusher;
 
+    /**
+     * An executor to periodically check the size of buffer pool. If the size 
is changed, the
+     * accumulated buffers should be flushed to release the buffers.
+     */
+    private final ScheduledExecutorService periodicalAccumulatorFlusher =
+            Executors.newSingleThreadScheduledExecutor(

Review Comment:
   If we register a listener for each LBP, when a redistribution happens, all 
listeners need to do some actions(write operations) and these actions need to 
acquire their own locks. So we have to coordinate the lock 
`availableMemorySegments` in `NetworkBufferPool` and all other locks. I tried 
listen-and-notify strategy at first but found that it is very easy to cause a 
deadlock. 
   I agree that a listen-and-notify strategy is better if we can acquire the 
lock `availableMemorySegments` in each listener to avoid the deadlock, but 
exposing that lock object should be careful and I think it is out of scope.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to