This is an automated email from the ASF dual-hosted git repository. yhu pushed a commit to branch pr-37886 in repository https://gitbox.apache.org/repos/asf/beam.git
commit d3a64cac7bdbd4d7fa93321053737621adb61616 Author: Yi Hu <[email protected]> AuthorDate: Thu Mar 19 20:01:46 2026 -0400 fix state leak --- .../apache/beam/runners/samza/runtime/SamzaDoFnRunners.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java index 4e3e5285f7c..680ddd93601 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java @@ -501,13 +501,13 @@ public class SamzaDoFnRunners { // SDF checkpoint timers are handled by loading the stored residual and re-processing it. if (BundleCheckpointHandlers.StateAndTimerBundleCheckpointHandler.isSdfTimer(timerId)) { StateInternals stateInternals = nonKeyedStateInternalsFactory.stateInternalsForKey(null); - WindowedValue<InT> residual = - stateInternals - .state( - StateNamespaces.window(windowCoder, window), - StateTags.value(timerId, windowedValueCoder)) - .read(); + org.apache.beam.sdk.state.ValueState<WindowedValue<InT>> residualState = + stateInternals.state( + StateNamespaces.window(windowCoder, window), + StateTags.value(timerId, windowedValueCoder)); + WindowedValue<InT> residual = residualState.read(); if (residual != null) { + residualState.clear(); processElement(residual); } return;
