This is an automated email from the ASF dual-hosted git repository.
dianfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new d1b15ba [hotfix][python] Use
StreamExecutionEnvironment.getConfiguration to access configuration
d1b15ba is described below
commit d1b15ba113b39f5d8271210c3e38dc584b6ac34c
Author: Dian Fu <[email protected]>
AuthorDate: Fri Aug 13 18:05:19 2021 +0800
[hotfix][python] Use StreamExecutionEnvironment.getConfiguration to access
configuration
---
.../python/chain/PythonOperatorChainingOptimizer.java | 4 +---
.../org/apache/flink/python/util/PythonConfigUtil.java | 18 +++++++-----------
2 files changed, 8 insertions(+), 14 deletions(-)
diff --git
a/flink-python/src/main/java/org/apache/flink/python/chain/PythonOperatorChainingOptimizer.java
b/flink-python/src/main/java/org/apache/flink/python/chain/PythonOperatorChainingOptimizer.java
index ca1e1b3..ebfdc85 100644
---
a/flink-python/src/main/java/org/apache/flink/python/chain/PythonOperatorChainingOptimizer.java
+++
b/flink-python/src/main/java/org/apache/flink/python/chain/PythonOperatorChainingOptimizer.java
@@ -20,7 +20,6 @@ package org.apache.flink.python.chain;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.dag.Transformation;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.ManagedMemoryUseCase;
import org.apache.flink.python.PythonOptions;
import org.apache.flink.python.util.PythonConfigUtil;
@@ -286,11 +285,10 @@ public class PythonOperatorChainingOptimizer {
transformationsField.setAccessible(true);
List<Transformation<?>> transformations =
(List<Transformation<?>>) transformationsField.get(env);
- Configuration config = PythonConfigUtil.getEnvironmentConfig(env);
PythonOperatorChainingOptimizer chainingOptimizer =
new PythonOperatorChainingOptimizer(
transformations,
-
config.getBoolean(PythonOptions.PYTHON_OPERATOR_CHAINING_ENABLED));
+
env.getConfiguration().get(PythonOptions.PYTHON_OPERATOR_CHAINING_ENABLED));
transformationsField.set(env, chainingOptimizer.optimize());
}
diff --git
a/flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java
b/flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java
index 2ef5600..0834469 100644
---
a/flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java
+++
b/flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java
@@ -62,7 +62,7 @@ public class PythonConfigUtil {
public static Configuration
getEnvConfigWithDependencies(StreamExecutionEnvironment env)
throws InvocationTargetException, IllegalAccessException,
NoSuchFieldException {
return PythonDependencyUtils.configurePythonDependencies(
- env.getCachedFiles(), getEnvironmentConfig(env));
+ env.getCachedFiles(), (Configuration) env.getConfiguration());
}
/**
@@ -138,16 +138,12 @@ public class PythonConfigUtil {
public static Configuration getMergedConfig(
StreamExecutionEnvironment env, TableConfig tableConfig) {
- try {
- Configuration config = new
Configuration(getEnvironmentConfig(env));
- PythonDependencyUtils.merge(config,
tableConfig.getConfiguration());
- Configuration mergedConfig =
-
PythonDependencyUtils.configurePythonDependencies(env.getCachedFiles(), config);
- mergedConfig.setString("table.exec.timezone",
tableConfig.getLocalTimeZone().getId());
- return mergedConfig;
- } catch (IllegalAccessException | NoSuchFieldException |
InvocationTargetException e) {
- throw new TableException("Method getMergedConfig failed.", e);
- }
+ Configuration config = new Configuration((Configuration)
env.getConfiguration());
+ PythonDependencyUtils.merge(config, tableConfig.getConfiguration());
+ Configuration mergedConfig =
+
PythonDependencyUtils.configurePythonDependencies(env.getCachedFiles(), config);
+ mergedConfig.setString("table.exec.timezone",
tableConfig.getLocalTimeZone().getId());
+ return mergedConfig;
}
@SuppressWarnings("unchecked")