[
https://issues.apache.org/jira/browse/FLINK-35899?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17868932#comment-17868932
]
Piotr Nowojski commented on FLINK-35899:
----------------------------------------
Partially. FLIP-443/FLINK-20217 would help with this part:
{quote}
(...) to process the accumulated windows, which caused the checkpointing
timeout. Once the sub-task has processed the old windows (might take several
hours) it works normally again
{quote}
as checkpointing would work fine, even if we have a backlog of timers to fire
(there are a couple of new metrics {{numFiredTimers}} and
{{numFiredTimersPerSecond}} that can be used for debugging this kind of issues,
but they were introduced relatively recently).
However why all of those windows were accumulated? I don't know, some unrelated
problem. Either watermarking is not working (assuming event time is used
instead of processing, but that seems to be NOT the case), or some data
skew/back pressure, some unhealthy TM or other a multitude of different
reasons.
> Accumulated TumblingProcessingTimeWindows in the state for a few days
> ---------------------------------------------------------------------
>
> Key: FLINK-35899
> URL: https://issues.apache.org/jira/browse/FLINK-35899
> Project: Flink
> Issue Type: Bug
> Components: API / Core, Runtime / Checkpointing
> Affects Versions: 1.13.5
> Reporter: Sergey Anokhovskiy
> Priority: Major
> Attachments: dedup_window_intializing_slowly.png,
> dedup_window_processes_old_windows.png, ids_destribution.txt,
> sub_task_state_size.txt, taskmanager_exception.txt, timers.txt
>
>
> One of the sub-task out of 40 of the TumblingProcessingTimeWindows operator
> accumulated windows over a day (unaligned checkpointing was disabled). The
> next restart of the job caused it to process the accumulated windows, which
> caused the checkpointing timeout. Once the sub-task has processed the old
> windows (might take several hours) it works normally again. *Could you please
> come up with the ideas of what might cause the window operator sub-task to
> accumulate old windows for days until the next restart?*
>
> Here is more context:
> At Yelp we built a connector to the database based on Flink. We aimed to
> reduce the load to the database. That's why a time window with reduce
> function was introduced in that only the latest version of the document does
> matter for us. Here is the configuration of the window:
>
>
> {code:java}
> private def windowedStream(input: DataStream[FieldsToIndex]) = {
> input.keyBy(f => f.id)
>
> .window(TumblingProcessingTimeWindows.of(seconds(elasticPipeJobConfig.deduplicationWindowTimeInSec)))
> .reduce(
> (e1, e2) => {
> if (e1.messageTimestamp > e2.messageTimestamp) {
> e1
> }
> else {
> e2
> }
> }
> )
> }
> {code}
>
>
> It works as expected most of the time but a few times per year one sub-task
> of the dedup_window operator got stuck and caused checkpointing to fail. We
> took a look at the state data and added extra logging to the custom trigger
> and here is what we found:
> # It turned out that the state of the 17th (different number every incident)
> sub-task is more than 100 times bigger than the others (see the
> sub_task_state_size.txt file). It caused the job and particular sub-task to
> initialize slowly (see the dedup_window_initializing_slowly.png)
> # Statics of RocksDB tables:
> _timer_state/processing_window-timers ~8MB,
> _timer_state/event_window-timers was 0
> window-contents was ~20GB with ~960k entries and ~14k unique message ids.
> Counts by id were distributed (see ids_destribution.txt)
> # Each window-contents value has associated timer entry in
> _timer_state/processing_window-timers The timers accumulated gradually, time
> (Pacific) bucket counts (see timers.txt)
> # The earliest entries are from 9:23am Pacific on June 26th, over a day
> before the incident. Flink log showed that a taskmanager went away at 9:28,
> forcing a job restart (see taskmanager_exception.txt). The job came back up
> at ~9:41am.
> # Debug logs in the custom trigger in the functions
> onClear/onProcessingTime/onElement/onEventTime confirmed that the job is busy
> on processing the old windows
> # It seems that the subtask was in a bad state after the restart. It is
> unclear if it was stuck not processing any window events, or if it was just
> not removing them from the state. The next time the job restarted it had to
> churn through a days worth of writes, causing the delay.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)