zhijiangW commented on a change in pull request #9905: [FLINK-14396][network]
Implement rudimentary non-blocking network output
URL: https://github.com/apache/flink/pull/9905#discussion_r335561897
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
##########
@@ -299,6 +313,15 @@ private NotificationResult
fireBufferAvailableNotification(BufferListener listen
return notificationResult;
}
+ /**
+ * @return true if there is no available buffers in queue and the
global quota is also exhausted.
+ */
+ private boolean isUnavailable() {
+ assert Thread.holdsLock(availableMemorySegments);
+
+ return availableMemorySegments.isEmpty() &&
numberOfRequestedMemorySegments == currentPoolSize;
Review comment:
> I think that we can not do, as I think this the mechanism that allows for
Flink jobs to run with less then desired amount of buffers (for example only 1
buffer per channel instead of 2)
My above proposal might not effect this behavior. E.g. Assuming we request
100 buffers together from global pool in eager way while constructing the
`LocalBufferPool`, and 50 buffers are used in practice and 50 buffers are left
in available queue in local pool. If the core size is decreased into 50 because
of other new `LocalBufferPool` is constructed, then the 50 available buffers in
first local pool could be returned to global pool immediately. If all the 100
buffers in the first local pool are used actually, then even though we request
the buffer in lazy way as now, the case is still the same that we have to wait
50 buffers to be recycled into global pool. The benefits of eager way is that
we could only rely on the condition of `availableMemorySegments.isEmpty()` to
judge the available state at the beginning.
> I was thinking more towards the direction of changing the interaction
between LocalBufferPool and NetworkBufferPool, so that if LocalBufferPool is
requesting more buffers from the NetworkBufferPool and it doesn't get any, it
could register some callback/retrieve some CompletableFuture that would be
completed if some buffers became available (currently NetworkBufferPool returns
just null).
I approve this way and it has the same effect as the listener way I
mentioned before. And this mechanism would be needed for two purposes:
- We want to judge the available state only by
`availableMemorySegments.isEmpty()`.
- We want to get ride of `availableMemorySegments.wait(2000)` in local pool.
> However there is one more thing. Maybe in the MVP we can live without it.
If code will block more often than it could, but the backpressure mechanism
based on isAvailable() will be still working correctly AND we will not
deadlock, that's I think acceptable for now. This current version blocks more
often and doesn't deadlock, but I think that currently isAvailable() might be
completed, while the caller is still blocked on
availableMemorySegments.wait(2000);, which would introduce some issues with the
backpressure monitor, right?
ATM the backpressure mechanism still works because we have not changed the
processor logic to rely on the output status before processing input. But after
we refactor the `StreamTask/StreamInputProcessor` future, that means
`StreamTask` would not call `processInput` if the output is unavailable, then
we have to change the backpressure monitor. In theory it has to monitor both
`RecordWriter#isAvailable` and also `availableMemorySegments.wait(2000)` to
cover all the cases.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services