HuangXingBo commented on a change in pull request #10126: [FLINK-14590][python]
Unify the working directory of Java process and Python process when submitting
python jobs via "flink run -py"
URL: https://github.com/apache/flink/pull/10126#discussion_r347702295
##########
File path:
flink-python/src/test/java/org/apache/flink/client/python/PythonEnvUtilsTest.java
##########
@@ -41,98 +42,103 @@
* Tests for the {@link PythonEnvUtils}.
*/
public class PythonEnvUtilsTest {
- private Path tmpDirPath;
- private FileSystem tmpDirFs;
+ private String tmpDirPath;
@Before
public void prepareTestEnvironment() {
- String tmpDir = System.getProperty("java.io.tmpdir") +
- File.separator + "pyflink" + File.separator +
UUID.randomUUID();
-
- tmpDirPath = new Path(tmpDir);
- try {
- tmpDirFs = tmpDirPath.getFileSystem();
- if (tmpDirFs.exists(tmpDirPath)) {
- tmpDirFs.delete(tmpDirPath, true);
- }
- tmpDirFs.mkdirs(tmpDirPath);
- } catch (IOException e) {
- throw new RuntimeException("initial PythonUtil test
environment failed");
- }
+ tmpDirPath = System.getProperty("java.io.tmpdir") +
+ File.separator + "pyflink_" + UUID.randomUUID();
+ new File(tmpDirPath).mkdirs();
}
@Test
public void testPreparePythonEnvironment() throws IOException {
// xxx/a.zip, xxx/subdir/b.py, xxx/subdir/c.zip
- File a = new File(tmpDirPath.toString() + File.separator +
"a.zip");
+ File a = new File(tmpDirPath + File.separator + "a.zip");
a.createNewFile();
- File subdir = new File(tmpDirPath.toString() + File.separator +
"subdir");
+ File moduleDir = new File(tmpDirPath + File.separator +
"module_dir");
+ moduleDir.mkdir();
+ File subdir = new File(tmpDirPath + File.separator + "subdir");
subdir.mkdir();
- File b = new File(tmpDirPath.toString() + File.separator +
"subdir" + File.separator + "b.py");
+ File b = new File(tmpDirPath + File.separator + "subdir" +
File.separator + "b.py");
b.createNewFile();
- File c = new File(tmpDirPath.toString() + File.separator +
"subdir" + File.separator + "c.zip");
+ File c = new File(tmpDirPath + File.separator + "subdir" +
File.separator + "c.zip");
c.createNewFile();
List<Path> pyFilesList = new ArrayList<>();
- pyFilesList.add(tmpDirPath);
+ pyFilesList.add(new Path(a.getAbsolutePath()));
+ pyFilesList.add(new Path(moduleDir.getAbsolutePath()));
+ // test relative path
+ String relativePath = Paths.get(new File("").getAbsolutePath())
+ .relativize(Paths.get(b.getAbsolutePath())).toString();
+ pyFilesList.add(new Path(relativePath));
+ // test path with schema
+ pyFilesList.add(new Path("file://" + c.getAbsolutePath()));
- PythonEnvUtils.PythonEnvironment env =
PythonEnvUtils.preparePythonEnvironment(pyFilesList);
+ PythonEnvUtils.PythonEnvironment env =
PythonEnvUtils.preparePythonEnvironment(
+ pyFilesList,
+ tmpDirPath);
+ String base = replaceUUID(env.storageDirectory);
Set<String> expectedPythonPaths = new HashSet<>();
- expectedPythonPaths.add(env.workingDirectory);
-
- String targetDir = env.workingDirectory + File.separator +
tmpDirPath.getName();
- expectedPythonPaths.add(targetDir + File.separator +
a.getName());
- expectedPythonPaths.add(targetDir + File.separator + "subdir" +
File.separator + c.getName());
+ expectedPythonPaths.add(String.join(File.separator, base,
"{uuid}", "a.zip"));
+ expectedPythonPaths.add(String.join(File.separator, base,
"{uuid}", "module_dir"));
+ expectedPythonPaths.add(String.join(File.separator, base,
"{uuid}"));
+ expectedPythonPaths.add(String.join(File.separator, base,
"{uuid}", "c.zip"));
- // the parent dir for files suffixed with .py should also be
added to PYTHONPATH
- expectedPythonPaths.add(targetDir + File.separator + "subdir");
- Assert.assertEquals(expectedPythonPaths, new
HashSet<>(Arrays.asList(env.pythonPath.split(File.pathSeparator))));
+ Assert.assertEquals(
+ expectedPythonPaths,
+ new
HashSet<>(Arrays.asList(replaceUUID(env.pythonPath).split(File.pathSeparator))));
}
@Test
public void testStartPythonProcess() {
PythonEnvUtils.PythonEnvironment pythonEnv = new
PythonEnvUtils.PythonEnvironment();
- pythonEnv.workingDirectory = tmpDirPath.toString();
- pythonEnv.pythonPath = tmpDirPath.toString();
+ pythonEnv.storageDirectory = tmpDirPath;
+ pythonEnv.pythonPath = tmpDirPath;
List<String> commands = new ArrayList<>();
- Path pyPath = new Path(tmpDirPath, "word_count.py");
+ String pyPath = String.join(File.separator, tmpDirPath,
"verifier.py");
try {
- tmpDirFs.create(pyPath, FileSystem.WriteMode.OVERWRITE);
- File pyFile = new File(pyPath.toString());
+ File pyFile = new File(pyPath);
+ pyFile.createNewFile();
+ pyFile.setExecutable(true);
String pyProgram = "#!/usr/bin/python\n" +
"# -*- coding: UTF-8 -*-\n" +
+ "import os\n" +
"import sys\n" +
"\n" +
"if __name__=='__main__':\n" +
"\tfilename = sys.argv[1]\n" +
"\tfo = open(filename, \"w\")\n" +
- "\tfo.write( \"hello world\")\n" +
+ "\tfo.write(os.getcwd())\n" +
"\tfo.close()";
Files.write(pyFile.toPath(), pyProgram.getBytes(),
StandardOpenOption.WRITE);
- Path result = new Path(tmpDirPath,
"word_count_result.txt");
- commands.add(pyFile.getName());
- commands.add(result.getName());
+ String result = String.join(File.separator, tmpDirPath,
"python_working_directory.txt");
+ commands.add(pyPath);
+ commands.add(result);
Process pythonProcess =
PythonEnvUtils.startPythonProcess(pythonEnv, commands);
int exitCode = pythonProcess.waitFor();
if (exitCode != 0) {
throw new RuntimeException("Python process
exits with code: " + exitCode);
}
String cmdResult = new String(Files.readAllBytes(new
File(result.toString()).toPath()));
Review comment:
toString is redundant since the type of variable result have changed to
String.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services