[ 
https://issues.apache.org/jira/browse/FLINK-21109?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-21109:
-----------------------------------
      Labels: auto-deprioritized-major auto-deprioritized-minor  (was: 
auto-deprioritized-major stale-minor)
    Priority: Not a Priority  (was: Minor)

This issue was labeled "stale-minor" 7 days ago and has not received any 
updates so it is being deprioritized. If this ticket is actually Minor, please 
raise the priority and ask a committer to assign you the issue or revive the 
public discussion.


> Introduce "retractAccumulators" interface for AggregateFunction in Table/SQL 
> API
> --------------------------------------------------------------------------------
>
>                 Key: FLINK-21109
>                 URL: https://issues.apache.org/jira/browse/FLINK-21109
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table SQL / API
>            Reporter: Jark Wu
>            Priority: Not a Priority
>              Labels: auto-deprioritized-major, auto-deprioritized-minor
>
> *Motivation*
> The motivation is to improve the performance of hopping (sliding) windows.
> Currently, we have paned (or called sliced) optimization for the hopping 
> windows in Table/SQL. 
> That each element will be accumulated into a single pane. And once a window 
> is fired,
> we will merge multiple panes to get the window result. 
> For example, HOP(size=10s, slide=2s), a window [0, 10) consists of 5 panes 
> [0, 2), [2, 4), [4, 6), [6, 8), [8, 10).
> And each element will fall into a single pane, e.g. element with timestamp 3 
> will fall into pane [2, 4).
> However, currently, the merging panes happen on JVM heap memory. For example, 
> when window [0, 10) is going to be fired,
> we will retrieve the accumulators of the 5 panes and merge them into an 
> in-memory accumulator. 
> The performance is not good, because the number of panes may be very large 
> when the slide is small, e.g. 8640 panes when HOP(1day, 10s).
> And the memory may OOM when the accumulator is very large, e.g. containing 
> count distinct. 
> Thus, I would like to introduce a "retractAccumulators()" method which is an 
> inverse method of "merge()".
> With the "retractAccumulators()" method, we can reduce the time complexity 
> from O(N) to O(1).
> For example, when window [10, 20) is going to be fired, then we only need to 
> retract accumulator of pane [8, 10) 
> and merge the accumulator of pane [18, 20) into the state of the last window 
> [8, 18). 
> This will be a great performance improvement to make the hopping window have 
> similar performance 
> with the tumbling window, no matter how small the slide is. 
> *Public Interface*
> We will introduce a contract method "retractAccumulators" which is similar to 
> the "merge" method.
> {code}
> Retracts a group of accumulator instances from one accumulator instance. This 
> method is optional, 
> but implementing this method can greatly improve the performance of hopping 
> window aggregates.
> Therefore, it is recommended to implement this method when using with hopping 
> windows. 
> param: accumulator the accumulator which will keep the retracted aggregate 
> results. It should
>                    be noted that the accumulator may contain the previous 
> aggregated
>                    results. Therefore users should not replace or clean this 
> instance in the
>                    custom retractAccumulators method.
> param: retractAccs an java.lang.Iterable pointed to a group of accumulators 
> that will be
>                    retracted.
> public void retractAccumulators(ACC accumulator, java.lang.Iterable<ACC> 
> retractAccs)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to