This is an automated email from the ASF dual-hosted git repository.
hxb pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.16 by this push:
new 9213effb32a [FLINK-29479][python] Fix system env cause conflict with
users python depdendency
9213effb32a is described below
commit 9213effb32a4e80d8113ba7bf36782f33a5e197c
Author: liuyongvs <[email protected]>
AuthorDate: Fri Oct 21 18:07:52 2022 +0800
[FLINK-29479][python] Fix system env cause conflict with users python
depdendency
This closes #21110.
---
.../shortcodes/generated/python_configuration.html | 6 ++++++
.../main/java/org/apache/flink/python/PythonOptions.java | 8 ++++++++
.../operators/python/AbstractPythonFunctionOperator.java | 4 ++++
.../embedded/AbstractEmbeddedPythonFunctionOperator.java | 2 +-
.../process/AbstractExternalPythonFunctionOperator.java | 2 +-
.../java/org/apache/flink/python/PythonOptionsTest.java | 16 ++++++++++++++++
6 files changed, 36 insertions(+), 2 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/python_configuration.html
b/docs/layouts/shortcodes/generated/python_configuration.html
index ef394bea7f0..a6b5779a977 100644
--- a/docs/layouts/shortcodes/generated/python_configuration.html
+++ b/docs/layouts/shortcodes/generated/python_configuration.html
@@ -110,5 +110,11 @@
<td>Integer</td>
<td>The maximum number of states cached in a Python UDF worker.
Note that this is an experimental flag and might not be available in future
releases.</td>
</tr>
+ <tr>
+ <td><h5>python.systemenv.enabled</h5></td>
+ <td style="word-wrap: break-word;">true</td>
+ <td>Boolean</td>
+ <td>Specify whether to load System Environment when starting
Python worker.</td>
+ </tr>
</tbody>
</table>
diff --git
a/flink-python/src/main/java/org/apache/flink/python/PythonOptions.java
b/flink-python/src/main/java/org/apache/flink/python/PythonOptions.java
index 44c3aea7c14..ec3ef9c1e0b 100644
--- a/flink-python/src/main/java/org/apache/flink/python/PythonOptions.java
+++ b/flink-python/src/main/java/org/apache/flink/python/PythonOptions.java
@@ -83,6 +83,14 @@ public class PythonOptions {
+ "The interval between each profiling is
determined by the config options "
+ "python.fn-execution.bundle.size and
python.fn-execution.bundle.time.");
+ /** The configuration to enable or disable system env for Python
execution. */
+ public static final ConfigOption<Boolean> PYTHON_SYSTEMENV_ENABLED =
+ ConfigOptions.key("python.systemenv.enabled")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription(
+ "Specify whether to load System Environment when
starting Python worker.");
+
/** The configuration to enable or disable python operator chaining. */
public static final ConfigOption<Boolean> PYTHON_OPERATOR_CHAINING_ENABLED
=
ConfigOptions.key("python.operator-chaining.enabled")
diff --git
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
index 2c8ee0ea4d6..d814d1f202d 100644
---
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
+++
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
@@ -40,6 +40,7 @@ import java.util.concurrent.ScheduledFuture;
import static org.apache.flink.python.PythonOptions.MAX_BUNDLE_SIZE;
import static org.apache.flink.python.PythonOptions.MAX_BUNDLE_TIME_MILLS;
import static org.apache.flink.python.PythonOptions.PYTHON_METRIC_ENABLED;
+import static org.apache.flink.python.PythonOptions.PYTHON_SYSTEMENV_ENABLED;
import static
org.apache.flink.streaming.api.utils.ClassLeakCleaner.cleanUpLeakingClasses;
import static
org.apache.flink.streaming.api.utils.PythonOperatorUtils.inBatchExecutionMode;
@@ -51,6 +52,8 @@ public abstract class AbstractPythonFunctionOperator<OUT>
extends AbstractStream
protected final Configuration config;
+ protected transient boolean systemEnvEnabled;
+
/** Max number of elements to include in a bundle. */
protected transient int maxBundleSize;
@@ -77,6 +80,7 @@ public abstract class AbstractPythonFunctionOperator<OUT>
extends AbstractStream
@Override
public void open() throws Exception {
try {
+ this.systemEnvEnabled = config.get(PYTHON_SYSTEMENV_ENABLED);
this.maxBundleSize = config.get(MAX_BUNDLE_SIZE);
if (this.maxBundleSize <= 0) {
this.maxBundleSize = MAX_BUNDLE_SIZE.defaultValue();
diff --git
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/AbstractEmbeddedPythonFunctionOperator.java
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/AbstractEmbeddedPythonFunctionOperator.java
index 2d12e4e492c..f56d41bbd65 100644
---
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/AbstractEmbeddedPythonFunctionOperator.java
+++
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/AbstractEmbeddedPythonFunctionOperator.java
@@ -141,7 +141,7 @@ public abstract class
AbstractEmbeddedPythonFunctionOperator<OUT>
return new EmbeddedPythonEnvironmentManager(
dependencyInfo,
getContainingTask().getEnvironment().getTaskManagerInfo().getTmpDirectories(),
- new HashMap<>(System.getenv()),
+ systemEnvEnabled ? new HashMap<>(System.getenv()) : new
HashMap<>(),
getRuntimeContext().getJobId());
}
diff --git
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/AbstractExternalPythonFunctionOperator.java
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/AbstractExternalPythonFunctionOperator.java
index cdd281d606b..5f3fb68f7d0 100644
---
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/AbstractExternalPythonFunctionOperator.java
+++
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/AbstractExternalPythonFunctionOperator.java
@@ -127,7 +127,7 @@ public abstract class
AbstractExternalPythonFunctionOperator<OUT>
return new ProcessPythonEnvironmentManager(
dependencyInfo,
getContainingTask().getEnvironment().getTaskManagerInfo().getTmpDirectories(),
- new HashMap<>(System.getenv()),
+ systemEnvEnabled ? new HashMap<>(System.getenv()) : new
HashMap<>(),
getRuntimeContext().getJobId());
} else {
throw new UnsupportedOperationException(
diff --git
a/flink-python/src/test/java/org/apache/flink/python/PythonOptionsTest.java
b/flink-python/src/test/java/org/apache/flink/python/PythonOptionsTest.java
index eb53e59b9aa..d02b93f852f 100644
--- a/flink-python/src/test/java/org/apache/flink/python/PythonOptionsTest.java
+++ b/flink-python/src/test/java/org/apache/flink/python/PythonOptionsTest.java
@@ -172,4 +172,20 @@ class PythonOptionsTest {
configuration.get(PythonOptions.PYTHON_CLIENT_EXECUTABLE);
assertThat(actualPythonClientExecutable).isEqualTo(expectedPythonClientExecutable);
}
+
+ @Test
+ void testPythonSystemEnvEnabled() {
+ final Configuration configuration = new Configuration();
+ final boolean isSystemEnvEnabled =
+
configuration.getBoolean(PythonOptions.PYTHON_SYSTEMENV_ENABLED);
+ assertThat(isSystemEnvEnabled)
+
.isEqualTo(PythonOptions.PYTHON_SYSTEMENV_ENABLED.defaultValue());
+
+ final boolean expectedIsSystemEnvEnabled = false;
+ configuration.setBoolean(PythonOptions.PYTHON_SYSTEMENV_ENABLED,
false);
+
+ final boolean actualIsSystemEnvEnabled =
+
configuration.getBoolean(PythonOptions.PYTHON_SYSTEMENV_ENABLED);
+
assertThat(actualIsSystemEnvEnabled).isEqualTo(expectedIsSystemEnvEnabled);
+ }
}