wsry commented on a change in pull request #13924:
URL: https://github.com/apache/flink/pull/13924#discussion_r601060222



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeSubpartitionReader.java
##########
@@ -176,21 +202,28 @@ public void resumeConsumption() {
 
     @Override
     public Throwable getFailureCause() {
-        // we can never throw an error after this was created
-        return null;
+        synchronized (lock) {
+            return failureCause;
+        }
     }
 
     @Override
     public boolean isAvailable(int numCreditsAvailable) {
-        if (numCreditsAvailable > 0) {
-            return !buffersRead.isEmpty();
-        }
+        synchronized (lock) {
+            if (isReleased) {
+                return true;
+            }
 
-        return !buffersRead.isEmpty() && !buffersRead.peek().isBuffer();
+            if (numCreditsAvailable > 0) {
+                return !buffersRead.isEmpty();
+            }
+
+            return !buffersRead.isEmpty() && !buffersRead.peek().isBuffer();
+        }
     }
 
     @Override
     public int unsynchronizedGetNumberOfQueuedBuffers() {
-        return 0;
+        return buffersRead.size();

Review comment:
       Because the method has unsynchronized prefix, I think the reason is to 
avoid getting metrics influence the performance. The PipelinedSubpartition does 
similar things. Maybe we should also use Math.max(buffersRead.size(), 0) like 
the PipelinedSubpartition.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to