Repository: incubator-systemml Updated Branches: refs/heads/master 1bd7da3a5 -> 3de43d62f
[SYSTEMML-653] Asynchronous bufferpool cleanup of evicted files Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/a0546399 Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/a0546399 Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/a0546399 Branch: refs/heads/master Commit: a0546399edcf614b5b5f06f4317d3d75127bf962 Parents: 1bd7da3 Author: Matthias Boehm <[email protected]> Authored: Thu Apr 28 21:27:30 2016 -0700 Committer: Matthias Boehm <[email protected]> Committed: Fri Apr 29 11:57:16 2016 -0700 ---------------------------------------------------------------------- .../controlprogram/caching/CacheableData.java | 5 +- .../controlprogram/caching/LazyWriteBuffer.java | 58 +++++++++++++++++++- 2 files changed, 58 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/a0546399/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java index 7e32cdb..716e528 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java @@ -72,7 +72,8 @@ public abstract class CacheableData<T extends CacheBlock> extends Data public static final boolean CACHING_BUFFER_PAGECACHE = false; public static final boolean CACHING_WRITE_CACHE_ON_READ = false; public static final String CACHING_COUNTER_GROUP_NAME = "SystemML Caching Counters"; - public static final String CACHEING_EVICTION_FILEEXTENSION = ".dat"; + public static final String CACHING_EVICTION_FILEEXTENSION = ".dat"; + public static final boolean CACHING_ASYNC_FILECLEANUP = true; /** * Defines all possible cache status types for a data blob. @@ -1083,7 +1084,7 @@ public abstract class CacheableData<T extends CacheBlock> extends Data sb.append(CacheableData.cacheEvictionLocalFilePath); sb.append(CacheableData.cacheEvictionLocalFilePrefix); sb.append(String.format ("%09d", getUniqueCacheID())); - sb.append(CacheableData.CACHEING_EVICTION_FILEEXTENSION); + sb.append(CacheableData.CACHING_EVICTION_FILEEXTENSION); _cacheFileName = sb.toString(); } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/a0546399/src/main/java/org/apache/sysml/runtime/controlprogram/caching/LazyWriteBuffer.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/LazyWriteBuffer.java b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/LazyWriteBuffer.java index b412b80..23cc620 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/LazyWriteBuffer.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/LazyWriteBuffer.java @@ -23,6 +23,8 @@ import java.io.IOException; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.Map.Entry; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import org.apache.sysml.api.DMLScript; import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer; @@ -49,6 +51,9 @@ public class LazyWriteBuffer //for (1) queue semantics and (2) constant time get/insert/delete operations) private static EvictionQueue _mQueue; + //file cleaner for synchronous or asynchronous delete of evicted files + private static FileCleaner _fClean; + static { //obtain the logical buffer size in bytes long maxMem = InfrastructureAnalyzer.getLocalMaxMemory(); @@ -144,7 +149,7 @@ public class LazyWriteBuffer //delete from FS if required if( requiresDelete ) - LocalFileUtils.deleteFileIfExists(fname, true); + _fClean.deleteFile(fname); } /** @@ -195,7 +200,8 @@ public class LazyWriteBuffer * */ public static void init() { - _mQueue = new EvictionQueue(); + _mQueue = new EvictionQueue(); + _fClean = new FileCleaner(); _size = 0; if( CacheableData.CACHING_BUFFER_PAGECACHE ) PageCache.init(); @@ -205,8 +211,10 @@ public class LazyWriteBuffer * */ public static void cleanup() { - if( _mQueue!=null ) + if( _mQueue != null ) _mQueue.clear(); + if( _fClean != null ) + _fClean.close(); if( CacheableData.CACHING_BUFFER_PAGECACHE ) PageCache.clear(); } @@ -271,4 +279,48 @@ public class LazyWriteBuffer return entry; } } + + /** + * File delete service for abstraction of synchronous and asynchronous + * file cleanup on rmvar/cpvar. The threadpool for asynchronous cleanup + * may increase the number of threads temporarily to the number of concurrent + * delete tasks (which is bounded to the parfor degree of parallelism). + */ + private static class FileCleaner + { + private ExecutorService _pool = null; + + public FileCleaner() { + //create new threadpool for async cleanup + if( CacheableData.CACHING_ASYNC_FILECLEANUP ) + _pool = Executors.newCachedThreadPool(); + } + + public void deleteFile(String fname) { + //sync or async file delete + if( CacheableData.CACHING_ASYNC_FILECLEANUP ) + _pool.submit(new FileCleanerTask(fname)); + else + LocalFileUtils.deleteFileIfExists(fname, true); + } + + public void close() { + //execute pending tasks and shutdown pool + if( CacheableData.CACHING_ASYNC_FILECLEANUP ) + _pool.shutdown(); + } + + private class FileCleanerTask implements Runnable { + private String _fname = null; + + public FileCleanerTask( String fname ) { + _fname = fname; + } + + @Override + public void run() { + LocalFileUtils.deleteFileIfExists(_fname, true); + } + } + } }
