pnowojski commented on a change in pull request #9905: [FLINK-14396][network]
Implement rudimentary non-blocking network output
URL: https://github.com/apache/flink/pull/9905#discussion_r335320704
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
##########
@@ -299,6 +313,15 @@ private NotificationResult
fireBufferAvailableNotification(BufferListener listen
return notificationResult;
}
+ /**
+ * @return true if there is no available buffers in queue and the
global quota is also exhausted.
+ */
+ private boolean isUnavailable() {
+ assert Thread.holdsLock(availableMemorySegments);
+
+ return availableMemorySegments.isEmpty() &&
numberOfRequestedMemorySegments == currentPoolSize;
Review comment:
> Maybe a feasible way to avoid waiting on requesting buffers from global
pool is that the availableMemorySegments is filled immediately while
constructing the LocalBufferPool in eager way, not current lazy way based on
emitting data.
I think that we can not do, as I think this the mechanism that allows for
Flink jobs to run with less then desired amount of buffers (for example only 1
buffer per channel instead of 2).
I was thinking more towards the direction of changing the interaction
between `LocalBufferPool` and `NetworkBufferPool`, so that if `LocalBufferPool`
is requesting more buffers from the `NetworkBufferPool` and it doesn't get any,
it could register some callback/retrieve some `CompletableFuture` that would be
completed if some buffers became available (currently `NetworkBufferPool`
returns just null).
However there is one more thing. Maybe in the MVP we can live without it. If
code will block more often than it could, but the backpressure mechanism based
on `isAvailable()` will be still working correctly AND we will not deadlock,
that's I think acceptable for now. This current version blocks more often and
doesn't deadlock, but I think that currently `isAvailable()` might be
completed, while the caller is still blocked on
`availableMemorySegments.wait(2000);`.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services