This is an automated email from the ASF dual-hosted git repository.
dianfu pushed a commit to branch release-1.20
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.20 by this push:
new 5e6d1c32d1b [FLINK-38585][python] Fix python env set in Pyflink's
thread mode when using shipped venv.zip archive
5e6d1c32d1b is described below
commit 5e6d1c32d1bc38efc755eee9635f0efffd693dcb
Author: Biao Geng <[email protected]>
AuthorDate: Thu Feb 26 17:05:26 2026 +0800
[FLINK-38585][python] Fix python env set in Pyflink's thread mode when
using shipped venv.zip archive
This closes #27685.
---
.../flink/python/env/PythonDependencyInfo.java | 29 ++++++++++
.../embedded/EmbeddedPythonEnvironmentManager.java | 44 +++++++++++++---
.../flink/python/util/PythonDependencyUtils.java | 17 ++++++
.../flink/python/env/PythonDependencyInfoTest.java | 61 ++++++++++++++++++++++
.../python/util/PythonDependencyUtilsTest.java | 7 +++
5 files changed, 152 insertions(+), 6 deletions(-)
diff --git
a/flink-python/src/main/java/org/apache/flink/python/env/PythonDependencyInfo.java
b/flink-python/src/main/java/org/apache/flink/python/env/PythonDependencyInfo.java
index 2f65e5f7be7..1d3665c2ab1 100644
---
a/flink-python/src/main/java/org/apache/flink/python/env/PythonDependencyInfo.java
+++
b/flink-python/src/main/java/org/apache/flink/python/env/PythonDependencyInfo.java
@@ -39,6 +39,7 @@ import static
org.apache.flink.python.PythonOptions.PYTHON_EXECUTION_MODE;
import static
org.apache.flink.python.PythonOptions.PYTHON_FILES_DISTRIBUTED_CACHE_INFO;
import static org.apache.flink.python.PythonOptions.PYTHON_PATH;
import static
org.apache.flink.python.PythonOptions.PYTHON_REQUIREMENTS_FILE_DISTRIBUTED_CACHE_INFO;
+import static
org.apache.flink.python.util.PythonDependencyUtils.getArchiveTargetDirName;
/** PythonDependencyInfo contains the information of third-party dependencies.
*/
@Internal
@@ -203,4 +204,32 @@ public final class PythonDependencyInfo {
config.get(PYTHON_EXECUTION_MODE),
config.get(PYTHON_PATH));
}
+
+ /**
+ * Checks whether the configured python executable path is located within
one of the shipped
+ * archives.
+ *
+ * <p>This is determined by comparing the first path component of the
python executable (the
+ * base directory) against the target directory names of all registered
archives.
+ *
+ * @return {@code true} if the python executable's base directory matches
an archive target
+ * directory
+ */
+ public boolean isPythonExecFromArchives() {
+ if (archives.isEmpty()) {
+ return false;
+ }
+ int index = pythonExec.indexOf(File.separator);
+ if (index == -1) {
+ index = pythonExec.length();
+ }
+ String pythonExecBaseDir = pythonExec.substring(0, index);
+ for (Map.Entry<String, String> entry : archives.entrySet()) {
+ String targetDirName = getArchiveTargetDirName(entry.getValue());
+ if (targetDirName.equals(pythonExecBaseDir)) {
+ return true;
+ }
+ }
+ return false;
+ }
}
diff --git
a/flink-python/src/main/java/org/apache/flink/python/env/embedded/EmbeddedPythonEnvironmentManager.java
b/flink-python/src/main/java/org/apache/flink/python/env/embedded/EmbeddedPythonEnvironmentManager.java
index 896c3a2c4ff..7d8fe49d5b4 100644
---
a/flink-python/src/main/java/org/apache/flink/python/env/embedded/EmbeddedPythonEnvironmentManager.java
+++
b/flink-python/src/main/java/org/apache/flink/python/env/embedded/EmbeddedPythonEnvironmentManager.java
@@ -24,6 +24,8 @@ import
org.apache.flink.python.env.AbstractPythonEnvironmentManager;
import org.apache.flink.python.env.PythonDependencyInfo;
import org.apache.flink.python.env.PythonEnvironment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import pemja.core.PythonInterpreterConfig;
import java.io.File;
@@ -36,6 +38,8 @@ import java.util.Map;
*/
@Internal
public class EmbeddedPythonEnvironmentManager extends
AbstractPythonEnvironmentManager {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(EmbeddedPythonEnvironmentManager.class);
public EmbeddedPythonEnvironmentManager(
PythonDependencyInfo dependencyInfo,
@@ -70,13 +74,41 @@ public class EmbeddedPythonEnvironmentManager extends
AbstractPythonEnvironmentM
flinkPython + File.pathSeparator +
env.getOrDefault("PYTHONPATH", ""));
}
- PythonInterpreterConfig interpreterConfig =
+ PythonInterpreterConfig.PythonInterpreterConfigBuilder
interpreterConfigBuilder =
PythonInterpreterConfig.newBuilder()
- .setPythonExec(dependencyInfo.getPythonExec())
.setExcType(execType)
- .addPythonPaths(env.getOrDefault("PYTHONPATH", ""))
- .build();
-
- return new EmbeddedPythonEnvironment(interpreterConfig, env);
+ .addPythonPaths(env.getOrDefault("PYTHONPATH", ""));
+ if (dependencyInfo.isPythonExecFromArchives()) {
+ String pythonExecPath = dependencyInfo.getPythonExec();
+ String binSeparator = File.separator + "bin" + File.separator;
+ int binIndex = pythonExecPath.lastIndexOf(binSeparator);
+ if (binIndex == -1) {
+ throw new RuntimeException(
+ String.format(
+ "Python executable '%s' is from archives but
does not contain "
+ + "a '%s' path component. Expected a
path like "
+ + "'venv/bin/python'.",
+ pythonExecPath, binSeparator));
+ }
+ String pythonHome =
+ String.join(
+ File.separator,
+ env.get(PYTHON_WORKING_DIR),
+ pythonExecPath.substring(0, binIndex));
+ String pythonExec =
+ String.join(File.separator, env.get(PYTHON_WORKING_DIR),
pythonExecPath);
+ LOG.info(
+ "Use python home and python exec from archives. Python
home: {}, Python exec: {}",
+ pythonHome,
+ pythonExec);
+
interpreterConfigBuilder.setPythonHome(pythonHome).setPythonExec(pythonExec);
+ } else {
+ LOG.info(
+ "Python interpreter path is not from archives, use python
exec from "
+ + "config {}.",
+ dependencyInfo.getPythonExec());
+
interpreterConfigBuilder.setPythonExec(dependencyInfo.getPythonExec());
+ }
+ return new EmbeddedPythonEnvironment(interpreterConfigBuilder.build(),
env);
}
}
diff --git
a/flink-python/src/main/java/org/apache/flink/python/util/PythonDependencyUtils.java
b/flink-python/src/main/java/org/apache/flink/python/util/PythonDependencyUtils.java
index 17f36ee4987..55b3eb147c4 100644
---
a/flink-python/src/main/java/org/apache/flink/python/util/PythonDependencyUtils.java
+++
b/flink-python/src/main/java/org/apache/flink/python/util/PythonDependencyUtils.java
@@ -69,6 +69,23 @@ public class PythonDependencyUtils {
public static final String PARAM_DELIMITER = "#";
private static final String HASH_ALGORITHM = "SHA-256";
+ /**
+ * Extracts the target directory name from an archive value string.
+ *
+ * <p>The archive value may contain a {@link #PARAM_DELIMITER} separator.
If present, the format
+ * is "originalFileName#targetDirName" and the target directory name is
the part after the
+ * delimiter. Otherwise, the entire value is used as the target directory
name.
+ *
+ * @param archiveValue the archive value string, e.g. "venv.zip#venv" or
"venv"
+ * @return the target directory name
+ */
+ public static String getArchiveTargetDirName(String archiveValue) {
+ if (archiveValue.contains(PARAM_DELIMITER)) {
+ return archiveValue.split(PARAM_DELIMITER, 2)[1];
+ }
+ return archiveValue;
+ }
+
/**
* Adds python dependencies to registered cache file list according to
given configuration and
* returns a new configuration which contains the metadata of the
registered python
diff --git
a/flink-python/src/test/java/org/apache/flink/python/env/PythonDependencyInfoTest.java
b/flink-python/src/test/java/org/apache/flink/python/env/PythonDependencyInfoTest.java
index f3b4c437077..dfc2341d0ec 100644
---
a/flink-python/src/test/java/org/apache/flink/python/env/PythonDependencyInfoTest.java
+++
b/flink-python/src/test/java/org/apache/flink/python/env/PythonDependencyInfoTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.util.OperatingSystem;
import org.junit.jupiter.api.Test;
+import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
@@ -136,4 +137,64 @@ class PythonDependencyInfoTest {
assertThat(dependencyInfo.getPythonExec()).isEqualTo("/usr/bin/python3");
}
+
+ @Test
+ void testIsPythonExecFromArchives() {
+ // exec matches archive target dir
+ Map<String, String> archives = new HashMap<>();
+ archives.put("/tmp/venv.zip", "venv");
+ assertThat(
+ new PythonDependencyInfo(
+ new HashMap<>(),
+ null,
+ null,
+ archives,
+ "venv" + File.separator + "bin" +
File.separator + "python")
+ .isPythonExecFromArchives())
+ .isTrue();
+
+ // exec does not match any archive
+ assertThat(
+ new PythonDependencyInfo(
+ new HashMap<>(), null, null, archives,
"/usr/bin/python3")
+ .isPythonExecFromArchives())
+ .isFalse();
+
+ // archive value with param delimiter
("originalFileName#targetDirName")
+ Map<String, String> archivesWithDelimiter = new HashMap<>();
+ archivesWithDelimiter.put("/tmp/venv.zip", "venv.zip#myenv");
+ assertThat(
+ new PythonDependencyInfo(
+ new HashMap<>(),
+ null,
+ null,
+ archivesWithDelimiter,
+ "myenv"
+ + File.separator
+ + "bin"
+ + File.separator
+ + "python")
+ .isPythonExecFromArchives())
+ .isTrue();
+
+ // empty archives
+ assertThat(
+ new PythonDependencyInfo(
+ new HashMap<>(),
+ null,
+ null,
+ new HashMap<>(),
+ "venv" + File.separator + "bin" +
File.separator + "python")
+ .isPythonExecFromArchives())
+ .isFalse();
+
+ // exec without separator — entire string is the base dir
+ Map<String, String> archivesForBareExec = new HashMap<>();
+ archivesForBareExec.put("/tmp/python.zip", "python");
+ assertThat(
+ new PythonDependencyInfo(
+ new HashMap<>(), null, null,
archivesForBareExec, "python")
+ .isPythonExecFromArchives())
+ .isTrue();
+ }
}
diff --git
a/flink-python/src/test/java/org/apache/flink/python/util/PythonDependencyUtilsTest.java
b/flink-python/src/test/java/org/apache/flink/python/util/PythonDependencyUtilsTest.java
index e75c5c6c551..3240e9a4f6b 100644
---
a/flink-python/src/test/java/org/apache/flink/python/util/PythonDependencyUtilsTest.java
+++
b/flink-python/src/test/java/org/apache/flink/python/util/PythonDependencyUtilsTest.java
@@ -39,6 +39,7 @@ import static
org.apache.flink.python.PythonOptions.PYTHON_REQUIREMENTS_FILE_DIS
import static org.apache.flink.python.util.PythonDependencyUtils.CACHE;
import static org.apache.flink.python.util.PythonDependencyUtils.FILE;
import static
org.apache.flink.python.util.PythonDependencyUtils.configurePythonDependencies;
+import static
org.apache.flink.python.util.PythonDependencyUtils.getArchiveTargetDirName;
import static org.apache.flink.python.util.PythonDependencyUtils.merge;
import static org.assertj.core.api.Assertions.assertThat;
@@ -237,6 +238,12 @@ class PythonDependencyUtilsTest {
verifyConfiguration(expectedConfiguration, actual);
}
+ @Test
+ void testGetArchiveTargetDirName() {
+ assertThat(getArchiveTargetDirName("venv.zip#venv")).isEqualTo("venv");
+ assertThat(getArchiveTargetDirName("venv")).isEqualTo("venv");
+ }
+
private void verifyCachedFiles(Map<String, String> expected, Configuration
config) {
Map<String, String> actual =
config.getOptional(PipelineOptions.CACHED_FILES).orElse(new
ArrayList<>()).stream()