This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 90a5fef8b9975c39c3f52046f8cadf91b51d88e1
Author: Dian Fu <[email protected]>
AuthorDate: Fri Apr 8 22:45:10 2022 +0800

    [hotfix][python] Cleanup the code to pass cachedFiles into 
extractPythonConfiguration
---
 .../java/org/apache/flink/python/util/PythonConfigUtil.java    | 10 +++++++---
 .../table/planner/plan/nodes/exec/utils/CommonPythonUtil.java  |  6 ++----
 2 files changed, 9 insertions(+), 7 deletions(-)

diff --git 
a/flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java 
b/flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java
index 1da956fb047..3c6085b7254 100644
--- 
a/flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java
+++ 
b/flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java
@@ -17,7 +17,9 @@
 
 package org.apache.flink.python.util;
 
+import org.apache.flink.api.common.cache.DistributedCache;
 import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.core.memory.ManagedMemoryUseCase;
@@ -79,7 +81,8 @@ public class PythonConfigUtil {
 
     public static void configPythonOperator(StreamExecutionEnvironment env)
             throws IllegalAccessException, NoSuchFieldException {
-        final Configuration config = extractPythonConfiguration(env, 
env.getConfiguration());
+        final Configuration config =
+                extractPythonConfiguration(env.getCachedFiles(), 
env.getConfiguration());
 
         for (Transformation<?> transformation : env.getTransformations()) {
             alignTransformation(transformation);
@@ -99,9 +102,10 @@ public class PythonConfigUtil {
 
     /** Extract the configurations which is used in the Python operators. */
     public static Configuration extractPythonConfiguration(
-            StreamExecutionEnvironment env, ReadableConfig config) {
+            List<Tuple2<String, DistributedCache.DistributedCacheEntry>> 
cachedFiles,
+            ReadableConfig config) {
         final Configuration pythonDependencyConfig =
-                
PythonDependencyUtils.configurePythonDependencies(env.getCachedFiles(), config);
+                PythonDependencyUtils.configurePythonDependencies(cachedFiles, 
config);
         final PythonConfig pythonConfig = new PythonConfig(config, 
pythonDependencyConfig);
         return pythonConfig.toConfiguration();
     }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/utils/CommonPythonUtil.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/utils/CommonPythonUtil.java
index d9ac38b3725..a949ad2afa6 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/utils/CommonPythonUtil.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/utils/CommonPythonUtil.java
@@ -114,10 +114,8 @@ public class CommonPythonUtil {
             StreamExecutionEnvironment realEnv = getRealEnvironment(env);
             Method method =
                     clazz.getDeclaredMethod(
-                            "extractPythonConfiguration",
-                            StreamExecutionEnvironment.class,
-                            ReadableConfig.class);
-            return (Configuration) method.invoke(null, realEnv, tableConfig);
+                            "extractPythonConfiguration", List.class, 
ReadableConfig.class);
+            return (Configuration) method.invoke(null, 
realEnv.getCachedFiles(), tableConfig);
         } catch (NoSuchFieldException
                 | IllegalAccessException
                 | NoSuchMethodException

Reply via email to