This is an automated email from the ASF dual-hosted git repository. dianfu pushed a commit to branch release-1.19 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.19 by this push: new 5fe0280bae1 [FLINK-37084][python] Fix TimerRegistration concurrency issue in PyFlink (#26004) 5fe0280bae1 is described below commit 5fe0280bae13203ac65b3c1534a167b72534e91b Author: Shuyi Chen <suez1...@users.noreply.github.com> AuthorDate: Fri Jan 17 15:56:54 2025 -0800 [FLINK-37084][python] Fix TimerRegistration concurrency issue in PyFlink (#26004) --- .../apache/flink/python/PythonFunctionRunner.java | 2 ++ .../python/AbstractPythonFunctionOperator.java | 8 +++++ .../AbstractExternalPythonFunctionOperator.java | 5 +++ .../process/ExternalPythonCoProcessOperator.java | 1 + .../ExternalPythonKeyedCoProcessOperator.java | 1 + .../ExternalPythonKeyedProcessOperator.java | 1 + .../process/ExternalPythonProcessOperator.java | 1 + .../process/timer/TimerRegistrationAction.java | 42 ++++++++++++++++++++++ .../beam/BeamDataStreamPythonFunctionRunner.java | 3 ++ .../python/beam/BeamPythonFunctionRunner.java | 29 ++++++++++++++- .../python/AbstractStatelessFunctionOperator.java | 1 + .../AbstractPythonStreamAggregateOperator.java | 1 + .../python/beam/BeamTablePythonFunctionRunner.java | 7 ++++ ...ghPythonStreamGroupWindowAggregateOperator.java | 1 + .../PythonStreamGroupAggregateOperatorTest.java | 1 + ...ythonStreamGroupTableAggregateOperatorTest.java | 1 + ...owPythonGroupAggregateFunctionOperatorTest.java | 1 + ...onGroupWindowAggregateFunctionOperatorTest.java | 1 + ...honOverWindowAggregateFunctionOperatorTest.java | 1 + ...onGroupWindowAggregateFunctionOperatorTest.java | 1 + ...rrowPythonProcTimeBoundedRangeOperatorTest.java | 1 + ...ArrowPythonProcTimeBoundedRowsOperatorTest.java | 1 + ...ArrowPythonRowTimeBoundedRangeOperatorTest.java | 1 + ...mArrowPythonRowTimeBoundedRowsOperatorTest.java | 1 + .../scalar/PythonScalarFunctionOperatorTest.java | 1 + .../ArrowPythonScalarFunctionOperatorTest.java | 1 + .../table/PythonTableFunctionOperatorTest.java | 1 + .../PassThroughPythonAggregateFunctionRunner.java | 3 ++ .../PassThroughPythonScalarFunctionRunner.java | 3 ++ .../PassThroughPythonTableFunctionRunner.java | 3 ++ ...ThroughStreamAggregatePythonFunctionRunner.java | 3 ++ ...amGroupWindowAggregatePythonFunctionRunner.java | 3 ++ ...ghStreamTableAggregatePythonFunctionRunner.java | 3 ++ 33 files changed, 133 insertions(+), 1 deletion(-) diff --git a/flink-python/src/main/java/org/apache/flink/python/PythonFunctionRunner.java b/flink-python/src/main/java/org/apache/flink/python/PythonFunctionRunner.java index 6ef6a70b59e..c637c0e659c 100644 --- a/flink-python/src/main/java/org/apache/flink/python/PythonFunctionRunner.java +++ b/flink-python/src/main/java/org/apache/flink/python/PythonFunctionRunner.java @@ -44,6 +44,8 @@ public interface PythonFunctionRunner extends AutoCloseable { /** Send the triggered timer to the Python function. */ void processTimer(byte[] timerData) throws Exception; + void drainUnregisteredTimers(); + /** * Retrieves the Python function result. * diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java index abd92e9eac6..37f0c4540f7 100644 --- a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java +++ b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java @@ -270,6 +270,8 @@ public abstract class AbstractPythonFunctionOperator<OUT> extends AbstractStream protected abstract PythonEnvironmentManager createPythonEnvironmentManager(); + protected void drainUnregisteredTimers() {} + /** * Advances the watermark of all managed timer services, potentially firing event time timers. * It also ensures that the fired timers are processed in the Python user-defined functions. @@ -277,10 +279,16 @@ public abstract class AbstractPythonFunctionOperator<OUT> extends AbstractStream private void advanceWatermark(Watermark watermark) throws Exception { if (getTimeServiceManager().isPresent()) { InternalTimeServiceManager<?> timeServiceManager = getTimeServiceManager().get(); + // make sure the registered timer are processed before advancing the watermark to + // ensure the timers could be triggered + drainUnregisteredTimers(); timeServiceManager.advanceWatermark(watermark); while (!isBundleFinished()) { invokeFinishBundle(); + // make sure the registered timer are processed before advancing the watermark to + // ensure the timers could be triggered + drainUnregisteredTimers(); timeServiceManager.advanceWatermark(watermark); } } diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/AbstractExternalPythonFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/AbstractExternalPythonFunctionOperator.java index 90991b57c4d..da50b5335f2 100644 --- a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/AbstractExternalPythonFunctionOperator.java +++ b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/AbstractExternalPythonFunctionOperator.java @@ -136,6 +136,11 @@ public abstract class AbstractExternalPythonFunctionOperator<OUT> } } + @Override + protected void drainUnregisteredTimers() { + pythonFunctionRunner.drainUnregisteredTimers(); + } + protected void emitResults() throws Exception { Tuple3<String, byte[], Integer> resultTuple; while ((resultTuple = pythonFunctionRunner.pollResult()) != null && resultTuple.f2 != 0) { diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/ExternalPythonCoProcessOperator.java b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/ExternalPythonCoProcessOperator.java index 05b2166cf62..71193ceed7c 100644 --- a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/ExternalPythonCoProcessOperator.java +++ b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/ExternalPythonCoProcessOperator.java @@ -72,6 +72,7 @@ public class ExternalPythonCoProcessOperator<IN1, IN2, OUT> @Override public PythonFunctionRunner createPythonFunctionRunner() throws Exception { return new BeamDataStreamPythonFunctionRunner( + getContainingTask().getEnvironment(), getRuntimeContext().getTaskInfo().getTaskName(), createPythonEnvironmentManager(), STATELESS_FUNCTION_URN, diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/ExternalPythonKeyedCoProcessOperator.java b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/ExternalPythonKeyedCoProcessOperator.java index 6ce86e5a408..dc13aafae19 100644 --- a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/ExternalPythonKeyedCoProcessOperator.java +++ b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/ExternalPythonKeyedCoProcessOperator.java @@ -113,6 +113,7 @@ public class ExternalPythonKeyedCoProcessOperator<OUT> @Override public PythonFunctionRunner createPythonFunctionRunner() throws Exception { return new BeamDataStreamPythonFunctionRunner( + getContainingTask().getEnvironment(), getRuntimeContext().getTaskInfo().getTaskName(), createPythonEnvironmentManager(), STATEFUL_FUNCTION_URN, diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/ExternalPythonKeyedProcessOperator.java b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/ExternalPythonKeyedProcessOperator.java index 2f2a785334a..e92301b6bf3 100644 --- a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/ExternalPythonKeyedProcessOperator.java +++ b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/ExternalPythonKeyedProcessOperator.java @@ -139,6 +139,7 @@ public class ExternalPythonKeyedProcessOperator<OUT> @Override public PythonFunctionRunner createPythonFunctionRunner() throws Exception { return new BeamDataStreamPythonFunctionRunner( + getContainingTask().getEnvironment(), getRuntimeContext().getTaskInfo().getTaskName(), createPythonEnvironmentManager(), STATEFUL_FUNCTION_URN, diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/ExternalPythonProcessOperator.java b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/ExternalPythonProcessOperator.java index 863edf7357b..07dc668a352 100644 --- a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/ExternalPythonProcessOperator.java +++ b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/ExternalPythonProcessOperator.java @@ -68,6 +68,7 @@ public class ExternalPythonProcessOperator<IN, OUT> @Override public PythonFunctionRunner createPythonFunctionRunner() throws Exception { return new BeamDataStreamPythonFunctionRunner( + getContainingTask().getEnvironment(), getRuntimeContext().getTaskInfo().getTaskName(), createPythonEnvironmentManager(), STATELESS_FUNCTION_URN, diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/timer/TimerRegistrationAction.java b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/timer/TimerRegistrationAction.java new file mode 100644 index 00000000000..8a11779b61f --- /dev/null +++ b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/timer/TimerRegistrationAction.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.python.process.timer; + +public class TimerRegistrationAction { + + private final TimerRegistration timerRegistration; + + private final byte[] serializedTimerData; + + private boolean isRegistered; + + public TimerRegistrationAction( + TimerRegistration timerRegistration, byte[] serializedTimerData) { + this.timerRegistration = timerRegistration; + this.serializedTimerData = serializedTimerData; + this.isRegistered = false; + } + + public void run() { + if (!isRegistered) { + timerRegistration.setTimer(serializedTimerData); + isRegistered = true; + } + } +} diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamDataStreamPythonFunctionRunner.java b/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamDataStreamPythonFunctionRunner.java index 71bd715c470..823e91ccfc5 100644 --- a/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamDataStreamPythonFunctionRunner.java +++ b/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamDataStreamPythonFunctionRunner.java @@ -24,6 +24,7 @@ import org.apache.flink.fnexecution.v1.FlinkFnApi; import org.apache.flink.python.env.process.ProcessPythonEnvironmentManager; import org.apache.flink.python.metric.process.FlinkMetricContainer; import org.apache.flink.python.util.ProtoUtils; +import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.state.KeyedStateBackend; import org.apache.flink.runtime.state.OperatorStateBackend; @@ -69,6 +70,7 @@ public class BeamDataStreamPythonFunctionRunner extends BeamPythonFunctionRunner private final List<FlinkFnApi.UserDefinedDataStreamFunction> userDefinedDataStreamFunctions; public BeamDataStreamPythonFunctionRunner( + Environment environment, String taskName, ProcessPythonEnvironmentManager environmentManager, String headOperatorFunctionUrn, @@ -86,6 +88,7 @@ public class BeamDataStreamPythonFunctionRunner extends BeamPythonFunctionRunner @Nullable FlinkFnApi.CoderInfoDescriptor timerCoderDescriptor, Map<String, FlinkFnApi.CoderInfoDescriptor> sideOutputCoderDescriptors) { super( + environment, taskName, environmentManager, flinkMetricContainer, diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java b/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java index 0029db937d4..4e52378f7a6 100644 --- a/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java +++ b/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java @@ -31,11 +31,13 @@ import org.apache.flink.python.env.PythonEnvironment; import org.apache.flink.python.env.process.ProcessPythonEnvironment; import org.apache.flink.python.env.process.ProcessPythonEnvironmentManager; import org.apache.flink.python.metric.process.FlinkMetricContainer; +import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.memory.OpaqueMemoryResource; import org.apache.flink.runtime.state.KeyedStateBackend; import org.apache.flink.runtime.state.OperatorStateBackend; import org.apache.flink.streaming.api.operators.python.process.timer.TimerRegistration; +import org.apache.flink.streaming.api.operators.python.process.timer.TimerRegistrationAction; import org.apache.flink.streaming.api.runners.python.beam.state.BeamStateRequestHandler; import org.apache.flink.util.Preconditions; import org.apache.flink.util.ShutdownHookUtil; @@ -85,6 +87,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Optional; @@ -190,7 +193,12 @@ public abstract class BeamPythonFunctionRunner implements PythonFunctionRunner { private transient Thread shutdownHook; + private transient Environment environment; + + private transient List<TimerRegistrationAction> unregisteredTimers; + public BeamPythonFunctionRunner( + Environment environment, String taskName, ProcessPythonEnvironmentManager environmentManager, @Nullable FlinkMetricContainer flinkMetricContainer, @@ -204,6 +212,7 @@ public abstract class BeamPythonFunctionRunner implements PythonFunctionRunner { FlinkFnApi.CoderInfoDescriptor inputCoderDescriptor, FlinkFnApi.CoderInfoDescriptor outputCoderDescriptor, Map<String, FlinkFnApi.CoderInfoDescriptor> sideOutputCoderDescriptors) { + this.environment = environment; this.taskName = Preconditions.checkNotNull(taskName); this.environmentManager = Preconditions.checkNotNull(environmentManager); this.flinkMetricContainer = flinkMetricContainer; @@ -301,6 +310,8 @@ public abstract class BeamPythonFunctionRunner implements PythonFunctionRunner { shutdownHook = ShutdownHookUtil.addShutdownHook( this, BeamPythonFunctionRunner.class.getSimpleName(), LOG); + + unregisteredTimers = new LinkedList<>(); } @Override @@ -339,6 +350,14 @@ public abstract class BeamPythonFunctionRunner implements PythonFunctionRunner { mainInputReceiver.accept(WindowedValue.valueInGlobalWindow(data)); } + @Override + public void drainUnregisteredTimers() { + for (TimerRegistrationAction timerRegistrationAction : unregisteredTimers) { + timerRegistrationAction.run(); + } + unregisteredTimers.clear(); + } + @Override public void processTimer(byte[] timerData) throws Exception { if (timerInputReceiver == null) { @@ -681,7 +700,15 @@ public abstract class BeamPythonFunctionRunner implements PythonFunctionRunner { private TimerReceiverFactory createTimerReceiverFactory() { BiConsumer<Timer<?>, TimerInternals.TimerData> timerDataConsumer = - (timer, timerData) -> timerRegistration.setTimer((byte[]) timer.getUserKey()); + (timer, timerData) -> { + TimerRegistrationAction timerRegistrationAction = + new TimerRegistrationAction( + timerRegistration, (byte[]) timer.getUserKey()); + unregisteredTimers.add(timerRegistrationAction); + environment + .getMainMailboxExecutor() + .execute(timerRegistrationAction::run, "PythonTimerRegistration"); + }; return new TimerReceiverFactory(stageBundleFactory, timerDataConsumer, null); } diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/AbstractStatelessFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/AbstractStatelessFunctionOperator.java index 4b5c95701a9..9b53ca7550b 100644 --- a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/AbstractStatelessFunctionOperator.java +++ b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/AbstractStatelessFunctionOperator.java @@ -105,6 +105,7 @@ public abstract class AbstractStatelessFunctionOperator<IN, OUT, UDFIN> @Override public PythonFunctionRunner createPythonFunctionRunner() throws IOException { return BeamTablePythonFunctionRunner.stateless( + getContainingTask().getEnvironment(), getRuntimeContext().getTaskInfo().getTaskName(), createPythonEnvironmentManager(), getFunctionUrn(), diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/AbstractPythonStreamAggregateOperator.java b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/AbstractPythonStreamAggregateOperator.java index 5ee074cbfaf..64381c06582 100644 --- a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/AbstractPythonStreamAggregateOperator.java +++ b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/AbstractPythonStreamAggregateOperator.java @@ -172,6 +172,7 @@ public abstract class AbstractPythonStreamAggregateOperator @Override public PythonFunctionRunner createPythonFunctionRunner() throws Exception { return BeamTablePythonFunctionRunner.stateful( + getContainingTask().getEnvironment(), getRuntimeContext().getTaskInfo().getTaskName(), createPythonEnvironmentManager(), getFunctionUrn(), diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/beam/BeamTablePythonFunctionRunner.java b/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/beam/BeamTablePythonFunctionRunner.java index 60158397e62..50a2e16111c 100644 --- a/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/beam/BeamTablePythonFunctionRunner.java +++ b/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/beam/BeamTablePythonFunctionRunner.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.fnexecution.v1.FlinkFnApi; import org.apache.flink.python.env.process.ProcessPythonEnvironmentManager; import org.apache.flink.python.metric.process.FlinkMetricContainer; +import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.state.KeyedStateBackend; import org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner; @@ -52,6 +53,7 @@ public class BeamTablePythonFunctionRunner extends BeamPythonFunctionRunner { private final GeneratedMessageV3 userDefinedFunctionProto; public BeamTablePythonFunctionRunner( + Environment environment, String taskName, ProcessPythonEnvironmentManager environmentManager, String functionUrn, @@ -65,6 +67,7 @@ public class BeamTablePythonFunctionRunner extends BeamPythonFunctionRunner { FlinkFnApi.CoderInfoDescriptor inputCoderDescriptor, FlinkFnApi.CoderInfoDescriptor outputCoderDescriptor) { super( + environment, taskName, environmentManager, flinkMetricContainer, @@ -117,6 +120,7 @@ public class BeamTablePythonFunctionRunner extends BeamPythonFunctionRunner { } public static BeamTablePythonFunctionRunner stateless( + Environment environment, String taskName, ProcessPythonEnvironmentManager environmentManager, String functionUrn, @@ -127,6 +131,7 @@ public class BeamTablePythonFunctionRunner extends BeamPythonFunctionRunner { FlinkFnApi.CoderInfoDescriptor inputCoderDescriptor, FlinkFnApi.CoderInfoDescriptor outputCoderDescriptor) { return new BeamTablePythonFunctionRunner( + environment, taskName, environmentManager, functionUrn, @@ -142,6 +147,7 @@ public class BeamTablePythonFunctionRunner extends BeamPythonFunctionRunner { } public static BeamTablePythonFunctionRunner stateful( + Environment environment, String taskName, ProcessPythonEnvironmentManager environmentManager, String functionUrn, @@ -155,6 +161,7 @@ public class BeamTablePythonFunctionRunner extends BeamPythonFunctionRunner { FlinkFnApi.CoderInfoDescriptor inputCoderDescriptor, FlinkFnApi.CoderInfoDescriptor outputCoderDescriptor) { return new BeamTablePythonFunctionRunner( + environment, taskName, environmentManager, functionUrn, diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PassThroughPythonStreamGroupWindowAggregateOperator.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PassThroughPythonStreamGroupWindowAggregateOperator.java index 5eee5f69581..0f0df0691a3 100644 --- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PassThroughPythonStreamGroupWindowAggregateOperator.java +++ b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PassThroughPythonStreamGroupWindowAggregateOperator.java @@ -196,6 +196,7 @@ public class PassThroughPythonStreamGroupWindowAggregateOperator<K> @Override public PythonFunctionRunner createPythonFunctionRunner() throws Exception { return new PassThroughStreamGroupWindowAggregatePythonFunctionRunner( + getContainingTask().getEnvironment(), getRuntimeContext().getTaskInfo().getTaskName(), PythonTestUtils.createTestProcessEnvironmentManager(), userDefinedFunctionInputType, diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupAggregateOperatorTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupAggregateOperatorTest.java index 8924e4e2a92..f3a8c51f894 100644 --- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupAggregateOperatorTest.java +++ b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupAggregateOperatorTest.java @@ -254,6 +254,7 @@ class PythonStreamGroupAggregateOperatorTest extends AbstractPythonStreamAggrega @Override public PythonFunctionRunner createPythonFunctionRunner() { return new PassThroughStreamAggregatePythonFunctionRunner( + getContainingTask().getEnvironment(), getRuntimeContext().getTaskInfo().getTaskName(), PythonTestUtils.createTestProcessEnvironmentManager(), userDefinedFunctionInputType, diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupTableAggregateOperatorTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupTableAggregateOperatorTest.java index b5e688fedb1..198f5b23eff 100644 --- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupTableAggregateOperatorTest.java +++ b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupTableAggregateOperatorTest.java @@ -268,6 +268,7 @@ class PythonStreamGroupTableAggregateOperatorTest @Override public PythonFunctionRunner createPythonFunctionRunner() { return new PassThroughStreamTableAggregatePythonFunctionRunner( + getContainingTask().getEnvironment(), getRuntimeContext().getTaskInfo().getTaskName(), PythonTestUtils.createTestProcessEnvironmentManager(), userDefinedFunctionInputType, diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonGroupAggregateFunctionOperatorTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonGroupAggregateFunctionOperatorTest.java index 8ec9f5f750f..a1da0c17785 100644 --- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonGroupAggregateFunctionOperatorTest.java +++ b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonGroupAggregateFunctionOperatorTest.java @@ -236,6 +236,7 @@ class BatchArrowPythonGroupAggregateFunctionOperatorTest @Override public PythonFunctionRunner createPythonFunctionRunner() { return new PassThroughPythonAggregateFunctionRunner( + getContainingTask().getEnvironment(), getRuntimeContext().getTaskInfo().getTaskName(), PythonTestUtils.createTestProcessEnvironmentManager(), udfInputType, diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonGroupWindowAggregateFunctionOperatorTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonGroupWindowAggregateFunctionOperatorTest.java index 571aee90e11..3f8ad32a98f 100644 --- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonGroupWindowAggregateFunctionOperatorTest.java +++ b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonGroupWindowAggregateFunctionOperatorTest.java @@ -405,6 +405,7 @@ class BatchArrowPythonGroupWindowAggregateFunctionOperatorTest @Override public PythonFunctionRunner createPythonFunctionRunner() { return new PassThroughPythonAggregateFunctionRunner( + getContainingTask().getEnvironment(), getRuntimeContext().getTaskInfo().getTaskName(), PythonTestUtils.createTestProcessEnvironmentManager(), udfInputType, diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonOverWindowAggregateFunctionOperatorTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonOverWindowAggregateFunctionOperatorTest.java index 1c383016499..fc0b1055982 100644 --- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonOverWindowAggregateFunctionOperatorTest.java +++ b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonOverWindowAggregateFunctionOperatorTest.java @@ -306,6 +306,7 @@ class BatchArrowPythonOverWindowAggregateFunctionOperatorTest @Override public PythonFunctionRunner createPythonFunctionRunner() { return new PassThroughPythonAggregateFunctionRunner( + getContainingTask().getEnvironment(), getRuntimeContext().getTaskInfo().getTaskName(), PythonTestUtils.createTestProcessEnvironmentManager(), udfInputType, diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonGroupWindowAggregateFunctionOperatorTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonGroupWindowAggregateFunctionOperatorTest.java index b9da904e4b8..1e85d81e444 100644 --- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonGroupWindowAggregateFunctionOperatorTest.java +++ b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonGroupWindowAggregateFunctionOperatorTest.java @@ -433,6 +433,7 @@ class StreamArrowPythonGroupWindowAggregateFunctionOperatorTest @Override public PythonFunctionRunner createPythonFunctionRunner() { return new PassThroughPythonAggregateFunctionRunner( + getContainingTask().getEnvironment(), getRuntimeContext().getTaskInfo().getTaskName(), PythonTestUtils.createTestProcessEnvironmentManager(), udfInputType, diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonProcTimeBoundedRangeOperatorTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonProcTimeBoundedRangeOperatorTest.java index a3f6ac85f77..079848d3135 100644 --- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonProcTimeBoundedRangeOperatorTest.java +++ b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonProcTimeBoundedRangeOperatorTest.java @@ -162,6 +162,7 @@ class StreamArrowPythonProcTimeBoundedRangeOperatorTest @Override public PythonFunctionRunner createPythonFunctionRunner() { return new PassThroughPythonAggregateFunctionRunner( + getContainingTask().getEnvironment(), getRuntimeContext().getTaskInfo().getTaskName(), PythonTestUtils.createTestProcessEnvironmentManager(), udfInputType, diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonProcTimeBoundedRowsOperatorTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonProcTimeBoundedRowsOperatorTest.java index 0fa82b65ee8..c468e1bd471 100644 --- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonProcTimeBoundedRowsOperatorTest.java +++ b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonProcTimeBoundedRowsOperatorTest.java @@ -163,6 +163,7 @@ class StreamArrowPythonProcTimeBoundedRowsOperatorTest @Override public PythonFunctionRunner createPythonFunctionRunner() { return new PassThroughPythonAggregateFunctionRunner( + getContainingTask().getEnvironment(), getRuntimeContext().getTaskInfo().getTaskName(), PythonTestUtils.createTestProcessEnvironmentManager(), udfInputType, diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRangeOperatorTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRangeOperatorTest.java index ccf8bea34a6..52d2f77f77a 100644 --- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRangeOperatorTest.java +++ b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRangeOperatorTest.java @@ -281,6 +281,7 @@ class StreamArrowPythonRowTimeBoundedRangeOperatorTest @Override public PythonFunctionRunner createPythonFunctionRunner() { return new PassThroughPythonAggregateFunctionRunner( + getContainingTask().getEnvironment(), getRuntimeContext().getTaskInfo().getTaskName(), PythonTestUtils.createTestProcessEnvironmentManager(), udfInputType, diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRowsOperatorTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRowsOperatorTest.java index e644596162b..84c421ee4ec 100644 --- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRowsOperatorTest.java +++ b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRowsOperatorTest.java @@ -247,6 +247,7 @@ class StreamArrowPythonRowTimeBoundedRowsOperatorTest @Override public PythonFunctionRunner createPythonFunctionRunner() { return new PassThroughPythonAggregateFunctionRunner( + getContainingTask().getEnvironment(), getRuntimeContext().getTaskInfo().getTaskName(), PythonTestUtils.createTestProcessEnvironmentManager(), udfInputType, diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/PythonScalarFunctionOperatorTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/PythonScalarFunctionOperatorTest.java index 5e22a51d31c..2bb218b208b 100644 --- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/PythonScalarFunctionOperatorTest.java +++ b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/PythonScalarFunctionOperatorTest.java @@ -148,6 +148,7 @@ public class PythonScalarFunctionOperatorTest @Override public PythonFunctionRunner createPythonFunctionRunner() throws IOException { return new PassThroughPythonScalarFunctionRunner( + getContainingTask().getEnvironment(), getRuntimeContext().getTaskInfo().getTaskName(), PythonTestUtils.createTestProcessEnvironmentManager(), udfInputType, diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/ArrowPythonScalarFunctionOperatorTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/ArrowPythonScalarFunctionOperatorTest.java index 0f926a342f9..5aa7dd76e65 100644 --- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/ArrowPythonScalarFunctionOperatorTest.java +++ b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/ArrowPythonScalarFunctionOperatorTest.java @@ -147,6 +147,7 @@ public class ArrowPythonScalarFunctionOperatorTest @Override public PythonFunctionRunner createPythonFunctionRunner() throws IOException { return new PassThroughPythonScalarFunctionRunner( + getContainingTask().getEnvironment(), getRuntimeContext().getTaskInfo().getTaskName(), PythonTestUtils.createTestProcessEnvironmentManager(), udfInputType, diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperatorTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperatorTest.java index a9b30714dd6..6f26bd82a3f 100644 --- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperatorTest.java +++ b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperatorTest.java @@ -124,6 +124,7 @@ public class PythonTableFunctionOperatorTest @Override public PythonFunctionRunner createPythonFunctionRunner() { return new PassThroughPythonTableFunctionRunner( + getContainingTask().getEnvironment(), getRuntimeContext().getTaskInfo().getTaskName(), PythonTestUtils.createTestProcessEnvironmentManager(), udfInputType, diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonAggregateFunctionRunner.java b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonAggregateFunctionRunner.java index 6eb8202a31e..af8c70a64e0 100644 --- a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonAggregateFunctionRunner.java +++ b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonAggregateFunctionRunner.java @@ -27,6 +27,7 @@ import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.fnexecution.v1.FlinkFnApi; import org.apache.flink.python.env.process.ProcessPythonEnvironmentManager; import org.apache.flink.python.metric.process.FlinkMetricContainer; +import org.apache.flink.runtime.execution.Environment; import org.apache.flink.table.data.RowData; import org.apache.flink.table.runtime.arrow.serializers.ArrowSerializer; import org.apache.flink.table.runtime.runners.python.beam.BeamTablePythonFunctionRunner; @@ -68,6 +69,7 @@ public class PassThroughPythonAggregateFunctionRunner extends BeamTablePythonFun private transient ByteArrayOutputStreamWithPos baos; public PassThroughPythonAggregateFunctionRunner( + Environment environment, String taskName, ProcessPythonEnvironmentManager environmentManager, RowType inputType, @@ -77,6 +79,7 @@ public class PassThroughPythonAggregateFunctionRunner extends BeamTablePythonFun FlinkMetricContainer flinkMetricContainer, boolean isBatchOverWindow) { super( + environment, taskName, environmentManager, functionUrn, diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonScalarFunctionRunner.java b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonScalarFunctionRunner.java index e84ee4d1d65..721fb9f2e4c 100644 --- a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonScalarFunctionRunner.java +++ b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonScalarFunctionRunner.java @@ -22,6 +22,7 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.fnexecution.v1.FlinkFnApi; import org.apache.flink.python.env.process.ProcessPythonEnvironmentManager; import org.apache.flink.python.metric.process.FlinkMetricContainer; +import org.apache.flink.runtime.execution.Environment; import org.apache.flink.table.runtime.runners.python.beam.BeamTablePythonFunctionRunner; import org.apache.flink.table.types.logical.RowType; @@ -44,6 +45,7 @@ public class PassThroughPythonScalarFunctionRunner extends BeamTablePythonFuncti private final List<byte[]> buffer; public PassThroughPythonScalarFunctionRunner( + Environment environment, String taskName, ProcessPythonEnvironmentManager environmentManager, RowType inputType, @@ -52,6 +54,7 @@ public class PassThroughPythonScalarFunctionRunner extends BeamTablePythonFuncti FlinkFnApi.UserDefinedFunctions userDefinedFunctions, FlinkMetricContainer flinkMetricContainer) { super( + environment, taskName, environmentManager, functionUrn, diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonTableFunctionRunner.java b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonTableFunctionRunner.java index b94e765e413..0c8f41a6423 100644 --- a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonTableFunctionRunner.java +++ b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonTableFunctionRunner.java @@ -22,6 +22,7 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.fnexecution.v1.FlinkFnApi; import org.apache.flink.python.env.process.ProcessPythonEnvironmentManager; import org.apache.flink.python.metric.process.FlinkMetricContainer; +import org.apache.flink.runtime.execution.Environment; import org.apache.flink.table.runtime.runners.python.beam.BeamTablePythonFunctionRunner; import org.apache.flink.table.types.logical.RowType; @@ -46,6 +47,7 @@ public class PassThroughPythonTableFunctionRunner extends BeamTablePythonFunctio private final List<byte[]> buffer; public PassThroughPythonTableFunctionRunner( + Environment environment, String taskName, ProcessPythonEnvironmentManager environmentManager, RowType inputType, @@ -54,6 +56,7 @@ public class PassThroughPythonTableFunctionRunner extends BeamTablePythonFunctio FlinkFnApi.UserDefinedFunctions userDefinedFunctions, FlinkMetricContainer flinkMetricContainer) { super( + environment, taskName, environmentManager, functionUrn, diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughStreamAggregatePythonFunctionRunner.java b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughStreamAggregatePythonFunctionRunner.java index 7d970e2a529..6455cc52ef4 100644 --- a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughStreamAggregatePythonFunctionRunner.java +++ b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughStreamAggregatePythonFunctionRunner.java @@ -23,6 +23,7 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.fnexecution.v1.FlinkFnApi; import org.apache.flink.python.env.process.ProcessPythonEnvironmentManager; import org.apache.flink.python.metric.process.FlinkMetricContainer; +import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.state.KeyedStateBackend; import org.apache.flink.table.runtime.runners.python.beam.BeamTablePythonFunctionRunner; import org.apache.flink.table.types.logical.RowType; @@ -50,6 +51,7 @@ public class PassThroughStreamAggregatePythonFunctionRunner extends BeamTablePyt private final Function<byte[], byte[]> processFunction; public PassThroughStreamAggregatePythonFunctionRunner( + Environment environment, String taskName, ProcessPythonEnvironmentManager environmentManager, RowType inputType, @@ -61,6 +63,7 @@ public class PassThroughStreamAggregatePythonFunctionRunner extends BeamTablePyt TypeSerializer keySerializer, Function<byte[], byte[]> processFunction) { super( + environment, taskName, environmentManager, functionUrn, diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughStreamGroupWindowAggregatePythonFunctionRunner.java b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughStreamGroupWindowAggregatePythonFunctionRunner.java index 48c56cd434d..b41676141d7 100644 --- a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughStreamGroupWindowAggregatePythonFunctionRunner.java +++ b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughStreamGroupWindowAggregatePythonFunctionRunner.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.fnexecution.v1.FlinkFnApi; import org.apache.flink.python.env.process.ProcessPythonEnvironmentManager; import org.apache.flink.python.metric.process.FlinkMetricContainer; +import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.state.KeyedStateBackend; import org.apache.flink.table.runtime.operators.python.aggregate.PassThroughPythonStreamGroupWindowAggregateOperator; import org.apache.flink.table.runtime.runners.python.beam.BeamTablePythonFunctionRunner; @@ -42,6 +43,7 @@ public class PassThroughStreamGroupWindowAggregatePythonFunctionRunner private final PassThroughPythonStreamGroupWindowAggregateOperator operator; public PassThroughStreamGroupWindowAggregatePythonFunctionRunner( + Environment environment, String taskName, ProcessPythonEnvironmentManager environmentManager, RowType inputType, @@ -53,6 +55,7 @@ public class PassThroughStreamGroupWindowAggregatePythonFunctionRunner TypeSerializer keySerializer, PassThroughPythonStreamGroupWindowAggregateOperator operator) { super( + environment, taskName, environmentManager, functionUrn, diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughStreamTableAggregatePythonFunctionRunner.java b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughStreamTableAggregatePythonFunctionRunner.java index d962463d680..d6e2986cd3d 100644 --- a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughStreamTableAggregatePythonFunctionRunner.java +++ b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughStreamTableAggregatePythonFunctionRunner.java @@ -23,6 +23,7 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.fnexecution.v1.FlinkFnApi; import org.apache.flink.python.env.process.ProcessPythonEnvironmentManager; import org.apache.flink.python.metric.process.FlinkMetricContainer; +import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.state.KeyedStateBackend; import org.apache.flink.table.runtime.runners.python.beam.BeamTablePythonFunctionRunner; import org.apache.flink.table.types.logical.RowType; @@ -52,6 +53,7 @@ public class PassThroughStreamTableAggregatePythonFunctionRunner private final Function<byte[], byte[][]> processFunction; public PassThroughStreamTableAggregatePythonFunctionRunner( + Environment environment, String taskName, ProcessPythonEnvironmentManager environmentManager, RowType inputType, @@ -63,6 +65,7 @@ public class PassThroughStreamTableAggregatePythonFunctionRunner TypeSerializer keySerializer, Function<byte[], byte[][]> processFunction) { super( + environment, taskName, environmentManager, functionUrn,