[
https://issues.apache.org/jira/browse/FLINK-6260?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Timo Walther closed FLINK-6260.
-------------------------------
Resolution: Duplicate
This issue is a duplicate of FLINK-6335 and FLINK-6373.
> Distinct Aggregates for Group By Windows
> ----------------------------------------
>
> Key: FLINK-6260
> URL: https://issues.apache.org/jira/browse/FLINK-6260
> Project: Flink
> Issue Type: New Feature
> Components: Table API & SQL
> Reporter: radu
> Priority: Major
> Labels: features
>
> Time target: ProcTime/EventTime
> SQL targeted query examples:
> ------------
> Q1. Boundaries are expressed in GROUP BY clause and distinct is applied for
> the elements of the aggregate(s)
> `SELECT MIN( DISTINCT rowtime), prodID FROM stream1 GROUP BY FLOOR(procTime()
> TO HOUR)`
> Q2. Distinct is applied to the collection of outputs to be selected.
> `SELECT STREAM DISTINCT procTime(), prodId FROM stream1 GROUP BY
> FLOOR(procTime() TO DAY)`
> => DISTINCT operation makes sense only within the context of windows or some
> bounded defined structures. Otherwise the operation would keep an infinite
> amount of data to ensure uniqueness and would not trigger for certain
> functions (e.g. aggregates)
> => We can follow the same design/implementation as for JIRA FLINK-6249
> (supporting Distinct Aggregates for OVER Windows)
> => We can consider as a sub-JIRA issue the implementation of DISTINCT for
> select clauses.
> => Aggregations over distinct elements without any boundary (i.e.
> within SELECT clause) do not make sense just as aggregations do not make
> sense without groupings or windows.
> If distinct is applied as in Q1 example on group elements than either we
> define a new implementation if selection is general or extend the current
> implementation of grouped aggregates with distinct group aggregates
> If distinct is applied as in Q2 example for the select all elements, then a
> new implementation needs to be defined. This would work over a specific
> window / processFunction and within the processing function the uniqueness of
> the results to be processed will be done. This will happen for each
> partition. The data structure used to trace distinct elements will be reset
> with each new window (i.e., group by scope)
>
> Examples
> ------------
> `Q1: SELECT STREAM DISTINCT b FROM stream1 GROUP BY FLOOR(PROCTIME TO HOUR) `
> `Q2: SELECT COUNT(DISTINCT b) FROM stream1 GROUP BY FLOOR(PROCTIME() TO
> HOUR) `
> ||Proctime||IngestionTime(Event)||Stream1||Q1||Q2||
> ||10:00:01| (ab,1)| | |
> ||10:05:00| (aa,2)| | |
> ||11:00:00| | ab,aa | 2 |
> ||11:03:00| (aa,2)| | |
> ||11:09:00| (aa,2 | | |
> ||12:00:00| | aa | 1 |
> |...|
> Implementation option
> ---------------------
> Considering that the behavior is similar as the one implemented for over
> window behavior (with the difference that the distinction is reset for each ,
> group scope), the implementation will be done by reusing the existing
> implementation of the over window functions. Distinction can be achieved
> within the aggregate itself or within the window/processfunction logic that
> computes the aggregates. As multiple aggregates which require distinction can
> be computed in the same time, the preferred option is to create distinction
> within the process logic. For the case of selecting distinct outputs (i.e.,
> not aggregates) we can follow the same implementation design: support
> distinction in the aggregation and than emitting only one output per each
> element saw (instead of calling aggregate method of the aggregates) .
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)