This is an automated email from the ASF dual-hosted git repository.
dianfu pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.15 by this push:
new 6a352bd6042 [hotfix][python] Cleanup the code to pass cachedFiles into
extractPythonConfiguration
6a352bd6042 is described below
commit 6a352bd60420aad40c4b5e116a2a085bfda8b628
Author: Dian Fu <[email protected]>
AuthorDate: Fri Apr 8 22:45:10 2022 +0800
[hotfix][python] Cleanup the code to pass cachedFiles into
extractPythonConfiguration
---
.../java/org/apache/flink/python/util/PythonConfigUtil.java | 10 +++++++---
.../table/planner/plan/nodes/exec/utils/CommonPythonUtil.java | 6 ++----
2 files changed, 9 insertions(+), 7 deletions(-)
diff --git
a/flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java
b/flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java
index 1da956fb047..3c6085b7254 100644
---
a/flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java
+++
b/flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java
@@ -17,7 +17,9 @@
package org.apache.flink.python.util;
+import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.memory.ManagedMemoryUseCase;
@@ -79,7 +81,8 @@ public class PythonConfigUtil {
public static void configPythonOperator(StreamExecutionEnvironment env)
throws IllegalAccessException, NoSuchFieldException {
- final Configuration config = extractPythonConfiguration(env,
env.getConfiguration());
+ final Configuration config =
+ extractPythonConfiguration(env.getCachedFiles(),
env.getConfiguration());
for (Transformation<?> transformation : env.getTransformations()) {
alignTransformation(transformation);
@@ -99,9 +102,10 @@ public class PythonConfigUtil {
/** Extract the configurations which is used in the Python operators. */
public static Configuration extractPythonConfiguration(
- StreamExecutionEnvironment env, ReadableConfig config) {
+ List<Tuple2<String, DistributedCache.DistributedCacheEntry>>
cachedFiles,
+ ReadableConfig config) {
final Configuration pythonDependencyConfig =
-
PythonDependencyUtils.configurePythonDependencies(env.getCachedFiles(), config);
+ PythonDependencyUtils.configurePythonDependencies(cachedFiles,
config);
final PythonConfig pythonConfig = new PythonConfig(config,
pythonDependencyConfig);
return pythonConfig.toConfiguration();
}
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/utils/CommonPythonUtil.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/utils/CommonPythonUtil.java
index d9ac38b3725..a949ad2afa6 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/utils/CommonPythonUtil.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/utils/CommonPythonUtil.java
@@ -114,10 +114,8 @@ public class CommonPythonUtil {
StreamExecutionEnvironment realEnv = getRealEnvironment(env);
Method method =
clazz.getDeclaredMethod(
- "extractPythonConfiguration",
- StreamExecutionEnvironment.class,
- ReadableConfig.class);
- return (Configuration) method.invoke(null, realEnv, tableConfig);
+ "extractPythonConfiguration", List.class,
ReadableConfig.class);
+ return (Configuration) method.invoke(null,
realEnv.getCachedFiles(), tableConfig);
} catch (NoSuchFieldException
| IllegalAccessException
| NoSuchMethodException