HuangXingBo commented on a change in pull request #10126: [FLINK-14590][python] 
Unify the working directory of Java process and Python process when submitting 
python jobs via "flink run -py"
URL: https://github.com/apache/flink/pull/10126#discussion_r347816271
 
 

 ##########
 File path: 
flink-python/src/main/java/org/apache/flink/client/python/PythonEnvUtils.java
 ##########
 @@ -93,65 +91,60 @@ public void run() {
         * Prepares PythonEnvironment to start python process.
         *
         * @param pythonLibFiles The dependent Python files.
+        * @param tmpDir The temporary directory files copied to.
         * @return PythonEnvironment the Python environment which will be 
executed in Python process.
         */
-       public static PythonEnvironment preparePythonEnvironment(List<Path> 
pythonLibFiles) throws IOException {
+       public static PythonEnvironment preparePythonEnvironment(
+                               List<Path> pythonLibFiles,
+                               String tmpDir) throws IOException {
                PythonEnvironment env = new PythonEnvironment();
 
-               // 1. setup temporary local directory for the user files
-               String tmpDir = System.getProperty("java.io.tmpdir") +
-                       File.separator + "pyflink" + File.separator + 
UUID.randomUUID();
+               tmpDir = new File(tmpDir).getAbsolutePath();
 
+               // 1. setup temporary local directory for the user files
                Path tmpDirPath = new Path(tmpDir);
                FileSystem fs = tmpDirPath.getFileSystem();
-               if (fs.exists(tmpDirPath)) {
-                       fs.delete(tmpDirPath, true);
-               }
                fs.mkdirs(tmpDirPath);
 
-               env.workingDirectory = tmpDirPath.toString();
-
-               StringBuilder pythonPathEnv = new StringBuilder();
+               env.storageDirectory = tmpDir;
 
-               pythonPathEnv.append(env.workingDirectory);
+               List<String> pythonPathList = new ArrayList<>();
 
                // 2. create symbolLink in the working directory for the 
pyflink dependency libs.
                List<java.nio.file.Path> pythonLibs = 
getLibFiles(FLINK_OPT_DIR_PYTHON);
                for (java.nio.file.Path libPath : pythonLibs) {
-                       java.nio.file.Path symbolicLinkFilePath = 
FileSystems.getDefault().getPath(env.workingDirectory,
+                       java.nio.file.Path symbolicLinkFilePath = 
FileSystems.getDefault().getPath(
+                               env.storageDirectory,
                                libPath.getFileName().toString());
                        createSymbolicLinkForPyflinkLib(libPath, 
symbolicLinkFilePath);
-                       pythonPathEnv.append(File.pathSeparator);
-                       pythonPathEnv.append(symbolicLinkFilePath.toString());
+                       pythonPathList.add(symbolicLinkFilePath.toString());
                }
 
                // 3. copy relevant python files to tmp dir and set them in 
PYTHONPATH.
                for (Path pythonFile : pythonLibFiles) {
                        String sourceFileName = pythonFile.getName();
-                       Path targetPath = new Path(tmpDirPath, sourceFileName);
-                       FileUtils.copy(pythonFile, targetPath, true);
-                       String targetFileNames = 
Files.walk(Paths.get(targetPath.toString()))
-                               .filter(Files::isRegularFile)
-                               .filter(f -> !f.toString().endsWith(".py"))
-                               .map(java.nio.file.Path::toString)
-                               
.collect(Collectors.joining(File.pathSeparator));
-                       pythonPathEnv.append(File.pathSeparator);
-                       pythonPathEnv.append(targetFileNames);
-               }
-
-               // 4. add the parent directory to PYTHONPATH for files suffixed 
with .py
-               String pyFileParents = 
Files.walk(Paths.get(tmpDirPath.toString()))
-                       .filter(file -> file.toString().endsWith(".py"))
-                       .map(java.nio.file.Path::getParent)
-                       .distinct()
-                       .map(java.nio.file.Path::toString)
-                       .collect(Collectors.joining(File.pathSeparator));
-               if (!StringUtils.isNullOrWhitespaceOnly(pyFileParents)) {
-                       pythonPathEnv.append(File.pathSeparator);
-                       pythonPathEnv.append(pyFileParents);
+                       // add random UUID parent directory to avoid name 
conflict.
+                       Path targetPath = new Path(
+                               tmpDirPath,
+                               String.join(File.separator, 
UUID.randomUUID().toString(), sourceFileName));
+                       if (!pythonFile.getFileSystem().isDistributedFS()) {
+                               // if the path is local file, try to create 
symbolic link.
+                               createSymbolicLinkForPyflinkLib(
+                                       Paths.get(new 
File(pythonFile.getPath()).getAbsolutePath()),
+                                       Paths.get(targetPath.toString()));
+                       } else {
+                               FileUtils.copy(pythonFile, targetPath, true);
+                       }
+                       if 
(Paths.get(targetPath.toString()).toRealPath().toFile().isFile() &&
 
 Review comment:
   What about using 
Files.isRegularFile(Paths.get(targetPath.toString()).toRealPath()) to decide 
whether it is a file?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to