[
https://issues.apache.org/jira/browse/FLINK-27849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17572773#comment-17572773
]
lincoln lee commented on FLINK-27849:
-------------------------------------
Thanks to [~jark] [~godfrey] [~jinsong] for all your contributions and
valuable suggestions during the long discussion offline, I have written a
simple doc of the final solution in: https://docs.google.com/document/d/
1uFBYqyXJxuNhhB37ydGniQMF2JkS3DtGZMU4oBTJ03U
Before releasing 1.16, we also need to prepare a formal user documentation
FLINK-28738.
> Harden correctness for non-deterministic updates present in the changelog
> pipeline
> ----------------------------------------------------------------------------------
>
> Key: FLINK-27849
> URL: https://issues.apache.org/jira/browse/FLINK-27849
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Runtime
> Reporter: lincoln lee
> Assignee: lincoln lee
> Priority: Major
> Fix For: 1.16.0
>
>
> There commonly exists updates(which means not only RowKind.INSERT messages)
> in a streaming pipeline, then wrong results or error may occurs when use some
> non-deterministic functions or operations.
> It is a long lived issue since the first day that flink sql was available in
> streaming, but it still not totally be eliminated though some efforts have
> been taken.
> We should detect all the non-deterministic operations in the changelog
> pipelines, raise an error to tell users the risk and also add an mechanism
> that can process such a issue if a user is willing to pay some cost(probably
> introduce the state).
> All non-deterministic operations include builtin temporal functions(now,
> current_timestamp...), UUID, RAND...
> or user defined non-deterministic functions (override isDeterministic return
> false)
> or a lookup join on a lookup source which data may change over time
> or a cdc-source with meta data field (described in FLINK-28242)
>
>
> ====== Solution ======
> Will introduce a physical plan checker to validate if there's any
> non-deterministic updates which may cause wrong result, and also a physical
> plan rewriter to eliminate the non determinism generated by lookup join node
> (which we think is commonly used in sql, and hard to solve by users
> themselves).
> For implementation steps, the main changes may include 4 parts:
> # [preparing work] Adds an internal postOptimize method for physical dag
> processing
> # Introduces a `StreamNonDeterministicPlanResolver` to validate if there's
> any non-deterministic updates which may cause wrong result and rewrite lookup
> join node with materialization (to eliminate the non determinism generated by
> lookup join node)
> # Implements a new lookup join operator (sync mode only) with state to
> eliminate the non determinism
> # [optimization] SinkUpsertMaterializer should be aware of the input
> upsertKey if it is not empty
>
>
>
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)