[
https://issues.apache.org/jira/browse/FLINK-38740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18041724#comment-18041724
]
lincoln lee commented on FLINK-38740:
-------------------------------------
[~dylanhz] Thanks for explaining!
Regarding the first point: IIUC, we don't need to guarantee this level of
compatibility because {{{}CompiledPlan{}}}[1] can be translated directly into a
JobGraph, bypassing the planner’s optimization phase (which is where the
incompatible changes are introduced).
As for the second point: is the difference simply due to improved accuracy? If
so, I think it’s acceptable.
[1].
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=191336489#FLIP190:SupportVersionUpgradesforTableAPI&SQLPrograms-Scoping
> STDDEV related functions may produce invalid results
> ----------------------------------------------------
>
> Key: FLINK-38740
> URL: https://issues.apache.org/jira/browse/FLINK-38740
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / API
> Reporter: dylanhz
> Assignee: dylanhz
> Priority: Major
>
> Currently, Flink uses Calcite's AggregateReduceFunctionsRule to reduce
> STDDEV_POP/STDDEV_SAMP/VAR_POP/VAR_SAMP to SUMs and COUNTs. For example:
> {code:java}
> STDDEV_POP(x) = SQRT((SUM(x * x) - SUM(x) * SUM(x) / COUNT(x)) / COUNT(x))
> {code}
> This rewrite rule is known as a naive one-pass algorithm for variance
> calculation. However, it suffers from significant precision loss and may
> produce invalid results given the limited precision of Java double and float
> types. See [1] for reference. For example in Flink:
> {code:sql}
> > select stddev_pop(x) from (values (cast(0.27 as double)), (cast(0.27 as
> > double)), (cast(0.27 as double))) as T(x)
> -- NaN
> > select var_pop(x) from (values (cast(0.27 as double)), (cast(0.27 as
> > double)), (cast(0.27 as double))) as T(x)
> -- -9.25185853854297E-18
> > select stddev_pop(x) from (values (cast(100000 as int)), (cast(200000 as
> > int)), (cast(300000 as int))) as T(x)
> -- 0
> {code}
> Most systems such as Spark/PostgreSQL/MySQL have turned to use a more
> accurate and stable approach called Welford's online algorithm, you can find
> its definition in [1]. I'm considering introducing this into Flink as well.
> More specifically, we can introduce a new internal built-in function like
> WELFORD_M2 to calculate the m2 term in the algorithm, and then rewrite the
> functions as follows to avoid repeated calculations:
> {code:java}
> stddev_pop(x) ==> power(m2 / count(x), .5)
> stddev_samp(x) ==> power(m2 / nullif(count(x) - 1, 0), .5)
> var_pop(x) ==> m2 / count(x)
> var_samp(x) ==> m2 / nullif(count(x) - 1, 0)
> {code}
> Since this proposal will introduce a breaking change, I think we should
> discuss the following before I create the pull request:
> 1. Should we provide a configuration option to fall back to the legacy
> algorithm for backward compatibility?
> 2. What should be the intermediate result type?
> - Option A: Use decimal for decimal input and double for all other numeric
> types
> - Option B: Use double for all types
> - Option C: Other ideas/suggestions are welcome
> [1][variance
> calculation|https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Welford's_online_algorithm]
--
This message was sent by Atlassian Jira
(v8.20.10#820010)