This is an automated email from the ASF dual-hosted git repository. dianfu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new de258f3ce01 [FLINK-31503][python] Makes sure it loads Beam classes using the same class loader as the PyFlink classes de258f3ce01 is described below commit de258f3ce01e720d23bec67c20892133f89293d3 Author: Dian Fu <dia...@apache.org> AuthorDate: Fri Mar 17 23:03:11 2023 +0800 [FLINK-31503][python] Makes sure it loads Beam classes using the same class loader as the PyFlink classes This closes #22206. --- .../python/beam/BeamPythonFunctionRunner.java | 35 ++++++++++++++++------ 1 file changed, 26 insertions(+), 9 deletions(-) diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java b/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java index 85268014f90..0029db937d4 100644 --- a/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java +++ b/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java @@ -39,6 +39,7 @@ import org.apache.flink.streaming.api.operators.python.process.timer.TimerRegist import org.apache.flink.streaming.api.runners.python.beam.state.BeamStateRequestHandler; import org.apache.flink.util.Preconditions; import org.apache.flink.util.ShutdownHookUtil; +import org.apache.flink.util.TemporaryClassLoaderContext; import org.apache.flink.util.function.LongFunctionWithException; import org.apache.beam.model.fnexecution.v1.BeamFnApi; @@ -237,8 +238,14 @@ public abstract class BeamPythonFunctionRunner implements PythonFunctionRunner { // The creation of stageBundleFactory depends on the initialized environment manager. environmentManager.open(); - PortablePipelineOptions portableOptions = - PipelineOptionsFactory.as(PortablePipelineOptions.class); + PortablePipelineOptions portableOptions; + try (TemporaryClassLoaderContext ignored = + TemporaryClassLoaderContext.of(getClass().getClassLoader())) { + // It loads classes using service loader under context classloader in Beam, + // make sure the classloader used to load SPI classes is the same as the class loader of + // the current class. + portableOptions = PipelineOptionsFactory.as(PortablePipelineOptions.class); + } int stateCacheSize = config.get(PythonOptions.STATE_CACHE_SIZE); if (stateCacheSize > 0) { @@ -622,18 +629,28 @@ public abstract class BeamPythonFunctionRunner implements PythonFunctionRunner { @VisibleForTesting public JobBundleFactory createJobBundleFactory(Struct pipelineOptions) throws Exception { - return DefaultJobBundleFactory.create( - JobInfo.create( - taskName, - taskName, - environmentManager.createRetrievalToken(), - pipelineOptions)); + try (TemporaryClassLoaderContext ignored = + TemporaryClassLoaderContext.of(getClass().getClassLoader())) { + // It loads classes using service loader under context classloader in Beam, + // make sure the classloader used to load SPI classes is the same as the class + // loader of the current class. + return DefaultJobBundleFactory.create( + JobInfo.create( + taskName, + taskName, + environmentManager.createRetrievalToken(), + pipelineOptions)); + } } /** To make the error messages more user friendly, throws an exception with the boot logs. */ private StageBundleFactory createStageBundleFactory( JobBundleFactory jobBundleFactory, RunnerApi.Environment environment) throws Exception { - try { + try (TemporaryClassLoaderContext ignored = + TemporaryClassLoaderContext.of(getClass().getClassLoader())) { + // It loads classes using service loader under context classloader in Beam, + // make sure the classloader used to load SPI classes is the same as the class + // loader of the current class. return jobBundleFactory.forStage(createExecutableStage(environment)); } catch (Throwable e) { throw new RuntimeException(environmentManager.getBootLog(), e);