Github user bjlovegithub commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2629#discussion_r86517940
  
    --- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 ---
    @@ -540,15 +540,12 @@ private boolean performCheckpoint(CheckpointMetaData 
checkpointMetaData) throws
     
                synchronized (lock) {
                        if (isRunning) {
    +                           checkpointState(checkpointMetaData);
     
    -                           // Since both state checkpointing and 
downstream barrier emission occurs in this
    -                           // lock scope, they are an atomic operation 
regardless of the order in which they occur.
    -                           // Given this, we immediately emit the 
checkpoint barriers, so the downstream operators
    -                           // can start their checkpoint work as soon as 
possible
    +                           // broadcast barriers after snapshot operators' 
states.
                                operatorChain.broadcastCheckpointBarrier(
    -                                           
checkpointMetaData.getCheckpointId(), checkpointMetaData.getTimestamp());
    -
    -                           checkpointState(checkpointMetaData);
    +                                           
checkpointMetaData.getCheckpointId(), checkpointMetaData.getTimestamp()
    +                           );
    --- End diff --
    
    Yes. We have discussed this problem, and the solution will make sure that 
we will not change the order of broadcasting barriers and then snapshotting.
    The inelegant way is to stop all `Emitter` Thread first before broadcasting 
barrier. But it is a little tricky.
    Another way is to change checkpoint lock into ReentrantReadWriteLock. For 
main thread and emitter thread, they have to acquire read lock. But for 
checkpoint thread or checkpoint procedure, write lock should be taken first, so 
that all emitter threads also stop working. In this way, main thread and all 
emitter thread will not block each other.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to