This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch pr-37886
in repository https://gitbox.apache.org/repos/asf/beam.git

commit d3a64cac7bdbd4d7fa93321053737621adb61616
Author: Yi Hu <[email protected]>
AuthorDate: Thu Mar 19 20:01:46 2026 -0400

    fix state leak
---
 .../apache/beam/runners/samza/runtime/SamzaDoFnRunners.java  | 12 ++++++------
 1 file changed, 6 insertions(+), 6 deletions(-)

diff --git 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java
 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java
index 4e3e5285f7c..680ddd93601 100644
--- 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java
+++ 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java
@@ -501,13 +501,13 @@ public class SamzaDoFnRunners {
       // SDF checkpoint timers are handled by loading the stored residual and 
re-processing it.
       if 
(BundleCheckpointHandlers.StateAndTimerBundleCheckpointHandler.isSdfTimer(timerId))
 {
         StateInternals stateInternals = 
nonKeyedStateInternalsFactory.stateInternalsForKey(null);
-        WindowedValue<InT> residual =
-            stateInternals
-                .state(
-                    StateNamespaces.window(windowCoder, window),
-                    StateTags.value(timerId, windowedValueCoder))
-                .read();
+        org.apache.beam.sdk.state.ValueState<WindowedValue<InT>> residualState 
=
+            stateInternals.state(
+                StateNamespaces.window(windowCoder, window),
+                StateTags.value(timerId, windowedValueCoder));
+        WindowedValue<InT> residual = residualState.read();
         if (residual != null) {
+          residualState.clear();
           processElement(residual);
         }
         return;

Reply via email to