hequn8128 commented on a change in pull request #10310: [FLINK-14865][python]
fix unstable tests
PyFlinkBlinkStreamUserDefinedFunctionTests#test_udf_in_join_condition_2
URL: https://github.com/apache/flink/pull/10310#discussion_r350519111
##########
File path:
flink-python/src/main/java/org/apache/flink/python/util/ResourceUtil.java
##########
@@ -47,31 +47,76 @@
String tmpdir,
ClassLoader classLoader,
String prefix,
- boolean skipShellScript) throws IOException {
+ boolean skipShellScript) throws IOException,
InterruptedException {
List<File> extractedFiles = new ArrayList<>();
for (String fileName : PYTHON_BASIC_DEPENDENCIES) {
if (skipShellScript && fileName.endsWith(".sh")) {
continue;
}
- File file = new File(tmpdir, prefix + fileName);
-
- try (OutputStream out = new BufferedOutputStream(new
FileOutputStream(file));
- InputStream in =
classLoader.getResourceAsStream(fileName)) {
- // This util will use in the extract program
before launching pyflink shell,
- // so it must do this itself to minimize the
dependencies.
- final byte[] buf = new byte[BUFF_SIZE];
- int bytesRead = in.read(buf);
- while (bytesRead >= 0) {
- out.write(buf, 0, bytesRead);
- bytesRead = in.read(buf);
- }
- }
- if (file.getName().endsWith(".sh")) {
- file.setExecutable(true);
+ File file = new File(tmpdir, prefix + fileName);
+ if (fileName.endsWith(".sh")) {
+ // TODO: This is a hacky solution to prevent
subprocesses to hold the file descriptor of shell scripts,
+ // which will cause the execution of shell
scripts failed with the exception "test file is busy"
+ // randomly. It's a bug of JDK, see
https://bugs.openjdk.java.net/browse/JDK-8068370. After moving flink
+ // python jar to lib directory, we can solve
this problem elegantly by extracting these files only once.
+ String javaExecutable =
String.join(File.separator, System.getProperty("java.home"), "bin", "java");
+ String classPath = new File(
+
ResourceUtil.class.getProtectionDomain().getCodeSource().getLocation().getPath()).getAbsolutePath();
+ new ProcessBuilder(
+ javaExecutable,
+ "-cp",
+ classPath,
+ ResourceUtil.class.getName(),
+ tmpdir,
+ prefix,
+ fileName).inheritIO().start().waitFor();
+ } else {
+ copyBytes(
+
classLoader.getResourceAsStream(fileName),
+ new BufferedOutputStream(new
FileOutputStream(file)));
}
extractedFiles.add(file);
}
return extractedFiles;
}
+
+ /**
+ * This main method is used to create the shell script in a subprocess,
see the "TODO" hints in method
+ * { @link ResourceUtil#extractBasicDependenciesFromResource }.
+ * @param args First argument is the directory where shell script will
be created. Second argument is the prefix of
+ * the shell script. Third argument is the fileName of the
shell script.
+ * @throws IOException
+ */
+ public static void main(String[] args) throws IOException {
+ String tmpdir = args[0];
+ String prefix = args[1];
+ String fileName = args[2];
+ File file = new File(tmpdir, prefix + fileName);
+
+ copyBytes(
+
ResourceUtil.class.getClassLoader().getResourceAsStream(fileName),
+ new BufferedOutputStream(new FileOutputStream(file)));
+
+ file.setExecutable(true);
+ }
+
+ /**
+ * This util class will be executed in a separated java process. So
implementing this method here instead of reusing
+ * flink-core utils to minimize its dependencies, which will make the
code much simple.
+ * simple.
+ * @param input The input stream.
+ * @param output The output stream.
+ * @throws IOException
+ */
+ public static void copyBytes(InputStream input, OutputStream output)
throws IOException {
Review comment:
It seems we can use `Files.copy(InputStream in, Path target)` to achive this?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services