This is an automated email from the ASF dual-hosted git repository. dianfu pushed a commit to branch release-1.19 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.19 by this push: new 775fdb95c91e [FLINK-37168][python] Clean up TimerRegistrationAction in unregisteredTimers list after timers are registered (#26011) 775fdb95c91e is described below commit 775fdb95c91e204701ca0d31f2c13d013d281fe9 Author: Shuyi Chen <suez1...@users.noreply.github.com> AuthorDate: Sun Jan 19 21:39:49 2025 -0800 [FLINK-37168][python] Clean up TimerRegistrationAction in unregisteredTimers list after timers are registered (#26011) --- .../process/timer/TimerRegistrationAction.java | 21 ++++++++++++++++++++- .../python/beam/BeamPythonFunctionRunner.java | 16 ++++++++++------ 2 files changed, 30 insertions(+), 7 deletions(-) diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/timer/TimerRegistrationAction.java b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/timer/TimerRegistrationAction.java index 8a11779b61fe..609f55006716 100644 --- a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/timer/TimerRegistrationAction.java +++ b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/timer/TimerRegistrationAction.java @@ -18,6 +18,9 @@ package org.apache.flink.streaming.api.operators.python.process.timer; +import java.util.List; + +/** {@link TimerRegistrationAction} used to register Timer. */ public class TimerRegistrationAction { private final TimerRegistration timerRegistration; @@ -26,17 +29,33 @@ public class TimerRegistrationAction { private boolean isRegistered; + private final List<TimerRegistrationAction> containingList; + public TimerRegistrationAction( - TimerRegistration timerRegistration, byte[] serializedTimerData) { + TimerRegistration timerRegistration, + byte[] serializedTimerData, + List<TimerRegistrationAction> containingList) { this.timerRegistration = timerRegistration; this.serializedTimerData = serializedTimerData; + this.containingList = containingList; this.isRegistered = false; } public void run() { + registerTimer(); + cleanup(); + } + + public void registerTimer() { if (!isRegistered) { timerRegistration.setTimer(serializedTimerData); isRegistered = true; } } + + private void cleanup() { + if (isRegistered) { + containingList.remove(this); + } + } } diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java b/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java index 4e52378f7a69..727d86af1764 100644 --- a/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java +++ b/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java @@ -195,7 +195,7 @@ public abstract class BeamPythonFunctionRunner implements PythonFunctionRunner { private transient Environment environment; - private transient List<TimerRegistrationAction> unregisteredTimers; + private transient volatile List<TimerRegistrationAction> unregisteredTimers; public BeamPythonFunctionRunner( Environment environment, @@ -311,7 +311,7 @@ public abstract class BeamPythonFunctionRunner implements PythonFunctionRunner { ShutdownHookUtil.addShutdownHook( this, BeamPythonFunctionRunner.class.getSimpleName(), LOG); - unregisteredTimers = new LinkedList<>(); + unregisteredTimers = Collections.synchronizedList(new LinkedList<>()); } @Override @@ -352,10 +352,12 @@ public abstract class BeamPythonFunctionRunner implements PythonFunctionRunner { @Override public void drainUnregisteredTimers() { - for (TimerRegistrationAction timerRegistrationAction : unregisteredTimers) { - timerRegistrationAction.run(); + synchronized (unregisteredTimers) { + for (TimerRegistrationAction timerRegistrationAction : unregisteredTimers) { + timerRegistrationAction.registerTimer(); + } + unregisteredTimers.clear(); } - unregisteredTimers.clear(); } @Override @@ -703,7 +705,9 @@ public abstract class BeamPythonFunctionRunner implements PythonFunctionRunner { (timer, timerData) -> { TimerRegistrationAction timerRegistrationAction = new TimerRegistrationAction( - timerRegistration, (byte[]) timer.getUserKey()); + timerRegistration, + (byte[]) timer.getUserKey(), + unregisteredTimers); unregisteredTimers.add(timerRegistrationAction); environment .getMainMailboxExecutor()