This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch release-1.19
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.19 by this push:
     new 5fe0280bae1 [FLINK-37084][python] Fix TimerRegistration concurrency 
issue in PyFlink (#26004)
5fe0280bae1 is described below

commit 5fe0280bae13203ac65b3c1534a167b72534e91b
Author: Shuyi Chen <suez1...@users.noreply.github.com>
AuthorDate: Fri Jan 17 15:56:54 2025 -0800

    [FLINK-37084][python] Fix TimerRegistration concurrency issue in PyFlink 
(#26004)
---
 .../apache/flink/python/PythonFunctionRunner.java  |  2 ++
 .../python/AbstractPythonFunctionOperator.java     |  8 +++++
 .../AbstractExternalPythonFunctionOperator.java    |  5 +++
 .../process/ExternalPythonCoProcessOperator.java   |  1 +
 .../ExternalPythonKeyedCoProcessOperator.java      |  1 +
 .../ExternalPythonKeyedProcessOperator.java        |  1 +
 .../process/ExternalPythonProcessOperator.java     |  1 +
 .../process/timer/TimerRegistrationAction.java     | 42 ++++++++++++++++++++++
 .../beam/BeamDataStreamPythonFunctionRunner.java   |  3 ++
 .../python/beam/BeamPythonFunctionRunner.java      | 29 ++++++++++++++-
 .../python/AbstractStatelessFunctionOperator.java  |  1 +
 .../AbstractPythonStreamAggregateOperator.java     |  1 +
 .../python/beam/BeamTablePythonFunctionRunner.java |  7 ++++
 ...ghPythonStreamGroupWindowAggregateOperator.java |  1 +
 .../PythonStreamGroupAggregateOperatorTest.java    |  1 +
 ...ythonStreamGroupTableAggregateOperatorTest.java |  1 +
 ...owPythonGroupAggregateFunctionOperatorTest.java |  1 +
 ...onGroupWindowAggregateFunctionOperatorTest.java |  1 +
 ...honOverWindowAggregateFunctionOperatorTest.java |  1 +
 ...onGroupWindowAggregateFunctionOperatorTest.java |  1 +
 ...rrowPythonProcTimeBoundedRangeOperatorTest.java |  1 +
 ...ArrowPythonProcTimeBoundedRowsOperatorTest.java |  1 +
 ...ArrowPythonRowTimeBoundedRangeOperatorTest.java |  1 +
 ...mArrowPythonRowTimeBoundedRowsOperatorTest.java |  1 +
 .../scalar/PythonScalarFunctionOperatorTest.java   |  1 +
 .../ArrowPythonScalarFunctionOperatorTest.java     |  1 +
 .../table/PythonTableFunctionOperatorTest.java     |  1 +
 .../PassThroughPythonAggregateFunctionRunner.java  |  3 ++
 .../PassThroughPythonScalarFunctionRunner.java     |  3 ++
 .../PassThroughPythonTableFunctionRunner.java      |  3 ++
 ...ThroughStreamAggregatePythonFunctionRunner.java |  3 ++
 ...amGroupWindowAggregatePythonFunctionRunner.java |  3 ++
 ...ghStreamTableAggregatePythonFunctionRunner.java |  3 ++
 33 files changed, 133 insertions(+), 1 deletion(-)

diff --git 
a/flink-python/src/main/java/org/apache/flink/python/PythonFunctionRunner.java 
b/flink-python/src/main/java/org/apache/flink/python/PythonFunctionRunner.java
index 6ef6a70b59e..c637c0e659c 100644
--- 
a/flink-python/src/main/java/org/apache/flink/python/PythonFunctionRunner.java
+++ 
b/flink-python/src/main/java/org/apache/flink/python/PythonFunctionRunner.java
@@ -44,6 +44,8 @@ public interface PythonFunctionRunner extends AutoCloseable {
     /** Send the triggered timer to the Python function. */
     void processTimer(byte[] timerData) throws Exception;
 
+    void drainUnregisteredTimers();
+
     /**
      * Retrieves the Python function result.
      *
diff --git 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
index abd92e9eac6..37f0c4540f7 100644
--- 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
+++ 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
@@ -270,6 +270,8 @@ public abstract class AbstractPythonFunctionOperator<OUT> 
extends AbstractStream
 
     protected abstract PythonEnvironmentManager 
createPythonEnvironmentManager();
 
+    protected void drainUnregisteredTimers() {}
+
     /**
      * Advances the watermark of all managed timer services, potentially 
firing event time timers.
      * It also ensures that the fired timers are processed in the Python 
user-defined functions.
@@ -277,10 +279,16 @@ public abstract class AbstractPythonFunctionOperator<OUT> 
extends AbstractStream
     private void advanceWatermark(Watermark watermark) throws Exception {
         if (getTimeServiceManager().isPresent()) {
             InternalTimeServiceManager<?> timeServiceManager = 
getTimeServiceManager().get();
+            // make sure the registered timer are processed before advancing 
the watermark to
+            // ensure the timers could be triggered
+            drainUnregisteredTimers();
             timeServiceManager.advanceWatermark(watermark);
 
             while (!isBundleFinished()) {
                 invokeFinishBundle();
+                // make sure the registered timer are processed before 
advancing the watermark to
+                // ensure the timers could be triggered
+                drainUnregisteredTimers();
                 timeServiceManager.advanceWatermark(watermark);
             }
         }
diff --git 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/AbstractExternalPythonFunctionOperator.java
 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/AbstractExternalPythonFunctionOperator.java
index 90991b57c4d..da50b5335f2 100644
--- 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/AbstractExternalPythonFunctionOperator.java
+++ 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/AbstractExternalPythonFunctionOperator.java
@@ -136,6 +136,11 @@ public abstract class 
AbstractExternalPythonFunctionOperator<OUT>
         }
     }
 
+    @Override
+    protected void drainUnregisteredTimers() {
+        pythonFunctionRunner.drainUnregisteredTimers();
+    }
+
     protected void emitResults() throws Exception {
         Tuple3<String, byte[], Integer> resultTuple;
         while ((resultTuple = pythonFunctionRunner.pollResult()) != null && 
resultTuple.f2 != 0) {
diff --git 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/ExternalPythonCoProcessOperator.java
 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/ExternalPythonCoProcessOperator.java
index 05b2166cf62..71193ceed7c 100644
--- 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/ExternalPythonCoProcessOperator.java
+++ 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/ExternalPythonCoProcessOperator.java
@@ -72,6 +72,7 @@ public class ExternalPythonCoProcessOperator<IN1, IN2, OUT>
     @Override
     public PythonFunctionRunner createPythonFunctionRunner() throws Exception {
         return new BeamDataStreamPythonFunctionRunner(
+                getContainingTask().getEnvironment(),
                 getRuntimeContext().getTaskInfo().getTaskName(),
                 createPythonEnvironmentManager(),
                 STATELESS_FUNCTION_URN,
diff --git 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/ExternalPythonKeyedCoProcessOperator.java
 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/ExternalPythonKeyedCoProcessOperator.java
index 6ce86e5a408..dc13aafae19 100644
--- 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/ExternalPythonKeyedCoProcessOperator.java
+++ 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/ExternalPythonKeyedCoProcessOperator.java
@@ -113,6 +113,7 @@ public class ExternalPythonKeyedCoProcessOperator<OUT>
     @Override
     public PythonFunctionRunner createPythonFunctionRunner() throws Exception {
         return new BeamDataStreamPythonFunctionRunner(
+                getContainingTask().getEnvironment(),
                 getRuntimeContext().getTaskInfo().getTaskName(),
                 createPythonEnvironmentManager(),
                 STATEFUL_FUNCTION_URN,
diff --git 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/ExternalPythonKeyedProcessOperator.java
 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/ExternalPythonKeyedProcessOperator.java
index 2f2a785334a..e92301b6bf3 100644
--- 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/ExternalPythonKeyedProcessOperator.java
+++ 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/ExternalPythonKeyedProcessOperator.java
@@ -139,6 +139,7 @@ public class ExternalPythonKeyedProcessOperator<OUT>
     @Override
     public PythonFunctionRunner createPythonFunctionRunner() throws Exception {
         return new BeamDataStreamPythonFunctionRunner(
+                getContainingTask().getEnvironment(),
                 getRuntimeContext().getTaskInfo().getTaskName(),
                 createPythonEnvironmentManager(),
                 STATEFUL_FUNCTION_URN,
diff --git 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/ExternalPythonProcessOperator.java
 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/ExternalPythonProcessOperator.java
index 863edf7357b..07dc668a352 100644
--- 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/ExternalPythonProcessOperator.java
+++ 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/ExternalPythonProcessOperator.java
@@ -68,6 +68,7 @@ public class ExternalPythonProcessOperator<IN, OUT>
     @Override
     public PythonFunctionRunner createPythonFunctionRunner() throws Exception {
         return new BeamDataStreamPythonFunctionRunner(
+                getContainingTask().getEnvironment(),
                 getRuntimeContext().getTaskInfo().getTaskName(),
                 createPythonEnvironmentManager(),
                 STATELESS_FUNCTION_URN,
diff --git 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/timer/TimerRegistrationAction.java
 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/timer/TimerRegistrationAction.java
new file mode 100644
index 00000000000..8a11779b61f
--- /dev/null
+++ 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/timer/TimerRegistrationAction.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.python.process.timer;
+
+public class TimerRegistrationAction {
+
+    private final TimerRegistration timerRegistration;
+
+    private final byte[] serializedTimerData;
+
+    private boolean isRegistered;
+
+    public TimerRegistrationAction(
+            TimerRegistration timerRegistration, byte[] serializedTimerData) {
+        this.timerRegistration = timerRegistration;
+        this.serializedTimerData = serializedTimerData;
+        this.isRegistered = false;
+    }
+
+    public void run() {
+        if (!isRegistered) {
+            timerRegistration.setTimer(serializedTimerData);
+            isRegistered = true;
+        }
+    }
+}
diff --git 
a/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamDataStreamPythonFunctionRunner.java
 
b/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamDataStreamPythonFunctionRunner.java
index 71bd715c470..823e91ccfc5 100644
--- 
a/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamDataStreamPythonFunctionRunner.java
+++ 
b/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamDataStreamPythonFunctionRunner.java
@@ -24,6 +24,7 @@ import org.apache.flink.fnexecution.v1.FlinkFnApi;
 import org.apache.flink.python.env.process.ProcessPythonEnvironmentManager;
 import org.apache.flink.python.metric.process.FlinkMetricContainer;
 import org.apache.flink.python.util.ProtoUtils;
+import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.state.KeyedStateBackend;
 import org.apache.flink.runtime.state.OperatorStateBackend;
@@ -69,6 +70,7 @@ public class BeamDataStreamPythonFunctionRunner extends 
BeamPythonFunctionRunner
     private final List<FlinkFnApi.UserDefinedDataStreamFunction> 
userDefinedDataStreamFunctions;
 
     public BeamDataStreamPythonFunctionRunner(
+            Environment environment,
             String taskName,
             ProcessPythonEnvironmentManager environmentManager,
             String headOperatorFunctionUrn,
@@ -86,6 +88,7 @@ public class BeamDataStreamPythonFunctionRunner extends 
BeamPythonFunctionRunner
             @Nullable FlinkFnApi.CoderInfoDescriptor timerCoderDescriptor,
             Map<String, FlinkFnApi.CoderInfoDescriptor> 
sideOutputCoderDescriptors) {
         super(
+                environment,
                 taskName,
                 environmentManager,
                 flinkMetricContainer,
diff --git 
a/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java
 
b/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java
index 0029db937d4..4e52378f7a6 100644
--- 
a/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java
+++ 
b/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java
@@ -31,11 +31,13 @@ import org.apache.flink.python.env.PythonEnvironment;
 import org.apache.flink.python.env.process.ProcessPythonEnvironment;
 import org.apache.flink.python.env.process.ProcessPythonEnvironmentManager;
 import org.apache.flink.python.metric.process.FlinkMetricContainer;
+import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.memory.OpaqueMemoryResource;
 import org.apache.flink.runtime.state.KeyedStateBackend;
 import org.apache.flink.runtime.state.OperatorStateBackend;
 import 
org.apache.flink.streaming.api.operators.python.process.timer.TimerRegistration;
+import 
org.apache.flink.streaming.api.operators.python.process.timer.TimerRegistrationAction;
 import 
org.apache.flink.streaming.api.runners.python.beam.state.BeamStateRequestHandler;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.ShutdownHookUtil;
@@ -85,6 +87,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -190,7 +193,12 @@ public abstract class BeamPythonFunctionRunner implements 
PythonFunctionRunner {
 
     private transient Thread shutdownHook;
 
+    private transient Environment environment;
+
+    private transient List<TimerRegistrationAction> unregisteredTimers;
+
     public BeamPythonFunctionRunner(
+            Environment environment,
             String taskName,
             ProcessPythonEnvironmentManager environmentManager,
             @Nullable FlinkMetricContainer flinkMetricContainer,
@@ -204,6 +212,7 @@ public abstract class BeamPythonFunctionRunner implements 
PythonFunctionRunner {
             FlinkFnApi.CoderInfoDescriptor inputCoderDescriptor,
             FlinkFnApi.CoderInfoDescriptor outputCoderDescriptor,
             Map<String, FlinkFnApi.CoderInfoDescriptor> 
sideOutputCoderDescriptors) {
+        this.environment = environment;
         this.taskName = Preconditions.checkNotNull(taskName);
         this.environmentManager = 
Preconditions.checkNotNull(environmentManager);
         this.flinkMetricContainer = flinkMetricContainer;
@@ -301,6 +310,8 @@ public abstract class BeamPythonFunctionRunner implements 
PythonFunctionRunner {
         shutdownHook =
                 ShutdownHookUtil.addShutdownHook(
                         this, BeamPythonFunctionRunner.class.getSimpleName(), 
LOG);
+
+        unregisteredTimers = new LinkedList<>();
     }
 
     @Override
@@ -339,6 +350,14 @@ public abstract class BeamPythonFunctionRunner implements 
PythonFunctionRunner {
         mainInputReceiver.accept(WindowedValue.valueInGlobalWindow(data));
     }
 
+    @Override
+    public void drainUnregisteredTimers() {
+        for (TimerRegistrationAction timerRegistrationAction : 
unregisteredTimers) {
+            timerRegistrationAction.run();
+        }
+        unregisteredTimers.clear();
+    }
+
     @Override
     public void processTimer(byte[] timerData) throws Exception {
         if (timerInputReceiver == null) {
@@ -681,7 +700,15 @@ public abstract class BeamPythonFunctionRunner implements 
PythonFunctionRunner {
 
     private TimerReceiverFactory createTimerReceiverFactory() {
         BiConsumer<Timer<?>, TimerInternals.TimerData> timerDataConsumer =
-                (timer, timerData) -> timerRegistration.setTimer((byte[]) 
timer.getUserKey());
+                (timer, timerData) -> {
+                    TimerRegistrationAction timerRegistrationAction =
+                            new TimerRegistrationAction(
+                                    timerRegistration, (byte[]) 
timer.getUserKey());
+                    unregisteredTimers.add(timerRegistrationAction);
+                    environment
+                            .getMainMailboxExecutor()
+                            .execute(timerRegistrationAction::run, 
"PythonTimerRegistration");
+                };
         return new TimerReceiverFactory(stageBundleFactory, timerDataConsumer, 
null);
     }
 
diff --git 
a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/AbstractStatelessFunctionOperator.java
 
b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/AbstractStatelessFunctionOperator.java
index 4b5c95701a9..9b53ca7550b 100644
--- 
a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/AbstractStatelessFunctionOperator.java
+++ 
b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/AbstractStatelessFunctionOperator.java
@@ -105,6 +105,7 @@ public abstract class AbstractStatelessFunctionOperator<IN, 
OUT, UDFIN>
     @Override
     public PythonFunctionRunner createPythonFunctionRunner() throws 
IOException {
         return BeamTablePythonFunctionRunner.stateless(
+                getContainingTask().getEnvironment(),
                 getRuntimeContext().getTaskInfo().getTaskName(),
                 createPythonEnvironmentManager(),
                 getFunctionUrn(),
diff --git 
a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/AbstractPythonStreamAggregateOperator.java
 
b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/AbstractPythonStreamAggregateOperator.java
index 5ee074cbfaf..64381c06582 100644
--- 
a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/AbstractPythonStreamAggregateOperator.java
+++ 
b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/AbstractPythonStreamAggregateOperator.java
@@ -172,6 +172,7 @@ public abstract class AbstractPythonStreamAggregateOperator
     @Override
     public PythonFunctionRunner createPythonFunctionRunner() throws Exception {
         return BeamTablePythonFunctionRunner.stateful(
+                getContainingTask().getEnvironment(),
                 getRuntimeContext().getTaskInfo().getTaskName(),
                 createPythonEnvironmentManager(),
                 getFunctionUrn(),
diff --git 
a/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/beam/BeamTablePythonFunctionRunner.java
 
b/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/beam/BeamTablePythonFunctionRunner.java
index 60158397e62..50a2e16111c 100644
--- 
a/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/beam/BeamTablePythonFunctionRunner.java
+++ 
b/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/beam/BeamTablePythonFunctionRunner.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.fnexecution.v1.FlinkFnApi;
 import org.apache.flink.python.env.process.ProcessPythonEnvironmentManager;
 import org.apache.flink.python.metric.process.FlinkMetricContainer;
+import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.state.KeyedStateBackend;
 import 
org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner;
@@ -52,6 +53,7 @@ public class BeamTablePythonFunctionRunner extends 
BeamPythonFunctionRunner {
     private final GeneratedMessageV3 userDefinedFunctionProto;
 
     public BeamTablePythonFunctionRunner(
+            Environment environment,
             String taskName,
             ProcessPythonEnvironmentManager environmentManager,
             String functionUrn,
@@ -65,6 +67,7 @@ public class BeamTablePythonFunctionRunner extends 
BeamPythonFunctionRunner {
             FlinkFnApi.CoderInfoDescriptor inputCoderDescriptor,
             FlinkFnApi.CoderInfoDescriptor outputCoderDescriptor) {
         super(
+                environment,
                 taskName,
                 environmentManager,
                 flinkMetricContainer,
@@ -117,6 +120,7 @@ public class BeamTablePythonFunctionRunner extends 
BeamPythonFunctionRunner {
     }
 
     public static BeamTablePythonFunctionRunner stateless(
+            Environment environment,
             String taskName,
             ProcessPythonEnvironmentManager environmentManager,
             String functionUrn,
@@ -127,6 +131,7 @@ public class BeamTablePythonFunctionRunner extends 
BeamPythonFunctionRunner {
             FlinkFnApi.CoderInfoDescriptor inputCoderDescriptor,
             FlinkFnApi.CoderInfoDescriptor outputCoderDescriptor) {
         return new BeamTablePythonFunctionRunner(
+                environment,
                 taskName,
                 environmentManager,
                 functionUrn,
@@ -142,6 +147,7 @@ public class BeamTablePythonFunctionRunner extends 
BeamPythonFunctionRunner {
     }
 
     public static BeamTablePythonFunctionRunner stateful(
+            Environment environment,
             String taskName,
             ProcessPythonEnvironmentManager environmentManager,
             String functionUrn,
@@ -155,6 +161,7 @@ public class BeamTablePythonFunctionRunner extends 
BeamPythonFunctionRunner {
             FlinkFnApi.CoderInfoDescriptor inputCoderDescriptor,
             FlinkFnApi.CoderInfoDescriptor outputCoderDescriptor) {
         return new BeamTablePythonFunctionRunner(
+                environment,
                 taskName,
                 environmentManager,
                 functionUrn,
diff --git 
a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PassThroughPythonStreamGroupWindowAggregateOperator.java
 
b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PassThroughPythonStreamGroupWindowAggregateOperator.java
index 5eee5f69581..0f0df0691a3 100644
--- 
a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PassThroughPythonStreamGroupWindowAggregateOperator.java
+++ 
b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PassThroughPythonStreamGroupWindowAggregateOperator.java
@@ -196,6 +196,7 @@ public class 
PassThroughPythonStreamGroupWindowAggregateOperator<K>
     @Override
     public PythonFunctionRunner createPythonFunctionRunner() throws Exception {
         return new PassThroughStreamGroupWindowAggregatePythonFunctionRunner(
+                getContainingTask().getEnvironment(),
                 getRuntimeContext().getTaskInfo().getTaskName(),
                 PythonTestUtils.createTestProcessEnvironmentManager(),
                 userDefinedFunctionInputType,
diff --git 
a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupAggregateOperatorTest.java
 
b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupAggregateOperatorTest.java
index 8924e4e2a92..f3a8c51f894 100644
--- 
a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupAggregateOperatorTest.java
+++ 
b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupAggregateOperatorTest.java
@@ -254,6 +254,7 @@ class PythonStreamGroupAggregateOperatorTest extends 
AbstractPythonStreamAggrega
         @Override
         public PythonFunctionRunner createPythonFunctionRunner() {
             return new PassThroughStreamAggregatePythonFunctionRunner(
+                    getContainingTask().getEnvironment(),
                     getRuntimeContext().getTaskInfo().getTaskName(),
                     PythonTestUtils.createTestProcessEnvironmentManager(),
                     userDefinedFunctionInputType,
diff --git 
a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupTableAggregateOperatorTest.java
 
b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupTableAggregateOperatorTest.java
index b5e688fedb1..198f5b23eff 100644
--- 
a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupTableAggregateOperatorTest.java
+++ 
b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupTableAggregateOperatorTest.java
@@ -268,6 +268,7 @@ class PythonStreamGroupTableAggregateOperatorTest
         @Override
         public PythonFunctionRunner createPythonFunctionRunner() {
             return new PassThroughStreamTableAggregatePythonFunctionRunner(
+                    getContainingTask().getEnvironment(),
                     getRuntimeContext().getTaskInfo().getTaskName(),
                     PythonTestUtils.createTestProcessEnvironmentManager(),
                     userDefinedFunctionInputType,
diff --git 
a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonGroupAggregateFunctionOperatorTest.java
 
b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonGroupAggregateFunctionOperatorTest.java
index 8ec9f5f750f..a1da0c17785 100644
--- 
a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonGroupAggregateFunctionOperatorTest.java
+++ 
b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonGroupAggregateFunctionOperatorTest.java
@@ -236,6 +236,7 @@ class BatchArrowPythonGroupAggregateFunctionOperatorTest
         @Override
         public PythonFunctionRunner createPythonFunctionRunner() {
             return new PassThroughPythonAggregateFunctionRunner(
+                    getContainingTask().getEnvironment(),
                     getRuntimeContext().getTaskInfo().getTaskName(),
                     PythonTestUtils.createTestProcessEnvironmentManager(),
                     udfInputType,
diff --git 
a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonGroupWindowAggregateFunctionOperatorTest.java
 
b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonGroupWindowAggregateFunctionOperatorTest.java
index 571aee90e11..3f8ad32a98f 100644
--- 
a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonGroupWindowAggregateFunctionOperatorTest.java
+++ 
b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonGroupWindowAggregateFunctionOperatorTest.java
@@ -405,6 +405,7 @@ class 
BatchArrowPythonGroupWindowAggregateFunctionOperatorTest
         @Override
         public PythonFunctionRunner createPythonFunctionRunner() {
             return new PassThroughPythonAggregateFunctionRunner(
+                    getContainingTask().getEnvironment(),
                     getRuntimeContext().getTaskInfo().getTaskName(),
                     PythonTestUtils.createTestProcessEnvironmentManager(),
                     udfInputType,
diff --git 
a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonOverWindowAggregateFunctionOperatorTest.java
 
b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonOverWindowAggregateFunctionOperatorTest.java
index 1c383016499..fc0b1055982 100644
--- 
a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonOverWindowAggregateFunctionOperatorTest.java
+++ 
b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonOverWindowAggregateFunctionOperatorTest.java
@@ -306,6 +306,7 @@ class 
BatchArrowPythonOverWindowAggregateFunctionOperatorTest
         @Override
         public PythonFunctionRunner createPythonFunctionRunner() {
             return new PassThroughPythonAggregateFunctionRunner(
+                    getContainingTask().getEnvironment(),
                     getRuntimeContext().getTaskInfo().getTaskName(),
                     PythonTestUtils.createTestProcessEnvironmentManager(),
                     udfInputType,
diff --git 
a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonGroupWindowAggregateFunctionOperatorTest.java
 
b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonGroupWindowAggregateFunctionOperatorTest.java
index b9da904e4b8..1e85d81e444 100644
--- 
a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonGroupWindowAggregateFunctionOperatorTest.java
+++ 
b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonGroupWindowAggregateFunctionOperatorTest.java
@@ -433,6 +433,7 @@ class 
StreamArrowPythonGroupWindowAggregateFunctionOperatorTest
         @Override
         public PythonFunctionRunner createPythonFunctionRunner() {
             return new PassThroughPythonAggregateFunctionRunner(
+                    getContainingTask().getEnvironment(),
                     getRuntimeContext().getTaskInfo().getTaskName(),
                     PythonTestUtils.createTestProcessEnvironmentManager(),
                     udfInputType,
diff --git 
a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonProcTimeBoundedRangeOperatorTest.java
 
b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonProcTimeBoundedRangeOperatorTest.java
index a3f6ac85f77..079848d3135 100644
--- 
a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonProcTimeBoundedRangeOperatorTest.java
+++ 
b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonProcTimeBoundedRangeOperatorTest.java
@@ -162,6 +162,7 @@ class StreamArrowPythonProcTimeBoundedRangeOperatorTest
         @Override
         public PythonFunctionRunner createPythonFunctionRunner() {
             return new PassThroughPythonAggregateFunctionRunner(
+                    getContainingTask().getEnvironment(),
                     getRuntimeContext().getTaskInfo().getTaskName(),
                     PythonTestUtils.createTestProcessEnvironmentManager(),
                     udfInputType,
diff --git 
a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonProcTimeBoundedRowsOperatorTest.java
 
b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonProcTimeBoundedRowsOperatorTest.java
index 0fa82b65ee8..c468e1bd471 100644
--- 
a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonProcTimeBoundedRowsOperatorTest.java
+++ 
b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonProcTimeBoundedRowsOperatorTest.java
@@ -163,6 +163,7 @@ class StreamArrowPythonProcTimeBoundedRowsOperatorTest
         @Override
         public PythonFunctionRunner createPythonFunctionRunner() {
             return new PassThroughPythonAggregateFunctionRunner(
+                    getContainingTask().getEnvironment(),
                     getRuntimeContext().getTaskInfo().getTaskName(),
                     PythonTestUtils.createTestProcessEnvironmentManager(),
                     udfInputType,
diff --git 
a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRangeOperatorTest.java
 
b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRangeOperatorTest.java
index ccf8bea34a6..52d2f77f77a 100644
--- 
a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRangeOperatorTest.java
+++ 
b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRangeOperatorTest.java
@@ -281,6 +281,7 @@ class StreamArrowPythonRowTimeBoundedRangeOperatorTest
         @Override
         public PythonFunctionRunner createPythonFunctionRunner() {
             return new PassThroughPythonAggregateFunctionRunner(
+                    getContainingTask().getEnvironment(),
                     getRuntimeContext().getTaskInfo().getTaskName(),
                     PythonTestUtils.createTestProcessEnvironmentManager(),
                     udfInputType,
diff --git 
a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRowsOperatorTest.java
 
b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRowsOperatorTest.java
index e644596162b..84c421ee4ec 100644
--- 
a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRowsOperatorTest.java
+++ 
b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRowsOperatorTest.java
@@ -247,6 +247,7 @@ class StreamArrowPythonRowTimeBoundedRowsOperatorTest
         @Override
         public PythonFunctionRunner createPythonFunctionRunner() {
             return new PassThroughPythonAggregateFunctionRunner(
+                    getContainingTask().getEnvironment(),
                     getRuntimeContext().getTaskInfo().getTaskName(),
                     PythonTestUtils.createTestProcessEnvironmentManager(),
                     udfInputType,
diff --git 
a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/PythonScalarFunctionOperatorTest.java
 
b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/PythonScalarFunctionOperatorTest.java
index 5e22a51d31c..2bb218b208b 100644
--- 
a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/PythonScalarFunctionOperatorTest.java
+++ 
b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/PythonScalarFunctionOperatorTest.java
@@ -148,6 +148,7 @@ public class PythonScalarFunctionOperatorTest
         @Override
         public PythonFunctionRunner createPythonFunctionRunner() throws 
IOException {
             return new PassThroughPythonScalarFunctionRunner(
+                    getContainingTask().getEnvironment(),
                     getRuntimeContext().getTaskInfo().getTaskName(),
                     PythonTestUtils.createTestProcessEnvironmentManager(),
                     udfInputType,
diff --git 
a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/ArrowPythonScalarFunctionOperatorTest.java
 
b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/ArrowPythonScalarFunctionOperatorTest.java
index 0f926a342f9..5aa7dd76e65 100644
--- 
a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/ArrowPythonScalarFunctionOperatorTest.java
+++ 
b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/ArrowPythonScalarFunctionOperatorTest.java
@@ -147,6 +147,7 @@ public class ArrowPythonScalarFunctionOperatorTest
         @Override
         public PythonFunctionRunner createPythonFunctionRunner() throws 
IOException {
             return new PassThroughPythonScalarFunctionRunner(
+                    getContainingTask().getEnvironment(),
                     getRuntimeContext().getTaskInfo().getTaskName(),
                     PythonTestUtils.createTestProcessEnvironmentManager(),
                     udfInputType,
diff --git 
a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperatorTest.java
 
b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperatorTest.java
index a9b30714dd6..6f26bd82a3f 100644
--- 
a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperatorTest.java
+++ 
b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperatorTest.java
@@ -124,6 +124,7 @@ public class PythonTableFunctionOperatorTest
         @Override
         public PythonFunctionRunner createPythonFunctionRunner() {
             return new PassThroughPythonTableFunctionRunner(
+                    getContainingTask().getEnvironment(),
                     getRuntimeContext().getTaskInfo().getTaskName(),
                     PythonTestUtils.createTestProcessEnvironmentManager(),
                     udfInputType,
diff --git 
a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonAggregateFunctionRunner.java
 
b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonAggregateFunctionRunner.java
index 6eb8202a31e..af8c70a64e0 100644
--- 
a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonAggregateFunctionRunner.java
+++ 
b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonAggregateFunctionRunner.java
@@ -27,6 +27,7 @@ import 
org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.fnexecution.v1.FlinkFnApi;
 import org.apache.flink.python.env.process.ProcessPythonEnvironmentManager;
 import org.apache.flink.python.metric.process.FlinkMetricContainer;
+import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.runtime.arrow.serializers.ArrowSerializer;
 import 
org.apache.flink.table.runtime.runners.python.beam.BeamTablePythonFunctionRunner;
@@ -68,6 +69,7 @@ public class PassThroughPythonAggregateFunctionRunner extends 
BeamTablePythonFun
     private transient ByteArrayOutputStreamWithPos baos;
 
     public PassThroughPythonAggregateFunctionRunner(
+            Environment environment,
             String taskName,
             ProcessPythonEnvironmentManager environmentManager,
             RowType inputType,
@@ -77,6 +79,7 @@ public class PassThroughPythonAggregateFunctionRunner extends 
BeamTablePythonFun
             FlinkMetricContainer flinkMetricContainer,
             boolean isBatchOverWindow) {
         super(
+                environment,
                 taskName,
                 environmentManager,
                 functionUrn,
diff --git 
a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonScalarFunctionRunner.java
 
b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonScalarFunctionRunner.java
index e84ee4d1d65..721fb9f2e4c 100644
--- 
a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonScalarFunctionRunner.java
+++ 
b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonScalarFunctionRunner.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.fnexecution.v1.FlinkFnApi;
 import org.apache.flink.python.env.process.ProcessPythonEnvironmentManager;
 import org.apache.flink.python.metric.process.FlinkMetricContainer;
+import org.apache.flink.runtime.execution.Environment;
 import 
org.apache.flink.table.runtime.runners.python.beam.BeamTablePythonFunctionRunner;
 import org.apache.flink.table.types.logical.RowType;
 
@@ -44,6 +45,7 @@ public class PassThroughPythonScalarFunctionRunner extends 
BeamTablePythonFuncti
     private final List<byte[]> buffer;
 
     public PassThroughPythonScalarFunctionRunner(
+            Environment environment,
             String taskName,
             ProcessPythonEnvironmentManager environmentManager,
             RowType inputType,
@@ -52,6 +54,7 @@ public class PassThroughPythonScalarFunctionRunner extends 
BeamTablePythonFuncti
             FlinkFnApi.UserDefinedFunctions userDefinedFunctions,
             FlinkMetricContainer flinkMetricContainer) {
         super(
+                environment,
                 taskName,
                 environmentManager,
                 functionUrn,
diff --git 
a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonTableFunctionRunner.java
 
b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonTableFunctionRunner.java
index b94e765e413..0c8f41a6423 100644
--- 
a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonTableFunctionRunner.java
+++ 
b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonTableFunctionRunner.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.fnexecution.v1.FlinkFnApi;
 import org.apache.flink.python.env.process.ProcessPythonEnvironmentManager;
 import org.apache.flink.python.metric.process.FlinkMetricContainer;
+import org.apache.flink.runtime.execution.Environment;
 import 
org.apache.flink.table.runtime.runners.python.beam.BeamTablePythonFunctionRunner;
 import org.apache.flink.table.types.logical.RowType;
 
@@ -46,6 +47,7 @@ public class PassThroughPythonTableFunctionRunner extends 
BeamTablePythonFunctio
     private final List<byte[]> buffer;
 
     public PassThroughPythonTableFunctionRunner(
+            Environment environment,
             String taskName,
             ProcessPythonEnvironmentManager environmentManager,
             RowType inputType,
@@ -54,6 +56,7 @@ public class PassThroughPythonTableFunctionRunner extends 
BeamTablePythonFunctio
             FlinkFnApi.UserDefinedFunctions userDefinedFunctions,
             FlinkMetricContainer flinkMetricContainer) {
         super(
+                environment,
                 taskName,
                 environmentManager,
                 functionUrn,
diff --git 
a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughStreamAggregatePythonFunctionRunner.java
 
b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughStreamAggregatePythonFunctionRunner.java
index 7d970e2a529..6455cc52ef4 100644
--- 
a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughStreamAggregatePythonFunctionRunner.java
+++ 
b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughStreamAggregatePythonFunctionRunner.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.fnexecution.v1.FlinkFnApi;
 import org.apache.flink.python.env.process.ProcessPythonEnvironmentManager;
 import org.apache.flink.python.metric.process.FlinkMetricContainer;
+import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.state.KeyedStateBackend;
 import 
org.apache.flink.table.runtime.runners.python.beam.BeamTablePythonFunctionRunner;
 import org.apache.flink.table.types.logical.RowType;
@@ -50,6 +51,7 @@ public class PassThroughStreamAggregatePythonFunctionRunner 
extends BeamTablePyt
     private final Function<byte[], byte[]> processFunction;
 
     public PassThroughStreamAggregatePythonFunctionRunner(
+            Environment environment,
             String taskName,
             ProcessPythonEnvironmentManager environmentManager,
             RowType inputType,
@@ -61,6 +63,7 @@ public class PassThroughStreamAggregatePythonFunctionRunner 
extends BeamTablePyt
             TypeSerializer keySerializer,
             Function<byte[], byte[]> processFunction) {
         super(
+                environment,
                 taskName,
                 environmentManager,
                 functionUrn,
diff --git 
a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughStreamGroupWindowAggregatePythonFunctionRunner.java
 
b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughStreamGroupWindowAggregatePythonFunctionRunner.java
index 48c56cd434d..b41676141d7 100644
--- 
a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughStreamGroupWindowAggregatePythonFunctionRunner.java
+++ 
b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughStreamGroupWindowAggregatePythonFunctionRunner.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.fnexecution.v1.FlinkFnApi;
 import org.apache.flink.python.env.process.ProcessPythonEnvironmentManager;
 import org.apache.flink.python.metric.process.FlinkMetricContainer;
+import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.state.KeyedStateBackend;
 import 
org.apache.flink.table.runtime.operators.python.aggregate.PassThroughPythonStreamGroupWindowAggregateOperator;
 import 
org.apache.flink.table.runtime.runners.python.beam.BeamTablePythonFunctionRunner;
@@ -42,6 +43,7 @@ public class 
PassThroughStreamGroupWindowAggregatePythonFunctionRunner
     private final PassThroughPythonStreamGroupWindowAggregateOperator operator;
 
     public PassThroughStreamGroupWindowAggregatePythonFunctionRunner(
+            Environment environment,
             String taskName,
             ProcessPythonEnvironmentManager environmentManager,
             RowType inputType,
@@ -53,6 +55,7 @@ public class 
PassThroughStreamGroupWindowAggregatePythonFunctionRunner
             TypeSerializer keySerializer,
             PassThroughPythonStreamGroupWindowAggregateOperator operator) {
         super(
+                environment,
                 taskName,
                 environmentManager,
                 functionUrn,
diff --git 
a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughStreamTableAggregatePythonFunctionRunner.java
 
b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughStreamTableAggregatePythonFunctionRunner.java
index d962463d680..d6e2986cd3d 100644
--- 
a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughStreamTableAggregatePythonFunctionRunner.java
+++ 
b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughStreamTableAggregatePythonFunctionRunner.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.fnexecution.v1.FlinkFnApi;
 import org.apache.flink.python.env.process.ProcessPythonEnvironmentManager;
 import org.apache.flink.python.metric.process.FlinkMetricContainer;
+import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.state.KeyedStateBackend;
 import 
org.apache.flink.table.runtime.runners.python.beam.BeamTablePythonFunctionRunner;
 import org.apache.flink.table.types.logical.RowType;
@@ -52,6 +53,7 @@ public class 
PassThroughStreamTableAggregatePythonFunctionRunner
     private final Function<byte[], byte[][]> processFunction;
 
     public PassThroughStreamTableAggregatePythonFunctionRunner(
+            Environment environment,
             String taskName,
             ProcessPythonEnvironmentManager environmentManager,
             RowType inputType,
@@ -63,6 +65,7 @@ public class 
PassThroughStreamTableAggregatePythonFunctionRunner
             TypeSerializer keySerializer,
             Function<byte[], byte[][]> processFunction) {
         super(
+                environment,
                 taskName,
                 environmentManager,
                 functionUrn,

Reply via email to