gaoyunhaii opened a new pull request #14740:
URL: https://github.com/apache/flink/pull/14740


   ## What is the purpose of the change
   
   Modifies the logic of computation of checkpoint brief to support the case 
that have finished tasks. 
   
   **Basic Algorithm**
   
   The algorithm would search the graph from the source vertices to find those 
tasks that
   1. Is still running.
   2. Does not have running precedent tasks.
   
   To reduce the overhead, the search is done in JobVertex level instead of 
task level. It keeps reduce the possible tasks that need to trigger for each 
JobVertex via the observation that
   1. If JobVertex `A` has running tasks and `A->B` via a `ALL_TO_ALL` edges, 
then all the tasks of B must be still running and do not need to be triggered. 
   2. If JobVertex `A` has running tasks and `A->B` via a `POINTWISE` edges, 
then only the descendant tasks of finished tasks of `A` might need to be 
triggered. 
   
   It could be seen the time complexity for the whole algorithm would be linear 
to the number of tasks instead of execution edges.
   
   **Considering the unordered finished report**
   
   Another note about the algorithm is that since each tasks would report 
FINISHED to JM without coordination, thus for graph like `A -> B -> C`, it is 
possible that JobMaster first received the `FINISHED` report of `A` and `C` 
before `B`.  This might cause problem for the basic algorithm. Thus we would 
first iterates the graph reversely to find the accurate set of running tasks 
according to the observation that:
   1. If one task is finished, all its precedent tasks are finished.
   
   The iteration is also done in JobVertex level, like the basic algorithm. 
Thus as a whole, the computation need to iterates the graph for two times.
   
   **Flag to disable checkpoints after tasks**
   
   Checkpoints after tasks finished need to be enabled as a whole after we have 
finished all the work for JM, TM and failover, otherwise it may cause failed 
checkpoints or in-consistent checkpoint. To avoid this issue, we would 
introduce a temporary flag in `CheckpointCoordinator` to disable checkpoint 
after tasks finished in formal process for now and would remove the flags after 
the whole development is done.
   
   ## Brief change log
   
   - e6bb7ea2b9ecde7e33fd4412dac891a272f790a6 modifies the formal process
   - 2e4dd30cea5745047c2fda23ddc3bda442ecb30a fixed the existing tests
   - fc9eebe10a60640303df43899888d783af9293a0 added new tests for the changed 
logic.
   
   ## Verifying this change
   
   - Added UT to verify the computation of checkpoint brief and checkpoint 
status tracker with finished tasks.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): **no**
     - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **no**
     - The serializers: **no**
     - The runtime per-record code paths (performance sensitive): **no**
     - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: **no**
     - The S3 file system connector: **no**
   
   ## Documentation
   
     - Does this pull request introduce a new feature? **no**
     - If yes, how is the feature documented? **not applicable**
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to