gaoyunhaii commented on a change in pull request #15055:
URL: https://github.com/apache/flink/pull/15055#discussion_r657827783



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
##########
@@ -271,15 +271,22 @@ private ResultSubpartitionView 
checkAndWaitForSubpartitionView() {
 
     @Override
     public void resumeConsumption() {
-        checkState(!isReleased, "Channel released.");

Review comment:
       This is due to currently there is a logic that when the upstream task 
have emitted CheckpointBarrier it would be marked as block, and after the 
downstream task have aligned CheckpointBarrier, the downstream task would 
notify the upstream task to unblock the channel. In this way we could avoid 
processing the records after checkpoint barrier in exactly-once case.
   
   Since now we would also trigger checkpoint after some input channels have 
received EndOfPartition event and finished, in this case, we would need to skip 
these channels on unblock since their corresponding upstream tasks have already 
finished.
   
   There would be two ways to achieve this, one is we skipped these channels in 
`CheckpointBarrierHandler` and the second is to skip the channels in specific 
`InputChannel`. The first option would requires maintains and pass additional 
status in `CheckpointBarrierHandler`, which seems to be repeat with the status 
maintained in `InputChannel`, thus I think we might directly ignore the call if 
the channel has finished. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to