rkhachatryan commented on a change in pull request #17229:
URL: https://github.com/apache/flink/pull/17229#discussion_r738461042



##########
File path: 
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/UploadThrottle.java
##########
@@ -60,7 +70,22 @@ public void seizeCapacity(long bytes) throws 
InterruptedException {
     public void releaseCapacity(long bytes) {
         synchronized (lock) {
             inFlightBytesCounter -= bytes;
+            if (hasCapacity() && !isAvailable()) {
+                availabilityFuture.complete(null);
+                availabilityFuture = AVAILABLE;
+            }
             lock.notifyAll();
         }
     }
+
+    private boolean hasCapacity() {
+        return inFlightBytesCounter < maxBytesInFlight;
+    }
+
+    @Override
+    public CompletableFuture<?> getAvailableFuture() {
+        synchronized (lock) {
+            return availabilityFuture;
+        }
+    }

Review comment:
       My intention was to only call this method if `isApproximatelyAvailable` 
returned false (and input and output are unavailable). 
   
   But I think you're right, it's not an issue if `StreamTask` gets an outdated 
version of available future.
   
   The class has to be thread-safe because it's used by multiple tasks and 
uploader threads. 
   IIUC, `RecordWriter` requests memory which also uses `synchronized` section 
eventually in the buffer pool.
   Please also keep in mind that methods other than checking availability are 
not in the hot path (uploader runs in the background).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to