radu created FLINK-6260:
---------------------------
Summary: Distinct Aggregates for Group By Windows
Key: FLINK-6260
URL: https://issues.apache.org/jira/browse/FLINK-6260
Project: Flink
Issue Type: New Feature
Components: Table API & SQL
Reporter: radu
Time target: ProcTime/EventTime
SQL targeted query examples:
------------
Q1. Boundaries are expressed in GROUP BY clause and distinct is applied for the
elements of the aggregate(s)
`SELECT MIN( DISTINCT rowtime), prodID FROM stream1 GROUP BY FLOOR(procTime()
TO HOUR)`
Q2. Distinct is applied to the collection of outputs to be selected.
`SELECT STREAM DISTINCT procTime(), prodId FROM stream1 GROUP BY
FLOOR(procTime() TO DAY)`
=> DISTINCT operation makes sense only within the context of windows or some
bounded defined structures. Otherwise the operation would keep an infinite
amount of data to ensure uniqueness and would not trigger for certain functions
(e.g. aggregates)
=> We can follow the same design/implementation as for JIRA FLINK-6249
(supporting Distinct Aggregates for OVER Windows)
=> We can consider as a sub-JIRA issue the implementation of DISTINCT for
select clauses.
=> Aggregations over distinct elements without any boundary (i.e. within
SELECT clause) do not make sense just as aggregations do not make sense
without groupings or windows.
If distinct is applied as in Q1 example on group elements than either we define
a new implementation if selection is general or extend the current
implementation of grouped aggregates with distinct group aggregates
If distinct is applied as in Q2 example for the select all elements, then a new
implementation needs to be defined. This would work over a specific window /
processFunction and within the processing function the uniqueness of the
results to be processed will be done. This will happen for each partition. The
data structure used to trace distinct elements will be reset with each new
window (i.e., group by scope)
Examples
------------
`Q1: SELECT STREAM DISTINCT b FROM stream1 GROUP BY FLOOR(PROCTIME TO HOUR) `
`Q2: SELECT COUNT(DISTINCT b) FROM stream1 GROUP BY FLOOR(PROCTIME() TO HOUR)
`
||Proctime||IngestionTime(Event)||Stream1||Q1||Q2||
||10:00:01| (ab,1)| | |
||10:05:00| (aa,2)| | |
||11:00:00| | ab,aa | 2 |
||11:03:00| (aa,2)| | |
||11:09:00| (aa,2 | | |
||12:00:00| | aa | 1 |
|...|
Implementation option
---------------------
Considering that the behavior is similar as the one implemented for over
window behavior (with the difference that the distinction is reset for each ,
group scope), the implementation will be done by reusing the existing
implementation of the over window functions. Distinction can be achieved
within the aggregate itself or within the window/processfunction logic that
computes the aggregates. As multiple aggregates which require distinction can
be computed in the same time, the preferred option is to create distinction
within the process logic. For the case of selecting distinct outputs (i.e., not
aggregates) we can follow the same implementation design: support distinction
in the aggregation and than emitting only one output per each element saw
(instead of calling aggregate method of the aggregates) .
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)