[ 
https://issues.apache.org/jira/browse/FLINK-29692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17776082#comment-17776082
 ] 

tanjialiang commented on FLINK-29692:
-------------------------------------

Hi everyone, I notice that there is no solution yet. I want to share my 
thoughts about this feature. Maybe it can help.
 
I think support early-fire may not be the best solution to the current window 
function. Because every window triggers are expensive, and also the early-fire 
is not the realtime trigger.

For example
{code:sql}
SET table.exec.emit.early-fire.enabled = true;
SET table.exec.emit.early-fire.delay = 1min;

SELECT user_id,       
       COUNT(*) AS total,       
       HOP_START(rowtime, INTERVAL '24' HOUR, INTERVAL '48' HOUR) AS 
window_start,
       HOP_END(rowtime, rowtime, INTERVAL '24' HOUR, INTERVAL '48' HOUR) AS 
window_end,
FROM user_click
GROUP BY user_id, HOP(rowtime, INTERVAL '24' HOUR, INTERVAL '48' HOUR); {code}
1. whether HOP/TUMBLE/CUMULATE window or enable early-fire, there are having a 
time delay, which are not realtime enough.

2. when the cardinal of user_id is large, everytime to trigger window is very 
expensive, which would make job instability, easy to make checkpoint timeout.

3. everytime early-fire would trigger all user_id's windows, but maybe only a 
small part of the data actually changed in this early-fire trigger interval, 
which maybe cause write pressure to the sink.

 
In my company, I've added a window TVF function for this case, named 
HOPv2/TUMBLEv2 (maybe the name is not fit for the community).
{code:sql}
select user_id,       
       COUNT(*) AS total, 
       window_start,
       window_time, -- the record rowtime
       window_end
FROM TABLE(    
    HOPV2(
        DATA => TABLE user_click,
        TIMECOL => DESCRIPTOR(rowtime),
        SLIDE => INTERVAL '24' HOUR
        SIZE => INTERVAL '48' HOUR,
        ALLOWED_LATENESS => true))
GROUP BY user_id, window_start, window_time, window_end; {code}
1. similar to OVER window,we accumulate and output the result when record 
comming (actually is on timer trigger), which is in realtime trigger and also 
there is not a lot of write pressure for sink.
2. the window_time is the record rowtime, which is represents the current 
progress.
3. similar to HOP window, we fire and purge when window_end come.
4. support allowedLateness option, when process the late event, if its window 
have not been purge, allow acuumulate without emit.

 
I would like to contribute it but maybe need more discussion and help because i 
am still a novice for flink contribution.
 

> Support early/late fires for Windowing TVFs
> -------------------------------------------
>
>                 Key: FLINK-29692
>                 URL: https://issues.apache.org/jira/browse/FLINK-29692
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table SQL / Planner
>    Affects Versions: 1.15.3
>            Reporter: Canope Nerda
>            Priority: Major
>
> I have cases where I need to 1) output data as soon as possible and 2) handle 
> late arriving data to achieve eventual correctness. In the logic, I need to 
> do window deduplication which is based on Windowing TVFs and according to 
> source code, early/late fires are not supported yet in Windowing TVFs.
> Actually 1) contradicts with 2). Without early/late fires, we had to 
> compromise, either live with fresh incorrect data or tolerate excess latency 
> for correctness.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to