[SYSTEMML-2258] Fix thread contention on acquire shared parfor input

This patch shortens the critical sections of the acquireRead and release
primitives (for buffer pool objects). We now maintain the pinned status
and statistics outside the synchronized block, which improves
unnecessary contention on acquiring shared reads in local parfor loops
with large degree of parallelism. 

As a positive side effect of maintaining the statistics outside the
synchronized block, the -stats output now clearly shows such contention
issues because the waiting time for the lock is counted into the acquire
read time.


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/09b1533d
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/09b1533d
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/09b1533d

Branch: refs/heads/master
Commit: 09b1533de71da735ecb86a4d331a0fd1eba0f30f
Parents: f6e3a91
Author: Matthias Boehm <[email protected]>
Authored: Thu Apr 19 01:17:39 2018 -0700
Committer: Matthias Boehm <[email protected]>
Committed: Thu Apr 19 01:17:39 2018 -0700

----------------------------------------------------------------------
 .../controlprogram/caching/CacheableData.java   | 119 ++++++++++---------
 1 file changed, 62 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/09b1533d/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java
index 1ba6c4c..6e63284 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java
@@ -370,37 +370,52 @@ public abstract class CacheableData<T extends CacheBlock> 
extends Data
         * 
         * @return cacheable data
         */
-       public synchronized T acquireRead()
-       {
-               if( LOG.isTraceEnabled() )
-                       LOG.trace("Acquire read "+hashCode());
+       public T acquireRead() {
                long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
                
+               //core internal acquire (synchronized per object)
+               T ret = acquireReadIntern();
+               
+               //update thread-local status (after pin but outside the
+               //critical section of accessing a shared object)
+               if( !isBelowCachingThreshold() )
+                       updateStatusPinned(true);
+               
+               if( DMLScript.STATISTICS ){
+                       long t1 = System.nanoTime();
+                       CacheStatistics.incrementAcquireRTime(t1-t0);
+               }
+               
+               return ret;
+       }
+       
+       private synchronized T acquireReadIntern() {
                if ( !isAvailableToRead() )
                        throw new DMLRuntimeException("MatrixObject not 
available to read.");
                
                //get object from cache
                if( _data == null )
                        getCache();
-                       
+               
                //call acquireHostRead if gpuHandle is set as well as is 
allocated
-        boolean copiedFromGPU = false;
-        for (Map.Entry<GPUContext, GPUObject> kv : _gpuObjects.entrySet()) {
-            GPUObject gObj = kv.getValue();
-            if (gObj != null && copiedFromGPU && gObj.isDirty()) {
-                LOG.error("Inconsistent internal state - A copy of this 
CacheableData was dirty on more than 1 GPU");
-                throw new DMLRuntimeException("Internal Error : Inconsistent 
internal state, A copy of this CacheableData was dirty on more than 1 GPU");
-            } else if (gObj != null){
-                copiedFromGPU = gObj.acquireHostRead(null);
-                if( _data == null )
-                    getCache();
-            }
-        }
+               if( DMLScript.USE_ACCELERATOR ) {
+                       boolean copiedFromGPU = false;
+                       for (Map.Entry<GPUContext, GPUObject> kv : 
_gpuObjects.entrySet()) {
+                               GPUObject gObj = kv.getValue();
+                               if (gObj != null && copiedFromGPU && 
gObj.isDirty())
+                                       throw new DMLRuntimeException("Internal 
Error : Inconsistent internal state, A copy of this CacheableData was dirty on 
more than 1 GPU");
+                               else if (gObj != null) {
+                                       copiedFromGPU = 
gObj.acquireHostRead(null);
+                                       if( _data == null )
+                                               getCache();
+                               }
+                       }
+               }
 
                //read data from HDFS/RDD if required
-               //(probe data for cache_nowrite / jvm_reuse)  
-               if( isEmpty(true) && _data==null ) 
-               {                       
+               //(probe data for cache_nowrite / jvm_reuse)
+               if( isEmpty(true) && _data==null )
+               {
                        try
                        {
                                if( DMLScript.STATISTICS )
@@ -437,21 +452,13 @@ public abstract class CacheableData<T extends CacheBlock> 
extends Data
                        
                        _isAcquireFromEmpty = true;
                }
-               else if( DMLScript.STATISTICS )
-               {
+               else if( DMLScript.STATISTICS ) {
                        if( _data!=null )
                                CacheStatistics.incrementMemHits();
                }
                
                //cache status maintenance
                acquire( false, _data==null );
-               updateStatusPinned(true);
-               
-               if( DMLScript.STATISTICS ){
-                       long t1 = System.nanoTime();
-                       CacheStatistics.incrementAcquireRTime(t1-t0);
-               }
-               
                return _data;
        }
 
@@ -467,8 +474,8 @@ public abstract class CacheableData<T extends CacheBlock> 
extends Data
         */
        public synchronized T acquireModify() 
        {
-               if( LOG.isTraceEnabled() )
-                       LOG.trace("Acquire modify "+hashCode());
+               //TODO remove after debugger (as only consumer) has been 
removed, because
+               //recent features such as gpu data transfers are not yet 
integrated
                long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
                
                if ( !isAvailableToModify() )
@@ -526,8 +533,6 @@ public abstract class CacheableData<T extends CacheBlock> 
extends Data
         */
        public synchronized T acquireModify(T newData, String opcode)
        {
-               if( LOG.isTraceEnabled() )
-                       LOG.trace("Acquire modify newdata "+hashCode());
                long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
                
                if (! isAvailableToModify ())
@@ -546,7 +551,8 @@ public abstract class CacheableData<T extends CacheBlock> 
extends Data
                if (newData == null)
                        throw new DMLRuntimeException("acquireModify with empty 
cache block.");
                _data = newData;
-               updateStatusPinned(true);
+               if( !isBelowCachingThreshold() )
+                       updateStatusPinned(true);
                
                if( DMLScript.STATISTICS ){
                        long t1 = System.nanoTime();
@@ -580,15 +586,27 @@ public abstract class CacheableData<T extends CacheBlock> 
extends Data
         * Out-Status: READ(-1), EVICTABLE, EMPTY.
         * 
         */
-       public synchronized void release(String opcode) 
-       {
-               if( LOG.isTraceEnabled() )
-                       LOG.trace("Release "+hashCode());
+       public void release(String opcode) {
                long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
                
+               //update thread-local status (before unpin but outside
+               //the critical section of accessing a shared object)
+               if( !isBelowCachingThreshold() )
+                       updateStatusPinned(false);
+               
+               //core internal release (synchronized per object)
+               releaseIntern(opcode);
+               
+               if( DMLScript.STATISTICS ){
+                       long t1 = System.nanoTime();
+                       CacheStatistics.incrementReleaseTime(t1-t0);
+               }
+       }
+       
+       private synchronized void releaseIntern(String opcode)
+       {
                boolean write = false;
-               if ( isModify() )
-               {
+               if ( isModify() ) {
                        //set flags for write
                        write = true;
                        setDirty(true);
@@ -602,14 +620,12 @@ public abstract class CacheableData<T extends CacheBlock> 
extends Data
                
                //cache status maintenance (pass cacheNoWrite flag)
                release(_isAcquireFromEmpty && !_requiresLocalWrite);
-               updateStatusPinned(false);
                
-               if(    isCachingActive() //only if caching is enabled 
(otherwise keep everything in mem)
-                       && isCached(true)    //not empty and not read/modify
+               if( isCachingActive() //only if caching is enabled (otherwise 
keep everything in mem)
+                       && isCached(true) //not empty and not read/modify
                        && !isBelowCachingThreshold() ) //min size for caching
                {
-                       if( write || _requiresLocalWrite ) 
-                       {
+                       if( write || _requiresLocalWrite ) {
                                //evict blob
                                String filePath = getCacheFilePathAndName();
                                try {
@@ -632,14 +648,6 @@ public abstract class CacheableData<T extends CacheBlock> 
extends Data
                        createCache();
                        _data = null;
                }
-               else if( LOG.isTraceEnabled() ){
-                       LOG.trace("Var "+hashCode()+" not subject to caching, 
state="+getStatusAsString());
-               }
-
-               if( DMLScript.STATISTICS ){
-                       long t1 = System.nanoTime();
-                       CacheStatistics.incrementReleaseTime(t1-t0);
-               }
        }
        
        /**
@@ -653,9 +661,6 @@ public abstract class CacheableData<T extends CacheBlock> 
extends Data
         */
        public synchronized void clearData() 
        {
-               if( LOG.isTraceEnabled() )
-                       LOG.trace("Clear data "+hashCode());
-               
                // check if cleanup enabled and possible 
                if( !isCleanupEnabled() ) 
                        return; // do nothing
@@ -1201,7 +1206,7 @@ public abstract class CacheableData<T extends CacheBlock> 
extends Data
         * referenced cache block.  
         */
        protected void createCache( ) {
-               _cache = new SoftReference<>( _data );  
+               _cache = new SoftReference<>( _data );
        }
 
        /**

Reply via email to