This is an automated email from the ASF dual-hosted git repository. dianfu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 0f01a0e9a68 [FLINK-35208][python] Respect pipeline.cached-files during handling Python dependencies 0f01a0e9a68 is described below commit 0f01a0e9a6856781d5a0e33b26172bb913ec1928 Author: Dian Fu <dia...@apache.org> AuthorDate: Thu Apr 25 09:57:04 2024 +0800 [FLINK-35208][python] Respect pipeline.cached-files during handling Python dependencies This closes #24716. --- .../tests/test_stream_execution_environment.py | 1 - .../apache/flink/python/util/PythonConfigUtil.java | 20 +++-- .../flink/python/util/PythonDependencyUtils.java | 87 +++++++++++++++------- .../python/util/PythonDependencyUtilsTest.java | 39 ++++------ .../exec/batch/BatchExecPythonGroupAggregate.java | 2 +- .../batch/BatchExecPythonGroupWindowAggregate.java | 2 +- .../exec/batch/BatchExecPythonOverAggregate.java | 2 +- .../nodes/exec/common/CommonExecPythonCalc.java | 2 +- .../exec/common/CommonExecPythonCorrelate.java | 2 +- .../stream/StreamExecPythonGroupAggregate.java | 2 +- .../StreamExecPythonGroupTableAggregate.java | 2 +- .../StreamExecPythonGroupWindowAggregate.java | 2 +- .../exec/stream/StreamExecPythonOverAggregate.java | 2 +- .../plan/nodes/exec/utils/CommonPythonUtil.java | 27 +------ 14 files changed, 102 insertions(+), 90 deletions(-) diff --git a/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py b/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py index 307611b74a9..1ac1eb95ed2 100644 --- a/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py +++ b/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py @@ -662,7 +662,6 @@ class StreamExecutionEnvironmentTests(PyFlinkTestCase): python_dependency_config = dict( get_gateway().jvm.org.apache.flink.python.util.PythonDependencyUtils. configurePythonDependencies( - env._j_stream_execution_environment.getCachedFiles(), env._j_stream_execution_environment.getConfiguration()).toMap()) # Make sure that user specified files and archives are correctly added. diff --git a/flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java b/flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java index b49b60c5486..e56ba341baf 100644 --- a/flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java +++ b/flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java @@ -23,8 +23,8 @@ import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.dag.Transformation; -import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.PipelineOptions; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.core.memory.ManagedMemoryUseCase; import org.apache.flink.python.PythonConfig; @@ -79,8 +79,16 @@ public class PythonConfigUtil { } public static void configPythonOperator(StreamExecutionEnvironment env) throws Exception { - final Configuration config = - extractPythonConfiguration(env.getCachedFiles(), env.getConfiguration()); + final Configuration config = extractPythonConfiguration(env.getConfiguration()); + + env.getConfiguration() + .getOptional(PipelineOptions.CACHED_FILES) + .ifPresent( + f -> { + env.getCachedFiles().clear(); + env.getCachedFiles() + .addAll(DistributedCache.parseCachedFilesFromString(f)); + }); for (Transformation<?> transformation : env.getTransformations()) { alignTransformation(transformation); @@ -102,11 +110,9 @@ public class PythonConfigUtil { } /** Extract the configurations which is used in the Python operators. */ - public static Configuration extractPythonConfiguration( - List<Tuple2<String, DistributedCache.DistributedCacheEntry>> cachedFiles, - ReadableConfig config) { + public static Configuration extractPythonConfiguration(ReadableConfig config) { final Configuration pythonDependencyConfig = - PythonDependencyUtils.configurePythonDependencies(cachedFiles, config); + PythonDependencyUtils.configurePythonDependencies(config); final PythonConfig pythonConfig = new PythonConfig(config, pythonDependencyConfig); return pythonConfig.toConfiguration(); } diff --git a/flink-python/src/main/java/org/apache/flink/python/util/PythonDependencyUtils.java b/flink-python/src/main/java/org/apache/flink/python/util/PythonDependencyUtils.java index f5c56d13d8e..17f36ee4987 100644 --- a/flink-python/src/main/java/org/apache/flink/python/util/PythonDependencyUtils.java +++ b/flink-python/src/main/java/org/apache/flink/python/util/PythonDependencyUtils.java @@ -18,10 +18,12 @@ package org.apache.flink.python.util; import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.cache.DistributedCache; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ConfigurationUtils; +import org.apache.flink.configuration.PipelineOptions; import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.configuration.WritableConfig; import org.apache.flink.python.PythonOptions; import org.apache.flink.util.Preconditions; import org.apache.flink.util.StringUtils; @@ -34,9 +36,12 @@ import java.io.File; import java.nio.charset.StandardCharsets; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; +import java.util.ArrayList; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; import static org.apache.flink.client.cli.CliFrontendParser.PYARCHIVE_OPTION; @@ -69,16 +74,12 @@ public class PythonDependencyUtils { * returns a new configuration which contains the metadata of the registered python * dependencies. * - * @param cachedFiles The list used to store registered cached files. * @param config The configuration which contains python dependency configuration. * @return A new configuration which contains the metadata of the registered python * dependencies. */ - public static Configuration configurePythonDependencies( - List<Tuple2<String, DistributedCache.DistributedCacheEntry>> cachedFiles, - ReadableConfig config) { - final PythonDependencyManager pythonDependencyManager = - new PythonDependencyManager(cachedFiles, config); + public static Configuration configurePythonDependencies(ReadableConfig config) { + final PythonDependencyManager pythonDependencyManager = new PythonDependencyManager(config); final Configuration pythonDependencyConfig = new Configuration(); pythonDependencyManager.applyToConfiguration(pythonDependencyConfig); return pythonDependencyConfig; @@ -157,14 +158,10 @@ public class PythonDependencyUtils { private static final String PYTHON_REQUIREMENTS_FILE_PREFIX = "python_requirements_file"; private static final String PYTHON_REQUIREMENTS_CACHE_PREFIX = "python_requirements_cache"; private static final String PYTHON_ARCHIVE_PREFIX = "python_archive"; - - private final List<Tuple2<String, DistributedCache.DistributedCacheEntry>> cachedFiles; private final ReadableConfig config; - private PythonDependencyManager( - List<Tuple2<String, DistributedCache.DistributedCacheEntry>> cachedFiles, - ReadableConfig config) { - this.cachedFiles = cachedFiles; + private PythonDependencyManager(ReadableConfig config) { + Preconditions.checkArgument(config instanceof WritableConfig); this.config = config; } @@ -178,7 +175,7 @@ public class PythonDependencyUtils { private void addPythonFile(Configuration pythonDependencyConfig, String filePath) { Preconditions.checkNotNull(filePath); String fileKey = generateUniqueFileKey(PYTHON_FILE_PREFIX, filePath); - registerCachedFileIfNotExist(filePath, fileKey); + registerCachedFileIfNotExist(fileKey, filePath); if (!pythonDependencyConfig.contains(PYTHON_FILES_DISTRIBUTED_CACHE_INFO)) { pythonDependencyConfig.set( PYTHON_FILES_DISTRIBUTED_CACHE_INFO, new LinkedHashMap<>()); @@ -224,7 +221,7 @@ public class PythonDependencyUtils { String fileKey = generateUniqueFileKey(PYTHON_REQUIREMENTS_FILE_PREFIX, requirementsFilePath); - registerCachedFileIfNotExist(requirementsFilePath, fileKey); + registerCachedFileIfNotExist(fileKey, requirementsFilePath); pythonDependencyConfig .get(PYTHON_REQUIREMENTS_FILE_DISTRIBUTED_CACHE_INFO) .put(FILE, fileKey); @@ -233,7 +230,7 @@ public class PythonDependencyUtils { String cacheDirKey = generateUniqueFileKey( PYTHON_REQUIREMENTS_CACHE_PREFIX, requirementsCachedDir); - registerCachedFileIfNotExist(requirementsCachedDir, cacheDirKey); + registerCachedFileIfNotExist(cacheDirKey, requirementsCachedDir); pythonDependencyConfig .get(PYTHON_REQUIREMENTS_FILE_DISTRIBUTED_CACHE_INFO) .put(CACHE, cacheDirKey); @@ -258,7 +255,7 @@ public class PythonDependencyUtils { String fileKey = generateUniqueFileKey( PYTHON_ARCHIVE_PREFIX, archivePath + PARAM_DELIMITER + targetDir); - registerCachedFileIfNotExist(archivePath, fileKey); + registerCachedFileIfNotExist(fileKey, archivePath); pythonDependencyConfig .get(PYTHON_ARCHIVES_DISTRIBUTED_CACHE_INFO) .put(fileKey, targetDir); @@ -336,20 +333,56 @@ public class PythonDependencyUtils { "%s_%s", prefix, StringUtils.byteToHexString(messageDigest.digest())); } - private void registerCachedFileIfNotExist(String filePath, String fileKey) { - if (cachedFiles.stream().noneMatch(t -> t.f0.equals(fileKey))) { - cachedFiles.add( - new Tuple2<>( - fileKey, - new DistributedCache.DistributedCacheEntry(filePath, false))); + private void registerCachedFileIfNotExist(String name, String path) { + final List<Tuple2<String, String>> cachedFilePairs = + config.getOptional(PipelineOptions.CACHED_FILES).orElse(new ArrayList<>()) + .stream() + .map( + m -> + Tuple2.of( + ConfigurationUtils.parseStringToMap(m) + .get("name"), + m)) + .collect(Collectors.toList()); + + final Set<String> cachedFileNames = + cachedFilePairs.stream() + .map(f -> (String) f.getField(0)) + .collect(Collectors.toSet()); + if (cachedFileNames.contains(name)) { + return; } + + final List<String> cachedFiles = + cachedFilePairs.stream() + .map(f -> (String) f.getField(1)) + .collect(Collectors.toList()); + Map<String, String> map = new HashMap<>(); + map.put("name", name); + map.put("path", path); + cachedFiles.add(ConfigurationUtils.convertValue(map, String.class)); + + ((WritableConfig) config).set(PipelineOptions.CACHED_FILES, cachedFiles); } private void removeCachedFilesByPrefix(String prefix) { - cachedFiles.removeAll( - cachedFiles.stream() - .filter(t -> t.f0.matches("^" + prefix + "_[a-z0-9]{64}$")) - .collect(Collectors.toSet())); + final List<String> cachedFiles = + config.getOptional(PipelineOptions.CACHED_FILES).orElse(new ArrayList<>()) + .stream() + .map(m -> Tuple2.of(ConfigurationUtils.parseStringToMap(m), m)) + .filter( + t -> + t.f0.get("name") != null + && !(t.f0.get("name") + .matches( + "^" + + prefix + + "_[a-z0-9]{64}$"))) + .map(t -> t.f1) + .collect(Collectors.toList()); + + ((WritableConfig) config) + .set(PipelineOptions.CACHED_FILES, new ArrayList<>(cachedFiles)); } } } diff --git a/flink-python/src/test/java/org/apache/flink/python/util/PythonDependencyUtilsTest.java b/flink-python/src/test/java/org/apache/flink/python/util/PythonDependencyUtilsTest.java index 1b70729973b..e75c5c6c551 100644 --- a/flink-python/src/test/java/org/apache/flink/python/util/PythonDependencyUtilsTest.java +++ b/flink-python/src/test/java/org/apache/flink/python/util/PythonDependencyUtilsTest.java @@ -17,17 +17,15 @@ package org.apache.flink.python.util; -import org.apache.flink.api.common.cache.DistributedCache; -import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ConfigurationUtils; +import org.apache.flink.configuration.PipelineOptions; import org.apache.flink.python.PythonOptions; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import java.util.ArrayList; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.Properties; import java.util.stream.Collectors; @@ -47,20 +45,13 @@ import static org.assertj.core.api.Assertions.assertThat; /** Tests for PythonDependencyUtils. */ class PythonDependencyUtilsTest { - private List<Tuple2<String, DistributedCache.DistributedCacheEntry>> cachedFiles; - - @BeforeEach - void setUp() { - cachedFiles = new ArrayList<>(); - } - @Test void testPythonFiles() { Configuration config = new Configuration(); config.set( PythonOptions.PYTHON_FILES, "hdfs:///tmp_dir/test_file1.py,tmp_dir/test_file2.py,tmp_dir/test_dir,hdfs:///tmp_dir/test_file1.py"); - Configuration actual = configurePythonDependencies(cachedFiles, config); + Configuration actual = configurePythonDependencies(config); Map<String, String> expectedCachedFiles = new HashMap<>(); expectedCachedFiles.put( @@ -72,7 +63,7 @@ class PythonDependencyUtilsTest { expectedCachedFiles.put( "python_file_e56bc55ff643576457b3d012b2bba888727c71cf05a958930f2263398c4e9798", "tmp_dir/test_dir"); - verifyCachedFiles(expectedCachedFiles); + verifyCachedFiles(expectedCachedFiles, config); Configuration expectedConfiguration = new Configuration(); expectedConfiguration.set(PYTHON_FILES_DISTRIBUTED_CACHE_INFO, new HashMap<>()); @@ -98,13 +89,13 @@ class PythonDependencyUtilsTest { void testPythonRequirements() { Configuration config = new Configuration(); config.set(PYTHON_REQUIREMENTS, "tmp_dir/requirements.txt"); - Configuration actual = configurePythonDependencies(cachedFiles, config); + Configuration actual = configurePythonDependencies(config); Map<String, String> expectedCachedFiles = new HashMap<>(); expectedCachedFiles.put( "python_requirements_file_69390ca43c69ada3819226fcfbb5b6d27e111132a9427e7f201edd82e9d65ff6", "tmp_dir/requirements.txt"); - verifyCachedFiles(expectedCachedFiles); + verifyCachedFiles(expectedCachedFiles, config); Configuration expectedConfiguration = new Configuration(); expectedConfiguration.set(PYTHON_REQUIREMENTS_FILE_DISTRIBUTED_CACHE_INFO, new HashMap<>()); @@ -116,7 +107,7 @@ class PythonDependencyUtilsTest { verifyConfiguration(expectedConfiguration, actual); config.set(PYTHON_REQUIREMENTS, "tmp_dir/requirements2.txt#tmp_dir/cache"); - actual = configurePythonDependencies(cachedFiles, config); + actual = configurePythonDependencies(config); expectedCachedFiles = new HashMap<>(); expectedCachedFiles.put( @@ -125,7 +116,7 @@ class PythonDependencyUtilsTest { expectedCachedFiles.put( "python_requirements_cache_2f563dd6731c2c7c5e1ef1ef8279f61e907dc3bfc698adb71b109e43ed93e143", "tmp_dir/cache"); - verifyCachedFiles(expectedCachedFiles); + verifyCachedFiles(expectedCachedFiles, config); expectedConfiguration = new Configuration(); expectedConfiguration.set(PYTHON_REQUIREMENTS_FILE_DISTRIBUTED_CACHE_INFO, new HashMap<>()); @@ -152,7 +143,7 @@ class PythonDependencyUtilsTest { + "tmp_dir/py37.zip," + "tmp_dir/py37.zip#venv," + "tmp_dir/py37.zip#venv2,tmp_dir/py37.zip#venv"); - Configuration actual = configurePythonDependencies(cachedFiles, config); + Configuration actual = configurePythonDependencies(config); Map<String, String> expectedCachedFiles = new HashMap<>(); expectedCachedFiles.put( @@ -167,7 +158,7 @@ class PythonDependencyUtilsTest { expectedCachedFiles.put( "python_archive_c7d970ce1c5794367974ce8ef536c2343bed8fcfe7c2422c51548e58007eee6a", "tmp_dir/py37.zip"); - verifyCachedFiles(expectedCachedFiles); + verifyCachedFiles(expectedCachedFiles, config); Configuration expectedConfiguration = new Configuration(); expectedConfiguration.set(PYTHON_ARCHIVES_DISTRIBUTED_CACHE_INFO, new HashMap<>()); @@ -199,7 +190,7 @@ class PythonDependencyUtilsTest { Configuration config = new Configuration(); config.set(PYTHON_EXECUTABLE, "venv/bin/python3"); config.set(PYTHON_CLIENT_EXECUTABLE, "python37"); - Configuration actual = configurePythonDependencies(cachedFiles, config); + Configuration actual = configurePythonDependencies(config); Configuration expectedConfiguration = new Configuration(); expectedConfiguration.set(PYTHON_EXECUTABLE, "venv/bin/python3"); @@ -240,15 +231,17 @@ class PythonDependencyUtilsTest { "venv/bin/python3/lib64/python3.7/site-packages/:venv/bin/python3/lib/python3.7/site-packages/"; Configuration config = new Configuration(); config.set(PythonOptions.PYTHON_PATH, pyPath); - Configuration actual = configurePythonDependencies(cachedFiles, config); + Configuration actual = configurePythonDependencies(config); Configuration expectedConfiguration = new Configuration(); expectedConfiguration.set(PythonOptions.PYTHON_PATH, pyPath); verifyConfiguration(expectedConfiguration, actual); } - private void verifyCachedFiles(Map<String, String> expected) { + private void verifyCachedFiles(Map<String, String> expected, Configuration config) { Map<String, String> actual = - cachedFiles.stream().collect(Collectors.toMap(t -> t.f0, t -> t.f1.filePath)); + config.getOptional(PipelineOptions.CACHED_FILES).orElse(new ArrayList<>()).stream() + .map(ConfigurationUtils::parseStringToMap) + .collect(Collectors.toMap(m -> m.get("name"), m -> m.get("path"))); assertThat(actual).isEqualTo(expected); } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonGroupAggregate.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonGroupAggregate.java index 9be1061d7e4..938b75c582e 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonGroupAggregate.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonGroupAggregate.java @@ -95,7 +95,7 @@ public class BatchExecPythonGroupAggregate extends ExecNodeBase<RowData> final RowType outputRowType = InternalTypeInfo.of(getOutputType()).toRowType(); Configuration pythonConfig = CommonPythonUtil.extractPythonConfiguration( - planner.getExecEnv(), config, planner.getFlinkContext().getClassLoader()); + planner.getTableConfig(), planner.getFlinkContext().getClassLoader()); OneInputTransformation<RowData, RowData> transform = createPythonOneInputTransformation( inputTransform, diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonGroupWindowAggregate.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonGroupWindowAggregate.java index 545a989509c..f01f64a1f6d 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonGroupWindowAggregate.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonGroupWindowAggregate.java @@ -115,7 +115,7 @@ public class BatchExecPythonGroupWindowAggregate extends ExecNodeBase<RowData> final Tuple2<Long, Long> windowSizeAndSlideSize = WindowCodeGenerator.getWindowDef(window); final Configuration pythonConfig = CommonPythonUtil.extractPythonConfiguration( - planner.getExecEnv(), config, planner.getFlinkContext().getClassLoader()); + planner.getTableConfig(), planner.getFlinkContext().getClassLoader()); int groupBufferLimitSize = pythonConfig.get(ExecutionConfigOptions.TABLE_EXEC_WINDOW_AGG_BUFFER_SIZE_LIMIT); diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonOverAggregate.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonOverAggregate.java index aea09e785be..d6acd4d5e81 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonOverAggregate.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonOverAggregate.java @@ -154,7 +154,7 @@ public class BatchExecPythonOverAggregate extends BatchExecOverAggregateBase { } Configuration pythonConfig = CommonPythonUtil.extractPythonConfiguration( - planner.getExecEnv(), config, planner.getFlinkContext().getClassLoader()); + planner.getTableConfig(), planner.getFlinkContext().getClassLoader()); OneInputTransformation<RowData, RowData> transform = createPythonOneInputTransformation( inputTransform, diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecPythonCalc.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecPythonCalc.java index 4cd28906f55..9c6013055ca 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecPythonCalc.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecPythonCalc.java @@ -113,7 +113,7 @@ public abstract class CommonExecPythonCalc extends ExecNodeBase<RowData> (Transformation<RowData>) inputEdge.translateToPlan(planner); final Configuration pythonConfig = CommonPythonUtil.extractPythonConfiguration( - planner.getExecEnv(), config, planner.getFlinkContext().getClassLoader()); + planner.getTableConfig(), planner.getFlinkContext().getClassLoader()); OneInputTransformation<RowData, RowData> ret = createPythonOneInputTransformation( inputTransform, diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecPythonCorrelate.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecPythonCorrelate.java index a1b9ef5aad3..52572063ab8 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecPythonCorrelate.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecPythonCorrelate.java @@ -103,7 +103,7 @@ public abstract class CommonExecPythonCorrelate extends ExecNodeBase<RowData> (Transformation<RowData>) inputEdge.translateToPlan(planner); final Configuration pythonConfig = CommonPythonUtil.extractPythonConfiguration( - planner.getExecEnv(), config, planner.getFlinkContext().getClassLoader()); + planner.getTableConfig(), planner.getFlinkContext().getClassLoader()); final ExecNodeConfig pythonNodeConfig = ExecNodeConfig.ofNodeConfig(pythonConfig, config.isCompiled()); final OneInputTransformation<RowData, RowData> transform = diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupAggregate.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupAggregate.java index 50b70d595ef..d3b66c4d2ff 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupAggregate.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupAggregate.java @@ -176,7 +176,7 @@ public class StreamExecPythonGroupAggregate extends StreamExecAggregateBase { DataViewSpec[][] dataViewSpecs = aggInfosAndDataViewSpecs.f1; Configuration pythonConfig = CommonPythonUtil.extractPythonConfiguration( - planner.getExecEnv(), config, planner.getFlinkContext().getClassLoader()); + planner.getTableConfig(), planner.getFlinkContext().getClassLoader()); final OneInputStreamOperator<RowData, RowData> operator = getPythonAggregateFunctionOperator( pythonConfig, diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupTableAggregate.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupTableAggregate.java index e10076b789d..1f425f1d1c3 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupTableAggregate.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupTableAggregate.java @@ -132,7 +132,7 @@ public class StreamExecPythonGroupTableAggregate extends ExecNodeBase<RowData> DataViewSpec[][] dataViewSpecs = aggInfosAndDataViewSpecs.f1; Configuration pythonConfig = CommonPythonUtil.extractPythonConfiguration( - planner.getExecEnv(), config, planner.getFlinkContext().getClassLoader()); + planner.getTableConfig(), planner.getFlinkContext().getClassLoader()); OneInputStreamOperator<RowData, RowData> pythonOperator = getPythonTableAggregateFunctionOperator( pythonConfig, diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupWindowAggregate.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupWindowAggregate.java index 59633e21a6c..cab831209b6 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupWindowAggregate.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupWindowAggregate.java @@ -254,7 +254,7 @@ public class StreamExecPythonGroupWindowAggregate extends StreamExecAggregateBas Trigger<?> trigger = windowAssignerAndTrigger.f1; final Configuration pythonConfig = CommonPythonUtil.extractPythonConfiguration( - planner.getExecEnv(), config, planner.getFlinkContext().getClassLoader()); + planner.getTableConfig(), planner.getFlinkContext().getClassLoader()); final ExecNodeConfig pythonNodeConfig = ExecNodeConfig.ofNodeConfig(pythonConfig, config.isCompiled()); boolean isGeneralPythonUDAF = diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonOverAggregate.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonOverAggregate.java index a70f7877c5d..d9e9e55dad8 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonOverAggregate.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonOverAggregate.java @@ -198,7 +198,7 @@ public class StreamExecPythonOverAggregate extends ExecNodeBase<RowData> long precedingOffset = -1 * (long) boundValue; Configuration pythonConfig = CommonPythonUtil.extractPythonConfiguration( - planner.getExecEnv(), config, planner.getFlinkContext().getClassLoader()); + planner.getTableConfig(), planner.getFlinkContext().getClassLoader()); OneInputTransformation<RowData, RowData> transform = createPythonOneInputTransformation( inputTransform, diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/utils/CommonPythonUtil.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/utils/CommonPythonUtil.java index ae6605381e3..43fc9a9a92e 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/utils/CommonPythonUtil.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/utils/CommonPythonUtil.java @@ -22,7 +22,6 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ReadableConfig; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.TableException; import org.apache.flink.table.api.dataview.DataView; import org.apache.flink.table.api.dataview.ListView; @@ -50,7 +49,6 @@ import org.apache.flink.table.planner.functions.utils.TableSqlFunction; import org.apache.flink.table.planner.plan.schema.TimeIndicatorRelDataType; import org.apache.flink.table.planner.plan.utils.AggregateInfo; import org.apache.flink.table.planner.plan.utils.AggregateInfoList; -import org.apache.flink.table.planner.utils.DummyStreamExecutionEnvironment; import org.apache.flink.table.runtime.dataview.DataViewSpec; import org.apache.flink.table.runtime.dataview.ListViewSpec; import org.apache.flink.table.runtime.dataview.MapViewSpec; @@ -79,7 +77,6 @@ import org.apache.calcite.sql.SqlOperator; import org.apache.calcite.sql.fun.SqlCastFunction; import org.apache.calcite.sql.type.SqlTypeName; -import java.lang.reflect.Field; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.math.BigDecimal; @@ -111,18 +108,13 @@ public class CommonPythonUtil { } public static Configuration extractPythonConfiguration( - StreamExecutionEnvironment env, ReadableConfig tableConfig, ClassLoader classLoader) { + ReadableConfig tableConfig, ClassLoader classLoader) { Class<?> clazz = loadClass(PYTHON_CONFIG_UTILS_CLASS, classLoader); try { - StreamExecutionEnvironment realEnv = getRealEnvironment(env); Method method = - clazz.getDeclaredMethod( - "extractPythonConfiguration", List.class, ReadableConfig.class); - return (Configuration) method.invoke(null, realEnv.getCachedFiles(), tableConfig); - } catch (NoSuchFieldException - | IllegalAccessException - | NoSuchMethodException - | InvocationTargetException e) { + clazz.getDeclaredMethod("extractPythonConfiguration", ReadableConfig.class); + return (Configuration) method.invoke(null, tableConfig); + } catch (IllegalAccessException | NoSuchMethodException | InvocationTargetException e) { throw new TableException("Method extractPythonConfiguration accessed failed.", e); } } @@ -472,17 +464,6 @@ public class CommonPythonUtil { return new PythonFunctionInfo((PythonFunction) functionDefinition, inputs.toArray()); } - private static StreamExecutionEnvironment getRealEnvironment(StreamExecutionEnvironment env) - throws NoSuchFieldException, IllegalAccessException { - Field realExecEnvField = - DummyStreamExecutionEnvironment.class.getDeclaredField("realExecEnv"); - realExecEnvField.setAccessible(true); - while (env instanceof DummyStreamExecutionEnvironment) { - env = (StreamExecutionEnvironment) realExecEnvField.get(env); - } - return env; - } - private static BuiltInPythonAggregateFunction getBuiltInPythonAggregateFunction( UserDefinedFunction javaBuiltInAggregateFunction) { if (javaBuiltInAggregateFunction instanceof AvgAggFunction) {