pnowojski commented on a change in pull request #17229:
URL: https://github.com/apache/flink/pull/17229#discussion_r739166324



##########
File path: 
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/UploadThrottle.java
##########
@@ -60,7 +70,22 @@ public void seizeCapacity(long bytes) throws 
InterruptedException {
     public void releaseCapacity(long bytes) {
         synchronized (lock) {
             inFlightBytesCounter -= bytes;
+            if (hasCapacity() && !isAvailable()) {
+                availabilityFuture.complete(null);
+                availabilityFuture = AVAILABLE;
+            }
             lock.notifyAll();
         }
     }
+
+    private boolean hasCapacity() {
+        return inFlightBytesCounter < maxBytesInFlight;
+    }
+
+    @Override
+    public CompletableFuture<?> getAvailableFuture() {
+        synchronized (lock) {
+            return availabilityFuture;
+        }
+    }

Review comment:
       As we discussed offline, I think it should be fine. Task should 
establish sooner or later happens-before relation on this variable when 
accessing this lock via other methods.
   
   But in such case, that would require a larger comment to explain this "semi 
correct optimisation".




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to