Repository: incubator-systemml
Updated Branches:
  refs/heads/master 1bd7da3a5 -> 3de43d62f


[SYSTEMML-653] Asynchronous bufferpool cleanup of evicted files 

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/a0546399
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/a0546399
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/a0546399

Branch: refs/heads/master
Commit: a0546399edcf614b5b5f06f4317d3d75127bf962
Parents: 1bd7da3
Author: Matthias Boehm <[email protected]>
Authored: Thu Apr 28 21:27:30 2016 -0700
Committer: Matthias Boehm <[email protected]>
Committed: Fri Apr 29 11:57:16 2016 -0700

----------------------------------------------------------------------
 .../controlprogram/caching/CacheableData.java   |  5 +-
 .../controlprogram/caching/LazyWriteBuffer.java | 58 +++++++++++++++++++-
 2 files changed, 58 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/a0546399/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java
index 7e32cdb..716e528 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java
@@ -72,7 +72,8 @@ public abstract class CacheableData<T extends CacheBlock> 
extends Data
        public static final boolean CACHING_BUFFER_PAGECACHE = false; 
        public static final boolean CACHING_WRITE_CACHE_ON_READ = false;        
        public static final String  CACHING_COUNTER_GROUP_NAME    = "SystemML 
Caching Counters";
-       public static final String  CACHEING_EVICTION_FILEEXTENSION = ".dat";
+       public static final String  CACHING_EVICTION_FILEEXTENSION = ".dat";
+       public static final boolean CACHING_ASYNC_FILECLEANUP = true;
     
        /**
         * Defines all possible cache status types for a data blob.
@@ -1083,7 +1084,7 @@ public abstract class CacheableData<T extends CacheBlock> 
extends Data
                        sb.append(CacheableData.cacheEvictionLocalFilePath); 
                        sb.append(CacheableData.cacheEvictionLocalFilePrefix);
                        sb.append(String.format ("%09d", getUniqueCacheID()));
-                       
sb.append(CacheableData.CACHEING_EVICTION_FILEEXTENSION);                       
+                       
sb.append(CacheableData.CACHING_EVICTION_FILEEXTENSION);                        
                        _cacheFileName = sb.toString();
                }
                

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/a0546399/src/main/java/org/apache/sysml/runtime/controlprogram/caching/LazyWriteBuffer.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/LazyWriteBuffer.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/LazyWriteBuffer.java
index b412b80..23cc620 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/LazyWriteBuffer.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/LazyWriteBuffer.java
@@ -23,6 +23,8 @@ import java.io.IOException;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.Map.Entry;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 import org.apache.sysml.api.DMLScript;
 import 
org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
@@ -49,6 +51,9 @@ public class LazyWriteBuffer
        //for (1) queue semantics and (2) constant time get/insert/delete 
operations)
        private static EvictionQueue _mQueue;
        
+       //file cleaner for synchronous or asynchronous delete of evicted files
+       private static FileCleaner _fClean;
+       
        static {
                //obtain the logical buffer size in bytes
                long maxMem = InfrastructureAnalyzer.getLocalMaxMemory();
@@ -144,7 +149,7 @@ public class LazyWriteBuffer
                
                //delete from FS if required
                if( requiresDelete )
-                       LocalFileUtils.deleteFileIfExists(fname, true);
+                       _fClean.deleteFile(fname);
        }
        
        /**
@@ -195,7 +200,8 @@ public class LazyWriteBuffer
         * 
         */
        public static void init() {
-               _mQueue = new EvictionQueue();          
+               _mQueue = new EvictionQueue();
+               _fClean = new FileCleaner();
                _size = 0;
                if( CacheableData.CACHING_BUFFER_PAGECACHE )
                        PageCache.init();
@@ -205,8 +211,10 @@ public class LazyWriteBuffer
         * 
         */
        public static void cleanup() {
-               if( _mQueue!=null )
+               if( _mQueue != null )
                        _mQueue.clear();
+               if( _fClean != null )
+                       _fClean.close();
                if( CacheableData.CACHING_BUFFER_PAGECACHE )
                        PageCache.clear();
        }
@@ -271,4 +279,48 @@ public class LazyWriteBuffer
                        return entry;
                }
        }
+       
+       /**
+        * File delete service for abstraction of synchronous and asynchronous 
+        * file cleanup on rmvar/cpvar. The threadpool for asynchronous cleanup
+        * may increase the number of threads temporarily to the number of 
concurrent 
+        * delete tasks (which is bounded to the parfor degree of parallelism).
+        */
+       private static class FileCleaner
+       {
+               private ExecutorService _pool = null;
+               
+               public FileCleaner() {
+                       //create new threadpool for async cleanup
+                       if( CacheableData.CACHING_ASYNC_FILECLEANUP )
+                               _pool = Executors.newCachedThreadPool();
+               }
+               
+               public void deleteFile(String fname) {
+                       //sync or async file delete
+                       if( CacheableData.CACHING_ASYNC_FILECLEANUP )
+                               _pool.submit(new FileCleanerTask(fname));
+                       else
+                               LocalFileUtils.deleteFileIfExists(fname, true);
+               }
+               
+               public void close() {
+                       //execute pending tasks and shutdown pool
+                       if( CacheableData.CACHING_ASYNC_FILECLEANUP )
+                               _pool.shutdown();
+               }
+               
+               private class FileCleanerTask implements Runnable {
+                       private String _fname = null;
+                       
+                       public FileCleanerTask( String fname ) {
+                               _fname = fname;
+                       }
+                       
+                       @Override
+                       public void run() {
+                               LocalFileUtils.deleteFileIfExists(_fname, true);
+                       }                       
+               }
+       }
 }

Reply via email to