[ 
https://issues.apache.org/jira/browse/FLINK-35899?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17868862#comment-17868862
 ] 

Zakelly Lan commented on FLINK-35899:
-------------------------------------

[~pnowojski] Does FLINK-20217 or FLIP-443 help this?

> Accumulated TumblingProcessingTimeWindows in the state for a few days
> ---------------------------------------------------------------------
>
>                 Key: FLINK-35899
>                 URL: https://issues.apache.org/jira/browse/FLINK-35899
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Core, Runtime / Checkpointing
>    Affects Versions: 1.13.5
>            Reporter: Sergey Anokhovskiy
>            Priority: Major
>         Attachments: dedup_window_intializing_slowly.png, 
> dedup_window_processes_old_windows.png, ids_destribution.txt, 
> sub_task_state_size.txt, taskmanager_exception.txt, timers.txt
>
>
> One of the sub-task out of 40 of the TumblingProcessingTimeWindows operator 
> accumulated windows over a day. The next restart of the job caused it to 
> process the accumulated windows, which caused the checkpointing timeout. Once 
> the sub-task has processed the old windows (might take several hours) it 
> works normally again. *Could you please come up with the ideas of what might 
> cause the window operator sub-task to accumulate old windows for days until 
> the next restart?*
>  
> Here is more context:
> At Yelp we built a connector to the database based on Flink. We aimed to 
> reduce the load to the database. That's why a time window with reduce 
> function was introduced in that only the latest version of the document does 
> matter for us. Here is the configuration of the window: 
>  
>  
> {code:java}
> private def windowedStream(input: DataStream[FieldsToIndex]) = {
>     input.keyBy(f => f.id)
>       
> .window(TumblingProcessingTimeWindows.of(seconds(elasticPipeJobConfig.deduplicationWindowTimeInSec)))
>       .reduce(
>         (e1, e2) => {
>           if (e1.messageTimestamp > e2.messageTimestamp) {
>             e1
>           }
>           else {
>             e2
>           }
>         }
>       )
>   }
> {code}
>  
>  
> It works as expected most of the time but a few times per year one sub-task 
> of the dedup_window operator got stuck and caused checkpointing to fail. We 
> took a look at the state data and added extra logging to the custom trigger 
> and here is what we found:
>  # It turned out that the state of the 17th (different number every incident) 
> sub-task is more than 100 times bigger than the others (see the 
> sub_task_state_size.txt file). It caused the job and particular sub-task to 
> initialize slowly (see the dedup_window_initializing_slowly.png)
>  # Statics of RocksDB tables:
> _timer_state/processing_window-timers ~8MB,
> _timer_state/event_window-timers was 0
> window-contents was ~20GB with ~960k entries and ~14k unique message ids. 
> Counts by id were distributed (see ids_destribution.txt)
>  # Each window-contents value has associated timer entry in 
> _timer_state/processing_window-timers The timers accumulated gradually, time 
> (Pacific) bucket counts (see timers.txt)
>  # The earliest entries are from 9:23am Pacific on June 26th, over a day 
> before the incident. Flink log showed that a taskmanager went away at 9:28, 
> forcing a job restart (see taskmanager_exception.txt). The job came back up 
> at ~9:41am.
>  # Debug logs in the custom trigger in the functions 
> onClear/onProcessingTime/onElement/onEventTime confirmed that the job is busy 
> on processing the old windows
>  # It seems that the subtask was in a bad state after the restart. It is 
> unclear if it was stuck not processing any window events, or if it was just 
> not removing them from the state. The next time the job restarted it had to 
> churn through a days worth of writes, causing the delay.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to