gaoyunhaii opened a new pull request #16655:
URL: https://github.com/apache/flink/pull/16655


   ## What is the purpose of the change
   
   This PR checks the illegal modification when restoring from a checkpoint 
with partly-finished operators.
   
   **Bookkeeper the finished task state** 
   
   To implement this PR, we need first store the subtask finished state in the 
checkpoint. We could not directly use null state as flags of finished, since it 
could not distinguish the finished one with the operators without state and do 
not chained with other stateful operators. Another possible options might be 
let task always report an empty state, but we might not want to affect the 
normal checkpoints without the finished tasks.
   
   The current implementation stores an explicit flag for the finished tasks, 
but it seems both of the two options is ok to me. 
   
   **Check the illegal modification**
   
   The check is mainly to avoid that for keyed state, after some tasks finished 
and the corresponding keyed state is discarded, the topology is modified and a 
new predecessor is added and re-emitted the records with the discarded keys. 
The currently logic is that:
   
   1. The predecessors of a fully finished vertex must also be fully finished.
   2. The processors of a partly finished vertex connected via ALL_TO_ALL edges 
must be fully finished if the vertex is partly finished.
   3. The processors of a partly finished vertex connected via POINTWISE edges 
must be partly finished or fully finished if the vertex is partly finished.
   
   The logic of the check is to ensure:
   1. If there is no change and only rescaling, the check must be passed.
   2. If there are changes to the topology that causes records to the discarded 
keys, it must be forbid. But the forbidden operations might not indeed cause 
records to the discarded keys. 
   
   Currently there is only one bad case is that for a graph composed of 
`source1 -> forward() -> map1` and `source2 -> forword with key -> map2[with 
keyed state]` and `source2` and `map2` are connected via 
`DataStreamUtils#reinterpretAsKeyedStream()`, and after all the vertices become 
partly finished, users create a new graph with `source1 -> forward with key -> 
map2[with keyed state]`. It is not very easy to judge this situation, however, 
in considering the probability of this case is extremely low and the API is 
only for sophisticated users, perhaps we could first only tip this in the 
document. 
   
   ## Brief change log
   
   - e4992b55912beb45273b8555393bbd9709eb7529 including the finished flag for 
the finished subtask in the checkpoint.
   - cafe182574694b1f708cb9e8318fd54d4ef5ce9f checks for the illegal 
modification when restoring with partly-finished state.
   
   
   ## Verifying this change
   
   This change added tests and can be verified with the added UT.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): **no**
     - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **no**
     - The serializers: **no**
     - The runtime per-record code paths (performance sensitive): **no**
     - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: **yes**
     - The S3 file system connector: **no**
   
   ## Documentation
   
     - Does this pull request introduce a new feature? **no**
     - If yes, how is the feature documented? **not applicable**


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to