dianfu commented on a change in pull request #11448: 
[FLINK-16666][python][table] Support new Python dependency configuration 
options in flink-table.
URL: https://github.com/apache/flink/pull/11448#discussion_r404682353
 
 

 ##########
 File path: 
flink-python/src/main/java/org/apache/flink/client/python/PythonDriverEnvUtils.java
 ##########
 @@ -105,46 +126,113 @@ public void run() {
        /**
         * Prepares PythonEnvironment to start python process.
         *
-        * @param pythonDriverOptions The Python driver options.
+        * @param config The configuration of python client dependencies.
         * @param tmpDir The temporary directory which files will be copied to.
         * @return PythonEnvironment the Python environment which will be 
executed in Python process.
         */
        public static PythonEnvironment preparePythonEnvironment(
-                       PythonDriverOptions pythonDriverOptions,
-                       String tmpDir) throws IOException, InterruptedException 
{
+                       ReadableConfig config,
+                       String tmpDir) throws IOException {
+               Configuration contextConfig = 
ExecutionEnvironment.getExecutionEnvironment().getConfiguration();
+
                PythonEnvironment env = new PythonEnvironment();
 
+               // 1. setup the path of python interpreter.
+               if (config.getOptional(PYTHON_CLIENT_EXECUTABLE).isPresent()) {
+                       // configuration from table config.
+                       env.pythonExec = config.get(PYTHON_CLIENT_EXECUTABLE);
+               } else if 
(contextConfig.getOptional(PYTHON_CLIENT_EXECUTABLE).isPresent()) {
+                       // configuration from commandline.
+                       env.pythonExec = 
contextConfig.get(PYTHON_CLIENT_EXECUTABLE);
+               } else {
+                       // configuration from environment variable and 
flink-conf.yaml.
+                       env.pythonExec = 
loadContextConfiguration(PYTHON_CLIENT_EXECUTABLE, PYFLINK_CLIENT_EXECUTABLE);
+               }
+
                tmpDir = new File(tmpDir).getAbsolutePath();
 
-               // 1. setup temporary local directory for the user files
+               // 2. setup temporary local directory for the user files
                Path tmpDirPath = new Path(tmpDir);
                FileSystem fs = tmpDirPath.getFileSystem();
                fs.mkdirs(tmpDirPath);
 
                env.tempDirectory = tmpDir;
+
+               // 3. append the internal lib files to PYTHONPATH.
+               appendInternalLibFiles(env);
+
+               // 4. copy relevant python files to tmp dir and set them in 
PYTHONPATH.
+               if 
(contextConfig.getOptional(PythonOptions.PYTHON_FILES).isPresent()) {
+                       List<Path> userFiles = 
Arrays.stream(config.get(PythonOptions.PYTHON_FILES).split(","))
+                               .map(Path::new).collect(Collectors.toList());
+                       PythonDriverEnvUtils.appendUserFiles(env, userFiles);
+               }
+               if (config.getOptional(PythonOptions.PYTHON_FILES).isPresent()) 
{
+                       List<Path> userFiles = 
Arrays.stream(config.get(PythonOptions.PYTHON_FILES).split(","))
+                               .map(Path::new).collect(Collectors.toList());
+                       PythonDriverEnvUtils.appendUserFiles(env, userFiles);
+               }
+               return env;
+       }
+
+       public static void appendInternalLibFiles(PythonEnvironment env) {
 
 Review comment:
   private?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to