hequn8128 commented on a change in pull request #10310: [FLINK-14865][python] 
fix unstable tests 
PyFlinkBlinkStreamUserDefinedFunctionTests#test_udf_in_join_condition_2
URL: https://github.com/apache/flink/pull/10310#discussion_r350519468
 
 

 ##########
 File path: 
flink-python/src/main/java/org/apache/flink/python/util/ResourceUtil.java
 ##########
 @@ -47,31 +47,76 @@
                        String tmpdir,
                        ClassLoader classLoader,
                        String prefix,
-                       boolean skipShellScript) throws IOException {
+                       boolean skipShellScript) throws IOException, 
InterruptedException {
                List<File> extractedFiles = new ArrayList<>();
                for (String fileName : PYTHON_BASIC_DEPENDENCIES) {
                        if (skipShellScript && fileName.endsWith(".sh")) {
                                continue;
                        }
-                       File file = new File(tmpdir, prefix + fileName);
-
-                       try (OutputStream out = new BufferedOutputStream(new 
FileOutputStream(file));
-                               InputStream in = 
classLoader.getResourceAsStream(fileName)) {
-                               // This util will use in the extract program 
before launching pyflink shell,
-                               // so it must do this itself to minimize the 
dependencies.
-                               final byte[] buf = new byte[BUFF_SIZE];
-                               int bytesRead = in.read(buf);
-                               while (bytesRead >= 0) {
-                                       out.write(buf, 0, bytesRead);
-                                       bytesRead = in.read(buf);
-                               }
-                       }
 
-                       if (file.getName().endsWith(".sh")) {
-                               file.setExecutable(true);
+                       File file = new File(tmpdir, prefix + fileName);
+                       if (fileName.endsWith(".sh")) {
+                               // TODO: This is a hacky solution to prevent 
subprocesses to hold the file descriptor of shell scripts,
+                               // which will cause the execution of shell 
scripts failed with the exception "test file is busy"
+                               // randomly. It's a bug of JDK, see 
https://bugs.openjdk.java.net/browse/JDK-8068370. After moving flink
+                               // python jar to lib directory, we can solve 
this problem elegantly by extracting these files only once.
+                               String javaExecutable = 
String.join(File.separator, System.getProperty("java.home"), "bin", "java");
+                               String classPath = new File(
+                                       
ResourceUtil.class.getProtectionDomain().getCodeSource().getLocation().getPath()).getAbsolutePath();
+                               new ProcessBuilder(
+                                       javaExecutable,
+                                       "-cp",
+                                       classPath,
+                                       ResourceUtil.class.getName(),
+                                       tmpdir,
+                                       prefix,
+                                       fileName).inheritIO().start().waitFor();
+                       } else {
+                               copyBytes(
+                                       
classLoader.getResourceAsStream(fileName),
+                                       new BufferedOutputStream(new 
FileOutputStream(file)));
                        }
                        extractedFiles.add(file);
                }
                return extractedFiles;
        }
+
+       /**
+        * This main method is used to create the shell script in a subprocess, 
see the "TODO" hints in method
+        * { @link ResourceUtil#extractBasicDependenciesFromResource }.
 
 Review comment:
   Remove the blank after { or the annotation can not take effect.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to