zhijiangW commented on a change in pull request #12261:
URL: https://github.com/apache/flink/pull/12261#discussion_r428430651
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
##########
@@ -181,6 +181,14 @@ void retriggerSubpartitionRequest(int subpartitionIndex)
throws IOException {
moreAvailable = !receivedBuffers.isEmpty();
}
+ if (next == null) {
Review comment:
Let me further explain it.
The canceler thread will close the `InputGate` in advance so the task thread
might be aware of the released state to exit early. So when the canceler thread
called `RemoteInputChannel#releaseAllResources` before, then all the buffers in
`receivedBuffers` were already drained and recycled.
But the task thread was not aware of this then, and it would probably call
`getNextBuffer` to get a `null` buffer here. We only expect the `null` buffer
in the case of `released` channel, so we throw the expected
`CancelTaskException` to make task thread exit. If not released case, there
must be some logic bugs. E.g this channel notifies gate of available data by
fault. So we throw `IllegalStateException` for such case, to avoid the
misleading `NullPointerException` when reference with the buffer below.
My fix in `#releaseAllResources` is only for avoiding concurrent pulling
`receivedBuffers` by both task thread and canceler thread, which might cause
recycle the same buffer twice and misleading exception thrown by netty stack.
Another option for modifying the logic in `#getNextBuffer` like below:
```
synchronized (receivedBuffers) {
if (isReleased.get()) {
throw new CancelTaskException("Queried for a
buffer after channel has been released.");
}
next = receivedBuffers.poll();
moreAvailable = !receivedBuffers.isEmpty();
}
```
But it might still make sense to also judge whether the buffer is `null` out
of the `synchronized`, which is not the race condition case, and only for
avoiding potential logic bugs in data notification logic to cause misleading
NPE.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]