[ 
https://issues.apache.org/jira/browse/FLINK-6249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15955419#comment-15955419
 ] 

radu commented on FLINK-6249:
-----------------------------

[~fhueske] [~twalthr]

It seems that the aggregates that are passed in the window do not have directly 
a distinct marker. We will look into it to see how we can solve this. We also 
wrote to calcite community 

> Distinct Aggregates for OVER window
> -----------------------------------
>
>                 Key: FLINK-6249
>                 URL: https://issues.apache.org/jira/browse/FLINK-6249
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table API & SQL
>    Affects Versions: 1.3.0
>            Reporter: radu
>              Labels: features, patch
>
> Time target: ProcTime/EventTime
> SQL targeted query examples:
> ----------------------------
> Q1. Boundaries are expressed in windows and meant for the elements to be 
> aggregated
> Q1.1. `SELECT SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS BETWEEN 2 
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.2. `SELECT SUM( DISTINCT  b) OVER (ORDER BY procTime() RANGE BETWEEN 
> INTERVAL '1' HOUR PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.3. `SELECT SUM( DISTINCT  b) OVER (ORDER BY rowTime() ROWS BETWEEN 2 
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.4. `SELECT SUM( DISTINCT  b) OVER (ORDER BY rowTime() RANGE BETWEEN 
> INTERVAL '1' HOUR PRECEDING AND CURRENT ROW) FROM stream1`
> General comments:
> -   DISTINCT operation makes sense only within the context of windows or some 
> bounded defined structures. Otherwise the operation would keep an infinite 
> amount of data to ensure uniqueness and would not trigger for certain 
> functions (e.g. aggregates)
> -   We can consider as a sub-JIRA issue the implementation of DISTINCT for 
> UNBOUND sliding windows. However, there would be no control over the data 
> structure to keep seen data (to check it is not re-process). -> This needs to 
> be decided if we want to support it (to create appropriate JIRA issues)
> => We will open sub-JIRA issues to extend the current functionality of 
> aggregates for the DISTINCT CASE   
> =>   Aggregations over distinct elements without any boundary (i.e. within 
> SELECT clause) do not make sense just as aggregations do not make sense 
> without groupings or windows.
> Description:
> ------------
> The DISTINCT operator requires processing the elements to ensure uniqueness. 
> Either that the operation is used for SELECT ALL distinct elements or for 
> applying typical aggregation functions over a set of elements, there is a 
> prior need of forming a collection of elements.
> This brings the need of using windows or grouping methods. Therefore the 
> distinct function will be implemented within windows. Depending on the type 
> of window definition there are several options:
> -   Main Scope: If distinct is applied as in Q1 example for window 
> aggregations than either we extend the implementation with distinct 
> aggregates (less preferred) or extend the sliding window aggregates 
> implementation in the processFunction with distinction identification support 
> (preferred). The later option is preferred because a query can carry multiple 
> aggregates including multiple aggregates that have the distinct key word set 
> up. Implementing the distinction between elements in the process function 
> avoid the need to multiply the data structure to mark what what was seen 
> across multiple aggregates. It also makes the implementation more robust and 
> resilient as we can keep the data structure for marking the seen elements in 
> a state (mapstate).
> Functionality example
> ---------------------
> We exemplify below the functionality of the IN/Exists when working with 
> streams.
> `Query:  SELECT  sum(DISTINCT  a) OVER (ORDER BY procTime() ROWS BETWEEN 2 
> PRECEDING AND CURRENT ROW) FROM stream1`
> ||Proctime||IngestionTime(Event)||Stream1||Q3||
> ||10:00:01|   (ab,1)|           1 |
> ||10:05:00| (aa,2)|        3 |
> ||11:03:00|   (aa,2)|      3 |
> ||11:09:00|   (aa,2 |        2 |
> |...|
> Implementation option
> ---------------------
> Considering that the behavior depends on over window behavior, the 
> implementation will be done by reusing the existing implementation of the 
> over window functions - done based on processFunction. As mentioned in the 
> description section, there are 2 options to consider:
> 1)  Using distinct within the aggregates implementation by extending with 
> distinct aggregates implementation the current aggregates in Flink. For this 
> we define additional JIRA issues for each implementation to support the 
> distinct keyword.
> 2)  Using distinct for selection within the process logic when calling the 
> aggregates. This requires a new implementation of the process Function used 
> for computing the aggregates. The processFunction will also carry the logic 
> of taking each element once. For this  2 options are possible. Option 1 (To 
> be used within the ProcessFunction) trades memory – and would require  to 
> create a hashmap (e.g. mapstate) with binary values to mark if the event was 
> saw  before. This will be created once per window and will be reused across 
> multiple distinct aggregates. Option 2 trades computation and would require 
> to sort the window contents and in case of identical elements to eliminate 
> them. The sorting can be done based on hash values in case the events are  
> non numeric or composite or do not possess an id to mark the uniqueness.  
> Option 2 is not preferred for incremental aggregates and should be consider 
> only if certain aggregates would require a window implementation that 
> recomputes everything from scratch. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to