[ 
https://issues.apache.org/jira/browse/FLINK-33962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17801976#comment-17801976
 ] 

Xintong Song commented on FLINK-33962:
--------------------------------------

Thanks for reaching out, [~Zhanghao Chen].

Just some quick responses, I would need to look a bit more into the related 
components before giving further comments.

Based on your description, in general I think it makes sense to make operator 
id generation independent from chaining. However, as you have already 
mentioned, this is a breaking change that may result in state incompatibility. 
Therefore, I think it deserves a FLIP discussion and an official vote.

> Chaining-agnostic OperatorID generation for improved state compatibility on 
> parallelism change
> ----------------------------------------------------------------------------------------------
>
>                 Key: FLINK-33962
>                 URL: https://issues.apache.org/jira/browse/FLINK-33962
>             Project: Flink
>          Issue Type: Improvement
>          Components: API / Core
>            Reporter: Zhanghao Chen
>            Priority: Major
>
> *Background*
> Flink restores opeartor state from snapshots based on matching the 
> operatorIDs. Since Flink 1.2, {{StreamGraphHasherV2}} is used for operatorID 
> generation when no user-set uid exists. The generated OperatorID is 
> deterministic with respect to:
>  * node-local properties (the traverse ID in the BFS for the stream graph)
>  * chained output nodes
>  * input nodes hashes
> *Problem*
> The chaining behavior will affect state compatibility, as the generation of 
> the OperatorID of an Op is dependent on its chained output nodes. For 
> example, a simple source->sink DAG with source and sink chained together is 
> state imcompatible with an otherwise identical DAG with source and sink 
> unchained (either because the parallelisms of the two ops are changed to be 
> unequal or chaining is disabled). This greatly limits the flexibility to 
> perform chain-breaking/joining for performance tuning.
> *Proposal*
> Introduce {{StreamGraphHasherV3}} that is agnostic to the chaining behavior 
> of operators, which effectively just removes L227-235 of 
> [flink/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasherV2.java
>  at master · apache/flink 
> (github.com)|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasherV2.java].
>  
> This will not hurt the deteministicity of the ID generation across job 
> submission as long as the stream graph topology doesn't change, and since new 
> versions of Flink have already adopted pure operator-level state recovery, 
> this will not break state recovery across job submission as long as both 
> submissions use the same hasher.
> This will, however, break cross-version state compatibility. So we can 
> introduce a new option to enable using HasherV3 in v1.19 and consider making 
> it the default hasher in v2.0.
> Looking forward to suggestions on this.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to