[SYSTEMML-2258] Fix thread contention on acquire shared parfor input This patch shortens the critical sections of the acquireRead and release primitives (for buffer pool objects). We now maintain the pinned status and statistics outside the synchronized block, which improves unnecessary contention on acquiring shared reads in local parfor loops with large degree of parallelism.
As a positive side effect of maintaining the statistics outside the synchronized block, the -stats output now clearly shows such contention issues because the waiting time for the lock is counted into the acquire read time. Project: http://git-wip-us.apache.org/repos/asf/systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/09b1533d Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/09b1533d Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/09b1533d Branch: refs/heads/master Commit: 09b1533de71da735ecb86a4d331a0fd1eba0f30f Parents: f6e3a91 Author: Matthias Boehm <[email protected]> Authored: Thu Apr 19 01:17:39 2018 -0700 Committer: Matthias Boehm <[email protected]> Committed: Thu Apr 19 01:17:39 2018 -0700 ---------------------------------------------------------------------- .../controlprogram/caching/CacheableData.java | 119 ++++++++++--------- 1 file changed, 62 insertions(+), 57 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/systemml/blob/09b1533d/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java index 1ba6c4c..6e63284 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java @@ -370,37 +370,52 @@ public abstract class CacheableData<T extends CacheBlock> extends Data * * @return cacheable data */ - public synchronized T acquireRead() - { - if( LOG.isTraceEnabled() ) - LOG.trace("Acquire read "+hashCode()); + public T acquireRead() { long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0; + //core internal acquire (synchronized per object) + T ret = acquireReadIntern(); + + //update thread-local status (after pin but outside the + //critical section of accessing a shared object) + if( !isBelowCachingThreshold() ) + updateStatusPinned(true); + + if( DMLScript.STATISTICS ){ + long t1 = System.nanoTime(); + CacheStatistics.incrementAcquireRTime(t1-t0); + } + + return ret; + } + + private synchronized T acquireReadIntern() { if ( !isAvailableToRead() ) throw new DMLRuntimeException("MatrixObject not available to read."); //get object from cache if( _data == null ) getCache(); - + //call acquireHostRead if gpuHandle is set as well as is allocated - boolean copiedFromGPU = false; - for (Map.Entry<GPUContext, GPUObject> kv : _gpuObjects.entrySet()) { - GPUObject gObj = kv.getValue(); - if (gObj != null && copiedFromGPU && gObj.isDirty()) { - LOG.error("Inconsistent internal state - A copy of this CacheableData was dirty on more than 1 GPU"); - throw new DMLRuntimeException("Internal Error : Inconsistent internal state, A copy of this CacheableData was dirty on more than 1 GPU"); - } else if (gObj != null){ - copiedFromGPU = gObj.acquireHostRead(null); - if( _data == null ) - getCache(); - } - } + if( DMLScript.USE_ACCELERATOR ) { + boolean copiedFromGPU = false; + for (Map.Entry<GPUContext, GPUObject> kv : _gpuObjects.entrySet()) { + GPUObject gObj = kv.getValue(); + if (gObj != null && copiedFromGPU && gObj.isDirty()) + throw new DMLRuntimeException("Internal Error : Inconsistent internal state, A copy of this CacheableData was dirty on more than 1 GPU"); + else if (gObj != null) { + copiedFromGPU = gObj.acquireHostRead(null); + if( _data == null ) + getCache(); + } + } + } //read data from HDFS/RDD if required - //(probe data for cache_nowrite / jvm_reuse) - if( isEmpty(true) && _data==null ) - { + //(probe data for cache_nowrite / jvm_reuse) + if( isEmpty(true) && _data==null ) + { try { if( DMLScript.STATISTICS ) @@ -437,21 +452,13 @@ public abstract class CacheableData<T extends CacheBlock> extends Data _isAcquireFromEmpty = true; } - else if( DMLScript.STATISTICS ) - { + else if( DMLScript.STATISTICS ) { if( _data!=null ) CacheStatistics.incrementMemHits(); } //cache status maintenance acquire( false, _data==null ); - updateStatusPinned(true); - - if( DMLScript.STATISTICS ){ - long t1 = System.nanoTime(); - CacheStatistics.incrementAcquireRTime(t1-t0); - } - return _data; } @@ -467,8 +474,8 @@ public abstract class CacheableData<T extends CacheBlock> extends Data */ public synchronized T acquireModify() { - if( LOG.isTraceEnabled() ) - LOG.trace("Acquire modify "+hashCode()); + //TODO remove after debugger (as only consumer) has been removed, because + //recent features such as gpu data transfers are not yet integrated long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0; if ( !isAvailableToModify() ) @@ -526,8 +533,6 @@ public abstract class CacheableData<T extends CacheBlock> extends Data */ public synchronized T acquireModify(T newData, String opcode) { - if( LOG.isTraceEnabled() ) - LOG.trace("Acquire modify newdata "+hashCode()); long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0; if (! isAvailableToModify ()) @@ -546,7 +551,8 @@ public abstract class CacheableData<T extends CacheBlock> extends Data if (newData == null) throw new DMLRuntimeException("acquireModify with empty cache block."); _data = newData; - updateStatusPinned(true); + if( !isBelowCachingThreshold() ) + updateStatusPinned(true); if( DMLScript.STATISTICS ){ long t1 = System.nanoTime(); @@ -580,15 +586,27 @@ public abstract class CacheableData<T extends CacheBlock> extends Data * Out-Status: READ(-1), EVICTABLE, EMPTY. * */ - public synchronized void release(String opcode) - { - if( LOG.isTraceEnabled() ) - LOG.trace("Release "+hashCode()); + public void release(String opcode) { long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0; + //update thread-local status (before unpin but outside + //the critical section of accessing a shared object) + if( !isBelowCachingThreshold() ) + updateStatusPinned(false); + + //core internal release (synchronized per object) + releaseIntern(opcode); + + if( DMLScript.STATISTICS ){ + long t1 = System.nanoTime(); + CacheStatistics.incrementReleaseTime(t1-t0); + } + } + + private synchronized void releaseIntern(String opcode) + { boolean write = false; - if ( isModify() ) - { + if ( isModify() ) { //set flags for write write = true; setDirty(true); @@ -602,14 +620,12 @@ public abstract class CacheableData<T extends CacheBlock> extends Data //cache status maintenance (pass cacheNoWrite flag) release(_isAcquireFromEmpty && !_requiresLocalWrite); - updateStatusPinned(false); - if( isCachingActive() //only if caching is enabled (otherwise keep everything in mem) - && isCached(true) //not empty and not read/modify + if( isCachingActive() //only if caching is enabled (otherwise keep everything in mem) + && isCached(true) //not empty and not read/modify && !isBelowCachingThreshold() ) //min size for caching { - if( write || _requiresLocalWrite ) - { + if( write || _requiresLocalWrite ) { //evict blob String filePath = getCacheFilePathAndName(); try { @@ -632,14 +648,6 @@ public abstract class CacheableData<T extends CacheBlock> extends Data createCache(); _data = null; } - else if( LOG.isTraceEnabled() ){ - LOG.trace("Var "+hashCode()+" not subject to caching, state="+getStatusAsString()); - } - - if( DMLScript.STATISTICS ){ - long t1 = System.nanoTime(); - CacheStatistics.incrementReleaseTime(t1-t0); - } } /** @@ -653,9 +661,6 @@ public abstract class CacheableData<T extends CacheBlock> extends Data */ public synchronized void clearData() { - if( LOG.isTraceEnabled() ) - LOG.trace("Clear data "+hashCode()); - // check if cleanup enabled and possible if( !isCleanupEnabled() ) return; // do nothing @@ -1201,7 +1206,7 @@ public abstract class CacheableData<T extends CacheBlock> extends Data * referenced cache block. */ protected void createCache( ) { - _cache = new SoftReference<>( _data ); + _cache = new SoftReference<>( _data ); } /**
