[ https://issues.apache.org/jira/browse/FLINK-27849?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Weijie Guo updated FLINK-27849: ------------------------------- Affects Version/s: 2.1.0 > Harden correctness for non-deterministic updates present in the changelog > pipeline > ---------------------------------------------------------------------------------- > > Key: FLINK-27849 > URL: https://issues.apache.org/jira/browse/FLINK-27849 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime > Affects Versions: 2.1.0 > Reporter: lincoln lee > Assignee: lincoln lee > Priority: Major > Fix For: 2.0.0 > > > There commonly exists updates(which means not only RowKind.INSERT messages) > in a streaming pipeline, then wrong results or error may occurs when use some > non-deterministic functions or operations. > It is a long lived issue since the first day that flink sql was available in > streaming, but it still not totally be eliminated though some efforts have > been taken. > We should detect all the non-deterministic operations in the changelog > pipelines, raise an error to tell users the risk and also add an mechanism > that can process such a issue if a user is willing to pay some cost(probably > introduce the state). > All non-deterministic operations include builtin temporal functions(now, > current_timestamp...), UUID, RAND... > or user defined non-deterministic functions (override isDeterministic return > false) > or a lookup join on a lookup source which data may change over time > or a cdc-source with meta data field (described in FLINK-28242) > > > ====== Solution ====== > Will introduce a physical plan checker to validate if there's any > non-deterministic updates which may cause wrong result, and also a physical > plan rewriter to eliminate the non determinism generated by lookup join node > (which we think is commonly used in sql, and hard to solve by users > themselves). > For implementation steps, the main changes may include 4 parts: > # [preparing work] Adds an internal postOptimize method for physical dag > processing > # Introduces a `StreamNonDeterministicPlanResolver` to validate if there's > any non-deterministic updates which may cause wrong result and rewrite lookup > join node with materialization (to eliminate the non determinism generated by > lookup join node) > # Implements a new lookup join operator (sync mode only) with state to > eliminate the non determinism > # [optimization] SinkUpsertMaterializer should be aware of the input > upsertKey if it is not empty > > > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)