This is an automated email from the ASF dual-hosted git repository.

mboehm7 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/master by this push:
     new 0888631  [SYSTEMDS-2708] Performance buffer pool handling of frames w/ 
strings
0888631 is described below

commit 0888631043ba57d29e682367712619815ececc9d
Author: Matthias Boehm <mboe...@gmail.com>
AuthorDate: Fri Oct 30 22:08:13 2020 +0100

    [SYSTEMDS-2708] Performance buffer pool handling of frames w/ strings
    
    This patch improves the performance of buffer pool handling for frames
    with string columns. There are many calls to getInMemorySize() on the
    path through the buffer pool. In case of string columns, these calls can
    all values to determine the lengths of individual strings. We now reuse
    previously computed size estimates within the frame block. Furthermore,
    this patch also adds generalize the asynchronous file cleaning in the
    buffer pool to now also accept serialization tasks (such as frames with
    string columns). However, because asynchronous serialization provides
    weaker guarantees this second modification is not enabled by default.
    
    On an example mini-batch scenario (with preprocessing in the individual
    iterations), this patch improved performance from 91s (86 right indexing
    incl bufferpool release) to 9.2s (5.1s right indexing). With the
    additional sync serialization, it further reduced to 7.9s (3.2s right
    indexing).
---
 .../controlprogram/caching/CacheableData.java      | 13 ++++-
 .../controlprogram/caching/LazyWriteBuffer.java    | 66 ++++++++++++++++++----
 .../apache/sysds/runtime/io/IOUtilFunctions.java   |  8 +--
 .../sysds/runtime/matrix/data/FrameBlock.java      | 32 ++++++++---
 4 files changed, 95 insertions(+), 24 deletions(-)

diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java
index cb7e096..97f211a 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java
@@ -85,9 +85,20 @@ public abstract class CacheableData<T extends CacheBlock> 
extends Data
        public static final RPolicy CACHING_BUFFER_POLICY = RPolicy.FIFO;
        public static final boolean CACHING_BUFFER_PAGECACHE = false;
        public static final boolean CACHING_WRITE_CACHE_ON_READ = false;
-       public static final String  CACHING_COUNTER_GROUP_NAME    = "SystemDS 
Caching Counters";
+       public static final String  CACHING_COUNTER_GROUP_NAME = "SystemDS 
Caching Counters";
        public static final String  CACHING_EVICTION_FILEEXTENSION = ".dat";
        public static final boolean CACHING_ASYNC_FILECLEANUP = true;
+       public static final boolean CACHING_ASYNC_SERIALIZE = false;
+       
+       //NOTE CACHING_ASYNC_SERIALIZE:
+       // The serialization of matrices and frames (ultra-sparse matrices or 
+       // frames with strings) into buffer pool byte arrays happens outside 
the 
+       // critical region of the global lock in LazyWriteBuffer. However, it 
still
+       // requires thread-local serialization (before returning from release) 
in 
+       // order to guarantee that not too many objects are pinned at the same 
time 
+       // which would violate the memory budget. Therefore, the new 
asynchronous 
+       // serialization (see CACHING_ASYNC_SERIALIZE) should be understood as
+       // optimistic with weaker guarantees.
        
        /**
         * Defines all possible cache status types for a data blob.
diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/LazyWriteBuffer.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/LazyWriteBuffer.java
index cf6bf0e..9ca079f 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/LazyWriteBuffer.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/LazyWriteBuffer.java
@@ -27,6 +27,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
 import org.apache.sysds.api.DMLScript;
+import org.apache.sysds.runtime.DMLRuntimeException;
 import 
org.apache.sysds.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
 import org.apache.sysds.runtime.util.LocalFileUtils;
 
@@ -47,8 +48,8 @@ public class LazyWriteBuffer
        //for (1) queue semantics and (2) constant time get/insert/delete 
operations)
        private static EvictionQueue _mQueue;
        
-       //file cleaner for synchronous or asynchronous delete of evicted files
-       private static FileCleaner _fClean;
+       //maintenance service for synchronous or asynchronous delete of evicted 
files
+       private static MaintenanceService _fClean;
        
        static {
                //obtain the logical buffer size in bytes
@@ -100,7 +101,7 @@ public class LazyWriteBuffer
                        }
                        
                        //serialize matrix (outside synchronized critical path)
-                       bbuff.serializeBlock(cb);
+                       _fClean.serializeData(bbuff, cb);
                        
                        if( DMLScript.STATISTICS ) {
                                CacheStatistics.incrementFSBuffWrites();
@@ -180,7 +181,7 @@ public class LazyWriteBuffer
 
        public static void init() {
                _mQueue = new EvictionQueue();
-               _fClean = new FileCleaner();
+               _fClean = new MaintenanceService();
                _size = 0;
                if( CacheableData.CACHING_BUFFER_PAGECACHE )
                        PageCache.init();
@@ -309,18 +310,19 @@ public class LazyWriteBuffer
        }
        
        /**
-        * File delete service for abstraction of synchronous and asynchronous
-        * file cleanup on rmvar/cpvar. The threadpool for asynchronous cleanup
-        * may increase the number of threads temporarily to the number of 
concurrent
-        * delete tasks (which is bounded to the parfor degree of parallelism).
+        * Maintenance service for abstraction of synchronous and asynchronous
+        * file cleanup on rmvar/cpvar as well as serialization of matrices and
+        * frames. The thread pool for asynchronous cleanup may increase the 
+        * number of threads temporarily to the number of concurrent delete 
tasks
+        * (which is bounded to the parfor degree of parallelism).
         */
-       private static class FileCleaner
+       private static class MaintenanceService
        {
                private ExecutorService _pool = null;
                
-               public FileCleaner() {
+               public MaintenanceService() {
                        //create new threadpool for async cleanup
-                       if( CacheableData.CACHING_ASYNC_FILECLEANUP )
+                       if( isAsync() )
                                _pool = Executors.newCachedThreadPool();
                }
                
@@ -332,12 +334,32 @@ public class LazyWriteBuffer
                                LocalFileUtils.deleteFileIfExists(fname, true);
                }
                
+               public void serializeData(ByteBuffer bbuff, CacheBlock cb) {
+                       //sync or async file delete
+                       if( CacheableData.CACHING_ASYNC_SERIALIZE )
+                               _pool.submit(new DataSerializerTask(bbuff, cb));
+                       else {
+                               try {
+                                       bbuff.serializeBlock(cb);
+                               }
+                               catch(IOException ex) {
+                                       throw new DMLRuntimeException(ex);
+                               }
+                       }
+               }
+               
                public void close() {
                        //execute pending tasks and shutdown pool
-                       if( CacheableData.CACHING_ASYNC_FILECLEANUP )
+                       if( isAsync() )
                                _pool.shutdown();
                }
                
+               @SuppressWarnings("unused")
+               public boolean isAsync() {
+                       return CacheableData.CACHING_ASYNC_FILECLEANUP 
+                               || CacheableData.CACHING_ASYNC_SERIALIZE;
+               }
+               
                private static class FileCleanerTask implements Runnable {
                        private String _fname = null;
                        
@@ -350,5 +372,25 @@ public class LazyWriteBuffer
                                LocalFileUtils.deleteFileIfExists(_fname, true);
                        }
                }
+               
+               private static class DataSerializerTask implements Runnable {
+                       private ByteBuffer _bbuff = null;
+                       private CacheBlock _cb = null;
+                       
+                       public DataSerializerTask(ByteBuffer bbuff, CacheBlock 
cb) {
+                               _bbuff = bbuff;
+                               _cb = cb;
+                       }
+                       
+                       @Override
+                       public void run() {
+                               try {
+                                       _bbuff.serializeBlock(_cb);
+                               }
+                               catch(IOException ex) {
+                                       throw new DMLRuntimeException(ex);
+                               }
+                       }
+               }
        }
 }
diff --git a/src/main/java/org/apache/sysds/runtime/io/IOUtilFunctions.java 
b/src/main/java/org/apache/sysds/runtime/io/IOUtilFunctions.java
index 75095b2..7b5e4d0 100644
--- a/src/main/java/org/apache/sysds/runtime/io/IOUtilFunctions.java
+++ b/src/main/java/org/apache/sysds/runtime/io/IOUtilFunctions.java
@@ -442,10 +442,10 @@ public class IOUtilFunctions
                //size in modified UTF-8 as used by DataInput/DataOutput
                int size = 2; //length in bytes
                for (int i = 0; i < value.length(); i++) {
-            char c = value.charAt(i);
-            size += ( c>=0x0001 && c<=0x007F) ? 1 :
-               (c >= 0x0800) ? 3 : 2;
-        }
+                       char c = value.charAt(i);
+                       size += ( c>=0x0001 && c<=0x007F) ? 1 :
+                               (c >= 0x0800) ? 3 : 2;
+               }
                return size;
        }
 
diff --git a/src/main/java/org/apache/sysds/runtime/matrix/data/FrameBlock.java 
b/src/main/java/org/apache/sysds/runtime/matrix/data/FrameBlock.java
index 175d400..9c4284c 100644
--- a/src/main/java/org/apache/sysds/runtime/matrix/data/FrameBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/matrix/data/FrameBlock.java
@@ -84,6 +84,9 @@ public class FrameBlock implements CacheBlock, Externalizable 
 {
        /** The data frame data as an ordered list of columns */
        private Array[] _coldata = null;
        
+       /** Cached size in memory to avoid repeated scans of string columns */
+       long _msize = -1;
+       
        public FrameBlock() {
                _numRows = 0;
        }
@@ -273,6 +276,7 @@ public class FrameBlock implements CacheBlock, 
Externalizable  {
         * @param numRows number of rows
         */
        public void ensureAllocatedColumns(int numRows) {
+               _msize = -1;
                //early abort if already allocated
                if( _coldata != null && _schema.length == _coldata.length ) {
                        //handle special case that to few rows allocated
@@ -376,6 +380,7 @@ public class FrameBlock implements CacheBlock, 
Externalizable  {
         */
        public void set(int r, int c, Object val) {
                _coldata[c].set(r, UtilFunctions.objectToObject(_schema[c], 
val));
+               _msize = -1;
        }
 
        public void reset(int nrow, boolean clearMeta) {
@@ -392,6 +397,7 @@ public class FrameBlock implements CacheBlock, 
Externalizable  {
                        for( int i=0; i < _coldata.length; i++ )
                                _coldata[i].reset(nrow);
                }
+               _msize = -1;
        }
 
        public void reset() {
@@ -440,7 +446,7 @@ public class FrameBlock implements CacheBlock, 
Externalizable  {
                _coldata = (_coldata==null) ? new Array[]{new StringArray(col)} 
:
                        (Array[]) ArrayUtils.add(_coldata, new 
StringArray(col));
                _numRows = col.length;
-
+               _msize = -1;
        }
        
        /**
@@ -458,6 +464,7 @@ public class FrameBlock implements CacheBlock, 
Externalizable  {
                _coldata = (_coldata==null) ? new Array[]{new 
BooleanArray(col)} :
                        (Array[]) ArrayUtils.add(_coldata, new 
BooleanArray(col));      
                _numRows = col.length;
+               _msize = -1;
        }
        
        /**
@@ -475,6 +482,7 @@ public class FrameBlock implements CacheBlock, 
Externalizable  {
                _coldata = (_coldata==null) ? new Array[]{new 
IntegerArray(col)} :
                        (Array[]) ArrayUtils.add(_coldata, new 
IntegerArray(col));
                _numRows = col.length;
+               _msize = -1;
        }
        /**
         * Append a column of value type LONG as the last column of
@@ -491,6 +499,7 @@ public class FrameBlock implements CacheBlock, 
Externalizable  {
                _coldata = (_coldata==null) ? new Array[]{new LongArray(col)} :
                        (Array[]) ArrayUtils.add(_coldata, new LongArray(col));
                _numRows = col.length;
+               _msize = -1;
        }
        
        /**
@@ -508,6 +517,7 @@ public class FrameBlock implements CacheBlock, 
Externalizable  {
                _coldata = (_coldata==null) ? new Array[]{new FloatArray(col)} :
                                (Array[]) ArrayUtils.add(_coldata, new 
FloatArray(col));
                _numRows = col.length;
+               _msize = -1;
        }
        /**
         * Append a column of value type DOUBLE as the last column of
@@ -524,6 +534,7 @@ public class FrameBlock implements CacheBlock, 
Externalizable  {
                _coldata = (_coldata==null) ? new Array[]{new DoubleArray(col)} 
:
                        (Array[]) ArrayUtils.add(_coldata, new 
DoubleArray(col));
                _numRows = col.length;
+               _msize = -1;
        }
        
        /**
@@ -545,6 +556,7 @@ public class FrameBlock implements CacheBlock, 
Externalizable  {
                _schema = empty ? tmpSchema : (ValueType[]) 
ArrayUtils.addAll(_schema, tmpSchema); 
                _coldata = empty ? tmpData : (Array[]) 
ArrayUtils.addAll(_coldata, tmpData);
                _numRows = cols[0].length;
+               _msize = -1;
        }
 
        public Object getColumnData(int c) {
@@ -564,12 +576,13 @@ public class FrameBlock implements CacheBlock, 
Externalizable  {
        public void setColumn(int c, Array column) {
                if( _coldata == null )
                        _coldata = new Array[getNumColumns()];
-               _coldata[c] = column; 
+               _coldata[c] = column;
+               _msize = -1;
        }
        
        /**
         * Get a row iterator over the frame where all fields are encoded
-        * as strings independent of their value types.  
+        * as strings independent of their value types.
         * 
         * @return string array iterator
         */
@@ -738,6 +751,7 @@ public class FrameBlock implements CacheBlock, 
Externalizable  {
                                        (mvvalue==null || mvvalue.isEmpty()) ? 
null : mvvalue);
                        _coldata[j] = arr;
                }
+               _msize = -1;
        }
 
        @Override
@@ -757,6 +771,10 @@ public class FrameBlock implements CacheBlock, 
Externalizable  {
        
        @Override
        public long getInMemorySize() {
+               //reuse previously computed size
+               if( _msize > 0 )
+                       return _msize;
+               
                //frame block header
                long size = 16 + 4; //object, num rows
                
@@ -788,11 +806,11 @@ public class FrameBlock implements CacheBlock, 
Externalizable  {
                                        for( int i=0; i<_numRows; i++ )
                                                size += 
getInMemoryStringSize(arr.get(i));
                                        break;
-                               default: //not applicable       
+                               default: //not applicable
                        }
                }
                
-               return size;
+               return _msize = size;
        }
        
        @Override
@@ -819,7 +837,7 @@ public class FrameBlock implements CacheBlock, 
Externalizable  {
                                        for( int i=0; i<_numRows; i++ )
                                                size += 
IOUtilFunctions.getUTFSize(arr.get(i));
                                        break;
-                               default: //not applicable       
+                               default: //not applicable
                        }
                }
                
@@ -965,7 +983,7 @@ public class FrameBlock implements CacheBlock, 
Externalizable  {
                //allocate output frame (incl deep copy schema)
                if( ret == null )
                        ret = new FrameBlock();
-               ret._numRows = _numRows;                                        
                        
+               ret._numRows = _numRows;
                ret._schema = _schema.clone();
                ret._colnames = (_colnames != null) ? _colnames.clone() : null;
                ret._colmeta = _colmeta.clone();

Reply via email to