[ 
https://issues.apache.org/jira/browse/FLINK-18701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17164293#comment-17164293
 ] 

Jark Wu commented on FLINK-18701:
---------------------------------

The root cause is that we have an StreamExecExpand node when aggregation split 
is enabled. The StreamExecExpand will expand one row into several rows with 
null values. Previous, it is an atomic operation with one row. Now, it is 
several rows with null values which are not atomic, and the intermediate state 
is visible to users.

The Javadoc of {{SplitAggregateRule}} explains more about this.

[~twalthr] suggests to emit a default value for min/max in case they are NOT 
NULL. This would solve this case, not sure whether we have better solutions. 

> NOT NULL constraint is not guaranteed when aggregation split is enabled
> -----------------------------------------------------------------------
>
>                 Key: FLINK-18701
>                 URL: https://issues.apache.org/jira/browse/FLINK-18701
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>            Reporter: Jark Wu
>            Priority: Major
>
> Take the following test: 
> {{org.apache.flink.table.planner.runtime.stream.sql.SplitAggregateITCase#testMinMaxWithRetraction}}
> {code:scala}
>     val t1 = tEnv.sqlQuery(
>       s"""
>          |SELECT
>          |  c, MIN(b), MAX(b), COUNT(DISTINCT a)
>          |FROM(
>          |  SELECT
>          |    a, COUNT(DISTINCT b) as b, MAX(b) as c
>          |  FROM T
>          |  GROUP BY a
>          |) GROUP BY c
>        """.stripMargin)
>     val sink = new TestingRetractSink
>     t1.toRetractStream[Row].addSink(sink)
>     env.execute()
>     println(sink.getRawResults)
> {code}
> The query schema is
> {code:java}
> root
>  |-- c: INT
>  |-- EXPR$1: BIGINT NOT NULL
>  |-- EXPR$2: BIGINT NOT NULL
>  |-- EXPR$3: BIGINT NOT NULL
> {code}
> This should be correct as the count is never null and thus min/max are never 
> null, however, we can receive null in the sink.
> {code}
> List((true,1,null,null,1), (true,2,2,2,1), (false,1,null,null,1), 
> (true,6,2,2,1), (true,5,1,1,0), (false,5,1,1,0), (true,5,1,1,2), 
> (true,4,2,2,0), (false,5,1,1,2), (true,5,1,3,2), (false,4,2,2,0), 
> (false,5,1,3,2), (true,5,1,4,2))
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to