Hi,

Hemant, can you please share the code of the Pipeline? Do you use side inputs? Besides what Kenn already described:

> 2.  When is the state information cleared on the WindowDoFn (TUMBLE windows)  on window closure ? When will global states and timers get cleared?

The state and timers for windows is cleared using cleaner created in createWrappingDoFnRunner method [1]. The only exception is global window, where the state and timers are cleared only on final watermark using [2]. The reason is that otherwise Flink accumulates window GC timers per all keys ever seen in global window.

> 3.  Is timer and keystate information clearance by the following enough to not have ever increasing memory or checkpoints? AFAIK, state and timers are correctly cleared on window GC time, so - because you see state increasing over time in code path that corresponds to side-inputs - I would suppose that your side-input is what grows over time. Can you verify that? In case it is the problem, you can try switching state backend so that you don't have to keep it all in memory (RocksDB), or consider using different technique for joining (merging) two input streams (flatten and apply joining logic yourself including buffering).

Best,

 Jan

[1] https://github.com/apache/beam/blob/79f46e00184fc5fcea7c9c4a85e2ed8467ef1a71/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java#L351 [2] https://github.com/apache/beam/blob/34024902746af90e7bf41e28729ec031dbab58d2/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java#L212

On 9/19/23 15:56, Kenneth Knowles wrote:
Caveat: it has been a long time and I don't really know the details of the FlinkRunner. But I can answer a couple questions.

On Fri, Sep 15, 2023 at 7:07 PM Hemant Kumar via dev <dev@beam.apache.org> wrote:

    Hi Team,

    I am facing an issue of running a beam stateful job on flink,

    *Problem Statement:*
        Stateful beam application with TUMBLE window running on Flink
    Runner which has consistent checkpoint size increasing over time.

    *Observation:*
       The memory usage keeps increasing over time and getting
    OOM kill (code 137) on kubernetes pods.

    *Version:*
        Beam version 2.32, Flink version 1.13.6, State backend -
    EmbeddedRocksDB (com.ververica -  frocksdbjni - 6.20.3-ververica-2.0)

    *Assumption:*
       State is never cleared on statebackend even when the window is
    closed.

    *Questions:
    *
      1. What is the significance of currentSideInputWatermark in
    /org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator/
    and how does it affect application without side input?
    
https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java#L767


If you have a main input and a side input, each main input window is buffered until the side input is "ready" for that window to be processed. That particularly line is about flushing all the rest of the data when the side input is fully ready and you are guaranteed to never see more data on the side input. The rest of the questions I don't know when the under-the-hood stuff is cleared out.

Kenn

        On removing the check /if (currentSideInputWatermark >=
    BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) and calling
    /emitAllPushedBackData(); for every processwatermark reduces the
    checkpoint size, which otherwise keeps increasing

      2.  When is the state information cleared on the WindowDoFn
    (TUMBLE windows)  on window closure ? When will global states and
    timers get cleared?

      3.  Is timer and keystate information clearance by the following
    enough to not have ever increasing memory or checkpoints?

        *Flush on watermark:*


            pushedBackElementsHandler.clear();


            *Timer removal:*


            
keyedStateInternals.removeWatermarkHoldUsage(timer.getOutputTimestamp());


            *Global removal:*


            keyedStateInternals.clearGlobalState();


    /

    /
      -Hemant



Reply via email to