This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 97dd9f672f7 [FLINK-38585][python] Fix python env set in Pyflink's 
thread mode when using shipped venv.zip archive
97dd9f672f7 is described below

commit 97dd9f672f72b2c0e585ee13dc9dec8f5d174174
Author: Biao Geng <[email protected]>
AuthorDate: Thu Feb 26 17:05:26 2026 +0800

    [FLINK-38585][python] Fix python env set in Pyflink's thread mode when 
using shipped venv.zip archive
    
    This closes #27685.
---
 .../flink/python/env/PythonDependencyInfo.java     | 29 ++++++++++
 .../embedded/EmbeddedPythonEnvironmentManager.java | 44 +++++++++++++---
 .../flink/python/util/PythonDependencyUtils.java   | 17 ++++++
 .../flink/python/env/PythonDependencyInfoTest.java | 61 ++++++++++++++++++++++
 .../python/util/PythonDependencyUtilsTest.java     |  7 +++
 5 files changed, 152 insertions(+), 6 deletions(-)

diff --git 
a/flink-python/src/main/java/org/apache/flink/python/env/PythonDependencyInfo.java
 
b/flink-python/src/main/java/org/apache/flink/python/env/PythonDependencyInfo.java
index 2f65e5f7be7..1d3665c2ab1 100644
--- 
a/flink-python/src/main/java/org/apache/flink/python/env/PythonDependencyInfo.java
+++ 
b/flink-python/src/main/java/org/apache/flink/python/env/PythonDependencyInfo.java
@@ -39,6 +39,7 @@ import static 
org.apache.flink.python.PythonOptions.PYTHON_EXECUTION_MODE;
 import static 
org.apache.flink.python.PythonOptions.PYTHON_FILES_DISTRIBUTED_CACHE_INFO;
 import static org.apache.flink.python.PythonOptions.PYTHON_PATH;
 import static 
org.apache.flink.python.PythonOptions.PYTHON_REQUIREMENTS_FILE_DISTRIBUTED_CACHE_INFO;
+import static 
org.apache.flink.python.util.PythonDependencyUtils.getArchiveTargetDirName;
 
 /** PythonDependencyInfo contains the information of third-party dependencies. 
*/
 @Internal
@@ -203,4 +204,32 @@ public final class PythonDependencyInfo {
                 config.get(PYTHON_EXECUTION_MODE),
                 config.get(PYTHON_PATH));
     }
+
+    /**
+     * Checks whether the configured python executable path is located within 
one of the shipped
+     * archives.
+     *
+     * <p>This is determined by comparing the first path component of the 
python executable (the
+     * base directory) against the target directory names of all registered 
archives.
+     *
+     * @return {@code true} if the python executable's base directory matches 
an archive target
+     *     directory
+     */
+    public boolean isPythonExecFromArchives() {
+        if (archives.isEmpty()) {
+            return false;
+        }
+        int index = pythonExec.indexOf(File.separator);
+        if (index == -1) {
+            index = pythonExec.length();
+        }
+        String pythonExecBaseDir = pythonExec.substring(0, index);
+        for (Map.Entry<String, String> entry : archives.entrySet()) {
+            String targetDirName = getArchiveTargetDirName(entry.getValue());
+            if (targetDirName.equals(pythonExecBaseDir)) {
+                return true;
+            }
+        }
+        return false;
+    }
 }
diff --git 
a/flink-python/src/main/java/org/apache/flink/python/env/embedded/EmbeddedPythonEnvironmentManager.java
 
b/flink-python/src/main/java/org/apache/flink/python/env/embedded/EmbeddedPythonEnvironmentManager.java
index 896c3a2c4ff..7d8fe49d5b4 100644
--- 
a/flink-python/src/main/java/org/apache/flink/python/env/embedded/EmbeddedPythonEnvironmentManager.java
+++ 
b/flink-python/src/main/java/org/apache/flink/python/env/embedded/EmbeddedPythonEnvironmentManager.java
@@ -24,6 +24,8 @@ import 
org.apache.flink.python.env.AbstractPythonEnvironmentManager;
 import org.apache.flink.python.env.PythonDependencyInfo;
 import org.apache.flink.python.env.PythonEnvironment;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import pemja.core.PythonInterpreterConfig;
 
 import java.io.File;
@@ -36,6 +38,8 @@ import java.util.Map;
  */
 @Internal
 public class EmbeddedPythonEnvironmentManager extends 
AbstractPythonEnvironmentManager {
+    private static final Logger LOG =
+            LoggerFactory.getLogger(EmbeddedPythonEnvironmentManager.class);
 
     public EmbeddedPythonEnvironmentManager(
             PythonDependencyInfo dependencyInfo,
@@ -70,13 +74,41 @@ public class EmbeddedPythonEnvironmentManager extends 
AbstractPythonEnvironmentM
                     flinkPython + File.pathSeparator + 
env.getOrDefault("PYTHONPATH", ""));
         }
 
-        PythonInterpreterConfig interpreterConfig =
+        PythonInterpreterConfig.PythonInterpreterConfigBuilder 
interpreterConfigBuilder =
                 PythonInterpreterConfig.newBuilder()
-                        .setPythonExec(dependencyInfo.getPythonExec())
                         .setExcType(execType)
-                        .addPythonPaths(env.getOrDefault("PYTHONPATH", ""))
-                        .build();
-
-        return new EmbeddedPythonEnvironment(interpreterConfig, env);
+                        .addPythonPaths(env.getOrDefault("PYTHONPATH", ""));
+        if (dependencyInfo.isPythonExecFromArchives()) {
+            String pythonExecPath = dependencyInfo.getPythonExec();
+            String binSeparator = File.separator + "bin" + File.separator;
+            int binIndex = pythonExecPath.lastIndexOf(binSeparator);
+            if (binIndex == -1) {
+                throw new RuntimeException(
+                        String.format(
+                                "Python executable '%s' is from archives but 
does not contain "
+                                        + "a '%s' path component. Expected a 
path like "
+                                        + "'venv/bin/python'.",
+                                pythonExecPath, binSeparator));
+            }
+            String pythonHome =
+                    String.join(
+                            File.separator,
+                            env.get(PYTHON_WORKING_DIR),
+                            pythonExecPath.substring(0, binIndex));
+            String pythonExec =
+                    String.join(File.separator, env.get(PYTHON_WORKING_DIR), 
pythonExecPath);
+            LOG.info(
+                    "Use python home and python exec from archives. Python 
home: {}, Python exec: {}",
+                    pythonHome,
+                    pythonExec);
+            
interpreterConfigBuilder.setPythonHome(pythonHome).setPythonExec(pythonExec);
+        } else {
+            LOG.info(
+                    "Python interpreter path is not from archives, use python 
exec from "
+                            + "config {}.",
+                    dependencyInfo.getPythonExec());
+            
interpreterConfigBuilder.setPythonExec(dependencyInfo.getPythonExec());
+        }
+        return new EmbeddedPythonEnvironment(interpreterConfigBuilder.build(), 
env);
     }
 }
diff --git 
a/flink-python/src/main/java/org/apache/flink/python/util/PythonDependencyUtils.java
 
b/flink-python/src/main/java/org/apache/flink/python/util/PythonDependencyUtils.java
index 15d10e42bb9..a1b54188029 100644
--- 
a/flink-python/src/main/java/org/apache/flink/python/util/PythonDependencyUtils.java
+++ 
b/flink-python/src/main/java/org/apache/flink/python/util/PythonDependencyUtils.java
@@ -69,6 +69,23 @@ public class PythonDependencyUtils {
     public static final String PARAM_DELIMITER = "#";
     private static final String HASH_ALGORITHM = "SHA-256";
 
+    /**
+     * Extracts the target directory name from an archive value string.
+     *
+     * <p>The archive value may contain a {@link #PARAM_DELIMITER} separator. 
If present, the format
+     * is "originalFileName#targetDirName" and the target directory name is 
the part after the
+     * delimiter. Otherwise, the entire value is used as the target directory 
name.
+     *
+     * @param archiveValue the archive value string, e.g. "venv.zip#venv" or 
"venv"
+     * @return the target directory name
+     */
+    public static String getArchiveTargetDirName(String archiveValue) {
+        if (archiveValue.contains(PARAM_DELIMITER)) {
+            return archiveValue.split(PARAM_DELIMITER, 2)[1];
+        }
+        return archiveValue;
+    }
+
     /**
      * Adds python dependencies to registered cache file list according to 
given configuration and
      * returns a new configuration which contains the metadata of the 
registered python
diff --git 
a/flink-python/src/test/java/org/apache/flink/python/env/PythonDependencyInfoTest.java
 
b/flink-python/src/test/java/org/apache/flink/python/env/PythonDependencyInfoTest.java
index f3b4c437077..dfc2341d0ec 100644
--- 
a/flink-python/src/test/java/org/apache/flink/python/env/PythonDependencyInfoTest.java
+++ 
b/flink-python/src/test/java/org/apache/flink/python/env/PythonDependencyInfoTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.util.OperatingSystem;
 
 import org.junit.jupiter.api.Test;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
@@ -136,4 +137,64 @@ class PythonDependencyInfoTest {
 
         
assertThat(dependencyInfo.getPythonExec()).isEqualTo("/usr/bin/python3");
     }
+
+    @Test
+    void testIsPythonExecFromArchives() {
+        // exec matches archive target dir
+        Map<String, String> archives = new HashMap<>();
+        archives.put("/tmp/venv.zip", "venv");
+        assertThat(
+                        new PythonDependencyInfo(
+                                        new HashMap<>(),
+                                        null,
+                                        null,
+                                        archives,
+                                        "venv" + File.separator + "bin" + 
File.separator + "python")
+                                .isPythonExecFromArchives())
+                .isTrue();
+
+        // exec does not match any archive
+        assertThat(
+                        new PythonDependencyInfo(
+                                        new HashMap<>(), null, null, archives, 
"/usr/bin/python3")
+                                .isPythonExecFromArchives())
+                .isFalse();
+
+        // archive value with param delimiter 
("originalFileName#targetDirName")
+        Map<String, String> archivesWithDelimiter = new HashMap<>();
+        archivesWithDelimiter.put("/tmp/venv.zip", "venv.zip#myenv");
+        assertThat(
+                        new PythonDependencyInfo(
+                                        new HashMap<>(),
+                                        null,
+                                        null,
+                                        archivesWithDelimiter,
+                                        "myenv"
+                                                + File.separator
+                                                + "bin"
+                                                + File.separator
+                                                + "python")
+                                .isPythonExecFromArchives())
+                .isTrue();
+
+        // empty archives
+        assertThat(
+                        new PythonDependencyInfo(
+                                        new HashMap<>(),
+                                        null,
+                                        null,
+                                        new HashMap<>(),
+                                        "venv" + File.separator + "bin" + 
File.separator + "python")
+                                .isPythonExecFromArchives())
+                .isFalse();
+
+        // exec without separator — entire string is the base dir
+        Map<String, String> archivesForBareExec = new HashMap<>();
+        archivesForBareExec.put("/tmp/python.zip", "python");
+        assertThat(
+                        new PythonDependencyInfo(
+                                        new HashMap<>(), null, null, 
archivesForBareExec, "python")
+                                .isPythonExecFromArchives())
+                .isTrue();
+    }
 }
diff --git 
a/flink-python/src/test/java/org/apache/flink/python/util/PythonDependencyUtilsTest.java
 
b/flink-python/src/test/java/org/apache/flink/python/util/PythonDependencyUtilsTest.java
index e75c5c6c551..3240e9a4f6b 100644
--- 
a/flink-python/src/test/java/org/apache/flink/python/util/PythonDependencyUtilsTest.java
+++ 
b/flink-python/src/test/java/org/apache/flink/python/util/PythonDependencyUtilsTest.java
@@ -39,6 +39,7 @@ import static 
org.apache.flink.python.PythonOptions.PYTHON_REQUIREMENTS_FILE_DIS
 import static org.apache.flink.python.util.PythonDependencyUtils.CACHE;
 import static org.apache.flink.python.util.PythonDependencyUtils.FILE;
 import static 
org.apache.flink.python.util.PythonDependencyUtils.configurePythonDependencies;
+import static 
org.apache.flink.python.util.PythonDependencyUtils.getArchiveTargetDirName;
 import static org.apache.flink.python.util.PythonDependencyUtils.merge;
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -237,6 +238,12 @@ class PythonDependencyUtilsTest {
         verifyConfiguration(expectedConfiguration, actual);
     }
 
+    @Test
+    void testGetArchiveTargetDirName() {
+        assertThat(getArchiveTargetDirName("venv.zip#venv")).isEqualTo("venv");
+        assertThat(getArchiveTargetDirName("venv")).isEqualTo("venv");
+    }
+
     private void verifyCachedFiles(Map<String, String> expected, Configuration 
config) {
         Map<String, String> actual =
                 config.getOptional(PipelineOptions.CACHED_FILES).orElse(new 
ArrayList<>()).stream()

Reply via email to