[ 
https://issues.apache.org/jira/browse/FLINK-6249?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

radu updated FLINK-6249:
------------------------
    Description: 
Time target: ProcTime/EventTime

SQL targeted query examples:
----------------------------

Q1. Boundaries are expressed in windows and meant for the elements to be 
aggregated

Q1.1. `SELECT SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS BETWEEN 2 
PRECEDING AND CURRENT ROW) FROM stream1`

Q1.2. `SELECT SUM( DISTINCT  b) OVER (ORDER BY procTime() RANGE BETWEEN 
INTERVAL '1' HOUR PRECEDING AND CURRENT ROW) FROM stream1`

Q1.3. `SELECT SUM( DISTINCT  b) OVER (ORDER BY rowTime() ROWS BETWEEN 2 
PRECEDING AND CURRENT ROW) FROM stream1`

Q1.4. `SELECT SUM( DISTINCT  b) OVER (ORDER BY rowTime() RANGE BETWEEN INTERVAL 
'1' HOUR PRECEDING AND CURRENT ROW) FROM stream1`

General comments:

-   DISTINCT operation makes sense only within the context of windows or some 
bounded defined structures. Otherwise the operation would keep an infinite 
amount of data to ensure uniqueness and would not trigger for certain functions 
(e.g. aggregates)

-   We can consider as a sub-JIRA issue the implementation of DISTINCT for 
UNBOUND sliding windows. However, there would be no control over the data 
structure to keep seen data (to check it is not re-process). -> This needs to 
be decided if we want to support it (to create appropriate JIRA issues)
=> We will open sub-JIRA issues to extend the current functionality of 
aggregates for the DISTINCT CASE   

=>   Aggregations over distinct elements without any boundary (i.e. within 
SELECT clause) do not make sense just as aggregations do not make sense without 
groupings or windows.



Description:
------------

The DISTINCT operator requires processing the elements to ensure uniqueness. 
Either that the operation is used for SELECT ALL distinct elements or for 
applying typical aggregation functions over a set of elements, there is a prior 
need of forming a collection of elements.
This brings the need of using windows or grouping methods. Therefore the 
distinct function will be implemented within windows. Depending on the type of 
window definition there are several options:
-   Main Scope: If distinct is applied as in Q1 example for window aggregations 
than either we extend the implementation with distinct aggregates (less 
preferred) or extend the sliding window aggregates implementation in the 
processFunction with distinction identification support (preferred). The later 
option is preferred because a query can carry multiple aggregates including 
multiple aggregates that have the distinct key word set up. Implementing the 
distinction between elements in the process function avoid the need to multiply 
the data structure to mark what what was seen across multiple aggregates. It 
also makes the implementation more robust and resilient as we can keep the data 
structure for marking the seen elements in a state (mapstate).


Functionality example
---------------------

We exemplify below the functionality of the IN/Exists when working with streams.


`Query:  SELECT  sum(DISTINCT  a) OVER (ORDER BY procTime() ROWS BETWEEN 2 
PRECEDING AND CURRENT ROW) FROM stream1`



||Proctime||IngestionTime(Event)||Stream1||Q3||
||10:00:01|     (ab,1)|           1 |
||10:05:00| (aa,2)|        3 |
||11:03:00|     (aa,2)|      3 |
||11:09:00|     (aa,2 |        2 |
|...|



Implementation option
---------------------

Considering that the behavior depends on over window behavior, the 
implementation will be done by reusing the existing implementation of the over 
window functions - done based on processFunction. As mentioned in the 
description section, there are 2 options to consider:

1)  Using distinct within the aggregates implementation by extending with 
distinct aggregates implementation the current aggregates in Flink. For this we 
define additional JIRA issues for each implementation to support the distinct 
keyword.

2)  Using distinct for selection within the process logic when calling the 
aggregates. This requires a new implementation of the process Function used for 
computing the aggregates. The processFunction will also carry the logic of 
taking each element once. For this  2 options are possible. Option 1 (To be 
used within the ProcessFunction) trades memory – and would require  to create a 
hashmap (e.g. mapstate) with binary values to mark if the event was saw  
before. This will be created once per window and will be reused across multiple 
distinct aggregates. Option 2 trades computation and would require to sort the 
window contents and in case of identical elements to eliminate them. The 
sorting can be done based on hash values in case the events are  non numeric or 
composite or do not possess an id to mark the uniqueness.  Option 2 is not 
preferred for incremental aggregates and should be consider only if certain 
aggregates would require a window implementation that recomputes everything 
from scratch. 



  was:
Time target: ProcTime/EventTime

SQL targeted query examples:
----------------------------

Q1. Boundaries are expressed in windows and meant for the elements to be 
aggregated

Q1.1. `SELECT SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS BETWEEN 2 
PRECEDING AND CURRENT ROW) FROM stream1`

Q1.2. `SELECT SUM( DISTINCT  b) OVER (ORDER BY procTime() RANGE BETWEEN 
INTERVAL '1' HOUR PRECEDING AND CURRENT ROW) FROM stream1`

Q1.3. `SELECT SUM( DISTINCT  b) OVER (ORDER BY rowTime() ROWS BETWEEN 2 
PRECEDING AND CURRENT ROW) FROM stream1`

Q1.4. `SELECT SUM( DISTINCT  b) OVER (ORDER BY rowTime() RANGE BETWEEN INTERVAL 
'1' HOUR PRECEDING AND CURRENT ROW) FROM stream1`

General comments:

-   DISTINCT operation makes sense only within the context of windows or
    some bounded defined structures. Otherwise the operation would keep
    an infinite amount of data to ensure uniqueness and would not
    trigger for certain functions (e.g. aggregates)

-   We can consider as a sub-JIRA issue the implementation of DISTINCT
    for UNBOUND sliding windows. However, there would be no control over the 
data structure to keep seen data (to check it is not re-process). ->   This 
needs to be decided if we want to support it (to create appropriate JIRA issues)
=> We will open sub-JIRA issues to extend the current functionality of 
aggregates for the DISTINCT CASE   

=>   Aggregations over distinct elements without any boundary (i.e.
    within SELECT clause) do not make sense just as aggregations do not
    make sense without groupings or windows.



Description:
------------

The DISTINCT operator requires processing the elements to ensure
uniqueness. Either that the operation is used for SELECT ALL distinct
elements or for applying typical aggregation functions over a set of
elements, there is a prior need of forming a collection of elements.
This brings the need of using windows or grouping methods. Therefore the 
distinct function will be implemented within windows. Depending on the
type of window definition there are several options:
-   Main Scope: If distinct is applied as in Q1 example for window aggregations 
than either we extend the implementation with distinct aggregates (less 
preferred) or extend the sliding window aggregates implementation in the 
processFunction with distinction identification support (preferred). The later 
option is preferred because a query can carry multiple aggregates including 
multiple aggregates that have the distinct key word set up. Implementing the 
distinction between elements in the process function avoid the need to multiply 
the data structure to mark what what was seen across multiple aggregates. It 
also makes the implementation more robust and resilient as we can keep the data 
structure for marking the seen elements in a state (mapstate).


Functionality example
---------------------

We exemplify below the functionality of the IN/Exists when working with streams.


`Query:  SELECT  sum(DISTINCT  a) OVER (ORDER BY procTime() ROWS BETWEEN 2 
PRECEDING AND CURRENT ROW) FROM stream1`



||Proctime||IngestionTime(Event)||Stream1||Q3||
||10:00:01|     (ab,1)|           1 |
||10:05:00| (aa,2)|        3 |
||11:03:00|     (aa,2)|      3 |
||11:09:00|     (aa,2 |        2 |
|...|



Implementation option
---------------------

Considering that the behavior depends on over window behavior, the
implementation will be done by reusing the existing implementation of the over 
window functions - done based on processFunction. As mentioned in the 
description section, there are 2 options to consider:

1)  Using distinct within the aggregates implementation by extending with 
distinct aggregates implementation the current aggregates in Flink. For this we 
define additional JIRA issues for each implementation to support the distinct 
keyword.

2)  Using distinct for selection within the process logic when calling the 
aggregates. This requires a new implementation of the process Function used for 
computing the aggregates. The processFunction will also carry the logic of 
taking each element once. For this  2 options are possible. Option 1 (To be 
used within the ProcessFunction) trades memory – and would require  to create a 
hashmap (e.g. mapstate) with binary values to mark if the event was saw  
before. This will be created once per window and will be reused across multiple 
distinct aggregates. Option 2 trades computation and would require to sort the 
window contents and in case of identical elements to eliminate them. The 
sorting can be done based on hash values in case the events are  non numeric or 
composite or do not possess an id to mark the uniqueness.  Option 2 is not 
preferred for incremental aggregates and should be consider only if certain 
aggregates would require a window implementation that recomputes everything 
from scratch. 




> Distinct Aggregates for OVER window
> -----------------------------------
>
>                 Key: FLINK-6249
>                 URL: https://issues.apache.org/jira/browse/FLINK-6249
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table API & SQL
>    Affects Versions: 1.3.0
>            Reporter: radu
>              Labels: features, patch
>
> Time target: ProcTime/EventTime
> SQL targeted query examples:
> ----------------------------
> Q1. Boundaries are expressed in windows and meant for the elements to be 
> aggregated
> Q1.1. `SELECT SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS BETWEEN 2 
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.2. `SELECT SUM( DISTINCT  b) OVER (ORDER BY procTime() RANGE BETWEEN 
> INTERVAL '1' HOUR PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.3. `SELECT SUM( DISTINCT  b) OVER (ORDER BY rowTime() ROWS BETWEEN 2 
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.4. `SELECT SUM( DISTINCT  b) OVER (ORDER BY rowTime() RANGE BETWEEN 
> INTERVAL '1' HOUR PRECEDING AND CURRENT ROW) FROM stream1`
> General comments:
> -   DISTINCT operation makes sense only within the context of windows or some 
> bounded defined structures. Otherwise the operation would keep an infinite 
> amount of data to ensure uniqueness and would not trigger for certain 
> functions (e.g. aggregates)
> -   We can consider as a sub-JIRA issue the implementation of DISTINCT for 
> UNBOUND sliding windows. However, there would be no control over the data 
> structure to keep seen data (to check it is not re-process). -> This needs to 
> be decided if we want to support it (to create appropriate JIRA issues)
> => We will open sub-JIRA issues to extend the current functionality of 
> aggregates for the DISTINCT CASE   
> =>   Aggregations over distinct elements without any boundary (i.e. within 
> SELECT clause) do not make sense just as aggregations do not make sense 
> without groupings or windows.
> Description:
> ------------
> The DISTINCT operator requires processing the elements to ensure uniqueness. 
> Either that the operation is used for SELECT ALL distinct elements or for 
> applying typical aggregation functions over a set of elements, there is a 
> prior need of forming a collection of elements.
> This brings the need of using windows or grouping methods. Therefore the 
> distinct function will be implemented within windows. Depending on the type 
> of window definition there are several options:
> -   Main Scope: If distinct is applied as in Q1 example for window 
> aggregations than either we extend the implementation with distinct 
> aggregates (less preferred) or extend the sliding window aggregates 
> implementation in the processFunction with distinction identification support 
> (preferred). The later option is preferred because a query can carry multiple 
> aggregates including multiple aggregates that have the distinct key word set 
> up. Implementing the distinction between elements in the process function 
> avoid the need to multiply the data structure to mark what what was seen 
> across multiple aggregates. It also makes the implementation more robust and 
> resilient as we can keep the data structure for marking the seen elements in 
> a state (mapstate).
> Functionality example
> ---------------------
> We exemplify below the functionality of the IN/Exists when working with 
> streams.
> `Query:  SELECT  sum(DISTINCT  a) OVER (ORDER BY procTime() ROWS BETWEEN 2 
> PRECEDING AND CURRENT ROW) FROM stream1`
> ||Proctime||IngestionTime(Event)||Stream1||Q3||
> ||10:00:01|   (ab,1)|           1 |
> ||10:05:00| (aa,2)|        3 |
> ||11:03:00|   (aa,2)|      3 |
> ||11:09:00|   (aa,2 |        2 |
> |...|
> Implementation option
> ---------------------
> Considering that the behavior depends on over window behavior, the 
> implementation will be done by reusing the existing implementation of the 
> over window functions - done based on processFunction. As mentioned in the 
> description section, there are 2 options to consider:
> 1)  Using distinct within the aggregates implementation by extending with 
> distinct aggregates implementation the current aggregates in Flink. For this 
> we define additional JIRA issues for each implementation to support the 
> distinct keyword.
> 2)  Using distinct for selection within the process logic when calling the 
> aggregates. This requires a new implementation of the process Function used 
> for computing the aggregates. The processFunction will also carry the logic 
> of taking each element once. For this  2 options are possible. Option 1 (To 
> be used within the ProcessFunction) trades memory – and would require  to 
> create a hashmap (e.g. mapstate) with binary values to mark if the event was 
> saw  before. This will be created once per window and will be reused across 
> multiple distinct aggregates. Option 2 trades computation and would require 
> to sort the window contents and in case of identical elements to eliminate 
> them. The sorting can be done based on hash values in case the events are  
> non numeric or composite or do not possess an id to mark the uniqueness.  
> Option 2 is not preferred for incremental aggregates and should be consider 
> only if certain aggregates would require a window implementation that 
> recomputes everything from scratch. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to