wsry commented on a change in pull request #13924:
URL: https://github.com/apache/flink/pull/13924#discussion_r601060222
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeSubpartitionReader.java
##########
@@ -176,21 +202,28 @@ public void resumeConsumption() {
@Override
public Throwable getFailureCause() {
- // we can never throw an error after this was created
- return null;
+ synchronized (lock) {
+ return failureCause;
+ }
}
@Override
public boolean isAvailable(int numCreditsAvailable) {
- if (numCreditsAvailable > 0) {
- return !buffersRead.isEmpty();
- }
+ synchronized (lock) {
+ if (isReleased) {
+ return true;
+ }
- return !buffersRead.isEmpty() && !buffersRead.peek().isBuffer();
+ if (numCreditsAvailable > 0) {
+ return !buffersRead.isEmpty();
+ }
+
+ return !buffersRead.isEmpty() && !buffersRead.peek().isBuffer();
+ }
}
@Override
public int unsynchronizedGetNumberOfQueuedBuffers() {
- return 0;
+ return buffersRead.size();
Review comment:
Because the method has unsynchronized prefix, I think the reason is to
avoid getting metrics influence the performance. The PipelinedSubpartition does
similar things. Maybe we should also use Math.max(buffersRead.size(), 0) like
the PipelinedSubpartition.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]