This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 0f01a0e9a68 [FLINK-35208][python] Respect pipeline.cached-files during 
handling Python dependencies
0f01a0e9a68 is described below

commit 0f01a0e9a6856781d5a0e33b26172bb913ec1928
Author: Dian Fu <dia...@apache.org>
AuthorDate: Thu Apr 25 09:57:04 2024 +0800

    [FLINK-35208][python] Respect pipeline.cached-files during handling Python 
dependencies
    
    This closes #24716.
---
 .../tests/test_stream_execution_environment.py     |  1 -
 .../apache/flink/python/util/PythonConfigUtil.java | 20 +++--
 .../flink/python/util/PythonDependencyUtils.java   | 87 +++++++++++++++-------
 .../python/util/PythonDependencyUtilsTest.java     | 39 ++++------
 .../exec/batch/BatchExecPythonGroupAggregate.java  |  2 +-
 .../batch/BatchExecPythonGroupWindowAggregate.java |  2 +-
 .../exec/batch/BatchExecPythonOverAggregate.java   |  2 +-
 .../nodes/exec/common/CommonExecPythonCalc.java    |  2 +-
 .../exec/common/CommonExecPythonCorrelate.java     |  2 +-
 .../stream/StreamExecPythonGroupAggregate.java     |  2 +-
 .../StreamExecPythonGroupTableAggregate.java       |  2 +-
 .../StreamExecPythonGroupWindowAggregate.java      |  2 +-
 .../exec/stream/StreamExecPythonOverAggregate.java |  2 +-
 .../plan/nodes/exec/utils/CommonPythonUtil.java    | 27 +------
 14 files changed, 102 insertions(+), 90 deletions(-)

diff --git 
a/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py 
b/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py
index 307611b74a9..1ac1eb95ed2 100644
--- a/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py
+++ b/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py
@@ -662,7 +662,6 @@ class StreamExecutionEnvironmentTests(PyFlinkTestCase):
         python_dependency_config = dict(
             
get_gateway().jvm.org.apache.flink.python.util.PythonDependencyUtils.
             configurePythonDependencies(
-                env._j_stream_execution_environment.getCachedFiles(),
                 
env._j_stream_execution_environment.getConfiguration()).toMap())
 
         # Make sure that user specified files and archives are correctly added.
diff --git 
a/flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java 
b/flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java
index b49b60c5486..e56ba341baf 100644
--- 
a/flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java
+++ 
b/flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java
@@ -23,8 +23,8 @@ import org.apache.flink.api.common.state.MapStateDescriptor;
 import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.dag.Transformation;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.PipelineOptions;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.core.memory.ManagedMemoryUseCase;
 import org.apache.flink.python.PythonConfig;
@@ -79,8 +79,16 @@ public class PythonConfigUtil {
     }
 
     public static void configPythonOperator(StreamExecutionEnvironment env) 
throws Exception {
-        final Configuration config =
-                extractPythonConfiguration(env.getCachedFiles(), 
env.getConfiguration());
+        final Configuration config = 
extractPythonConfiguration(env.getConfiguration());
+
+        env.getConfiguration()
+                .getOptional(PipelineOptions.CACHED_FILES)
+                .ifPresent(
+                        f -> {
+                            env.getCachedFiles().clear();
+                            env.getCachedFiles()
+                                    
.addAll(DistributedCache.parseCachedFilesFromString(f));
+                        });
 
         for (Transformation<?> transformation : env.getTransformations()) {
             alignTransformation(transformation);
@@ -102,11 +110,9 @@ public class PythonConfigUtil {
     }
 
     /** Extract the configurations which is used in the Python operators. */
-    public static Configuration extractPythonConfiguration(
-            List<Tuple2<String, DistributedCache.DistributedCacheEntry>> 
cachedFiles,
-            ReadableConfig config) {
+    public static Configuration extractPythonConfiguration(ReadableConfig 
config) {
         final Configuration pythonDependencyConfig =
-                PythonDependencyUtils.configurePythonDependencies(cachedFiles, 
config);
+                PythonDependencyUtils.configurePythonDependencies(config);
         final PythonConfig pythonConfig = new PythonConfig(config, 
pythonDependencyConfig);
         return pythonConfig.toConfiguration();
     }
diff --git 
a/flink-python/src/main/java/org/apache/flink/python/util/PythonDependencyUtils.java
 
b/flink-python/src/main/java/org/apache/flink/python/util/PythonDependencyUtils.java
index f5c56d13d8e..17f36ee4987 100644
--- 
a/flink-python/src/main/java/org/apache/flink/python/util/PythonDependencyUtils.java
+++ 
b/flink-python/src/main/java/org/apache/flink/python/util/PythonDependencyUtils.java
@@ -18,10 +18,12 @@
 package org.apache.flink.python.util;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.cache.DistributedCache;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ConfigurationUtils;
+import org.apache.flink.configuration.PipelineOptions;
 import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.configuration.WritableConfig;
 import org.apache.flink.python.PythonOptions;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StringUtils;
@@ -34,9 +36,12 @@ import java.io.File;
 import java.nio.charset.StandardCharsets;
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 import static org.apache.flink.client.cli.CliFrontendParser.PYARCHIVE_OPTION;
@@ -69,16 +74,12 @@ public class PythonDependencyUtils {
      * returns a new configuration which contains the metadata of the 
registered python
      * dependencies.
      *
-     * @param cachedFiles The list used to store registered cached files.
      * @param config The configuration which contains python dependency 
configuration.
      * @return A new configuration which contains the metadata of the 
registered python
      *     dependencies.
      */
-    public static Configuration configurePythonDependencies(
-            List<Tuple2<String, DistributedCache.DistributedCacheEntry>> 
cachedFiles,
-            ReadableConfig config) {
-        final PythonDependencyManager pythonDependencyManager =
-                new PythonDependencyManager(cachedFiles, config);
+    public static Configuration configurePythonDependencies(ReadableConfig 
config) {
+        final PythonDependencyManager pythonDependencyManager = new 
PythonDependencyManager(config);
         final Configuration pythonDependencyConfig = new Configuration();
         pythonDependencyManager.applyToConfiguration(pythonDependencyConfig);
         return pythonDependencyConfig;
@@ -157,14 +158,10 @@ public class PythonDependencyUtils {
         private static final String PYTHON_REQUIREMENTS_FILE_PREFIX = 
"python_requirements_file";
         private static final String PYTHON_REQUIREMENTS_CACHE_PREFIX = 
"python_requirements_cache";
         private static final String PYTHON_ARCHIVE_PREFIX = "python_archive";
-
-        private final List<Tuple2<String, 
DistributedCache.DistributedCacheEntry>> cachedFiles;
         private final ReadableConfig config;
 
-        private PythonDependencyManager(
-                List<Tuple2<String, DistributedCache.DistributedCacheEntry>> 
cachedFiles,
-                ReadableConfig config) {
-            this.cachedFiles = cachedFiles;
+        private PythonDependencyManager(ReadableConfig config) {
+            Preconditions.checkArgument(config instanceof WritableConfig);
             this.config = config;
         }
 
@@ -178,7 +175,7 @@ public class PythonDependencyUtils {
         private void addPythonFile(Configuration pythonDependencyConfig, 
String filePath) {
             Preconditions.checkNotNull(filePath);
             String fileKey = generateUniqueFileKey(PYTHON_FILE_PREFIX, 
filePath);
-            registerCachedFileIfNotExist(filePath, fileKey);
+            registerCachedFileIfNotExist(fileKey, filePath);
             if 
(!pythonDependencyConfig.contains(PYTHON_FILES_DISTRIBUTED_CACHE_INFO)) {
                 pythonDependencyConfig.set(
                         PYTHON_FILES_DISTRIBUTED_CACHE_INFO, new 
LinkedHashMap<>());
@@ -224,7 +221,7 @@ public class PythonDependencyUtils {
 
             String fileKey =
                     generateUniqueFileKey(PYTHON_REQUIREMENTS_FILE_PREFIX, 
requirementsFilePath);
-            registerCachedFileIfNotExist(requirementsFilePath, fileKey);
+            registerCachedFileIfNotExist(fileKey, requirementsFilePath);
             pythonDependencyConfig
                     .get(PYTHON_REQUIREMENTS_FILE_DISTRIBUTED_CACHE_INFO)
                     .put(FILE, fileKey);
@@ -233,7 +230,7 @@ public class PythonDependencyUtils {
                 String cacheDirKey =
                         generateUniqueFileKey(
                                 PYTHON_REQUIREMENTS_CACHE_PREFIX, 
requirementsCachedDir);
-                registerCachedFileIfNotExist(requirementsCachedDir, 
cacheDirKey);
+                registerCachedFileIfNotExist(cacheDirKey, 
requirementsCachedDir);
                 pythonDependencyConfig
                         .get(PYTHON_REQUIREMENTS_FILE_DISTRIBUTED_CACHE_INFO)
                         .put(CACHE, cacheDirKey);
@@ -258,7 +255,7 @@ public class PythonDependencyUtils {
             String fileKey =
                     generateUniqueFileKey(
                             PYTHON_ARCHIVE_PREFIX, archivePath + 
PARAM_DELIMITER + targetDir);
-            registerCachedFileIfNotExist(archivePath, fileKey);
+            registerCachedFileIfNotExist(fileKey, archivePath);
             pythonDependencyConfig
                     .get(PYTHON_ARCHIVES_DISTRIBUTED_CACHE_INFO)
                     .put(fileKey, targetDir);
@@ -336,20 +333,56 @@ public class PythonDependencyUtils {
                     "%s_%s", prefix, 
StringUtils.byteToHexString(messageDigest.digest()));
         }
 
-        private void registerCachedFileIfNotExist(String filePath, String 
fileKey) {
-            if (cachedFiles.stream().noneMatch(t -> t.f0.equals(fileKey))) {
-                cachedFiles.add(
-                        new Tuple2<>(
-                                fileKey,
-                                new 
DistributedCache.DistributedCacheEntry(filePath, false)));
+        private void registerCachedFileIfNotExist(String name, String path) {
+            final List<Tuple2<String, String>> cachedFilePairs =
+                    
config.getOptional(PipelineOptions.CACHED_FILES).orElse(new ArrayList<>())
+                            .stream()
+                            .map(
+                                    m ->
+                                            Tuple2.of(
+                                                    
ConfigurationUtils.parseStringToMap(m)
+                                                            .get("name"),
+                                                    m))
+                            .collect(Collectors.toList());
+
+            final Set<String> cachedFileNames =
+                    cachedFilePairs.stream()
+                            .map(f -> (String) f.getField(0))
+                            .collect(Collectors.toSet());
+            if (cachedFileNames.contains(name)) {
+                return;
             }
+
+            final List<String> cachedFiles =
+                    cachedFilePairs.stream()
+                            .map(f -> (String) f.getField(1))
+                            .collect(Collectors.toList());
+            Map<String, String> map = new HashMap<>();
+            map.put("name", name);
+            map.put("path", path);
+            cachedFiles.add(ConfigurationUtils.convertValue(map, 
String.class));
+
+            ((WritableConfig) config).set(PipelineOptions.CACHED_FILES, 
cachedFiles);
         }
 
         private void removeCachedFilesByPrefix(String prefix) {
-            cachedFiles.removeAll(
-                    cachedFiles.stream()
-                            .filter(t -> t.f0.matches("^" + prefix + 
"_[a-z0-9]{64}$"))
-                            .collect(Collectors.toSet()));
+            final List<String> cachedFiles =
+                    
config.getOptional(PipelineOptions.CACHED_FILES).orElse(new ArrayList<>())
+                            .stream()
+                            .map(m -> 
Tuple2.of(ConfigurationUtils.parseStringToMap(m), m))
+                            .filter(
+                                    t ->
+                                            t.f0.get("name") != null
+                                                    && !(t.f0.get("name")
+                                                            .matches(
+                                                                    "^"
+                                                                            + 
prefix
+                                                                            + 
"_[a-z0-9]{64}$")))
+                            .map(t -> t.f1)
+                            .collect(Collectors.toList());
+
+            ((WritableConfig) config)
+                    .set(PipelineOptions.CACHED_FILES, new 
ArrayList<>(cachedFiles));
         }
     }
 }
diff --git 
a/flink-python/src/test/java/org/apache/flink/python/util/PythonDependencyUtilsTest.java
 
b/flink-python/src/test/java/org/apache/flink/python/util/PythonDependencyUtilsTest.java
index 1b70729973b..e75c5c6c551 100644
--- 
a/flink-python/src/test/java/org/apache/flink/python/util/PythonDependencyUtilsTest.java
+++ 
b/flink-python/src/test/java/org/apache/flink/python/util/PythonDependencyUtilsTest.java
@@ -17,17 +17,15 @@
 
 package org.apache.flink.python.util;
 
-import org.apache.flink.api.common.cache.DistributedCache;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ConfigurationUtils;
+import org.apache.flink.configuration.PipelineOptions;
 import org.apache.flink.python.PythonOptions;
 
-import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.stream.Collectors;
@@ -47,20 +45,13 @@ import static org.assertj.core.api.Assertions.assertThat;
 /** Tests for PythonDependencyUtils. */
 class PythonDependencyUtilsTest {
 
-    private List<Tuple2<String, DistributedCache.DistributedCacheEntry>> 
cachedFiles;
-
-    @BeforeEach
-    void setUp() {
-        cachedFiles = new ArrayList<>();
-    }
-
     @Test
     void testPythonFiles() {
         Configuration config = new Configuration();
         config.set(
                 PythonOptions.PYTHON_FILES,
                 
"hdfs:///tmp_dir/test_file1.py,tmp_dir/test_file2.py,tmp_dir/test_dir,hdfs:///tmp_dir/test_file1.py");
-        Configuration actual = configurePythonDependencies(cachedFiles, 
config);
+        Configuration actual = configurePythonDependencies(config);
 
         Map<String, String> expectedCachedFiles = new HashMap<>();
         expectedCachedFiles.put(
@@ -72,7 +63,7 @@ class PythonDependencyUtilsTest {
         expectedCachedFiles.put(
                 
"python_file_e56bc55ff643576457b3d012b2bba888727c71cf05a958930f2263398c4e9798",
                 "tmp_dir/test_dir");
-        verifyCachedFiles(expectedCachedFiles);
+        verifyCachedFiles(expectedCachedFiles, config);
 
         Configuration expectedConfiguration = new Configuration();
         expectedConfiguration.set(PYTHON_FILES_DISTRIBUTED_CACHE_INFO, new 
HashMap<>());
@@ -98,13 +89,13 @@ class PythonDependencyUtilsTest {
     void testPythonRequirements() {
         Configuration config = new Configuration();
         config.set(PYTHON_REQUIREMENTS, "tmp_dir/requirements.txt");
-        Configuration actual = configurePythonDependencies(cachedFiles, 
config);
+        Configuration actual = configurePythonDependencies(config);
 
         Map<String, String> expectedCachedFiles = new HashMap<>();
         expectedCachedFiles.put(
                 
"python_requirements_file_69390ca43c69ada3819226fcfbb5b6d27e111132a9427e7f201edd82e9d65ff6",
                 "tmp_dir/requirements.txt");
-        verifyCachedFiles(expectedCachedFiles);
+        verifyCachedFiles(expectedCachedFiles, config);
 
         Configuration expectedConfiguration = new Configuration();
         
expectedConfiguration.set(PYTHON_REQUIREMENTS_FILE_DISTRIBUTED_CACHE_INFO, new 
HashMap<>());
@@ -116,7 +107,7 @@ class PythonDependencyUtilsTest {
         verifyConfiguration(expectedConfiguration, actual);
 
         config.set(PYTHON_REQUIREMENTS, 
"tmp_dir/requirements2.txt#tmp_dir/cache");
-        actual = configurePythonDependencies(cachedFiles, config);
+        actual = configurePythonDependencies(config);
 
         expectedCachedFiles = new HashMap<>();
         expectedCachedFiles.put(
@@ -125,7 +116,7 @@ class PythonDependencyUtilsTest {
         expectedCachedFiles.put(
                 
"python_requirements_cache_2f563dd6731c2c7c5e1ef1ef8279f61e907dc3bfc698adb71b109e43ed93e143",
                 "tmp_dir/cache");
-        verifyCachedFiles(expectedCachedFiles);
+        verifyCachedFiles(expectedCachedFiles, config);
 
         expectedConfiguration = new Configuration();
         
expectedConfiguration.set(PYTHON_REQUIREMENTS_FILE_DISTRIBUTED_CACHE_INFO, new 
HashMap<>());
@@ -152,7 +143,7 @@ class PythonDependencyUtilsTest {
                         + "tmp_dir/py37.zip,"
                         + "tmp_dir/py37.zip#venv,"
                         + "tmp_dir/py37.zip#venv2,tmp_dir/py37.zip#venv");
-        Configuration actual = configurePythonDependencies(cachedFiles, 
config);
+        Configuration actual = configurePythonDependencies(config);
 
         Map<String, String> expectedCachedFiles = new HashMap<>();
         expectedCachedFiles.put(
@@ -167,7 +158,7 @@ class PythonDependencyUtilsTest {
         expectedCachedFiles.put(
                 
"python_archive_c7d970ce1c5794367974ce8ef536c2343bed8fcfe7c2422c51548e58007eee6a",
                 "tmp_dir/py37.zip");
-        verifyCachedFiles(expectedCachedFiles);
+        verifyCachedFiles(expectedCachedFiles, config);
 
         Configuration expectedConfiguration = new Configuration();
         expectedConfiguration.set(PYTHON_ARCHIVES_DISTRIBUTED_CACHE_INFO, new 
HashMap<>());
@@ -199,7 +190,7 @@ class PythonDependencyUtilsTest {
         Configuration config = new Configuration();
         config.set(PYTHON_EXECUTABLE, "venv/bin/python3");
         config.set(PYTHON_CLIENT_EXECUTABLE, "python37");
-        Configuration actual = configurePythonDependencies(cachedFiles, 
config);
+        Configuration actual = configurePythonDependencies(config);
 
         Configuration expectedConfiguration = new Configuration();
         expectedConfiguration.set(PYTHON_EXECUTABLE, "venv/bin/python3");
@@ -240,15 +231,17 @@ class PythonDependencyUtilsTest {
                 
"venv/bin/python3/lib64/python3.7/site-packages/:venv/bin/python3/lib/python3.7/site-packages/";
         Configuration config = new Configuration();
         config.set(PythonOptions.PYTHON_PATH, pyPath);
-        Configuration actual = configurePythonDependencies(cachedFiles, 
config);
+        Configuration actual = configurePythonDependencies(config);
         Configuration expectedConfiguration = new Configuration();
         expectedConfiguration.set(PythonOptions.PYTHON_PATH, pyPath);
         verifyConfiguration(expectedConfiguration, actual);
     }
 
-    private void verifyCachedFiles(Map<String, String> expected) {
+    private void verifyCachedFiles(Map<String, String> expected, Configuration 
config) {
         Map<String, String> actual =
-                cachedFiles.stream().collect(Collectors.toMap(t -> t.f0, t -> 
t.f1.filePath));
+                config.getOptional(PipelineOptions.CACHED_FILES).orElse(new 
ArrayList<>()).stream()
+                        .map(ConfigurationUtils::parseStringToMap)
+                        .collect(Collectors.toMap(m -> m.get("name"), m -> 
m.get("path")));
 
         assertThat(actual).isEqualTo(expected);
     }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonGroupAggregate.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonGroupAggregate.java
index 9be1061d7e4..938b75c582e 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonGroupAggregate.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonGroupAggregate.java
@@ -95,7 +95,7 @@ public class BatchExecPythonGroupAggregate extends 
ExecNodeBase<RowData>
         final RowType outputRowType = 
InternalTypeInfo.of(getOutputType()).toRowType();
         Configuration pythonConfig =
                 CommonPythonUtil.extractPythonConfiguration(
-                        planner.getExecEnv(), config, 
planner.getFlinkContext().getClassLoader());
+                        planner.getTableConfig(), 
planner.getFlinkContext().getClassLoader());
         OneInputTransformation<RowData, RowData> transform =
                 createPythonOneInputTransformation(
                         inputTransform,
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonGroupWindowAggregate.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonGroupWindowAggregate.java
index 545a989509c..f01f64a1f6d 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonGroupWindowAggregate.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonGroupWindowAggregate.java
@@ -115,7 +115,7 @@ public class BatchExecPythonGroupWindowAggregate extends 
ExecNodeBase<RowData>
         final Tuple2<Long, Long> windowSizeAndSlideSize = 
WindowCodeGenerator.getWindowDef(window);
         final Configuration pythonConfig =
                 CommonPythonUtil.extractPythonConfiguration(
-                        planner.getExecEnv(), config, 
planner.getFlinkContext().getClassLoader());
+                        planner.getTableConfig(), 
planner.getFlinkContext().getClassLoader());
         int groupBufferLimitSize =
                 
pythonConfig.get(ExecutionConfigOptions.TABLE_EXEC_WINDOW_AGG_BUFFER_SIZE_LIMIT);
 
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonOverAggregate.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonOverAggregate.java
index aea09e785be..d6acd4d5e81 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonOverAggregate.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonOverAggregate.java
@@ -154,7 +154,7 @@ public class BatchExecPythonOverAggregate extends 
BatchExecOverAggregateBase {
         }
         Configuration pythonConfig =
                 CommonPythonUtil.extractPythonConfiguration(
-                        planner.getExecEnv(), config, 
planner.getFlinkContext().getClassLoader());
+                        planner.getTableConfig(), 
planner.getFlinkContext().getClassLoader());
         OneInputTransformation<RowData, RowData> transform =
                 createPythonOneInputTransformation(
                         inputTransform,
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecPythonCalc.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecPythonCalc.java
index 4cd28906f55..9c6013055ca 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecPythonCalc.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecPythonCalc.java
@@ -113,7 +113,7 @@ public abstract class CommonExecPythonCalc extends 
ExecNodeBase<RowData>
                 (Transformation<RowData>) inputEdge.translateToPlan(planner);
         final Configuration pythonConfig =
                 CommonPythonUtil.extractPythonConfiguration(
-                        planner.getExecEnv(), config, 
planner.getFlinkContext().getClassLoader());
+                        planner.getTableConfig(), 
planner.getFlinkContext().getClassLoader());
         OneInputTransformation<RowData, RowData> ret =
                 createPythonOneInputTransformation(
                         inputTransform,
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecPythonCorrelate.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecPythonCorrelate.java
index a1b9ef5aad3..52572063ab8 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecPythonCorrelate.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecPythonCorrelate.java
@@ -103,7 +103,7 @@ public abstract class CommonExecPythonCorrelate extends 
ExecNodeBase<RowData>
                 (Transformation<RowData>) inputEdge.translateToPlan(planner);
         final Configuration pythonConfig =
                 CommonPythonUtil.extractPythonConfiguration(
-                        planner.getExecEnv(), config, 
planner.getFlinkContext().getClassLoader());
+                        planner.getTableConfig(), 
planner.getFlinkContext().getClassLoader());
         final ExecNodeConfig pythonNodeConfig =
                 ExecNodeConfig.ofNodeConfig(pythonConfig, config.isCompiled());
         final OneInputTransformation<RowData, RowData> transform =
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupAggregate.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupAggregate.java
index 50b70d595ef..d3b66c4d2ff 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupAggregate.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupAggregate.java
@@ -176,7 +176,7 @@ public class StreamExecPythonGroupAggregate extends 
StreamExecAggregateBase {
         DataViewSpec[][] dataViewSpecs = aggInfosAndDataViewSpecs.f1;
         Configuration pythonConfig =
                 CommonPythonUtil.extractPythonConfiguration(
-                        planner.getExecEnv(), config, 
planner.getFlinkContext().getClassLoader());
+                        planner.getTableConfig(), 
planner.getFlinkContext().getClassLoader());
         final OneInputStreamOperator<RowData, RowData> operator =
                 getPythonAggregateFunctionOperator(
                         pythonConfig,
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupTableAggregate.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupTableAggregate.java
index e10076b789d..1f425f1d1c3 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupTableAggregate.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupTableAggregate.java
@@ -132,7 +132,7 @@ public class StreamExecPythonGroupTableAggregate extends 
ExecNodeBase<RowData>
         DataViewSpec[][] dataViewSpecs = aggInfosAndDataViewSpecs.f1;
         Configuration pythonConfig =
                 CommonPythonUtil.extractPythonConfiguration(
-                        planner.getExecEnv(), config, 
planner.getFlinkContext().getClassLoader());
+                        planner.getTableConfig(), 
planner.getFlinkContext().getClassLoader());
         OneInputStreamOperator<RowData, RowData> pythonOperator =
                 getPythonTableAggregateFunctionOperator(
                         pythonConfig,
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupWindowAggregate.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupWindowAggregate.java
index 59633e21a6c..cab831209b6 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupWindowAggregate.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupWindowAggregate.java
@@ -254,7 +254,7 @@ public class StreamExecPythonGroupWindowAggregate extends 
StreamExecAggregateBas
         Trigger<?> trigger = windowAssignerAndTrigger.f1;
         final Configuration pythonConfig =
                 CommonPythonUtil.extractPythonConfiguration(
-                        planner.getExecEnv(), config, 
planner.getFlinkContext().getClassLoader());
+                        planner.getTableConfig(), 
planner.getFlinkContext().getClassLoader());
         final ExecNodeConfig pythonNodeConfig =
                 ExecNodeConfig.ofNodeConfig(pythonConfig, config.isCompiled());
         boolean isGeneralPythonUDAF =
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonOverAggregate.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonOverAggregate.java
index a70f7877c5d..d9e9e55dad8 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonOverAggregate.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonOverAggregate.java
@@ -198,7 +198,7 @@ public class StreamExecPythonOverAggregate extends 
ExecNodeBase<RowData>
         long precedingOffset = -1 * (long) boundValue;
         Configuration pythonConfig =
                 CommonPythonUtil.extractPythonConfiguration(
-                        planner.getExecEnv(), config, 
planner.getFlinkContext().getClassLoader());
+                        planner.getTableConfig(), 
planner.getFlinkContext().getClassLoader());
         OneInputTransformation<RowData, RowData> transform =
                 createPythonOneInputTransformation(
                         inputTransform,
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/utils/CommonPythonUtil.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/utils/CommonPythonUtil.java
index ae6605381e3..43fc9a9a92e 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/utils/CommonPythonUtil.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/utils/CommonPythonUtil.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ReadableConfig;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.dataview.DataView;
 import org.apache.flink.table.api.dataview.ListView;
@@ -50,7 +49,6 @@ import 
org.apache.flink.table.planner.functions.utils.TableSqlFunction;
 import org.apache.flink.table.planner.plan.schema.TimeIndicatorRelDataType;
 import org.apache.flink.table.planner.plan.utils.AggregateInfo;
 import org.apache.flink.table.planner.plan.utils.AggregateInfoList;
-import org.apache.flink.table.planner.utils.DummyStreamExecutionEnvironment;
 import org.apache.flink.table.runtime.dataview.DataViewSpec;
 import org.apache.flink.table.runtime.dataview.ListViewSpec;
 import org.apache.flink.table.runtime.dataview.MapViewSpec;
@@ -79,7 +77,6 @@ import org.apache.calcite.sql.SqlOperator;
 import org.apache.calcite.sql.fun.SqlCastFunction;
 import org.apache.calcite.sql.type.SqlTypeName;
 
-import java.lang.reflect.Field;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.math.BigDecimal;
@@ -111,18 +108,13 @@ public class CommonPythonUtil {
     }
 
     public static Configuration extractPythonConfiguration(
-            StreamExecutionEnvironment env, ReadableConfig tableConfig, 
ClassLoader classLoader) {
+            ReadableConfig tableConfig, ClassLoader classLoader) {
         Class<?> clazz = loadClass(PYTHON_CONFIG_UTILS_CLASS, classLoader);
         try {
-            StreamExecutionEnvironment realEnv = getRealEnvironment(env);
             Method method =
-                    clazz.getDeclaredMethod(
-                            "extractPythonConfiguration", List.class, 
ReadableConfig.class);
-            return (Configuration) method.invoke(null, 
realEnv.getCachedFiles(), tableConfig);
-        } catch (NoSuchFieldException
-                | IllegalAccessException
-                | NoSuchMethodException
-                | InvocationTargetException e) {
+                    clazz.getDeclaredMethod("extractPythonConfiguration", 
ReadableConfig.class);
+            return (Configuration) method.invoke(null, tableConfig);
+        } catch (IllegalAccessException | NoSuchMethodException | 
InvocationTargetException e) {
             throw new TableException("Method extractPythonConfiguration 
accessed failed.", e);
         }
     }
@@ -472,17 +464,6 @@ public class CommonPythonUtil {
         return new PythonFunctionInfo((PythonFunction) functionDefinition, 
inputs.toArray());
     }
 
-    private static StreamExecutionEnvironment 
getRealEnvironment(StreamExecutionEnvironment env)
-            throws NoSuchFieldException, IllegalAccessException {
-        Field realExecEnvField =
-                
DummyStreamExecutionEnvironment.class.getDeclaredField("realExecEnv");
-        realExecEnvField.setAccessible(true);
-        while (env instanceof DummyStreamExecutionEnvironment) {
-            env = (StreamExecutionEnvironment) realExecEnvField.get(env);
-        }
-        return env;
-    }
-
     private static BuiltInPythonAggregateFunction 
getBuiltInPythonAggregateFunction(
             UserDefinedFunction javaBuiltInAggregateFunction) {
         if (javaBuiltInAggregateFunction instanceof AvgAggFunction) {


Reply via email to