This is an automated email from the ASF dual-hosted git repository.

hxb pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.16 by this push:
     new 9213effb32a [FLINK-29479][python] Fix system env cause conflict with 
users python depdendency
9213effb32a is described below

commit 9213effb32a4e80d8113ba7bf36782f33a5e197c
Author: liuyongvs <[email protected]>
AuthorDate: Fri Oct 21 18:07:52 2022 +0800

    [FLINK-29479][python] Fix system env cause conflict with users python 
depdendency
    
    This closes #21110.
---
 .../shortcodes/generated/python_configuration.html       |  6 ++++++
 .../main/java/org/apache/flink/python/PythonOptions.java |  8 ++++++++
 .../operators/python/AbstractPythonFunctionOperator.java |  4 ++++
 .../embedded/AbstractEmbeddedPythonFunctionOperator.java |  2 +-
 .../process/AbstractExternalPythonFunctionOperator.java  |  2 +-
 .../java/org/apache/flink/python/PythonOptionsTest.java  | 16 ++++++++++++++++
 6 files changed, 36 insertions(+), 2 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/python_configuration.html 
b/docs/layouts/shortcodes/generated/python_configuration.html
index ef394bea7f0..a6b5779a977 100644
--- a/docs/layouts/shortcodes/generated/python_configuration.html
+++ b/docs/layouts/shortcodes/generated/python_configuration.html
@@ -110,5 +110,11 @@
             <td>Integer</td>
             <td>The maximum number of states cached in a Python UDF worker. 
Note that this is an experimental flag and might not be available in future 
releases.</td>
         </tr>
+        <tr>
+            <td><h5>python.systemenv.enabled</h5></td>
+            <td style="word-wrap: break-word;">true</td>
+            <td>Boolean</td>
+            <td>Specify whether to load System Environment when starting 
Python worker.</td>
+        </tr>
     </tbody>
 </table>
diff --git 
a/flink-python/src/main/java/org/apache/flink/python/PythonOptions.java 
b/flink-python/src/main/java/org/apache/flink/python/PythonOptions.java
index 44c3aea7c14..ec3ef9c1e0b 100644
--- a/flink-python/src/main/java/org/apache/flink/python/PythonOptions.java
+++ b/flink-python/src/main/java/org/apache/flink/python/PythonOptions.java
@@ -83,6 +83,14 @@ public class PythonOptions {
                                     + "The interval between each profiling is 
determined by the config options "
                                     + "python.fn-execution.bundle.size and 
python.fn-execution.bundle.time.");
 
+    /** The configuration to enable or disable system env for Python 
execution. */
+    public static final ConfigOption<Boolean> PYTHON_SYSTEMENV_ENABLED =
+            ConfigOptions.key("python.systemenv.enabled")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription(
+                            "Specify whether to load System Environment when 
starting Python worker.");
+
     /** The configuration to enable or disable python operator chaining. */
     public static final ConfigOption<Boolean> PYTHON_OPERATOR_CHAINING_ENABLED 
=
             ConfigOptions.key("python.operator-chaining.enabled")
diff --git 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
index 2c8ee0ea4d6..d814d1f202d 100644
--- 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
+++ 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
@@ -40,6 +40,7 @@ import java.util.concurrent.ScheduledFuture;
 import static org.apache.flink.python.PythonOptions.MAX_BUNDLE_SIZE;
 import static org.apache.flink.python.PythonOptions.MAX_BUNDLE_TIME_MILLS;
 import static org.apache.flink.python.PythonOptions.PYTHON_METRIC_ENABLED;
+import static org.apache.flink.python.PythonOptions.PYTHON_SYSTEMENV_ENABLED;
 import static 
org.apache.flink.streaming.api.utils.ClassLeakCleaner.cleanUpLeakingClasses;
 import static 
org.apache.flink.streaming.api.utils.PythonOperatorUtils.inBatchExecutionMode;
 
@@ -51,6 +52,8 @@ public abstract class AbstractPythonFunctionOperator<OUT> 
extends AbstractStream
 
     protected final Configuration config;
 
+    protected transient boolean systemEnvEnabled;
+
     /** Max number of elements to include in a bundle. */
     protected transient int maxBundleSize;
 
@@ -77,6 +80,7 @@ public abstract class AbstractPythonFunctionOperator<OUT> 
extends AbstractStream
     @Override
     public void open() throws Exception {
         try {
+            this.systemEnvEnabled = config.get(PYTHON_SYSTEMENV_ENABLED);
             this.maxBundleSize = config.get(MAX_BUNDLE_SIZE);
             if (this.maxBundleSize <= 0) {
                 this.maxBundleSize = MAX_BUNDLE_SIZE.defaultValue();
diff --git 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/AbstractEmbeddedPythonFunctionOperator.java
 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/AbstractEmbeddedPythonFunctionOperator.java
index 2d12e4e492c..f56d41bbd65 100644
--- 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/AbstractEmbeddedPythonFunctionOperator.java
+++ 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/AbstractEmbeddedPythonFunctionOperator.java
@@ -141,7 +141,7 @@ public abstract class 
AbstractEmbeddedPythonFunctionOperator<OUT>
         return new EmbeddedPythonEnvironmentManager(
                 dependencyInfo,
                 
getContainingTask().getEnvironment().getTaskManagerInfo().getTmpDirectories(),
-                new HashMap<>(System.getenv()),
+                systemEnvEnabled ? new HashMap<>(System.getenv()) : new 
HashMap<>(),
                 getRuntimeContext().getJobId());
     }
 
diff --git 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/AbstractExternalPythonFunctionOperator.java
 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/AbstractExternalPythonFunctionOperator.java
index cdd281d606b..5f3fb68f7d0 100644
--- 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/AbstractExternalPythonFunctionOperator.java
+++ 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/AbstractExternalPythonFunctionOperator.java
@@ -127,7 +127,7 @@ public abstract class 
AbstractExternalPythonFunctionOperator<OUT>
             return new ProcessPythonEnvironmentManager(
                     dependencyInfo,
                     
getContainingTask().getEnvironment().getTaskManagerInfo().getTmpDirectories(),
-                    new HashMap<>(System.getenv()),
+                    systemEnvEnabled ? new HashMap<>(System.getenv()) : new 
HashMap<>(),
                     getRuntimeContext().getJobId());
         } else {
             throw new UnsupportedOperationException(
diff --git 
a/flink-python/src/test/java/org/apache/flink/python/PythonOptionsTest.java 
b/flink-python/src/test/java/org/apache/flink/python/PythonOptionsTest.java
index eb53e59b9aa..d02b93f852f 100644
--- a/flink-python/src/test/java/org/apache/flink/python/PythonOptionsTest.java
+++ b/flink-python/src/test/java/org/apache/flink/python/PythonOptionsTest.java
@@ -172,4 +172,20 @@ class PythonOptionsTest {
                 configuration.get(PythonOptions.PYTHON_CLIENT_EXECUTABLE);
         
assertThat(actualPythonClientExecutable).isEqualTo(expectedPythonClientExecutable);
     }
+
+    @Test
+    void testPythonSystemEnvEnabled() {
+        final Configuration configuration = new Configuration();
+        final boolean isSystemEnvEnabled =
+                
configuration.getBoolean(PythonOptions.PYTHON_SYSTEMENV_ENABLED);
+        assertThat(isSystemEnvEnabled)
+                
.isEqualTo(PythonOptions.PYTHON_SYSTEMENV_ENABLED.defaultValue());
+
+        final boolean expectedIsSystemEnvEnabled = false;
+        configuration.setBoolean(PythonOptions.PYTHON_SYSTEMENV_ENABLED, 
false);
+
+        final boolean actualIsSystemEnvEnabled =
+                
configuration.getBoolean(PythonOptions.PYTHON_SYSTEMENV_ENABLED);
+        
assertThat(actualIsSystemEnvEnabled).isEqualTo(expectedIsSystemEnvEnabled);
+    }
 }

Reply via email to