gaoyunhaii commented on a change in pull request #15055:
URL: https://github.com/apache/flink/pull/15055#discussion_r657827783
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
##########
@@ -271,15 +271,22 @@ private ResultSubpartitionView
checkAndWaitForSubpartitionView() {
@Override
public void resumeConsumption() {
- checkState(!isReleased, "Channel released.");
Review comment:
This is due to currently there is a logic that when the upstream task
have emitted CheckpointBarrier it would be marked as block, and after the
downstream task have aligned CheckpointBarrier, the downstream task would
notify the upstream task to unblock the channel. In this way we could avoid
processing the records after checkpoint barrier in exactly-once case.
Since now we would also trigger checkpoint after some input channels have
received EndOfPartition event and finished, in this case, we would need to skip
these channels on unblock since their corresponding upstream tasks have already
finished.
There would be two ways to achieve this, one is we skipped these channels in
`CheckpointBarrierHandler` and the second is to skip the channels in specific
`InputChannel`. The first option would requires maintains and pass additional
status in `CheckpointBarrierHandler`, which seems to be repeat with the status
maintained in `InputChannel`, thus I think we might directly ignore the call if
the channel has finished.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]