[SYSTEMML-1309] Fix parfor spark working dir delete on shutdown

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/b78c1259
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/b78c1259
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/b78c1259

Branch: refs/heads/master
Commit: b78c125934fa7a947a5118f0c08473afa926fa5d
Parents: b028e6c
Author: Matthias Boehm <[email protected]>
Authored: Fri Feb 24 22:00:33 2017 -0800
Committer: Matthias Boehm <[email protected]>
Committed: Sat Feb 25 11:51:06 2017 -0800

----------------------------------------------------------------------
 .../parfor/RemoteDPParForSparkWorker.java       | 28 ++++++++-----
 .../parfor/RemoteParForSparkWorker.java         | 43 +++++++++++---------
 .../parfor/RemoteParForUtils.java               | 26 ++++++++++--
 .../sysml/runtime/util/LocalFileUtils.java      |  2 +-
 4 files changed, 65 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b78c1259/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
index ad0fbf8..458f149 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
@@ -32,6 +32,7 @@ import 
org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PDataPartition
 import org.apache.sysml.runtime.controlprogram.caching.CacheableData;
 import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
 import org.apache.sysml.runtime.controlprogram.parfor.Task.TaskType;
+import 
org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
 import org.apache.sysml.runtime.controlprogram.parfor.util.IDHandler;
 import org.apache.sysml.runtime.controlprogram.parfor.util.PairWritableBlock;
 import org.apache.sysml.runtime.controlprogram.parfor.util.PairWritableCell;
@@ -145,21 +146,28 @@ public class RemoteDPParForSparkWorker extends ParWorker 
implements PairFlatMapF
                _numTasks    = 0;
                _numIters    = 0;
 
-               //init local cache manager 
-               if( !CacheableData.isCachingActive() ) {
-                       String uuid = IDHandler.createDistributedUniqueID();
-                       LocalFileUtils.createWorkingDirectoryWithUUID( uuid );
-                       CacheableData.initCaching( uuid ); //incl activation, 
cache dir creation (each map task gets its own dir for simplified cleanup)
-               }               
-               if( !CacheableData.cacheEvictionLocalFilePrefix.contains("_") 
){ //account for local mode
-                       CacheableData.cacheEvictionLocalFilePrefix = 
CacheableData.cacheEvictionLocalFilePrefix +"_" + _workerID; 
+               //init and register-cleanup of buffer pool (in parfor spark, 
multiple tasks might 
+               //share the process-local, i.e., per executor, buffer pool; 
hence we synchronize 
+               //the initialization and immediately register the created 
directory for cleanup
+               //on process exit, i.e., executor exit, including any files 
created in the future.
+               synchronized( CacheableData.class ) {
+                       if( !CacheableData.isCachingActive() && 
!InfrastructureAnalyzer.isLocalMode() ) { 
+                               //create id, executor working dir, and cache dir
+                               String uuid = 
IDHandler.createDistributedUniqueID();
+                               LocalFileUtils.createWorkingDirectoryWithUUID( 
uuid );
+                               CacheableData.initCaching( uuid ); //incl 
activation and cache dir creation
+                               CacheableData.cacheEvictionLocalFilePrefix = 
+                                               
CacheableData.cacheEvictionLocalFilePrefix +"_" + _workerID; 
+                               //register entire working dir for delete on 
shutdown
+                               
RemoteParForUtils.cleanupWorkingDirectoriesOnShutdown();
+                       }       
                }
                
                //ensure that resultvar files are not removed
                super.pinResultVariables();
                
-               //enable/disable caching (if required)
-               if( !_caching )
+               //enable/disable caching (if required and not in CP process)
+               if( !_caching && !InfrastructureAnalyzer.isLocalMode() )
                        CacheableData.disableCaching();
        }
        

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b78c1259/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java
index e12376a..2ea802d 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java
@@ -28,6 +28,7 @@ import org.apache.spark.api.java.function.PairFlatMapFunction;
 import org.apache.spark.util.LongAccumulator;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.controlprogram.caching.CacheableData;
+import 
org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
 import org.apache.sysml.runtime.controlprogram.parfor.util.IDHandler;
 import org.apache.sysml.runtime.util.LocalFileUtils;
 
@@ -35,23 +36,20 @@ import scala.Tuple2;
 
 public class RemoteParForSparkWorker extends ParWorker implements 
PairFlatMapFunction<Task, Long, String> 
 {
-       
        private static final long serialVersionUID = -3254950138084272296L;
 
+       private final String  _prog;
        private boolean _initialized = false;
-       private String  _prog = null;
        private boolean _caching = true;
        
-       private LongAccumulator _aTasks = null;
-       private LongAccumulator _aIters = null;
+       private final LongAccumulator _aTasks;
+       private final LongAccumulator _aIters;
        
        public RemoteParForSparkWorker(String program, boolean cpCaching, 
LongAccumulator atasks, LongAccumulator aiters) 
                throws DMLRuntimeException
        {
-               //keep inputs (unfortunately, spark does not expose task ids 
and it would be implementation-dependent
-               //when this constructor is actually called; hence, we do lazy 
initialization on task execution)
-               _initialized = false;
                _prog = program;
+               _initialized = false;
                _caching = cpCaching;
                
                //setup spark accumulators
@@ -65,7 +63,7 @@ public class RemoteParForSparkWorker extends ParWorker 
implements PairFlatMapFun
        {
                //lazy parworker initialization
                if( !_initialized )
-                       configureWorker( TaskContext.get().taskAttemptId() ); 
//requires Spark 1.3
+                       configureWorker( TaskContext.get().taskAttemptId() );
                
                //execute a single task
                long numIter = getExecutedIterations();
@@ -98,24 +96,31 @@ public class RemoteParForSparkWorker extends ParWorker 
implements PairFlatMapFun
                _numTasks    = 0;
                _numIters    = 0;
 
-               //init local cache manager 
-               if( !CacheableData.isCachingActive() ) {
-                       String uuid = IDHandler.createDistributedUniqueID();
-                       LocalFileUtils.createWorkingDirectoryWithUUID( uuid );
-                       CacheableData.initCaching( uuid ); //incl activation, 
cache dir creation (each map task gets its own dir for simplified cleanup)
-               }               
-               if( !CacheableData.cacheEvictionLocalFilePrefix.contains("_") 
){ //account for local mode
-                       CacheableData.cacheEvictionLocalFilePrefix = 
CacheableData.cacheEvictionLocalFilePrefix +"_" + _workerID; 
+               //init and register-cleanup of buffer pool (in parfor spark, 
multiple tasks might 
+               //share the process-local, i.e., per executor, buffer pool; 
hence we synchronize 
+               //the initialization and immediately register the created 
directory for cleanup
+               //on process exit, i.e., executor exit, including any files 
created in the future.
+               synchronized( CacheableData.class ) {
+                       if( !CacheableData.isCachingActive() && 
!InfrastructureAnalyzer.isLocalMode() ) { 
+                               //create id, executor working dir, and cache dir
+                               String uuid = 
IDHandler.createDistributedUniqueID();
+                               LocalFileUtils.createWorkingDirectoryWithUUID( 
uuid );
+                               CacheableData.initCaching( uuid ); //incl 
activation and cache dir creation
+                               CacheableData.cacheEvictionLocalFilePrefix = 
+                                               
CacheableData.cacheEvictionLocalFilePrefix +"_" + _workerID; 
+                               //register entire working dir for delete on 
shutdown
+                               
RemoteParForUtils.cleanupWorkingDirectoriesOnShutdown();
+                       }       
                }
                
                //ensure that resultvar files are not removed
                super.pinResultVariables();
                
-               //enable/disable caching (if required)
-               if( !_caching )
+               //enable/disable caching (if required and not in CP process)
+               if( !_caching && !InfrastructureAnalyzer.isLocalMode() )
                        CacheableData.disableCaching();
                
-               //make as lazily intialized
+               //mark as initialized
                _initialized = true;
        }
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b78c1259/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForUtils.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForUtils.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForUtils.java
index 7b3ecb1..fd99429 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForUtils.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForUtils.java
@@ -190,12 +190,9 @@ public class RemoteParForUtils
                
                return ret;
        }
-               
        
        /**
-        * Cleanup all temporary files created by this SystemML process
-        * instance.
-        * 
+        * Cleanup all temporary files created by this SystemML process.
         */
        public static void cleanupWorkingDirectories()
        {
@@ -216,6 +213,15 @@ public class RemoteParForUtils
                }
        }
 
+       /**
+        * Cleanup all temporary files created by this SystemML process,
+        * on shutdown via exit or interrupt.
+        */
+       public static void cleanupWorkingDirectoriesOnShutdown() {
+               Runtime.getRuntime().addShutdownHook(
+                               new DeleteWorkingDirectoriesTask());
+       }
+       
        public static LocalVariableMap[] getResults( List<Tuple2<Long,String>> 
out, Log LOG ) 
                throws DMLRuntimeException
        {
@@ -241,4 +247,16 @@ public class RemoteParForUtils
                //create return array
                return tmp.values().toArray(new LocalVariableMap[0]);   
        }
+       
+       /**
+        * Task to be registered as shutdown hook in order to delete the 
+        * all working directories, including any remaining files, which 
+        * might not have been created  at time of registration.
+        */
+       private static class DeleteWorkingDirectoriesTask extends Thread {
+               @Override
+               public void run() {
+                       cleanupWorkingDirectories();
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b78c1259/src/main/java/org/apache/sysml/runtime/util/LocalFileUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/util/LocalFileUtils.java 
b/src/main/java/org/apache/sysml/runtime/util/LocalFileUtils.java
index c868f60..0086f8f 100644
--- a/src/main/java/org/apache/sysml/runtime/util/LocalFileUtils.java
+++ b/src/main/java/org/apache/sysml/runtime/util/LocalFileUtils.java
@@ -305,7 +305,7 @@ public class LocalFileUtils
                return createWorkingDirectoryWithUUID( DMLScript.getUUID() );
        }
 
-       public static synchronized String createWorkingDirectoryWithUUID( 
String uuid )
+       public static String createWorkingDirectoryWithUUID( String uuid )
                throws DMLRuntimeException 
        {
                //create local tmp dir if not existing

Reply via email to