[
https://issues.apache.org/jira/browse/FLINK-1483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14326411#comment-14326411
]
ASF GitHub Bot commented on FLINK-1483:
---------------------------------------
Github user hsaputra commented on a diff in the pull request:
https://github.com/apache/flink/pull/417#discussion_r24931138
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
---
@@ -52,26 +57,88 @@
/**
* Constructs a new IOManager.
*
- * @param paths
- * the basic directory paths for files underlying anonymous
channels.
+ * @param tempDirs The basic directories for files underlying anonymous
channels.
*/
- protected IOManager(String[] paths) {
- this.paths = paths;
+ protected IOManager(String[] tempDirs) {
+ if (tempDirs == null || tempDirs.length == 0) {
+ throw new IllegalArgumentException("The temporary
directories must not be null or empty.");
+ }
+
this.random = new Random();
this.nextPath = 0;
+
+ this.paths = new File[tempDirs.length];
+ for (int i = 0; i < tempDirs.length; i++) {
+ File baseDir = new File(tempDirs[i]);
+ String subfolder = String.format("flink-io-%s",
UUID.randomUUID().toString());
+ File storageDir = new File(baseDir, subfolder);
+
+ if (!storageDir.exists() && !storageDir.mkdirs()) {
+ throw new RuntimeException(
+ "Could not create storage
directory for IOManager: " + storageDir.getAbsolutePath());
+ }
+ paths[i] = storageDir;
+ LOG.info("I/O manager uses directory {} for spill
files.", storageDir.getAbsolutePath());
+ }
+
+ this.shutdownHook = new Thread("I/O manager shutdown hook") {
+ @Override
+ public void run() {
+ shutdown();
+ }
+ };
+ Runtime.getRuntime().addShutdownHook(this.shutdownHook);
}
/**
- * Close method, marks the I/O manager as closed.
+ * Close method, marks the I/O manager as closed
+ * and removed all temporary files.
*/
- public abstract void shutdown();
+ public void shutdown() {
+ // remove all of our temp directories
+ for (File path : paths) {
+ try {
+ if (path != null) {
+ if (path.exists()) {
+ FileUtils.deleteDirectory(path);
+ LOG.info("I/O manager removed
spill file directory {}", path.getAbsolutePath());
+ }
+ }
+ } catch (Throwable t) {
+ LOG.error("IOManager failed to properly clean
up temp file directory: " + path, t);
+ }
+ }
+
+ // Remove shutdown hook to prevent resource leaks, unless this
is invoked by the shutdown hook itself
+ if (shutdownHook != Thread.currentThread()) {
+ try {
+
Runtime.getRuntime().removeShutdownHook(shutdownHook);
+ }
+ catch (IllegalStateException e) {
+ // race, JVM is in shutdown already, we can
safely ignore this
+ }
+ catch (Throwable t) {
+ LOG.warn("Exception while unregistering
IOManager's shutdown hook.", t);
+ }
+ }
+ }
/**
* Utility method to check whether the IO manager has been properly
shut down.
+ * For this base implementation, this means that all files have been
removed.
*
* @return True, if the IO manager has properly shut down, false
otherwise.
*/
- public abstract boolean isProperlyShutDown();
+ public boolean isProperlyShutDown() {
+ for (File path : paths) {
+ if (path != null) {
--- End diff --
Would this easier to read with check {{if(path != null && path.exists())}}
> Temporary channel files are not properly deleted when Flink is terminated
> -------------------------------------------------------------------------
>
> Key: FLINK-1483
> URL: https://issues.apache.org/jira/browse/FLINK-1483
> Project: Flink
> Issue Type: Bug
> Components: TaskManager
> Affects Versions: 0.8, 0.9
> Reporter: Till Rohrmann
> Assignee: Ufuk Celebi
>
> The temporary channel files are not properly deleted if the IOManager does
> not shut down properly. This can be the case when the TaskManagers are
> terminated by Flink's shell scripts.
> A solution could be to store all channel files of one TaskManager in a
> uniquely identifiable directory and to register a shutdown hook which deletes
> this file upon termination.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)