[
https://issues.apache.org/jira/browse/FLINK-6249?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Fabian Hueske closed FLINK-6249.
--------------------------------
Resolution: Duplicate
Fix Version/s: 1.6.0
Implemented via FLINK-8689
> Distinct Aggregates for OVER window
> -----------------------------------
>
> Key: FLINK-6249
> URL: https://issues.apache.org/jira/browse/FLINK-6249
> Project: Flink
> Issue Type: New Feature
> Components: Table API & SQL
> Affects Versions: 1.3.0
> Reporter: radu
> Priority: Major
> Labels: features, patch
> Fix For: 1.6.0
>
>
> Time target: ProcTime/EventTime
> SQL targeted query examples:
> ----------------------------
> Q1. Boundaries are expressed in windows and meant for the elements to be
> aggregated
> Q1.1. `SELECT SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS BETWEEN 2
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.2. `SELECT SUM( DISTINCT b) OVER (ORDER BY procTime() RANGE BETWEEN
> INTERVAL '1' HOUR PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.3. `SELECT SUM( DISTINCT b) OVER (ORDER BY rowTime() ROWS BETWEEN 2
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.4. `SELECT SUM( DISTINCT b) OVER (ORDER BY rowTime() RANGE BETWEEN
> INTERVAL '1' HOUR PRECEDING AND CURRENT ROW) FROM stream1`
> General comments:
> - DISTINCT operation makes sense only within the context of windows or some
> bounded defined structures. Otherwise the operation would keep an infinite
> amount of data to ensure uniqueness and would not trigger for certain
> functions (e.g. aggregates)
> - We can consider as a sub-JIRA issue the implementation of DISTINCT for
> UNBOUND sliding windows. However, there would be no control over the data
> structure to keep seen data (to check it is not re-process). -> This needs to
> be decided if we want to support it (to create appropriate JIRA issues)
> => We will open sub-JIRA issues to extend the current functionality of
> aggregates for the DISTINCT CASE
> => Aggregations over distinct elements without any boundary (i.e. within
> SELECT clause) do not make sense just as aggregations do not make sense
> without groupings or windows.
> Description:
> ------------
> The DISTINCT operator requires processing the elements to ensure uniqueness.
> Either that the operation is used for SELECT ALL distinct elements or for
> applying typical aggregation functions over a set of elements, there is a
> prior need of forming a collection of elements.
> This brings the need of using windows or grouping methods. Therefore the
> distinct function will be implemented within windows. Depending on the type
> of window definition there are several options:
> - Main Scope: If distinct is applied as in Q1 example for window
> aggregations than either we extend the implementation with distinct
> aggregates (less preferred) or extend the sliding window aggregates
> implementation in the processFunction with distinction identification support
> (preferred). The later option is preferred because a query can carry multiple
> aggregates including multiple aggregates that have the distinct key word set
> up. Implementing the distinction between elements in the process function
> avoid the need to multiply the data structure to mark what what was seen
> across multiple aggregates. It also makes the implementation more robust and
> resilient as we can keep the data structure for marking the seen elements in
> a state (mapstate).
> Functionality example
> ---------------------
> We exemplify below the functionality of the IN/Exists when working with
> streams.
> `Query: SELECT sum(DISTINCT a) OVER (ORDER BY procTime() ROWS BETWEEN 2
> PRECEDING AND CURRENT ROW) FROM stream1`
> ||Proctime||IngestionTime(Event)||Stream1||Q3||
> ||10:00:01| (ab,1)| 1 |
> ||10:05:00| (aa,2)| 3 |
> ||11:03:00| (aa,2)| 3 |
> ||11:09:00| (aa,2 | 2 |
> |...|
> Implementation option
> ---------------------
> Considering that the behavior depends on over window behavior, the
> implementation will be done by reusing the existing implementation of the
> over window functions - done based on processFunction. As mentioned in the
> description section, there are 2 options to consider:
> 1) Using distinct within the aggregates implementation by extending with
> distinct aggregates implementation the current aggregates in Flink. For this
> we define additional JIRA issues for each implementation to support the
> distinct keyword.
> 2) Using distinct for selection within the process logic when calling the
> aggregates. This requires a new implementation of the process Function used
> for computing the aggregates. The processFunction will also carry the logic
> of taking each element once. For this 2 options are possible. Option 1 (To
> be used within the ProcessFunction) trades memory – and would require to
> create a hashmap (e.g. mapstate) with binary values to mark if the event was
> saw before. This will be created once per window and will be reused across
> multiple distinct aggregates. Option 2 trades computation and would require
> to sort the window contents and in case of identical elements to eliminate
> them. The sorting can be done based on hash values in case the events are
> non numeric or composite or do not possess an id to mark the uniqueness.
> Option 2 is not preferred for incremental aggregates and should be consider
> only if certain aggregates would require a window implementation that
> recomputes everything from scratch.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)