This is an automated email from the ASF dual-hosted git repository.
hxb pushed a commit to branch release-1.18
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.18 by this push:
new 31f13614c5e [FLINK-34616][python] Fix python dist dir doesn't clean
when open method construct resource has exception.
31f13614c5e is described below
commit 31f13614c5e1bccbcfc14f31561aac3892b86e85
Author: Jacky Lau <[email protected]>
AuthorDate: Fri Mar 8 09:37:52 2024 +0800
[FLINK-34616][python] Fix python dist dir doesn't clean when open method
construct resource has exception.
This closes #24462.
---
.../env/AbstractPythonEnvironmentManager.java | 63 +++++++++++++---------
1 file changed, 37 insertions(+), 26 deletions(-)
diff --git
a/flink-python/src/main/java/org/apache/flink/python/env/AbstractPythonEnvironmentManager.java
b/flink-python/src/main/java/org/apache/flink/python/env/AbstractPythonEnvironmentManager.java
index cfe39175d8b..2f0acfb53dc 100644
---
a/flink-python/src/main/java/org/apache/flink/python/env/AbstractPythonEnvironmentManager.java
+++
b/flink-python/src/main/java/org/apache/flink/python/env/AbstractPythonEnvironmentManager.java
@@ -123,9 +123,16 @@ public abstract class AbstractPythonEnvironmentManager
implements PythonEnvironm
"Could not create the base directory:
" + baseDirectory);
}
- Map<String, String> env =
constructEnvironmentVariables(baseDirectory);
- installRequirements(baseDirectory, env);
- return Tuple2.of(baseDirectory, env);
+ try {
+ Map<String, String> env =
+
constructEnvironmentVariables(baseDirectory);
+ installRequirements(baseDirectory, env);
+ return Tuple2.of(baseDirectory, env);
+ } catch (Throwable e) {
+ deleteBaseDirectory(baseDirectory);
+ LOG.warn("Failed to create resource.", e);
+ throw e;
+ }
});
shutdownHook =
ShutdownHookUtil.addShutdownHook(
@@ -213,6 +220,32 @@ public abstract class AbstractPythonEnvironmentManager
implements PythonEnvironm
+ "' for storing the generated files of python
dependency.");
}
+ private static void deleteBaseDirectory(String baseDirectory) {
+ int retries = 0;
+ while (true) {
+ try {
+ FileUtils.deleteDirectory(new File(baseDirectory));
+ break;
+ } catch (Throwable t) {
+ retries++;
+ if (retries <= CHECK_TIMEOUT / CHECK_INTERVAL) {
+ LOG.warn(
+ String.format(
+ "Failed to delete the working directory %s
of the Python UDF worker. Retrying...",
+ baseDirectory),
+ t);
+ } else {
+ LOG.warn(
+ String.format(
+ "Failed to delete the working directory %s
of the Python UDF worker.",
+ baseDirectory),
+ t);
+ break;
+ }
+ }
+ }
+ }
+
private void installRequirements(String baseDirectory, Map<String, String>
env)
throws IOException {
// Directory for storing the installation result of the requirements
file.
@@ -475,29 +508,7 @@ public abstract class AbstractPythonEnvironmentManager
implements PythonEnvironm
@Override
public void close() throws Exception {
- int retries = 0;
- while (true) {
- try {
- FileUtils.deleteDirectory(new File(baseDirectory));
- break;
- } catch (Throwable t) {
- retries++;
- if (retries <= CHECK_TIMEOUT / CHECK_INTERVAL) {
- LOG.warn(
- String.format(
- "Failed to delete the working
directory %s of the Python UDF worker. Retrying...",
- baseDirectory),
- t);
- } else {
- LOG.warn(
- String.format(
- "Failed to delete the working
directory %s of the Python UDF worker.",
- baseDirectory),
- t);
- break;
- }
- }
- }
+ deleteBaseDirectory(baseDirectory);
}
}
}