pnowojski commented on a change in pull request #17229:
URL: https://github.com/apache/flink/pull/17229#discussion_r738477646
##########
File path:
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/UploadThrottle.java
##########
@@ -60,7 +70,22 @@ public void seizeCapacity(long bytes) throws
InterruptedException {
public void releaseCapacity(long bytes) {
synchronized (lock) {
inFlightBytesCounter -= bytes;
+ if (hasCapacity() && !isAvailable()) {
+ availabilityFuture.complete(null);
+ availabilityFuture = AVAILABLE;
+ }
lock.notifyAll();
}
}
+
+ private boolean hasCapacity() {
+ return inFlightBytesCounter < maxBytesInFlight;
+ }
+
+ @Override
+ public CompletableFuture<?> getAvailableFuture() {
+ synchronized (lock) {
+ return availabilityFuture;
+ }
+ }
Review comment:
But `RecordWriter#getAvailableFuture` (and
`LocalBufferPool#getAvailableFuture`, and for that matter every other
`AvailabilityProvider`) do not need extra locking.
I think a small difference here is that in all the pre-existing use cases,
availability of a given task can change from `available`->`not available` only
as a result of that task's own actions. With shared changelogs, changelog can
become unavailable as a result of a different Task's action.
Either way, this lock doesn't really prevent from any race conditions, as
this future can become outdated 1ns after releasing the lock. We would either
need to implement some kind of complicated "reserving" mechanism, to book and
guarantee availability, or we need to accept that occasionally this mechanism
will fail and we can relay on the fallback that changelog will block a write
call, blocking task in the process.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]