dianfu commented on a change in pull request #12092:
URL: https://github.com/apache/flink/pull/12092#discussion_r424833367



##########
File path: flink-python/pyflink/table/tests/test_dependency.py
##########
@@ -130,9 +129,22 @@ def test_set_requirements_with_cached_directory(self):
         os.mkdir(requirements_dir_path)
         package_file_name = "python-package1-0.0.0.tar.gz"
         with open(os.path.join(requirements_dir_path, package_file_name), 
'wb') as f:
-            from pyflink.fn_execution.tests.process_mode_test_data import 
file_data
             import base64
-            
f.write(base64.b64decode(json.loads(file_data[package_file_name])["data"]))
+            # This base64 data is encoded from a python package file which 
includes a
+            # "python_package1" module. The module contains a "plus(a, b)" 
function.

Review comment:
       could you add some description on how to recompute the base64?

##########
File path: 
flink-python/src/main/java/org/apache/flink/python/env/ProcessPythonEnvironmentManager.java
##########
@@ -221,7 +235,7 @@ public String createRetrievalToken() throws IOException {
                // disable the launching of gateway server to prevent from this 
dead loop:
                // launch UDF worker -> import udf -> import job code
                //        ^                                    | (If the job 
code is not enclosed in a
-               //                                                              
                   |  if name == 'main' statement)
+               //        |                                                     
               |  if name == 'main' statement)

Review comment:
       revert this change?

##########
File path: 
flink-python/src/main/java/org/apache/flink/python/util/PythonEnvironmentManagerUtils.java
##########
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.python.util;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.OperatingSystem;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Utils used to install the python requirements via pip.
+ */
+@Internal
+public class PythonEnvironmentManagerUtils {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(PythonEnvironmentManagerUtils.class);
+
+       private static final int MAX_RETRY_TIMES = 3;
+
+       private static final String PYFLINK_UDF_RUNNER_SH = 
"pyflink-udf-runner.sh";
+       private static final String PYFLINK_UDF_RUNNER_BAT = 
"pyflink-udf-runner.bat";
+
+       private static final String GET_SITE_PACKAGES_PATH_SCRIPT =
+               "import sys;" +
+               "from distutils.dist import Distribution;" +
+               "install_obj = Distribution().get_command_obj('install', 
create=True);" +
+               "install_obj.prefix = sys.argv[1];" +
+               "install_obj.finalize_options();" +
+               "installed_dir = [install_obj.install_purelib];" +
+               "install_obj.install_purelib != install_obj.install_platlib and 
" +
+                       "installed_dir.append(install_obj.install_platlib);" +
+               "print(installed_dir[0]);" +
+               "len(installed_dir) > 1 and " +
+                       "print(installed_dir[1])";
+
+       private static final String CHECK_PIP_VERSION_SCRIPT =
+               "import sys;" +
+               "from pkg_resources import get_distribution, parse_version;" +
+               "pip_version = get_distribution('pip').version;" +
+               "print(parse_version(pip_version) >= 
parse_version(sys.argv[1]))";
+
+       private static final String GET_RUNNER_DIR_SCRIPT =
+               "import pyflink;" +
+               "import os;" +
+               
"print(os.path.join(os.path.abspath(os.path.dirname(pyflink.__file__)), 
'bin'))";
+
+       /**
+        * Installs the 3rd party libraries listed in the user-provided 
requirements file. An optional
+        * requirements cached directory can be provided to support offline 
installation. In order not
+        * to populate the public environment, the libraries will be installed 
to the specified
+        * directory, and added to the PYTHONPATH of the UDF workers.
+        *
+        * @param requirementsFilePath The path of the requirements file.
+        * @param requirementsCacheDir The path of the requirements cached 
directory.
+        * @param requirementsInstallDir The target directory of the 
installation.
+        * @param pythonExecutable The python interpreter used to launch the 
pip program.
+        * @param environmentVariables The environment variables used to launch 
the pip program.
+        */
+       public static void pipInstallRequirements(
+                       String requirementsFilePath,
+                       @Nullable String requirementsCacheDir,
+                       String requirementsInstallDir,
+                       String pythonExecutable,
+                       Map<String, String> environmentVariables) throws 
IOException, InterruptedException {
+               String sitePackagesPath = 
getSitePackagesPath(requirementsInstallDir, pythonExecutable, 
environmentVariables);
+               String path = String.join(File.pathSeparator, 
requirementsInstallDir, "bin");
+               appendToEnvironmentVariable("PYTHONPATH", sitePackagesPath, 
environmentVariables);
+               appendToEnvironmentVariable("PATH", path, environmentVariables);
+
+               List<String> commands = new ArrayList<>(Arrays.asList(
+                       pythonExecutable, "-m", "pip", "install", 
"--ignore-installed", "-r", requirementsFilePath));
+               if (isPipVersionGreaterEqual("8.0.0", pythonExecutable, 
environmentVariables)) {
+                       commands.addAll(Arrays.asList("--prefix", 
requirementsInstallDir));
+               } else {
+                       commands.addAll(Arrays.asList("--install-option", 
"--prefix=" + requirementsInstallDir));
+               }
+               if (requirementsCacheDir != null) {
+                       commands.addAll(Arrays.asList("--find-links", 
requirementsCacheDir));
+               }
+
+               int retries = 0;
+               while (true) {
+                       try {
+                               execute(commands.toArray(new String[0]), 
environmentVariables, true);
+                               break;
+                       } catch (Throwable t) {
+                               retries++;
+                               if (retries < MAX_RETRY_TIMES) {
+                                       LOG.warn(String.format("Pip install 
failed, retrying... (%d/%d)", retries, MAX_RETRY_TIMES), t);
+                               } else {
+                                       LOG.error(String.format("Pip install 
failed, already retried %d time...", retries));
+                                       throw t;
+                               }
+                       }
+               }
+       }
+
+       public static String getRunnerScriptPath(
+                       String pythonExecutable,
+                       Map<String, String> environmentVariables) throws 
IOException, InterruptedException {
+               String[] commands = new String[] { pythonExecutable, "-c", 
GET_RUNNER_DIR_SCRIPT};
+               String out = execute(commands, environmentVariables, false);
+               String runnerScriptPath;
+               if (OperatingSystem.isWindows()) {
+                       runnerScriptPath = String.join(File.separator, 
out.trim(), PYFLINK_UDF_RUNNER_BAT);
+               } else {
+                       runnerScriptPath = String.join(File.separator, 
out.trim(), PYFLINK_UDF_RUNNER_SH);
+               }
+               if (!new File(runnerScriptPath).exists()) {
+                       throw new FileNotFoundException(String.format(
+                               "The runner script '%s' does not exist! " +
+                               "Please reinstall the apache-flink Python 
package.", runnerScriptPath));
+               }
+               return runnerScriptPath;
+       }
+
+       private static String getSitePackagesPath(
+                       String prefix,
+                       String pythonExecutable,

Review comment:
       it will not throw `InterruptedException` any more. Should also check the 
other places.

##########
File path: 
flink-python/src/main/java/org/apache/flink/python/util/PythonEnvironmentManagerUtils.java
##########
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.python.util;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.OperatingSystem;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Utils used to install the python requirements via pip.
+ */
+@Internal
+public class PythonEnvironmentManagerUtils {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(PythonEnvironmentManagerUtils.class);
+
+       private static final int MAX_RETRY_TIMES = 3;
+
+       private static final String PYFLINK_UDF_RUNNER_SH = 
"pyflink-udf-runner.sh";
+       private static final String PYFLINK_UDF_RUNNER_BAT = 
"pyflink-udf-runner.bat";
+
+       private static final String GET_SITE_PACKAGES_PATH_SCRIPT =
+               "import sys;" +
+               "from distutils.dist import Distribution;" +
+               "install_obj = Distribution().get_command_obj('install', 
create=True);" +
+               "install_obj.prefix = sys.argv[1];" +
+               "install_obj.finalize_options();" +
+               "installed_dir = [install_obj.install_purelib];" +
+               "install_obj.install_purelib != install_obj.install_platlib and 
" +
+                       "installed_dir.append(install_obj.install_platlib);" +
+               "print(installed_dir[0]);" +
+               "len(installed_dir) > 1 and " +
+                       "print(installed_dir[1])";
+
+       private static final String CHECK_PIP_VERSION_SCRIPT =
+               "import sys;" +
+               "from pkg_resources import get_distribution, parse_version;" +
+               "pip_version = get_distribution('pip').version;" +
+               "print(parse_version(pip_version) >= 
parse_version(sys.argv[1]))";
+
+       private static final String GET_RUNNER_DIR_SCRIPT =
+               "import pyflink;" +
+               "import os;" +
+               
"print(os.path.join(os.path.abspath(os.path.dirname(pyflink.__file__)), 
'bin'))";
+
+       /**
+        * Installs the 3rd party libraries listed in the user-provided 
requirements file. An optional
+        * requirements cached directory can be provided to support offline 
installation. In order not
+        * to populate the public environment, the libraries will be installed 
to the specified
+        * directory, and added to the PYTHONPATH of the UDF workers.
+        *
+        * @param requirementsFilePath The path of the requirements file.
+        * @param requirementsCacheDir The path of the requirements cached 
directory.
+        * @param requirementsInstallDir The target directory of the 
installation.
+        * @param pythonExecutable The python interpreter used to launch the 
pip program.
+        * @param environmentVariables The environment variables used to launch 
the pip program.
+        */
+       public static void pipInstallRequirements(
+                       String requirementsFilePath,
+                       @Nullable String requirementsCacheDir,
+                       String requirementsInstallDir,
+                       String pythonExecutable,
+                       Map<String, String> environmentVariables) throws 
IOException, InterruptedException {
+               String sitePackagesPath = 
getSitePackagesPath(requirementsInstallDir, pythonExecutable, 
environmentVariables);
+               String path = String.join(File.pathSeparator, 
requirementsInstallDir, "bin");
+               appendToEnvironmentVariable("PYTHONPATH", sitePackagesPath, 
environmentVariables);
+               appendToEnvironmentVariable("PATH", path, environmentVariables);
+
+               List<String> commands = new ArrayList<>(Arrays.asList(
+                       pythonExecutable, "-m", "pip", "install", 
"--ignore-installed", "-r", requirementsFilePath));
+               if (isPipVersionGreaterEqual("8.0.0", pythonExecutable, 
environmentVariables)) {
+                       commands.addAll(Arrays.asList("--prefix", 
requirementsInstallDir));
+               } else {
+                       commands.addAll(Arrays.asList("--install-option", 
"--prefix=" + requirementsInstallDir));
+               }
+               if (requirementsCacheDir != null) {
+                       commands.addAll(Arrays.asList("--find-links", 
requirementsCacheDir));
+               }
+
+               int retries = 0;
+               while (true) {
+                       try {
+                               execute(commands.toArray(new String[0]), 
environmentVariables, true);
+                               break;
+                       } catch (Throwable t) {
+                               retries++;
+                               if (retries < MAX_RETRY_TIMES) {
+                                       LOG.warn(String.format("Pip install 
failed, retrying... (%d/%d)", retries, MAX_RETRY_TIMES), t);
+                               } else {
+                                       LOG.error(String.format("Pip install 
failed, already retried %d time...", retries));
+                                       throw t;
+                               }
+                       }
+               }
+       }
+
+       public static String getRunnerScriptPath(
+                       String pythonExecutable,
+                       Map<String, String> environmentVariables) throws 
IOException, InterruptedException {
+               String[] commands = new String[] { pythonExecutable, "-c", 
GET_RUNNER_DIR_SCRIPT};
+               String out = execute(commands, environmentVariables, false);
+               String runnerScriptPath;
+               if (OperatingSystem.isWindows()) {
+                       runnerScriptPath = String.join(File.separator, 
out.trim(), PYFLINK_UDF_RUNNER_BAT);
+               } else {
+                       runnerScriptPath = String.join(File.separator, 
out.trim(), PYFLINK_UDF_RUNNER_SH);
+               }
+               if (!new File(runnerScriptPath).exists()) {
+                       throw new FileNotFoundException(String.format(
+                               "The runner script '%s' does not exist! " +
+                               "Please reinstall the apache-flink Python 
package.", runnerScriptPath));
+               }
+               return runnerScriptPath;
+       }
+
+       private static String getSitePackagesPath(
+                       String prefix,
+                       String pythonExecutable,
+                       Map<String, String> environmentVariables) throws 
IOException, InterruptedException {
+               String[] commands = new String[] { pythonExecutable, "-c", 
GET_SITE_PACKAGES_PATH_SCRIPT, prefix };
+               String out = execute(commands, environmentVariables, false);
+               return String.join(File.pathSeparator, out.trim().split("\n"));
+       }
+
+       private static boolean isPipVersionGreaterEqual(
+                       String pipVersion,
+                       String pythonExecutable,
+                       Map<String, String> environmentVariables) throws 
IOException, InterruptedException {
+               String[] commands = new String[] { pythonExecutable, "-c", 
CHECK_PIP_VERSION_SCRIPT, pipVersion };
+               String out = execute(commands, environmentVariables, false);
+               return Boolean.parseBoolean(out.trim());
+       }
+
+       private static String execute(
+                       String[] commands,
+                       Map<String, String> environmentVariables,
+                       boolean log) throws IOException {
+               ProcessBuilder pb = new ProcessBuilder(commands);
+               pb.environment().putAll(environmentVariables);
+               pb.redirectErrorStream(true);
+               Process p = pb.start();
+               InputStream in = new BufferedInputStream(p.getInputStream());
+               StringBuilder out = new StringBuilder();
+               String s;
+               if (log) {
+                       LOG.info(String.format("Execute the commnad: %s", 
String.join(" ", commands)));

Review comment:
       typo: command

##########
File path: 
flink-python/src/main/java/org/apache/flink/python/util/PythonEnvironmentManagerUtils.java
##########
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.python.util;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.OperatingSystem;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Utils used to install the python requirements via pip.
+ */
+@Internal
+public class PythonEnvironmentManagerUtils {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(PythonEnvironmentManagerUtils.class);
+
+       private static final int MAX_RETRY_TIMES = 3;
+
+       private static final String PYFLINK_UDF_RUNNER_SH = 
"pyflink-udf-runner.sh";
+       private static final String PYFLINK_UDF_RUNNER_BAT = 
"pyflink-udf-runner.bat";
+
+       private static final String GET_SITE_PACKAGES_PATH_SCRIPT =
+               "import sys;" +
+               "from distutils.dist import Distribution;" +
+               "install_obj = Distribution().get_command_obj('install', 
create=True);" +
+               "install_obj.prefix = sys.argv[1];" +
+               "install_obj.finalize_options();" +
+               "installed_dir = [install_obj.install_purelib];" +
+               "install_obj.install_purelib != install_obj.install_platlib and 
" +
+                       "installed_dir.append(install_obj.install_platlib);" +
+               "print(installed_dir[0]);" +
+               "len(installed_dir) > 1 and " +
+                       "print(installed_dir[1])";
+
+       private static final String CHECK_PIP_VERSION_SCRIPT =
+               "import sys;" +
+               "from pkg_resources import get_distribution, parse_version;" +
+               "pip_version = get_distribution('pip').version;" +
+               "print(parse_version(pip_version) >= 
parse_version(sys.argv[1]))";
+
+       private static final String GET_RUNNER_DIR_SCRIPT =
+               "import pyflink;" +
+               "import os;" +
+               
"print(os.path.join(os.path.abspath(os.path.dirname(pyflink.__file__)), 
'bin'))";
+
+       /**
+        * Installs the 3rd party libraries listed in the user-provided 
requirements file. An optional
+        * requirements cached directory can be provided to support offline 
installation. In order not
+        * to populate the public environment, the libraries will be installed 
to the specified
+        * directory, and added to the PYTHONPATH of the UDF workers.
+        *
+        * @param requirementsFilePath The path of the requirements file.
+        * @param requirementsCacheDir The path of the requirements cached 
directory.
+        * @param requirementsInstallDir The target directory of the 
installation.
+        * @param pythonExecutable The python interpreter used to launch the 
pip program.
+        * @param environmentVariables The environment variables used to launch 
the pip program.
+        */
+       public static void pipInstallRequirements(
+                       String requirementsFilePath,
+                       @Nullable String requirementsCacheDir,
+                       String requirementsInstallDir,
+                       String pythonExecutable,
+                       Map<String, String> environmentVariables) throws 
IOException, InterruptedException {
+               String sitePackagesPath = 
getSitePackagesPath(requirementsInstallDir, pythonExecutable, 
environmentVariables);
+               String path = String.join(File.pathSeparator, 
requirementsInstallDir, "bin");
+               appendToEnvironmentVariable("PYTHONPATH", sitePackagesPath, 
environmentVariables);
+               appendToEnvironmentVariable("PATH", path, environmentVariables);
+
+               List<String> commands = new ArrayList<>(Arrays.asList(
+                       pythonExecutable, "-m", "pip", "install", 
"--ignore-installed", "-r", requirementsFilePath));
+               if (isPipVersionGreaterEqual("8.0.0", pythonExecutable, 
environmentVariables)) {
+                       commands.addAll(Arrays.asList("--prefix", 
requirementsInstallDir));
+               } else {
+                       commands.addAll(Arrays.asList("--install-option", 
"--prefix=" + requirementsInstallDir));
+               }
+               if (requirementsCacheDir != null) {
+                       commands.addAll(Arrays.asList("--find-links", 
requirementsCacheDir));
+               }
+
+               int retries = 0;
+               while (true) {
+                       try {
+                               execute(commands.toArray(new String[0]), 
environmentVariables, true);
+                               break;
+                       } catch (Throwable t) {
+                               retries++;
+                               if (retries < MAX_RETRY_TIMES) {
+                                       LOG.warn(String.format("Pip install 
failed, retrying... (%d/%d)", retries, MAX_RETRY_TIMES), t);
+                               } else {
+                                       LOG.error(String.format("Pip install 
failed, already retried %d time...", retries));
+                                       throw t;
+                               }
+                       }
+               }
+       }
+
+       public static String getRunnerScriptPath(
+                       String pythonExecutable,
+                       Map<String, String> environmentVariables) throws 
IOException, InterruptedException {
+               String[] commands = new String[] { pythonExecutable, "-c", 
GET_RUNNER_DIR_SCRIPT};
+               String out = execute(commands, environmentVariables, false);
+               String runnerScriptPath;
+               if (OperatingSystem.isWindows()) {
+                       runnerScriptPath = String.join(File.separator, 
out.trim(), PYFLINK_UDF_RUNNER_BAT);
+               } else {
+                       runnerScriptPath = String.join(File.separator, 
out.trim(), PYFLINK_UDF_RUNNER_SH);
+               }
+               if (!new File(runnerScriptPath).exists()) {
+                       throw new FileNotFoundException(String.format(
+                               "The runner script '%s' does not exist! " +
+                               "Please reinstall the apache-flink Python 
package.", runnerScriptPath));
+               }
+               return runnerScriptPath;
+       }
+
+       private static String getSitePackagesPath(
+                       String prefix,
+                       String pythonExecutable,
+                       Map<String, String> environmentVariables) throws 
IOException, InterruptedException {
+               String[] commands = new String[] { pythonExecutable, "-c", 
GET_SITE_PACKAGES_PATH_SCRIPT, prefix };
+               String out = execute(commands, environmentVariables, false);
+               return String.join(File.pathSeparator, out.trim().split("\n"));
+       }
+
+       private static boolean isPipVersionGreaterEqual(
+                       String pipVersion,
+                       String pythonExecutable,
+                       Map<String, String> environmentVariables) throws 
IOException, InterruptedException {
+               String[] commands = new String[] { pythonExecutable, "-c", 
CHECK_PIP_VERSION_SCRIPT, pipVersion };
+               String out = execute(commands, environmentVariables, false);
+               return Boolean.parseBoolean(out.trim());
+       }
+
+       private static String execute(
+                       String[] commands,
+                       Map<String, String> environmentVariables,
+                       boolean log) throws IOException {
+               ProcessBuilder pb = new ProcessBuilder(commands);
+               pb.environment().putAll(environmentVariables);
+               pb.redirectErrorStream(true);
+               Process p = pb.start();
+               InputStream in = new BufferedInputStream(p.getInputStream());
+               StringBuilder out = new StringBuilder();
+               String s;
+               if (log) {
+                       LOG.info(String.format("Execute the commnad: %s", 
String.join(" ", commands)));
+               }
+               try (BufferedReader br = new BufferedReader(new 
InputStreamReader(in))) {
+                       while ((s = br.readLine()) != null) {
+                               out.append(s).append("\n");
+                               if (log) {
+                                       LOG.info(s);
+                               }
+                       }
+               }
+               try {
+                       if (p.waitFor() != 0) {
+                               throw new IOException(String.format(
+                                       "Execute the 
command:\n%s\nfailed:\n%s", String.join(" ", commands), out));
+                       }
+               } catch (InterruptedException e) {
+                       // Ignored. The subprocess is dead after the 
"br.readLine()" returns null, so the "waitFor"
+                       // method would not be blocked.
+               }
+               return out.toString();
+       }
+
+       private static void appendToEnvironmentVariable(String key, String 
path, Map<String, String> env) {
+               if (env.containsKey(key)) {
+                       env.put(key, String.join(File.pathSeparator, path, 
env.get(key)));
+               } else {
+                       env.put(key, path);
+               }
+       }
+
+       public static void main(String[] args) throws IOException, 
InterruptedException {

Review comment:
       remove this?

##########
File path: 
flink-python/src/main/java/org/apache/flink/python/util/PythonEnvironmentManagerUtils.java
##########
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.python.util;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.OperatingSystem;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Utils used to install the python requirements via pip.
+ */
+@Internal
+public class PythonEnvironmentManagerUtils {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(PythonEnvironmentManagerUtils.class);
+
+       private static final int MAX_RETRY_TIMES = 3;
+
+       private static final String PYFLINK_UDF_RUNNER_SH = 
"pyflink-udf-runner.sh";
+       private static final String PYFLINK_UDF_RUNNER_BAT = 
"pyflink-udf-runner.bat";
+
+       private static final String GET_SITE_PACKAGES_PATH_SCRIPT =
+               "import sys;" +
+               "from distutils.dist import Distribution;" +
+               "install_obj = Distribution().get_command_obj('install', 
create=True);" +
+               "install_obj.prefix = sys.argv[1];" +
+               "install_obj.finalize_options();" +
+               "installed_dir = [install_obj.install_purelib];" +
+               "install_obj.install_purelib != install_obj.install_platlib and 
" +
+                       "installed_dir.append(install_obj.install_platlib);" +
+               "print(installed_dir[0]);" +
+               "len(installed_dir) > 1 and " +
+                       "print(installed_dir[1])";
+
+       private static final String CHECK_PIP_VERSION_SCRIPT =
+               "import sys;" +
+               "from pkg_resources import get_distribution, parse_version;" +
+               "pip_version = get_distribution('pip').version;" +
+               "print(parse_version(pip_version) >= 
parse_version(sys.argv[1]))";
+
+       private static final String GET_RUNNER_DIR_SCRIPT =
+               "import pyflink;" +
+               "import os;" +
+               
"print(os.path.join(os.path.abspath(os.path.dirname(pyflink.__file__)), 
'bin'))";
+
+       /**
+        * Installs the 3rd party libraries listed in the user-provided 
requirements file. An optional
+        * requirements cached directory can be provided to support offline 
installation. In order not
+        * to populate the public environment, the libraries will be installed 
to the specified
+        * directory, and added to the PYTHONPATH of the UDF workers.
+        *
+        * @param requirementsFilePath The path of the requirements file.
+        * @param requirementsCacheDir The path of the requirements cached 
directory.
+        * @param requirementsInstallDir The target directory of the 
installation.
+        * @param pythonExecutable The python interpreter used to launch the 
pip program.
+        * @param environmentVariables The environment variables used to launch 
the pip program.
+        */
+       public static void pipInstallRequirements(
+                       String requirementsFilePath,
+                       @Nullable String requirementsCacheDir,
+                       String requirementsInstallDir,
+                       String pythonExecutable,
+                       Map<String, String> environmentVariables) throws 
IOException, InterruptedException {
+               String sitePackagesPath = 
getSitePackagesPath(requirementsInstallDir, pythonExecutable, 
environmentVariables);
+               String path = String.join(File.pathSeparator, 
requirementsInstallDir, "bin");
+               appendToEnvironmentVariable("PYTHONPATH", sitePackagesPath, 
environmentVariables);
+               appendToEnvironmentVariable("PATH", path, environmentVariables);
+
+               List<String> commands = new ArrayList<>(Arrays.asList(
+                       pythonExecutable, "-m", "pip", "install", 
"--ignore-installed", "-r", requirementsFilePath));
+               if (isPipVersionGreaterEqual("8.0.0", pythonExecutable, 
environmentVariables)) {
+                       commands.addAll(Arrays.asList("--prefix", 
requirementsInstallDir));
+               } else {
+                       commands.addAll(Arrays.asList("--install-option", 
"--prefix=" + requirementsInstallDir));
+               }
+               if (requirementsCacheDir != null) {
+                       commands.addAll(Arrays.asList("--find-links", 
requirementsCacheDir));
+               }
+
+               int retries = 0;
+               while (true) {
+                       try {
+                               execute(commands.toArray(new String[0]), 
environmentVariables, true);
+                               break;
+                       } catch (Throwable t) {
+                               retries++;
+                               if (retries < MAX_RETRY_TIMES) {
+                                       LOG.warn(String.format("Pip install 
failed, retrying... (%d/%d)", retries, MAX_RETRY_TIMES), t);
+                               } else {
+                                       LOG.error(String.format("Pip install 
failed, already retried %d time...", retries));
+                                       throw t;
+                               }
+                       }
+               }
+       }
+
+       public static String getRunnerScriptPath(
+                       String pythonExecutable,
+                       Map<String, String> environmentVariables) throws 
IOException, InterruptedException {
+               String[] commands = new String[] { pythonExecutable, "-c", 
GET_RUNNER_DIR_SCRIPT};
+               String out = execute(commands, environmentVariables, false);
+               String runnerScriptPath;
+               if (OperatingSystem.isWindows()) {
+                       runnerScriptPath = String.join(File.separator, 
out.trim(), PYFLINK_UDF_RUNNER_BAT);
+               } else {
+                       runnerScriptPath = String.join(File.separator, 
out.trim(), PYFLINK_UDF_RUNNER_SH);
+               }
+               if (!new File(runnerScriptPath).exists()) {
+                       throw new FileNotFoundException(String.format(
+                               "The runner script '%s' does not exist! " +
+                               "Please reinstall the apache-flink Python 
package.", runnerScriptPath));
+               }
+               return runnerScriptPath;
+       }
+
+       private static String getSitePackagesPath(
+                       String prefix,
+                       String pythonExecutable,
+                       Map<String, String> environmentVariables) throws 
IOException, InterruptedException {
+               String[] commands = new String[] { pythonExecutable, "-c", 
GET_SITE_PACKAGES_PATH_SCRIPT, prefix };
+               String out = execute(commands, environmentVariables, false);
+               return String.join(File.pathSeparator, out.trim().split("\n"));
+       }
+
+       private static boolean isPipVersionGreaterEqual(
+                       String pipVersion,
+                       String pythonExecutable,
+                       Map<String, String> environmentVariables) throws 
IOException, InterruptedException {
+               String[] commands = new String[] { pythonExecutable, "-c", 
CHECK_PIP_VERSION_SCRIPT, pipVersion };
+               String out = execute(commands, environmentVariables, false);
+               return Boolean.parseBoolean(out.trim());
+       }
+
+       private static String execute(
+                       String[] commands,
+                       Map<String, String> environmentVariables,
+                       boolean log) throws IOException {
+               ProcessBuilder pb = new ProcessBuilder(commands);
+               pb.environment().putAll(environmentVariables);
+               pb.redirectErrorStream(true);
+               Process p = pb.start();
+               InputStream in = new BufferedInputStream(p.getInputStream());
+               StringBuilder out = new StringBuilder();
+               String s;
+               if (log) {
+                       LOG.info(String.format("Execute the commnad: %s", 
String.join(" ", commands)));

Review comment:
       `Executing command: %s`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to