pnowojski commented on a change in pull request #17229:
URL: https://github.com/apache/flink/pull/17229#discussion_r750169518



##########
File path: 
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/BatchingStateChangeUploader.java
##########
@@ -206,16 +251,27 @@ private synchronized void setErrorSafe(Throwable t) {
     }
 
     private static UploadTask wrapWithSizeUpdate(
-            UploadTask uploadTask, long preComputedTaskSize, LongAdder 
inflightSize) {
+            UploadTask uploadTask, Runnable releaseCapacityCallback) {
         return new UploadTask(
                 uploadTask.changeSets,
                 result -> {
-                    inflightSize.add(-preComputedTaskSize);
-                    uploadTask.successCallback.accept(result);
+                    try {
+                        releaseCapacityCallback.run();

Review comment:
       Why indirect `Runnable`? Can not we call `releaseCapacity(size)` 
directly here?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -303,6 +304,8 @@
     @GuardedBy("shouldInterruptOnCancelLock")
     private boolean shouldInterruptOnCancel = true;
 
+    private final AvailabilityProvider changelogWriterAvailabilityProvider;

Review comment:
       nit: Re the `null` version. I think I would sleep better with it here. 
Maybe the change is barely visible (1% degradation?) or even if it's not 
visible, it would be a one less potential issue to wonder/investigate/optimise 
for someone in the future.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -474,7 +484,8 @@ protected void processInput(MailboxDefaultAction.Controller 
controller) throws E
         DataInputStatus status = inputProcessor.processInput();
         switch (status) {
             case MORE_AVAILABLE:
-                if (recordWriter.isAvailable()) {
+                if (recordWriter.isAvailable()
+                        && 
changelogWriterAvailabilityProvider.isApproximatelyAvailable()) {

Review comment:
       Why `isApproximatelyAvailable` and not `isAvailable`? Isn't this 
actually an anti optimisation? If the 
`changelogWriterAvailabilityProvider.getAvailableFuture() != AVAILABLE`, it 
should be actually cheaper to check 
`changelogWriterAvailabilityProvider.getAvailableFuture().isDone()` rather then 
suspend mailbox and wake it up via `CompletableFuture#thenRun` callback.

##########
File path: 
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/BatchingStateChangeUploader.java
##########
@@ -206,16 +251,27 @@ private synchronized void setErrorSafe(Throwable t) {
     }
 
     private static UploadTask wrapWithSizeUpdate(
-            UploadTask uploadTask, long preComputedTaskSize, LongAdder 
inflightSize) {
+            UploadTask uploadTask, Runnable releaseCapacityCallback) {
         return new UploadTask(
                 uploadTask.changeSets,
                 result -> {
-                    inflightSize.add(-preComputedTaskSize);
-                    uploadTask.successCallback.accept(result);
+                    try {
+                        releaseCapacityCallback.run();
+                    } finally {
+                        uploadTask.successCallback.accept(result);
+                    }
                 },
                 (result, error) -> {
-                    inflightSize.add(-preComputedTaskSize);
-                    uploadTask.failureCallback.accept(result, error);
+                    try {
+                        releaseCapacityCallback.run();
+                    } finally {
+                        uploadTask.failureCallback.accept(result, error);
+                    }
                 });
     }
+
+    @Override
+    public AvailabilityProvider getAvailabilityProvider() {
+        return availabilityHelper;
+    }

Review comment:
       Can you include here in a comment the explanation why we think it's fine 
to have this unsynchronised, despite that other task threads might actually 
make this unavailable?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to