This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new de258f3ce01 [FLINK-31503][python] Makes sure it loads Beam classes 
using the same class loader as the PyFlink classes
de258f3ce01 is described below

commit de258f3ce01e720d23bec67c20892133f89293d3
Author: Dian Fu <dia...@apache.org>
AuthorDate: Fri Mar 17 23:03:11 2023 +0800

    [FLINK-31503][python] Makes sure it loads Beam classes using the same class 
loader as the PyFlink classes
    
    This closes #22206.
---
 .../python/beam/BeamPythonFunctionRunner.java      | 35 ++++++++++++++++------
 1 file changed, 26 insertions(+), 9 deletions(-)

diff --git 
a/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java
 
b/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java
index 85268014f90..0029db937d4 100644
--- 
a/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java
+++ 
b/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java
@@ -39,6 +39,7 @@ import 
org.apache.flink.streaming.api.operators.python.process.timer.TimerRegist
 import 
org.apache.flink.streaming.api.runners.python.beam.state.BeamStateRequestHandler;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.ShutdownHookUtil;
+import org.apache.flink.util.TemporaryClassLoaderContext;
 import org.apache.flink.util.function.LongFunctionWithException;
 
 import org.apache.beam.model.fnexecution.v1.BeamFnApi;
@@ -237,8 +238,14 @@ public abstract class BeamPythonFunctionRunner implements 
PythonFunctionRunner {
         // The creation of stageBundleFactory depends on the initialized 
environment manager.
         environmentManager.open();
 
-        PortablePipelineOptions portableOptions =
-                PipelineOptionsFactory.as(PortablePipelineOptions.class);
+        PortablePipelineOptions portableOptions;
+        try (TemporaryClassLoaderContext ignored =
+                TemporaryClassLoaderContext.of(getClass().getClassLoader())) {
+            // It loads classes using service loader under context classloader 
in Beam,
+            // make sure the classloader used to load SPI classes is the same 
as the class loader of
+            // the current class.
+            portableOptions = 
PipelineOptionsFactory.as(PortablePipelineOptions.class);
+        }
 
         int stateCacheSize = config.get(PythonOptions.STATE_CACHE_SIZE);
         if (stateCacheSize > 0) {
@@ -622,18 +629,28 @@ public abstract class BeamPythonFunctionRunner implements 
PythonFunctionRunner {
 
     @VisibleForTesting
     public JobBundleFactory createJobBundleFactory(Struct pipelineOptions) 
throws Exception {
-        return DefaultJobBundleFactory.create(
-                JobInfo.create(
-                        taskName,
-                        taskName,
-                        environmentManager.createRetrievalToken(),
-                        pipelineOptions));
+        try (TemporaryClassLoaderContext ignored =
+                TemporaryClassLoaderContext.of(getClass().getClassLoader())) {
+            // It loads classes using service loader under context classloader 
in Beam,
+            // make sure the classloader used to load SPI classes is the same 
as the class
+            // loader of the current class.
+            return DefaultJobBundleFactory.create(
+                    JobInfo.create(
+                            taskName,
+                            taskName,
+                            environmentManager.createRetrievalToken(),
+                            pipelineOptions));
+        }
     }
 
     /** To make the error messages more user friendly, throws an exception 
with the boot logs. */
     private StageBundleFactory createStageBundleFactory(
             JobBundleFactory jobBundleFactory, RunnerApi.Environment 
environment) throws Exception {
-        try {
+        try (TemporaryClassLoaderContext ignored =
+                TemporaryClassLoaderContext.of(getClass().getClassLoader())) {
+            // It loads classes using service loader under context classloader 
in Beam,
+            // make sure the classloader used to load SPI classes is the same 
as the class
+            // loader of the current class.
             return 
jobBundleFactory.forStage(createExecutableStage(environment));
         } catch (Throwable e) {
             throw new RuntimeException(environmentManager.getBootLog(), e);

Reply via email to