[ 
https://issues.apache.org/jira/browse/FLINK-38740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18042181#comment-18042181
 ] 

dylanhz commented on FLINK-38740:
---------------------------------

[~lincoln.86xy] Thanks for the clarification about compatibility! In this case, 
we don't have to worry about compatibility, and the legacy option is 
unnecessary as well. 

And for the second point, yes, the result differs due to improved accuracy.

There are still some precision issues with decimal computation, but I think we 
can discuss them further in the pull request.

> STDDEV related functions may produce invalid results
> ----------------------------------------------------
>
>                 Key: FLINK-38740
>                 URL: https://issues.apache.org/jira/browse/FLINK-38740
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / API
>            Reporter: dylanhz
>            Assignee: dylanhz
>            Priority: Major
>
> Currently, Flink uses Calcite's AggregateReduceFunctionsRule to reduce 
> STDDEV_POP/STDDEV_SAMP/VAR_POP/VAR_SAMP to SUMs and COUNTs. For example:
> {code:java}
> STDDEV_POP(x) = SQRT((SUM(x * x) - SUM(x) * SUM(x) / COUNT(x)) / COUNT(x))
> {code}
> This rewrite rule is known as a naive one-pass algorithm for variance 
> calculation. However, it suffers from significant precision loss and may 
> produce invalid results given the limited precision of Java double and float 
> types. See [1] for reference. For example in Flink:
> {code:sql}
> > select stddev_pop(x) from (values (cast(0.27 as double)), (cast(0.27 as 
> > double)), (cast(0.27 as double))) as T(x)
> -- NaN
> > select var_pop(x) from (values (cast(0.27 as double)), (cast(0.27 as 
> > double)), (cast(0.27 as double))) as T(x)
> -- -9.25185853854297E-18
> > select stddev_pop(x) from (values (cast(100000 as int)), (cast(200000 as 
> > int)), (cast(300000 as int))) as T(x)
> -- 0
> {code}
> Most systems such as Spark/PostgreSQL/MySQL have turned to use a more 
> accurate and stable approach called Welford's online algorithm, you can find 
> its definition in [1]. I'm considering introducing this into Flink as well.
> More specifically, we can introduce a new internal built-in function like 
> WELFORD_M2 to calculate the m2 term in the algorithm, and then rewrite the 
> functions as follows to avoid repeated calculations:
> {code:java}
> stddev_pop(x)  ==> power(m2 / count(x), .5)
> stddev_samp(x) ==> power(m2 / nullif(count(x) - 1, 0), .5)
> var_pop(x)     ==> m2 / count(x)
> var_samp(x)    ==> m2 / nullif(count(x) - 1, 0)
> {code}
> Since this proposal will introduce a breaking change, I think we should 
> discuss the following before I create the pull request:
> 1. Should we provide a configuration option to fall back to the legacy 
> algorithm for backward compatibility?
> 2. What should be the intermediate result type?
>  - Option A: Use decimal for decimal input and double for all other numeric 
> types
>  - Option B: Use double for all types
>  - Option C: Other ideas/suggestions are welcome
> [1][variance 
> calculation|https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Welford's_online_algorithm]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to