becketqin commented on pull request #13574:
URL: https://github.com/apache/flink/pull/13574#issuecomment-723460867


   @StephanEwen I think the complexity mostly comes from the consecutive 
`resetToCheckpoint()` calls.
   
   Imagine the following sequence.
   
   1. The RecreateOnResetOperatorCoordinator has a initial internal coordinator 
`coordinator_1`.
   2. `resetToCheckpoint()` is invoked, a new thread `ClosingThread_1` is 
created to:
        a. Close `coordinator_1`.
        b. create a new internal coordinator `coordinator_2` and reset the 
states.
        c. process the pending calls on `coordinator_2` to catch up.
   3. before `ClosingThread_1` finishes its work, `resetToCheckpoint()` is 
invoked again. So `ClosingThread_2` is created to
        a. Close `coordinator_2`.
        b. create a new internal coordinator `coordinator_3` and reset the 
states.
        c. process the pending calls on `coordinator_3` to catch up.
   
   In this case, `coordinator_2` is going to be touched by both 
`ClosingThread_1` and `ClosingThread_2`. And these two threads might race on 
the instance variable `coordinator`. So some synchronization needs to be taken 
care of between multiple closing threads. The epoch was created for this 
purpose, but I agree it is a little confusing.
   
   I think option (b) is a good suggestion. I extended it a bit more to
   1. Created a `DeferrableCoordinator` which contains both the quiesceable 
context and the internal coordinator.
   2. Unified the coordinator to make the instance variable `coordinator` also 
a `DeferrableCoordinator`.
   
   With this change, the threading model becomes:
   1. The scheduler thread is the only thread writing to instance variable 
`coordinator`. And it only interacts with that instance variable.
   2. The old internal coordinator instances are completely isolated from the 
current instance. So the closing thread just need to close them fast.
   
   I have updated the patch with the above changes. Also, the updated patch 
added the implementation of `notifyCheckpointComplete()`. Please let me know if 
you have any concerns. Thanks.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to