zhijiangW commented on a change in pull request #9905: [FLINK-14396][network] 
Implement rudimentary non-blocking network output
URL: https://github.com/apache/flink/pull/9905#discussion_r335561897
 
 

 ##########
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
 ##########
 @@ -299,6 +313,15 @@ private NotificationResult 
fireBufferAvailableNotification(BufferListener listen
                return notificationResult;
        }
 
+       /**
+        * @return true if there is no available buffers in queue and the 
global quota is also exhausted.
+        */
+       private boolean isUnavailable() {
+               assert Thread.holdsLock(availableMemorySegments);
+
+               return availableMemorySegments.isEmpty() && 
numberOfRequestedMemorySegments == currentPoolSize;
 
 Review comment:
   > I think that we can not do, as I think this the mechanism that allows for 
Flink jobs to run with less then desired amount of buffers (for example only 1 
buffer per channel instead of 2)
   
   My above proposal might not effect this behavior. E.g. Assuming we request 
100 buffers together from global pool in eager way while constructing the 
`LocalBufferPool`, and 50 buffers are used in practice and 50 buffers are left 
in available queue in local pool. If the core size is decreased into 50 because 
of other new `LocalBufferPool` is constructed, then the 50 available buffers in 
first local pool could be returned to global pool immediately. If all the 100 
buffers in the first local pool are used actually, then even though we request 
the buffer in lazy way as now, the case is still the same that we have to wait 
50 buffers to be recycled into global pool. The benefits of eager way is that 
we could only rely on the condition of `availableMemorySegments.isEmpty()` to 
judge the available state at the beginning.
   
   > I was thinking more towards the direction of changing the interaction 
between LocalBufferPool and NetworkBufferPool, so that if LocalBufferPool is 
requesting more buffers from the NetworkBufferPool and it doesn't get any, it 
could register some callback/retrieve some CompletableFuture that would be 
completed if some buffers became available (currently NetworkBufferPool returns 
just null).
   
   I approve this way and it has the same effect as the listener way I 
mentioned before.  And this mechanism would be needed for two purposes:
   
   - We want to judge the available state only by 
`availableMemorySegments.isEmpty()`.
   
   - We want to get ride of `availableMemorySegments.wait(2000)` in local pool.
   
   > However there is one more thing. Maybe in the MVP we can live without it. 
If code will block more often than it could, but the backpressure mechanism 
based on isAvailable() will be still working correctly AND we will not 
deadlock, that's I think acceptable for now. This current version blocks more 
often and doesn't deadlock, but I think that currently isAvailable() might be 
completed, while the caller is still blocked on 
availableMemorySegments.wait(2000);, which would introduce some issues with the 
backpressure monitor, right?
   
   ATM the backpressure mechanism still works because we have not changed the 
processor logic to rely on the output status before processing input. But after 
we refactor the `StreamTask/StreamInputProcessor` future, that means 
`StreamTask` would not call `processInput` if the output is unavailable, then 
we have to change the backpressure monitor. In theory it has to monitor both 
`RecordWriter#isAvailable` and also `availableMemorySegments.wait(2000)` to 
cover all the cases. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to