[SYSTEMML-1309] Fix parfor spark working dir delete on shutdown Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/b78c1259 Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/b78c1259 Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/b78c1259
Branch: refs/heads/master Commit: b78c125934fa7a947a5118f0c08473afa926fa5d Parents: b028e6c Author: Matthias Boehm <[email protected]> Authored: Fri Feb 24 22:00:33 2017 -0800 Committer: Matthias Boehm <[email protected]> Committed: Sat Feb 25 11:51:06 2017 -0800 ---------------------------------------------------------------------- .../parfor/RemoteDPParForSparkWorker.java | 28 ++++++++----- .../parfor/RemoteParForSparkWorker.java | 43 +++++++++++--------- .../parfor/RemoteParForUtils.java | 26 ++++++++++-- .../sysml/runtime/util/LocalFileUtils.java | 2 +- 4 files changed, 65 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b78c1259/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java index ad0fbf8..458f149 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java @@ -32,6 +32,7 @@ import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PDataPartition import org.apache.sysml.runtime.controlprogram.caching.CacheableData; import org.apache.sysml.runtime.controlprogram.caching.MatrixObject; import org.apache.sysml.runtime.controlprogram.parfor.Task.TaskType; +import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer; import org.apache.sysml.runtime.controlprogram.parfor.util.IDHandler; import org.apache.sysml.runtime.controlprogram.parfor.util.PairWritableBlock; import org.apache.sysml.runtime.controlprogram.parfor.util.PairWritableCell; @@ -145,21 +146,28 @@ public class RemoteDPParForSparkWorker extends ParWorker implements PairFlatMapF _numTasks = 0; _numIters = 0; - //init local cache manager - if( !CacheableData.isCachingActive() ) { - String uuid = IDHandler.createDistributedUniqueID(); - LocalFileUtils.createWorkingDirectoryWithUUID( uuid ); - CacheableData.initCaching( uuid ); //incl activation, cache dir creation (each map task gets its own dir for simplified cleanup) - } - if( !CacheableData.cacheEvictionLocalFilePrefix.contains("_") ){ //account for local mode - CacheableData.cacheEvictionLocalFilePrefix = CacheableData.cacheEvictionLocalFilePrefix +"_" + _workerID; + //init and register-cleanup of buffer pool (in parfor spark, multiple tasks might + //share the process-local, i.e., per executor, buffer pool; hence we synchronize + //the initialization and immediately register the created directory for cleanup + //on process exit, i.e., executor exit, including any files created in the future. + synchronized( CacheableData.class ) { + if( !CacheableData.isCachingActive() && !InfrastructureAnalyzer.isLocalMode() ) { + //create id, executor working dir, and cache dir + String uuid = IDHandler.createDistributedUniqueID(); + LocalFileUtils.createWorkingDirectoryWithUUID( uuid ); + CacheableData.initCaching( uuid ); //incl activation and cache dir creation + CacheableData.cacheEvictionLocalFilePrefix = + CacheableData.cacheEvictionLocalFilePrefix +"_" + _workerID; + //register entire working dir for delete on shutdown + RemoteParForUtils.cleanupWorkingDirectoriesOnShutdown(); + } } //ensure that resultvar files are not removed super.pinResultVariables(); - //enable/disable caching (if required) - if( !_caching ) + //enable/disable caching (if required and not in CP process) + if( !_caching && !InfrastructureAnalyzer.isLocalMode() ) CacheableData.disableCaching(); } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b78c1259/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java index e12376a..2ea802d 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java @@ -28,6 +28,7 @@ import org.apache.spark.api.java.function.PairFlatMapFunction; import org.apache.spark.util.LongAccumulator; import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.controlprogram.caching.CacheableData; +import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer; import org.apache.sysml.runtime.controlprogram.parfor.util.IDHandler; import org.apache.sysml.runtime.util.LocalFileUtils; @@ -35,23 +36,20 @@ import scala.Tuple2; public class RemoteParForSparkWorker extends ParWorker implements PairFlatMapFunction<Task, Long, String> { - private static final long serialVersionUID = -3254950138084272296L; + private final String _prog; private boolean _initialized = false; - private String _prog = null; private boolean _caching = true; - private LongAccumulator _aTasks = null; - private LongAccumulator _aIters = null; + private final LongAccumulator _aTasks; + private final LongAccumulator _aIters; public RemoteParForSparkWorker(String program, boolean cpCaching, LongAccumulator atasks, LongAccumulator aiters) throws DMLRuntimeException { - //keep inputs (unfortunately, spark does not expose task ids and it would be implementation-dependent - //when this constructor is actually called; hence, we do lazy initialization on task execution) - _initialized = false; _prog = program; + _initialized = false; _caching = cpCaching; //setup spark accumulators @@ -65,7 +63,7 @@ public class RemoteParForSparkWorker extends ParWorker implements PairFlatMapFun { //lazy parworker initialization if( !_initialized ) - configureWorker( TaskContext.get().taskAttemptId() ); //requires Spark 1.3 + configureWorker( TaskContext.get().taskAttemptId() ); //execute a single task long numIter = getExecutedIterations(); @@ -98,24 +96,31 @@ public class RemoteParForSparkWorker extends ParWorker implements PairFlatMapFun _numTasks = 0; _numIters = 0; - //init local cache manager - if( !CacheableData.isCachingActive() ) { - String uuid = IDHandler.createDistributedUniqueID(); - LocalFileUtils.createWorkingDirectoryWithUUID( uuid ); - CacheableData.initCaching( uuid ); //incl activation, cache dir creation (each map task gets its own dir for simplified cleanup) - } - if( !CacheableData.cacheEvictionLocalFilePrefix.contains("_") ){ //account for local mode - CacheableData.cacheEvictionLocalFilePrefix = CacheableData.cacheEvictionLocalFilePrefix +"_" + _workerID; + //init and register-cleanup of buffer pool (in parfor spark, multiple tasks might + //share the process-local, i.e., per executor, buffer pool; hence we synchronize + //the initialization and immediately register the created directory for cleanup + //on process exit, i.e., executor exit, including any files created in the future. + synchronized( CacheableData.class ) { + if( !CacheableData.isCachingActive() && !InfrastructureAnalyzer.isLocalMode() ) { + //create id, executor working dir, and cache dir + String uuid = IDHandler.createDistributedUniqueID(); + LocalFileUtils.createWorkingDirectoryWithUUID( uuid ); + CacheableData.initCaching( uuid ); //incl activation and cache dir creation + CacheableData.cacheEvictionLocalFilePrefix = + CacheableData.cacheEvictionLocalFilePrefix +"_" + _workerID; + //register entire working dir for delete on shutdown + RemoteParForUtils.cleanupWorkingDirectoriesOnShutdown(); + } } //ensure that resultvar files are not removed super.pinResultVariables(); - //enable/disable caching (if required) - if( !_caching ) + //enable/disable caching (if required and not in CP process) + if( !_caching && !InfrastructureAnalyzer.isLocalMode() ) CacheableData.disableCaching(); - //make as lazily intialized + //mark as initialized _initialized = true; } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b78c1259/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForUtils.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForUtils.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForUtils.java index 7b3ecb1..fd99429 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForUtils.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForUtils.java @@ -190,12 +190,9 @@ public class RemoteParForUtils return ret; } - /** - * Cleanup all temporary files created by this SystemML process - * instance. - * + * Cleanup all temporary files created by this SystemML process. */ public static void cleanupWorkingDirectories() { @@ -216,6 +213,15 @@ public class RemoteParForUtils } } + /** + * Cleanup all temporary files created by this SystemML process, + * on shutdown via exit or interrupt. + */ + public static void cleanupWorkingDirectoriesOnShutdown() { + Runtime.getRuntime().addShutdownHook( + new DeleteWorkingDirectoriesTask()); + } + public static LocalVariableMap[] getResults( List<Tuple2<Long,String>> out, Log LOG ) throws DMLRuntimeException { @@ -241,4 +247,16 @@ public class RemoteParForUtils //create return array return tmp.values().toArray(new LocalVariableMap[0]); } + + /** + * Task to be registered as shutdown hook in order to delete the + * all working directories, including any remaining files, which + * might not have been created at time of registration. + */ + private static class DeleteWorkingDirectoriesTask extends Thread { + @Override + public void run() { + cleanupWorkingDirectories(); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b78c1259/src/main/java/org/apache/sysml/runtime/util/LocalFileUtils.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/util/LocalFileUtils.java b/src/main/java/org/apache/sysml/runtime/util/LocalFileUtils.java index c868f60..0086f8f 100644 --- a/src/main/java/org/apache/sysml/runtime/util/LocalFileUtils.java +++ b/src/main/java/org/apache/sysml/runtime/util/LocalFileUtils.java @@ -305,7 +305,7 @@ public class LocalFileUtils return createWorkingDirectoryWithUUID( DMLScript.getUUID() ); } - public static synchronized String createWorkingDirectoryWithUUID( String uuid ) + public static String createWorkingDirectoryWithUUID( String uuid ) throws DMLRuntimeException { //create local tmp dir if not existing
