Hi,
Hemant, can you please share the code of the Pipeline? Do you use side
inputs? Besides what Kenn already described:
> 2. When is the state information cleared on the WindowDoFn (TUMBLE
windows) on window closure ? When will global states and timers get
cleared?
The state and timers for windows is cleared using cleaner created in
createWrappingDoFnRunner method [1]. The only exception is global
window, where the state and timers are cleared only on final watermark
using [2]. The reason is that otherwise Flink accumulates window GC
timers per all keys ever seen in global window.
> 3. Is timer and keystate information clearance by the following
enough to not have ever increasing memory or checkpoints?
AFAIK, state and timers are correctly cleared on window GC time, so -
because you see state increasing over time in code path that corresponds
to side-inputs - I would suppose that your side-input is what grows over
time. Can you verify that? In case it is the problem, you can try
switching state backend so that you don't have to keep it all in memory
(RocksDB), or consider using different technique for joining (merging)
two input streams (flatten and apply joining logic yourself including
buffering).
Best,
Jan
[1]
https://github.com/apache/beam/blob/79f46e00184fc5fcea7c9c4a85e2ed8467ef1a71/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java#L351
[2]
https://github.com/apache/beam/blob/34024902746af90e7bf41e28729ec031dbab58d2/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java#L212
On 9/19/23 15:56, Kenneth Knowles wrote:
Caveat: it has been a long time and I don't really know the details of
the FlinkRunner. But I can answer a couple questions.
On Fri, Sep 15, 2023 at 7:07 PM Hemant Kumar via dev
<dev@beam.apache.org> wrote:
Hi Team,
I am facing an issue of running a beam stateful job on flink,
*Problem Statement:*
Stateful beam application with TUMBLE window running on Flink
Runner which has consistent checkpoint size increasing over time.
*Observation:*
The memory usage keeps increasing over time and getting
OOM kill (code 137) on kubernetes pods.
*Version:*
Beam version 2.32, Flink version 1.13.6, State backend -
EmbeddedRocksDB (com.ververica - frocksdbjni - 6.20.3-ververica-2.0)
*Assumption:*
State is never cleared on statebackend even when the window is
closed.
*Questions:
*
1. What is the significance of currentSideInputWatermark in
/org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator/
and how does it affect application without side input?
https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java#L767
If you have a main input and a side input, each main input window is
buffered until the side input is "ready" for that window to be
processed. That particularly line is about flushing all the rest of
the data when the side input is fully ready and you are guaranteed to
never see more data on the side input.
The rest of the questions I don't know when the under-the-hood stuff
is cleared out.
Kenn
On removing the check /if (currentSideInputWatermark >=
BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) and calling
/emitAllPushedBackData(); for every processwatermark reduces the
checkpoint size, which otherwise keeps increasing
2. When is the state information cleared on the WindowDoFn
(TUMBLE windows) on window closure ? When will global states and
timers get cleared?
3. Is timer and keystate information clearance by the following
enough to not have ever increasing memory or checkpoints?
*Flush on watermark:*
pushedBackElementsHandler.clear();
*Timer removal:*
keyedStateInternals.removeWatermarkHoldUsage(timer.getOutputTimestamp());
*Global removal:*
keyedStateInternals.clearGlobalState();
/
/
-Hemant