[
https://issues.apache.org/jira/browse/FLINK-15577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17014876#comment-17014876
]
Kurt Young commented on FLINK-15577:
------------------------------------
[~b.hanotte] Thanks for the fix, looks like blink planner also has this issue,
could you fix both?
> WindowAggregate RelNodes missing Window specs in digest
> -------------------------------------------------------
>
> Key: FLINK-15577
> URL: https://issues.apache.org/jira/browse/FLINK-15577
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Legacy Planner
> Affects Versions: 1.9.1
> Reporter: Benoit Hanotte
> Priority: Critical
>
> The RelNode's digest (AbstractRelNode.getDigest()), along with its RowType,
> is used by the Calcite HepPlanner to avoid adding duplicate Vertices to the
> graph. If an equivalent vertex is already present in the graph, then that
> vertex is used in place of the newly generated one:
> https://github.com/apache/calcite/blob/branch-1.21/core/src/main/java/org/apache/calcite/plan/hep/HepPlanner.java#L828
> This means that *the digest needs to contain all the information necessary to
> identify a vertex and distinguish it from similar - but not equivalent -
> vertices*.
> In the case of `LogicalWindowAggregation` and
> `FlinkLogicalWindowAggregation`, the window specs are currently not in the
> digest, meaning that two aggregations with the same signatures and
> expressions but different windows are considered equivalent by the planner,
> which is not correct and will lead to an invalid Physical Plan.
> For instance, the following query would give an invalid plan:
> {code}
> WITH window_1h AS (
> SELECT HOP_ROWTIME(`timestamp`, INTERVAL '1' HOUR, INTERVAL '1' HOUR) as
> `timestamp`
> FROM my_table
> GROUP BY HOP(`timestamp`, INTERVAL '1' HOUR, INTERVAL '1' HOUR)
> ),
> window_2h AS (
> SELECT HOP_ROWTIME(`timestamp`, INTERVAL '1' HOUR, INTERVAL '2' HOUR) as
> `timestamp`
> FROM my_table
> GROUP BY HOP(`timestamp`, INTERVAL '1' HOUR, INTERVAL '2' HOUR)
> )
> (SELECT * FROM window_1h)
> UNION ALL
> (SELECT * FROM window_2h)
> {code}
> The invalid plan generated by the planner is the following (*Please note the
> windows in the two DataStreamGroupWindowAggregates nodes being the same when
> they should be different*):
> {code}
> DataStreamUnion(all=[true], union all=[timestamp]): rowcount = 200.0,
> cumulative cost = {800.0 rows, 802.0 cpu, 0.0 io}, id = 176
> DataStreamCalc(select=[w$rowtime AS timestamp]): rowcount = 100.0,
> cumulative cost = {300.0 rows, 301.0 cpu, 0.0 io}, id = 173
> DataStreamGroupWindowAggregate(window=[SlidingGroupWindow('w$,
> 'timestamp, 7200000.millis, 3600000.millis)], select=[start('w$) AS w$start,
> end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime]):
> rowcount = 100.0, cumulative cost = {200.0 rows, 201.0 cpu, 0.0 io}, id = 172
> DataStreamScan(id=[1], fields=[timestamp]): rowcount = 100.0,
> cumulative cost = {100.0 rows, 101.0 cpu, 0.0 io}, id = 171
> DataStreamCalc(select=[w$rowtime AS timestamp]): rowcount = 100.0,
> cumulative cost = {300.0 rows, 301.0 cpu, 0.0 io}, id = 175
> DataStreamGroupWindowAggregate(window=[SlidingGroupWindow('w$,
> 'timestamp, 7200000.millis, 3600000.millis)], select=[start('w$) AS w$start,
> end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime]):
> rowcount = 100.0, cumulative cost = {200.0 rows, 201.0 cpu, 0.0 io}, id = 174
> DataStreamScan(id=[1], fields=[timestamp]): rowcount = 100.0,
> cumulative cost = {100.0 rows, 101.0 cpu, 0.0 io}, id = 171
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)