Hi Team, I am facing an issue of running a beam stateful job on flink,
*Problem Statement:* Stateful beam application with TUMBLE window running on Flink Runner which has consistent checkpoint size increasing over time. *Observation:* The memory usage keeps increasing over time and getting OOM kill (code 137) on kubernetes pods. *Version:* Beam version 2.32, Flink version 1.13.6, State backend - EmbeddedRocksDB (com.ververica - frocksdbjni - 6.20.3-ververica-2.0) *Assumption:* State is never cleared on statebackend even when the window is closed. *Questions:* 1. What is the significance of currentSideInputWatermark in *org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator* and how does it affect application without side input? https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java#L767 On removing the check *if (currentSideInputWatermark >= BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) and calling *emitAllPushedBackData(); for every processwatermark reduces the checkpoint size, which otherwise keeps increasing 2. When is the state information cleared on the WindowDoFn (TUMBLE windows) on window closure ? When will global states and timers get cleared? 3. Is timer and keystate information clearance by the following enough to not have ever increasing memory or checkpoints? *Flush on watermark:* pushedBackElementsHandler.clear(); *Timer removal:* keyedStateInternals.removeWatermarkHoldUsage(timer.getOutputTimestamp()); *Global removal:* keyedStateInternals.clearGlobalState(); -Hemant