This is an automated email from the ASF dual-hosted git repository.

hxb pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.15 by this push:
     new 91ccde95c7e [FLINK-29479][python] Fix system env cause conflict with 
users python depdendency
91ccde95c7e is described below

commit 91ccde95c7eae7f020d68592a7fa76201674724a
Author: liuyongvs <[email protected]>
AuthorDate: Fri Oct 21 18:07:52 2022 +0800

    [FLINK-29479][python] Fix system env cause conflict with users python 
depdendency
    
    This closes #21110.
---
 .../shortcodes/generated/python_configuration.html       |  6 ++++++
 .../main/java/org/apache/flink/python/PythonOptions.java |  8 ++++++++
 .../python/AbstractEmbeddedPythonFunctionOperator.java   |  2 +-
 .../python/AbstractExternalPythonFunctionOperator.java   |  2 +-
 .../operators/python/AbstractPythonFunctionOperator.java |  4 ++++
 .../java/org/apache/flink/python/PythonOptionsTest.java  | 16 ++++++++++++++++
 6 files changed, 36 insertions(+), 2 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/python_configuration.html 
b/docs/layouts/shortcodes/generated/python_configuration.html
index cdaabcf4621..370f931b76c 100644
--- a/docs/layouts/shortcodes/generated/python_configuration.html
+++ b/docs/layouts/shortcodes/generated/python_configuration.html
@@ -110,5 +110,11 @@
             <td>Integer</td>
             <td>The maximum number of states cached in a Python UDF worker. 
Note that this is an experimental flag and might not be available in future 
releases.</td>
         </tr>
+        <tr>
+            <td><h5>python.systemenv.enabled</h5></td>
+            <td style="word-wrap: break-word;">true</td>
+            <td>Boolean</td>
+            <td>Specify whether to load System Environment when starting 
Python worker.</td>
+        </tr>
     </tbody>
 </table>
diff --git 
a/flink-python/src/main/java/org/apache/flink/python/PythonOptions.java 
b/flink-python/src/main/java/org/apache/flink/python/PythonOptions.java
index 47bfbd3d567..ab519c660db 100644
--- a/flink-python/src/main/java/org/apache/flink/python/PythonOptions.java
+++ b/flink-python/src/main/java/org/apache/flink/python/PythonOptions.java
@@ -83,6 +83,14 @@ public class PythonOptions {
                                     + "The interval between each profiling is 
determined by the config options "
                                     + "python.fn-execution.bundle.size and 
python.fn-execution.bundle.time.");
 
+    /** The configuration to enable or disable system env for Python 
execution. */
+    public static final ConfigOption<Boolean> PYTHON_SYSTEMENV_ENABLED =
+            ConfigOptions.key("python.systemenv.enabled")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription(
+                            "Specify whether to load System Environment when 
starting Python worker.");
+
     /** The configuration to enable or disable python operator chaining. */
     public static final ConfigOption<Boolean> PYTHON_OPERATOR_CHAINING_ENABLED 
=
             ConfigOptions.key("python.operator-chaining.enabled")
diff --git 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractEmbeddedPythonFunctionOperator.java
 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractEmbeddedPythonFunctionOperator.java
index 6dc961832ee..70343275a23 100644
--- 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractEmbeddedPythonFunctionOperator.java
+++ 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractEmbeddedPythonFunctionOperator.java
@@ -142,7 +142,7 @@ public abstract class 
AbstractEmbeddedPythonFunctionOperator<OUT>
         return new EmbeddedPythonEnvironmentManager(
                 dependencyInfo,
                 
getContainingTask().getEnvironment().getTaskManagerInfo().getTmpDirectories(),
-                new HashMap<>(System.getenv()),
+                systemEnvEnabled ? new HashMap<>(System.getenv()) : new 
HashMap<>(),
                 getRuntimeContext().getJobId());
     }
 
diff --git 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractExternalPythonFunctionOperator.java
 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractExternalPythonFunctionOperator.java
index d95ffafb5af..12b35783066 100644
--- 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractExternalPythonFunctionOperator.java
+++ 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractExternalPythonFunctionOperator.java
@@ -126,7 +126,7 @@ public abstract class 
AbstractExternalPythonFunctionOperator<OUT>
             return new ProcessPythonEnvironmentManager(
                     dependencyInfo,
                     
getContainingTask().getEnvironment().getTaskManagerInfo().getTmpDirectories(),
-                    new HashMap<>(System.getenv()),
+                    systemEnvEnabled ? new HashMap<>(System.getenv()) : new 
HashMap<>(),
                     getRuntimeContext().getJobId());
         } else {
             throw new UnsupportedOperationException(
diff --git 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
index 1b53b80998c..a967f85a3c1 100644
--- 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
+++ 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
@@ -41,6 +41,7 @@ import java.util.concurrent.ScheduledFuture;
 import static org.apache.flink.python.PythonOptions.MAX_BUNDLE_SIZE;
 import static org.apache.flink.python.PythonOptions.MAX_BUNDLE_TIME_MILLS;
 import static org.apache.flink.python.PythonOptions.PYTHON_METRIC_ENABLED;
+import static org.apache.flink.python.PythonOptions.PYTHON_SYSTEMENV_ENABLED;
 import static 
org.apache.flink.streaming.api.utils.ClassLeakCleaner.cleanUpLeakingClasses;
 import static 
org.apache.flink.streaming.api.utils.PythonOperatorUtils.inBatchExecutionMode;
 
@@ -52,6 +53,8 @@ public abstract class AbstractPythonFunctionOperator<OUT> 
extends AbstractStream
 
     protected final Configuration config;
 
+    protected transient boolean systemEnvEnabled;
+
     /** Max number of elements to include in a bundle. */
     protected transient int maxBundleSize;
 
@@ -78,6 +81,7 @@ public abstract class AbstractPythonFunctionOperator<OUT> 
extends AbstractStream
     @Override
     public void open() throws Exception {
         try {
+            this.systemEnvEnabled = config.get(PYTHON_SYSTEMENV_ENABLED);
             this.maxBundleSize = config.get(MAX_BUNDLE_SIZE);
             if (this.maxBundleSize <= 0) {
                 this.maxBundleSize = MAX_BUNDLE_SIZE.defaultValue();
diff --git 
a/flink-python/src/test/java/org/apache/flink/python/PythonOptionsTest.java 
b/flink-python/src/test/java/org/apache/flink/python/PythonOptionsTest.java
index 9dcb4a900a7..2b4bb881202 100644
--- a/flink-python/src/test/java/org/apache/flink/python/PythonOptionsTest.java
+++ b/flink-python/src/test/java/org/apache/flink/python/PythonOptionsTest.java
@@ -178,4 +178,20 @@ public class PythonOptionsTest {
                 configuration.get(PythonOptions.PYTHON_CLIENT_EXECUTABLE);
         assertThat(actualPythonClientExecutable, 
is(equalTo(expectedPythonClientExecutable)));
     }
+
+    @Test
+    void testPythonSystemEnvEnabled() {
+        final Configuration configuration = new Configuration();
+        final boolean isSystemEnvEnabled =
+                
configuration.getBoolean(PythonOptions.PYTHON_SYSTEMENV_ENABLED);
+        assertThat(isSystemEnvEnabled)
+                
.isEqualTo(PythonOptions.PYTHON_SYSTEMENV_ENABLED.defaultValue());
+
+        final boolean expectedIsSystemEnvEnabled = false;
+        configuration.setBoolean(PythonOptions.PYTHON_SYSTEMENV_ENABLED, 
false);
+
+        final boolean actualIsSystemEnvEnabled =
+                
configuration.getBoolean(PythonOptions.PYTHON_SYSTEMENV_ENABLED);
+        
assertThat(actualIsSystemEnvEnabled).isEqualTo(expectedIsSystemEnvEnabled);
+    }
 }

Reply via email to