Sergey Anokhovskiy created FLINK-35899:
------------------------------------------
Summary: Accumulated TumblingProcessingTimeWindows in the state
for a few days
Key: FLINK-35899
URL: https://issues.apache.org/jira/browse/FLINK-35899
Project: Flink
Issue Type: Bug
Components: API / Core, Runtime / Checkpointing
Affects Versions: 1.13.5
Reporter: Sergey Anokhovskiy
One of the sub-task out of 40 of the TumblingProcessingTimeWindows operator
accumulated windows over a day. The next restart of the job caused it to
process the accumulated windows, which caused the checkpointing timeout. Once
the sub-task has processed the old windows (might take several hours) it works
normally again. *Could you please come up with the ideas of what might cause
the window operator sub-task to accumulate old windows for days?*
Here is more context:
At Yelp we built a connector to the database based on Flink. We aimed to reduce
the load to the database. That's why a time window with reduce function was
introduced in that only the latest version of the document does matter for us.
Here is the configuration of the window:
private def windowedStream(input: DataStream[FieldsToIndex]) = {
input.keyBy(f => f.id)
.window(TumblingProcessingTimeWindows.of(seconds(elasticPipeJobConfig.deduplicationWindowTimeInSec)))
.reduce(
(e1, e2) => {
if (e1.messageTimestamp > e2.messageTimestamp) {
e1
} else {
e2
}
})
}
It works as expected most of the time but a few times per year on sub-task of
the dedup_window operator got stuck and caused checkpointing to fail. We took a
look at the state data and added extra logging to the custom trigger and here
is what we found:
# It turned out that the state of the 17th (different number every incident)
sub-task is more than 100 times bigger than the others (see the txt file). It
caused the job and particular sub-task to initialize slowly (see the screen
shot)
# Statics of RocksDB tables:
_timer_state/processing_window-timers ~8MB,
_timer_state/event_window-timers was 0
window-contents was ~20GB with ~960k entries and ~14k unique message ids.
Counts by id were distributed (see ids_destribution.txt)
# Each window-contents value has associated timer entry in
_timer_state/processing_window-timers The timers accumulated gradually, time
(Pacific) bucket counts (see timers.txt)
# The earliest entries are from 9:23am Pacific on June 26th, over a day before
the incident. Flink log showed that a taskmanager went away at 9:28, forcing a
job restart (see taskmanager_exception.txt). The job came back up at ~9:41am.
# Debug logs in the custom trigger in the functions
onClear/onProcessingTime/onElement/onEventTime confirmed that the job is busy
on processing the old windows
# It seems that the subtask was in a bad state after the restart. It is
unclear if it was stuck not processing any window events, or if it was just not
removing them from the state. The next time the job restarted it had to churn
through a days worth of writes, causing the delay.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)