This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch release-1.20
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.20 by this push:
     new a224b50aa321 [FLINK-37168][python] Clean up TimerRegistrationAction in 
unregisteredTimers list after timers are registered (#26011)
a224b50aa321 is described below

commit a224b50aa3215c7fb6e85c42070599454197c316
Author: Shuyi Chen <suez1...@users.noreply.github.com>
AuthorDate: Sun Jan 19 21:39:49 2025 -0800

    [FLINK-37168][python] Clean up TimerRegistrationAction in 
unregisteredTimers list after timers are registered (#26011)
---
 .../process/timer/TimerRegistrationAction.java      | 21 ++++++++++++++++++++-
 .../python/beam/BeamPythonFunctionRunner.java       | 16 ++++++++++------
 2 files changed, 30 insertions(+), 7 deletions(-)

diff --git 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/timer/TimerRegistrationAction.java
 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/timer/TimerRegistrationAction.java
index 8a11779b61fe..609f55006716 100644
--- 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/timer/TimerRegistrationAction.java
+++ 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/timer/TimerRegistrationAction.java
@@ -18,6 +18,9 @@
 
 package org.apache.flink.streaming.api.operators.python.process.timer;
 
+import java.util.List;
+
+/** {@link TimerRegistrationAction} used to register Timer. */
 public class TimerRegistrationAction {
 
     private final TimerRegistration timerRegistration;
@@ -26,17 +29,33 @@ public class TimerRegistrationAction {
 
     private boolean isRegistered;
 
+    private final List<TimerRegistrationAction> containingList;
+
     public TimerRegistrationAction(
-            TimerRegistration timerRegistration, byte[] serializedTimerData) {
+            TimerRegistration timerRegistration,
+            byte[] serializedTimerData,
+            List<TimerRegistrationAction> containingList) {
         this.timerRegistration = timerRegistration;
         this.serializedTimerData = serializedTimerData;
+        this.containingList = containingList;
         this.isRegistered = false;
     }
 
     public void run() {
+        registerTimer();
+        cleanup();
+    }
+
+    public void registerTimer() {
         if (!isRegistered) {
             timerRegistration.setTimer(serializedTimerData);
             isRegistered = true;
         }
     }
+
+    private void cleanup() {
+        if (isRegistered) {
+            containingList.remove(this);
+        }
+    }
 }
diff --git 
a/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java
 
b/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java
index 4e52378f7a69..727d86af1764 100644
--- 
a/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java
+++ 
b/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java
@@ -195,7 +195,7 @@ public abstract class BeamPythonFunctionRunner implements 
PythonFunctionRunner {
 
     private transient Environment environment;
 
-    private transient List<TimerRegistrationAction> unregisteredTimers;
+    private transient volatile List<TimerRegistrationAction> 
unregisteredTimers;
 
     public BeamPythonFunctionRunner(
             Environment environment,
@@ -311,7 +311,7 @@ public abstract class BeamPythonFunctionRunner implements 
PythonFunctionRunner {
                 ShutdownHookUtil.addShutdownHook(
                         this, BeamPythonFunctionRunner.class.getSimpleName(), 
LOG);
 
-        unregisteredTimers = new LinkedList<>();
+        unregisteredTimers = Collections.synchronizedList(new LinkedList<>());
     }
 
     @Override
@@ -352,10 +352,12 @@ public abstract class BeamPythonFunctionRunner implements 
PythonFunctionRunner {
 
     @Override
     public void drainUnregisteredTimers() {
-        for (TimerRegistrationAction timerRegistrationAction : 
unregisteredTimers) {
-            timerRegistrationAction.run();
+        synchronized (unregisteredTimers) {
+            for (TimerRegistrationAction timerRegistrationAction : 
unregisteredTimers) {
+                timerRegistrationAction.registerTimer();
+            }
+            unregisteredTimers.clear();
         }
-        unregisteredTimers.clear();
     }
 
     @Override
@@ -703,7 +705,9 @@ public abstract class BeamPythonFunctionRunner implements 
PythonFunctionRunner {
                 (timer, timerData) -> {
                     TimerRegistrationAction timerRegistrationAction =
                             new TimerRegistrationAction(
-                                    timerRegistration, (byte[]) 
timer.getUserKey());
+                                    timerRegistration,
+                                    (byte[]) timer.getUserKey(),
+                                    unregisteredTimers);
                     unregisteredTimers.add(timerRegistrationAction);
                     environment
                             .getMainMailboxExecutor()

Reply via email to