pnowojski commented on a change in pull request #17229:
URL: https://github.com/apache/flink/pull/17229#discussion_r738477646



##########
File path: 
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/UploadThrottle.java
##########
@@ -60,7 +70,22 @@ public void seizeCapacity(long bytes) throws 
InterruptedException {
     public void releaseCapacity(long bytes) {
         synchronized (lock) {
             inFlightBytesCounter -= bytes;
+            if (hasCapacity() && !isAvailable()) {
+                availabilityFuture.complete(null);
+                availabilityFuture = AVAILABLE;
+            }
             lock.notifyAll();
         }
     }
+
+    private boolean hasCapacity() {
+        return inFlightBytesCounter < maxBytesInFlight;
+    }
+
+    @Override
+    public CompletableFuture<?> getAvailableFuture() {
+        synchronized (lock) {
+            return availabilityFuture;
+        }
+    }

Review comment:
       But `RecordWriter#getAvailableFuture` (and 
`LocalBufferPool#getAvailableFuture`, and for that matter every other 
`AvailabilityProvider`) do not need extra locking. 
   
   I think a small difference here is that in all the pre-existing use cases, 
availability of a given task can change from `available`->`not available` only 
as a result of that task's own actions. With shared changelogs, changelog can 
become unavailable as a result of a different Task's action. 
   
   Either way, this lock doesn't really prevent from any race conditions, as 
this future can become outdated 1ns after releasing the lock. We would either 
need to implement some kind of complicated "reserving" mechanism, to book and 
guarantee availability, or we need to accept that occasionally this mechanism 
will fail and we can relay on the fallback that changelog will block a write 
call, blocking task in the process.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to