This is an automated email from the ASF dual-hosted git repository.
hxb pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.15 by this push:
new 91ccde95c7e [FLINK-29479][python] Fix system env cause conflict with
users python depdendency
91ccde95c7e is described below
commit 91ccde95c7eae7f020d68592a7fa76201674724a
Author: liuyongvs <[email protected]>
AuthorDate: Fri Oct 21 18:07:52 2022 +0800
[FLINK-29479][python] Fix system env cause conflict with users python
depdendency
This closes #21110.
---
.../shortcodes/generated/python_configuration.html | 6 ++++++
.../main/java/org/apache/flink/python/PythonOptions.java | 8 ++++++++
.../python/AbstractEmbeddedPythonFunctionOperator.java | 2 +-
.../python/AbstractExternalPythonFunctionOperator.java | 2 +-
.../operators/python/AbstractPythonFunctionOperator.java | 4 ++++
.../java/org/apache/flink/python/PythonOptionsTest.java | 16 ++++++++++++++++
6 files changed, 36 insertions(+), 2 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/python_configuration.html
b/docs/layouts/shortcodes/generated/python_configuration.html
index cdaabcf4621..370f931b76c 100644
--- a/docs/layouts/shortcodes/generated/python_configuration.html
+++ b/docs/layouts/shortcodes/generated/python_configuration.html
@@ -110,5 +110,11 @@
<td>Integer</td>
<td>The maximum number of states cached in a Python UDF worker.
Note that this is an experimental flag and might not be available in future
releases.</td>
</tr>
+ <tr>
+ <td><h5>python.systemenv.enabled</h5></td>
+ <td style="word-wrap: break-word;">true</td>
+ <td>Boolean</td>
+ <td>Specify whether to load System Environment when starting
Python worker.</td>
+ </tr>
</tbody>
</table>
diff --git
a/flink-python/src/main/java/org/apache/flink/python/PythonOptions.java
b/flink-python/src/main/java/org/apache/flink/python/PythonOptions.java
index 47bfbd3d567..ab519c660db 100644
--- a/flink-python/src/main/java/org/apache/flink/python/PythonOptions.java
+++ b/flink-python/src/main/java/org/apache/flink/python/PythonOptions.java
@@ -83,6 +83,14 @@ public class PythonOptions {
+ "The interval between each profiling is
determined by the config options "
+ "python.fn-execution.bundle.size and
python.fn-execution.bundle.time.");
+ /** The configuration to enable or disable system env for Python
execution. */
+ public static final ConfigOption<Boolean> PYTHON_SYSTEMENV_ENABLED =
+ ConfigOptions.key("python.systemenv.enabled")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription(
+ "Specify whether to load System Environment when
starting Python worker.");
+
/** The configuration to enable or disable python operator chaining. */
public static final ConfigOption<Boolean> PYTHON_OPERATOR_CHAINING_ENABLED
=
ConfigOptions.key("python.operator-chaining.enabled")
diff --git
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractEmbeddedPythonFunctionOperator.java
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractEmbeddedPythonFunctionOperator.java
index 6dc961832ee..70343275a23 100644
---
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractEmbeddedPythonFunctionOperator.java
+++
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractEmbeddedPythonFunctionOperator.java
@@ -142,7 +142,7 @@ public abstract class
AbstractEmbeddedPythonFunctionOperator<OUT>
return new EmbeddedPythonEnvironmentManager(
dependencyInfo,
getContainingTask().getEnvironment().getTaskManagerInfo().getTmpDirectories(),
- new HashMap<>(System.getenv()),
+ systemEnvEnabled ? new HashMap<>(System.getenv()) : new
HashMap<>(),
getRuntimeContext().getJobId());
}
diff --git
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractExternalPythonFunctionOperator.java
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractExternalPythonFunctionOperator.java
index d95ffafb5af..12b35783066 100644
---
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractExternalPythonFunctionOperator.java
+++
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractExternalPythonFunctionOperator.java
@@ -126,7 +126,7 @@ public abstract class
AbstractExternalPythonFunctionOperator<OUT>
return new ProcessPythonEnvironmentManager(
dependencyInfo,
getContainingTask().getEnvironment().getTaskManagerInfo().getTmpDirectories(),
- new HashMap<>(System.getenv()),
+ systemEnvEnabled ? new HashMap<>(System.getenv()) : new
HashMap<>(),
getRuntimeContext().getJobId());
} else {
throw new UnsupportedOperationException(
diff --git
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
index 1b53b80998c..a967f85a3c1 100644
---
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
+++
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
@@ -41,6 +41,7 @@ import java.util.concurrent.ScheduledFuture;
import static org.apache.flink.python.PythonOptions.MAX_BUNDLE_SIZE;
import static org.apache.flink.python.PythonOptions.MAX_BUNDLE_TIME_MILLS;
import static org.apache.flink.python.PythonOptions.PYTHON_METRIC_ENABLED;
+import static org.apache.flink.python.PythonOptions.PYTHON_SYSTEMENV_ENABLED;
import static
org.apache.flink.streaming.api.utils.ClassLeakCleaner.cleanUpLeakingClasses;
import static
org.apache.flink.streaming.api.utils.PythonOperatorUtils.inBatchExecutionMode;
@@ -52,6 +53,8 @@ public abstract class AbstractPythonFunctionOperator<OUT>
extends AbstractStream
protected final Configuration config;
+ protected transient boolean systemEnvEnabled;
+
/** Max number of elements to include in a bundle. */
protected transient int maxBundleSize;
@@ -78,6 +81,7 @@ public abstract class AbstractPythonFunctionOperator<OUT>
extends AbstractStream
@Override
public void open() throws Exception {
try {
+ this.systemEnvEnabled = config.get(PYTHON_SYSTEMENV_ENABLED);
this.maxBundleSize = config.get(MAX_BUNDLE_SIZE);
if (this.maxBundleSize <= 0) {
this.maxBundleSize = MAX_BUNDLE_SIZE.defaultValue();
diff --git
a/flink-python/src/test/java/org/apache/flink/python/PythonOptionsTest.java
b/flink-python/src/test/java/org/apache/flink/python/PythonOptionsTest.java
index 9dcb4a900a7..2b4bb881202 100644
--- a/flink-python/src/test/java/org/apache/flink/python/PythonOptionsTest.java
+++ b/flink-python/src/test/java/org/apache/flink/python/PythonOptionsTest.java
@@ -178,4 +178,20 @@ public class PythonOptionsTest {
configuration.get(PythonOptions.PYTHON_CLIENT_EXECUTABLE);
assertThat(actualPythonClientExecutable,
is(equalTo(expectedPythonClientExecutable)));
}
+
+ @Test
+ void testPythonSystemEnvEnabled() {
+ final Configuration configuration = new Configuration();
+ final boolean isSystemEnvEnabled =
+
configuration.getBoolean(PythonOptions.PYTHON_SYSTEMENV_ENABLED);
+ assertThat(isSystemEnvEnabled)
+
.isEqualTo(PythonOptions.PYTHON_SYSTEMENV_ENABLED.defaultValue());
+
+ final boolean expectedIsSystemEnvEnabled = false;
+ configuration.setBoolean(PythonOptions.PYTHON_SYSTEMENV_ENABLED,
false);
+
+ final boolean actualIsSystemEnvEnabled =
+
configuration.getBoolean(PythonOptions.PYTHON_SYSTEMENV_ENABLED);
+
assertThat(actualIsSystemEnvEnabled).isEqualTo(expectedIsSystemEnvEnabled);
+ }
}