[ 
https://issues.apache.org/jira/browse/FLINK-27849?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo updated FLINK-27849:
-------------------------------
    Affects Version/s: 2.1.0

> Harden correctness for non-deterministic updates present in the changelog 
> pipeline
> ----------------------------------------------------------------------------------
>
>                 Key: FLINK-27849
>                 URL: https://issues.apache.org/jira/browse/FLINK-27849
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Runtime
>    Affects Versions: 2.1.0
>            Reporter: lincoln lee
>            Assignee: lincoln lee
>            Priority: Major
>             Fix For: 2.0.0
>
>
> There commonly exists updates(which means not only RowKind.INSERT messages) 
> in a streaming pipeline, then wrong results or error may occurs when use some 
> non-deterministic functions or operations.
> It is a long lived issue since the first day that flink sql was available in 
> streaming, but it still not totally be eliminated though some efforts have 
> been taken.
> We should detect all the non-deterministic operations in the changelog 
> pipelines, raise an error to tell users the risk and also add an mechanism 
> that can process such a issue if a user is willing to pay some cost(probably 
> introduce the state).
> All non-deterministic operations include builtin temporal functions(now, 
> current_timestamp...), UUID, RAND... 
> or user defined non-deterministic functions (override isDeterministic return 
> false)
> or a lookup join on a lookup source which data may change over time
> or a cdc-source with meta data field (described in FLINK-28242)
>  
>  
> ====== Solution  ======
> Will introduce a physical plan checker to validate if there's any 
> non-deterministic updates which may cause wrong result, and also a physical 
> plan rewriter to eliminate the non determinism generated by lookup join node 
> (which we think is commonly used in sql, and hard to solve by users 
> themselves).
> For implementation steps, the main changes may include 4 parts:
>  # [preparing work] Adds an internal postOptimize method for physical dag 
> processing
>  # Introduces a `StreamNonDeterministicPlanResolver` to validate if there's 
> any non-deterministic updates which may cause wrong result and rewrite lookup 
> join node with materialization (to eliminate the non determinism generated by 
> lookup join node)
>  # Implements a new lookup join operator (sync mode only) with state to 
> eliminate the non determinism
>  # [optimization] SinkUpsertMaterializer should be aware of the input 
> upsertKey if it is not empty
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to