Hi Team,

I am facing an issue of running a beam stateful job on flink,

*Problem Statement:*
    Stateful beam application with TUMBLE window running on Flink Runner
which has consistent checkpoint size increasing over time.

*Observation:*
   The memory usage keeps increasing over time and getting OOM kill (code
137) on kubernetes pods.

*Version:*
    Beam version 2.32, Flink version 1.13.6, State backend -
EmbeddedRocksDB (com.ververica -  frocksdbjni - 6.20.3-ververica-2.0)

*Assumption:*
   State is never cleared on statebackend even when the window is closed.


*Questions:*
  1. What is the significance of currentSideInputWatermark in
*org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator*
and how does it affect application without side input?

https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java#L767
    On removing the check *if (currentSideInputWatermark >=
BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) and calling
*emitAllPushedBackData();
for every processwatermark reduces the checkpoint size, which otherwise
keeps increasing

  2.  When is the state information cleared on the WindowDoFn (TUMBLE
windows)  on window closure ? When will global states and timers get
cleared?

  3.  Is timer and keystate information clearance by the following enough
to not have ever increasing memory or checkpoints?


           *Flush on watermark:*


pushedBackElementsHandler.clear();


*Timer removal:*


keyedStateInternals.removeWatermarkHoldUsage(timer.getOutputTimestamp());


*Global removal:*


keyedStateInternals.clearGlobalState();





  -Hemant

Reply via email to