This is an automated email from the ASF dual-hosted git repository. dianfu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 90a5fef8b9975c39c3f52046f8cadf91b51d88e1 Author: Dian Fu <[email protected]> AuthorDate: Fri Apr 8 22:45:10 2022 +0800 [hotfix][python] Cleanup the code to pass cachedFiles into extractPythonConfiguration --- .../java/org/apache/flink/python/util/PythonConfigUtil.java | 10 +++++++--- .../table/planner/plan/nodes/exec/utils/CommonPythonUtil.java | 6 ++---- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java b/flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java index 1da956fb047..3c6085b7254 100644 --- a/flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java +++ b/flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java @@ -17,7 +17,9 @@ package org.apache.flink.python.util; +import org.apache.flink.api.common.cache.DistributedCache; import org.apache.flink.api.dag.Transformation; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.core.memory.ManagedMemoryUseCase; @@ -79,7 +81,8 @@ public class PythonConfigUtil { public static void configPythonOperator(StreamExecutionEnvironment env) throws IllegalAccessException, NoSuchFieldException { - final Configuration config = extractPythonConfiguration(env, env.getConfiguration()); + final Configuration config = + extractPythonConfiguration(env.getCachedFiles(), env.getConfiguration()); for (Transformation<?> transformation : env.getTransformations()) { alignTransformation(transformation); @@ -99,9 +102,10 @@ public class PythonConfigUtil { /** Extract the configurations which is used in the Python operators. */ public static Configuration extractPythonConfiguration( - StreamExecutionEnvironment env, ReadableConfig config) { + List<Tuple2<String, DistributedCache.DistributedCacheEntry>> cachedFiles, + ReadableConfig config) { final Configuration pythonDependencyConfig = - PythonDependencyUtils.configurePythonDependencies(env.getCachedFiles(), config); + PythonDependencyUtils.configurePythonDependencies(cachedFiles, config); final PythonConfig pythonConfig = new PythonConfig(config, pythonDependencyConfig); return pythonConfig.toConfiguration(); } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/utils/CommonPythonUtil.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/utils/CommonPythonUtil.java index d9ac38b3725..a949ad2afa6 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/utils/CommonPythonUtil.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/utils/CommonPythonUtil.java @@ -114,10 +114,8 @@ public class CommonPythonUtil { StreamExecutionEnvironment realEnv = getRealEnvironment(env); Method method = clazz.getDeclaredMethod( - "extractPythonConfiguration", - StreamExecutionEnvironment.class, - ReadableConfig.class); - return (Configuration) method.invoke(null, realEnv, tableConfig); + "extractPythonConfiguration", List.class, ReadableConfig.class); + return (Configuration) method.invoke(null, realEnv.getCachedFiles(), tableConfig); } catch (NoSuchFieldException | IllegalAccessException | NoSuchMethodException
