This is an automated email from the ASF dual-hosted git repository.

mboehm7 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemml.git


The following commit(s) were added to refs/heads/master by this push:
     new 6e811d7  [SYSTEMDS-335] Weighted eviction policy in lineage cache
6e811d7 is described below

commit 6e811d75facf0a6cbff0ee9ff93c15beedc1302f
Author: arnabp <[email protected]>
AuthorDate: Sun May 3 16:26:05 2020 +0200

    [SYSTEMDS-335] Weighted eviction policy in lineage cache
    
    This patch contains a new eviction policy for lineage cache. A min-heap
    based priority queue over a function of computation time and
    size is maintained to define the order of evictions.The idea is to evict
    large matrices, which take little time to recompute. This weighted
    scheme significantly reduces the number of evictions (including disk
    spilling). This patch also refactors the LineageCache class to hide the
    eviction policy related maintenance.
    
    Closes #905.
---
 docs/Tasks.txt                                     |   2 +-
 .../instructions/cp/FunctionCallCPInstruction.java |   2 +
 .../apache/sysds/runtime/lineage/LineageCache.java | 430 ++-------------------
 .../sysds/runtime/lineage/LineageCacheConfig.java  |  20 +
 .../sysds/runtime/lineage/LineageCacheEntry.java   | 112 ++++++
 .../runtime/lineage/LineageCacheEviction.java      | 371 ++++++++++++++++++
 6 files changed, 544 insertions(+), 393 deletions(-)

diff --git a/docs/Tasks.txt b/docs/Tasks.txt
index 42b2b3e..9d2dba4 100644
--- a/docs/Tasks.txt
+++ b/docs/Tasks.txt
@@ -262,7 +262,7 @@ SYSTEMDS-330 Lineage Tracing, Reuse and Integration
  * 332 Parfor integration with multi-level reuse                      OK
  * 333 Improve cache eviction with actual compute time                OK
  * 334 Cache scalars only with atleast one matrix inputs
- * 335 Weighted eviction policy (function of size & computetime)
+ * 335 Weighted eviction policy (function of size & computetime)      OK
  * 336 Better use of cache status to handle multithreading
  * 337 Adjust disk I/O speed by recording actual time taken           OK
  * 338 Extended lineage tracing (rmEmpty, lists), partial rewrites    OK
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/cp/FunctionCallCPInstruction.java
 
b/src/main/java/org/apache/sysds/runtime/instructions/cp/FunctionCallCPInstruction.java
index def4859..3c4e1a9 100644
--- 
a/src/main/java/org/apache/sysds/runtime/instructions/cp/FunctionCallCPInstruction.java
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/cp/FunctionCallCPInstruction.java
@@ -231,6 +231,8 @@ public class FunctionCallCPInstruction extends 
CPInstruction {
                if( DMLScript.LINEAGE && LineageCacheConfig.isMultiLevelReuse() 
) {
                        LineageCache.putValue(fpb.getOutputParams(), liInputs, 
                                        getCacheFunctionName(_functionName, 
fpb), ec, t1-t0);
+                       //FIXME: send _boundOutputNames instead of 
fpb.getOutputParams as 
+                       //those are already replaced by boundoutput names in 
the lineage map.
                }
        }
 
diff --git a/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java 
b/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
index a42a376..32e5585 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
@@ -38,15 +38,12 @@ import org.apache.sysds.runtime.instructions.cp.Data;
 import org.apache.sysds.runtime.instructions.cp.MMTSJCPInstruction;
 import 
org.apache.sysds.runtime.instructions.cp.ParameterizedBuiltinCPInstruction;
 import org.apache.sysds.runtime.instructions.cp.ScalarObject;
-import org.apache.sysds.runtime.lineage.LineageCacheConfig.LineageCacheStatus;
 import org.apache.sysds.runtime.lineage.LineageCacheConfig.ReuseCacheType;
 import org.apache.sysds.runtime.matrix.data.InputInfo;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 import org.apache.sysds.runtime.matrix.data.OutputInfo;
 import org.apache.sysds.runtime.meta.MetaDataFormat;
-import org.apache.sysds.runtime.util.LocalFileUtils;
 
-import java.io.IOException;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -55,20 +52,13 @@ import java.util.Map;
 
 public class LineageCache
 {
-       private static final Map<LineageItem, Entry> _cache = new HashMap<>();
-       private static final Map<LineageItem, SpilledItem> _spillList = new 
HashMap<>();
-       private static final HashSet<LineageItem> _removelist = new HashSet<>();
+       private static final Map<LineageItem, LineageCacheEntry> _cache = new 
HashMap<>();
        private static final double CACHE_FRAC = 0.05; // 5% of JVM heap size
-       private static final long CACHE_LIMIT; //limit in bytes
-       private static final boolean DEBUG = false;
-       private static String _outdir = null;
-       private static long _cachesize = 0;
-       private static Entry _head = null;
-       private static Entry _end = null;
+       protected static final boolean DEBUG = false;
 
        static {
                long maxMem = InfrastructureAnalyzer.getLocalMaxMemory();
-               CACHE_LIMIT = (long)(CACHE_FRAC * maxMem);
+               LineageCacheEviction.setCacheLimit((long)(CACHE_FRAC * maxMem));
        }
        
        // Cache Synchronization Approach:
@@ -95,7 +85,7 @@ public class LineageCache
                        
                        //atomic try reuse full/partial and set placeholder, 
without
                        //obtaining value to avoid blocking in critical section
-                       Entry e = null;
+                       LineageCacheEntry e = null;
                        synchronized( _cache ) {
                                //try to reuse full or partial intermediates
                                if 
(LineageCacheConfig.getCacheType().isFullReuse())
@@ -141,7 +131,7 @@ public class LineageCache
                for (int i=0; i<numOutputs; i++) {
                        String opcode = name + String.valueOf(i+1);
                        LineageItem li = new LineageItem(outNames.get(i), 
opcode, liInputs);
-                       Entry e = null;
+                       LineageCacheEntry e = null;
                        synchronized(_cache) {
                                if (LineageCache.probe(li)) {
                                        e = LineageCache.getIntern(li);
@@ -199,15 +189,15 @@ public class LineageCache
        
        public static boolean probe(LineageItem key) {
                //TODO problematic as after probe the matrix might be kicked 
out of cache
-               boolean p = (_cache.containsKey(key) || 
_spillList.containsKey(key));
-               if (!p && DMLScript.STATISTICS && _removelist.contains(key))
+               boolean p = (_cache.containsKey(key) || 
LineageCacheEviction.spillListContains(key));
+               if (!p && DMLScript.STATISTICS && 
LineageCacheEviction._removelist.contains(key))
                        // The sought entry was in cache but removed later 
                        LineageCacheStatistics.incrementDelHits();
                return p;
        }
        
        public static MatrixBlock getMatrix(LineageItem key) {
-               Entry e = null;
+               LineageCacheEntry e = null;
                synchronized( _cache ) {
                        e = getIntern(key);
                }
@@ -241,11 +231,14 @@ public class LineageCache
                                        
_cache.get(item).setValue((ScalarObject)data, computetime);
                                else
                                        throw new DMLRuntimeException("Lineage 
Cache: unsupported data: "+data.getDataType());
+
+                               //maintain order for eviction
+                               LineageCacheEviction.addEntry(_cache.get(item));
+
                                long size = _cache.get(item).getSize();
-                               
-                               if (!isBelowThreshold(size))
-                                       makeSpace(size);
-                               updateSize(size, true);
+                               if 
(!LineageCacheEviction.isBelowThreshold(size))
+                                       LineageCacheEviction.makeSpace(_cache, 
size);
+                               LineageCacheEviction.updateSize(size, true);
                        }
                }
        }
@@ -281,7 +274,7 @@ public class LineageCache
                        if(AllOutputsCacheable)
                                FuncLIMap.forEach((Li, boundLI) -> mvIntern(Li, 
boundLI, computetime));
                        else
-                               FuncLIMap.forEach((Li, boundLI) -> 
removeEntry(Li));
+                               FuncLIMap.forEach((Li, boundLI) -> 
_cache.remove(Li));
                }
                
                return;
@@ -290,61 +283,52 @@ public class LineageCache
        public static void resetCache() {
                synchronized (_cache) {
                        _cache.clear();
-                       _spillList.clear();
-                       _head = null;
-                       _end = null;
-                       // reset cache size, otherwise the cache clear leads to 
unusable 
-                       // space which means evictions could run into endless 
loops
-                       _cachesize = 0;
-                       _outdir = null;
-                       if (DMLScript.STATISTICS)
-                               _removelist.clear();
+                       LineageCacheEviction.resetEviction();
                }
        }
        
        //----------------- INTERNAL CACHE LOGIC IMPLEMENTATION --------------//
        
-       private static void putIntern(LineageItem key, DataType dt, MatrixBlock 
Mval, ScalarObject Sval, long computetime) {
+       protected static void putIntern(LineageItem key, DataType dt, 
MatrixBlock Mval, ScalarObject Sval, long computetime) {
                if (_cache.containsKey(key))
                        //can come here if reuse_partial option is enabled
                        return;
                
                // Create a new entry.
-               Entry newItem = new Entry(key, dt, Mval, Sval, computetime);
+               LineageCacheEntry newItem = new LineageCacheEntry(key, dt, 
Mval, Sval, computetime);
                
                // Make space by removing or spilling LRU entries.
                if( Mval != null || Sval != null ) {
                        long size = newItem.getSize();
-                       if( size > CACHE_LIMIT )
+                       if( size > LineageCacheEviction.getCacheLimit())
                                return; //not applicable
-                       if( !isBelowThreshold(size) ) 
-                               makeSpace(size);
-                       updateSize(size, true);
+                       if( !LineageCacheEviction.isBelowThreshold(size) ) 
+                               LineageCacheEviction.makeSpace(_cache, size);
+                       LineageCacheEviction.updateSize(size, true);
                }
                
                // Place the entry at head position.
-               setHead(newItem);
+               LineageCacheEviction.addEntry(newItem);
                
                _cache.put(key, newItem);
                if (DMLScript.STATISTICS)
                        LineageCacheStatistics.incrementMemWrites();
        }
        
-       private static Entry getIntern(LineageItem key) {
+       private static LineageCacheEntry getIntern(LineageItem key) {
                // This method is called only when entry is present either in 
cache or in local FS.
                if (_cache.containsKey(key)) {
                        // Read and put the entry at head.
-                       Entry e = _cache.get(key);
-                       delete(e);
-                       setHead(e);
+                       LineageCacheEntry e = _cache.get(key);
+                       // Maintain order for eviction
+                       LineageCacheEviction.getEntry(e);
                        if (DMLScript.STATISTICS)
                                LineageCacheStatistics.incrementMemHits();
                        return e;
                }
                else
-                       return readFromLocalFS(key);
+                       return LineageCacheEviction.readFromLocalFS(_cache, 
key);
        }
-
        
        private static void mvIntern(LineageItem item, LineageItem probeItem, 
long computetime) {
                if (ReuseCacheType.isNone())
@@ -352,21 +336,24 @@ public class LineageCache
                // Move the value from the cache entry with key probeItem to
                // the placeholder entry with key item.
                if (LineageCache.probe(probeItem)) {
-                       Entry oe = getIntern(probeItem);
-                       Entry e = _cache.get(item);
+                       LineageCacheEntry oe = getIntern(probeItem);
+                       LineageCacheEntry e = _cache.get(item);
                        if (oe.isMatrixValue())
                                e.setValue(oe.getMBValue(), computetime); 
                        else
                                e.setValue(oe.getSOValue(), computetime);
                        e._origItem = probeItem; 
+                       
+                       //maintain order for eviction
+                       LineageCacheEviction.addEntry(e);
 
                        long size = oe.getSize();
-                       if(!isBelowThreshold(size)) 
-                               makeSpace(size);
-                       updateSize(size, true);
+                       if(!LineageCacheEviction.isBelowThreshold(size)) 
+                               LineageCacheEviction.makeSpace(_cache, size);
+                       LineageCacheEviction.updateSize(size, true);
                }
                else
-                       removeEntry(item);  //remove the placeholder
+                       _cache.remove(item);    //remove the placeholder
        }
        
        private static boolean isMarkedForCaching (Instruction inst, 
ExecutionContext ec) {
@@ -383,153 +370,6 @@ public class LineageCache
                        return true;
        }
        
-       //---------------- CACHE SPACE MANAGEMENT METHODS -----------------
-       
-       private static boolean isBelowThreshold(long spaceNeeded) {
-               return ((spaceNeeded + _cachesize) <= CACHE_LIMIT);
-       }
-       
-       private static void makeSpace(long spaceNeeded) {
-               // cost based eviction
-               Entry e = _end;
-               while (e != _head)
-               {
-                       if ((spaceNeeded + _cachesize) <= CACHE_LIMIT)
-                               // Enough space recovered.
-                               break;
-
-                       if (!LineageCacheConfig.isSetSpill()) {
-                               // If eviction is disabled, just delete the 
entries.
-                               removeEntry(e);
-                               e = e._prev;
-                               continue;
-                       }
-
-                       if (!e.getCacheStatus().canEvict()) {
-                               // Don't delete if the entry's cache status 
doesn't allow.
-                               e = e._prev;
-                               continue;
-                       }
-
-                       double exectime = ((double) e._computeTime) / 1000000; 
// in milliseconds
-
-                       if (!e.isMatrixValue()) {
-                               // Skip scalar entries with higher computation 
time, as
-                               // those could be function/statementblock 
outputs.
-                               if (exectime < 
LineageCacheConfig.MIN_SPILL_TIME_ESTIMATE)
-                                       removeEntry(e);
-                               e = e._prev;
-                               continue;
-                       }
-
-                       // Estimate time to write to FS + read from FS.
-                       double spilltime = getDiskSpillEstimate(e) * 1000; // 
in milliseconds
-
-                       if (DEBUG) {
-                               if (exectime > 
LineageCacheConfig.MIN_SPILL_TIME_ESTIMATE) {
-                                       System.out.print("LI " + 
e._key.getOpcode());
-                                       System.out.print(" exec time " + 
((double) e._computeTime) / 1000000);
-                                       System.out.print(" estimate time " + 
getDiskSpillEstimate(e) * 1000);
-                                       System.out.print(" dim " + 
e.getMBValue().getNumRows() + " " + e.getMBValue().getNumColumns());
-                                       System.out.println(" size " + 
getDiskSizeEstimate(e));
-                               }
-                       }
-
-                       if (LineageCacheConfig.isSetSpill()) {
-                               if (spilltime < 
LineageCacheConfig.MIN_SPILL_TIME_ESTIMATE) {
-                                       // Can't trust the estimate if less 
than 100ms.
-                                       // Spill if it takes longer to 
recompute.
-                                       if (exectime >= 
LineageCacheConfig.MIN_SPILL_TIME_ESTIMATE)
-                                               spillToLocalFS(e);
-                               }
-                               else {
-                                       // Spill if it takes longer to 
recompute than spilling.
-                                       if (exectime > spilltime)
-                                               spillToLocalFS(e);
-                               }
-                       }
-
-                       // Remove the entry from cache.
-                       removeEntry(e);
-                       e = e._prev;
-               }
-       }
-
-       private static void updateSize(long space, boolean addspace) {
-               if (addspace)
-                       _cachesize += space;
-               else
-                       _cachesize -= space;
-       }
-
-       //---------------- COSTING RELATED METHODS -----------------
-
-       private static double getDiskSpillEstimate(Entry e) {
-               if (!e.isMatrixValue() || e.isNullVal())
-                       return 0;
-               // This includes sum of writing to and reading from disk
-               long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
-               double size = getDiskSizeEstimate(e);
-               double loadtime = isSparse(e) ? 
size/LineageCacheConfig.FSREAD_SPARSE : size/LineageCacheConfig.FSREAD_DENSE;
-               double writetime = isSparse(e) ? 
size/LineageCacheConfig.FSWRITE_SPARSE : size/LineageCacheConfig.FSWRITE_DENSE;
-
-               //double loadtime = CostEstimatorStaticRuntime.getFSReadTime(r, 
c, s);
-               //double writetime = 
CostEstimatorStaticRuntime.getFSWriteTime(r, c, s);
-               if (DMLScript.STATISTICS) 
-                       
LineageCacheStatistics.incrementCostingTime(System.nanoTime() - t0);
-               return loadtime + writetime;
-       }
-
-       private static double getDiskSizeEstimate(Entry e) {
-               if (!e.isMatrixValue() || e.isNullVal())
-                       return 0;
-               MatrixBlock mb = e.getMBValue();
-               long r = mb.getNumRows();
-               long c = mb.getNumColumns();
-               long nnz = mb.getNonZeros();
-               double s = OptimizerUtils.getSparsity(r, c, nnz);
-               double disksize = ((double)MatrixBlock.estimateSizeOnDisk(r, c, 
(long)(s*r*c))) / (1024*1024);
-               return disksize;
-       }
-       
-       private static void adjustReadWriteSpeed(Entry e, double IOtime, 
boolean read) {
-               double size = getDiskSizeEstimate(e);
-               if (!e.isMatrixValue() || size < 
LineageCacheConfig.MIN_SPILL_DATA)
-                       // Scalar or too small
-                       return; 
-               
-               long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
-               double newIOSpeed = size / IOtime; // MB per second 
-               // Adjust the read/write speed taking into account the last 
read/write.
-               // These constants will eventually converge to the real speed.
-               if (read) {
-                       if (isSparse(e))
-                               LineageCacheConfig.FSREAD_SPARSE = 
(LineageCacheConfig.FSREAD_SPARSE + newIOSpeed) / 2;
-                       else
-                               LineageCacheConfig.FSREAD_DENSE= 
(LineageCacheConfig.FSREAD_DENSE+ newIOSpeed) / 2;
-               }
-               else {
-                       if (isSparse(e))
-                               LineageCacheConfig.FSWRITE_SPARSE = 
(LineageCacheConfig.FSWRITE_SPARSE + newIOSpeed) / 2;
-                       else
-                               LineageCacheConfig.FSWRITE_DENSE= 
(LineageCacheConfig.FSWRITE_DENSE+ newIOSpeed) / 2;
-               }
-               if (DMLScript.STATISTICS) 
-                       
LineageCacheStatistics.incrementCostingTime(System.nanoTime() - t0);
-       }
-       
-       private static boolean isSparse(Entry e) {
-               if (!e.isMatrixValue() || e.isNullVal())
-                       return false;
-               MatrixBlock mb = e.getMBValue();
-               long r = mb.getNumRows();
-               long c = mb.getNumColumns();
-               long nnz = mb.getNonZeros();
-               double s = OptimizerUtils.getSparsity(r, c, nnz);
-               boolean sparse = MatrixBlock.evalSparseFormatOnDisk(r, c, 
(long)(s*r*c));
-               return sparse;
-       }
-       
        @Deprecated
        @SuppressWarnings("unused")
        private static double getRecomputeEstimate(Instruction inst, 
ExecutionContext ec) {
@@ -676,198 +516,4 @@ public class LineageCache
                }
                return nflops / (2L * 1024 * 1024 * 1024);
        }
-
-       // ---------------- I/O METHODS TO LOCAL FS -----------------
-       
-       private static void spillToLocalFS(Entry entry) {
-               if (!entry.isMatrixValue())
-                       throw new DMLRuntimeException ("Spilling scalar objects 
to disk is not allowd. Key: "+entry._key);
-               if (entry.isNullVal())
-                       throw new DMLRuntimeException ("Cannot spill null value 
to disk. Key: "+entry._key);
-               
-               long t0 = System.nanoTime();
-               if (_outdir == null) {
-                       _outdir = 
LocalFileUtils.getUniqueWorkingDir(LocalFileUtils.CATEGORY_LINEAGE);
-                       LocalFileUtils.createLocalFileIfNotExist(_outdir);
-               }
-               String outfile = _outdir+"/"+entry._key.getId();
-               try {
-                       LocalFileUtils.writeMatrixBlockToLocal(outfile, 
entry.getMBValue());
-               } catch (IOException e) {
-                       throw new DMLRuntimeException ("Write to " + outfile + 
" failed.", e);
-               }
-               long t1 = System.nanoTime();
-               // Adjust disk writing speed
-               adjustReadWriteSpeed(entry, ((double)(t1-t0))/1000000000, 
false);
-               
-               if (DMLScript.STATISTICS) {
-                       LineageCacheStatistics.incrementFSWriteTime(t1-t0);
-                       LineageCacheStatistics.incrementFSWrites();
-               }
-
-               _spillList.put(entry._key, new SpilledItem(outfile, 
entry._computeTime));
-       }
-       
-       private static Entry readFromLocalFS(LineageItem key) {
-               long t0 = System.nanoTime();
-               MatrixBlock mb = null;
-               // Read from local FS
-               try {
-                       mb = 
LocalFileUtils.readMatrixBlockFromLocal(_spillList.get(key)._outfile);
-               } catch (IOException e) {
-                       throw new DMLRuntimeException ("Read from " + 
_spillList.get(key)._outfile + " failed.", e);
-               }
-               // Restore to cache
-               LocalFileUtils.deleteFileIfExists(_spillList.get(key)._outfile, 
true);
-               long t1 = System.nanoTime();
-               putIntern(key, DataType.MATRIX, mb, null, 
_spillList.get(key)._computeTime);
-               // Adjust disk reading speed
-               adjustReadWriteSpeed(_cache.get(key), 
((double)(t1-t0))/1000000000, true);
-               //TODO: set cache status as RELOADED for this entry
-               _spillList.remove(key);
-               if (DMLScript.STATISTICS) {
-                       LineageCacheStatistics.incrementFSReadTime(t1-t0);
-                       LineageCacheStatistics.incrementFSHits();
-               }
-               return _cache.get(key);
-       }
-
-       //--------------- CACHE MAINTENANCE & LOOKUP FUNCTIONS ---------//
-       
-       private static void removeEntry(LineageItem key) {
-               // Remove the entry for key
-               if (!_cache.containsKey(key))
-                       return;
-               delete(_cache.get(key));
-               _cache.remove(key);
-       }
-
-       private static void removeEntry(Entry e) {
-               if (_cache.remove(e._key) == null)
-                       // Entry not found in cache
-                       return;
-
-               if (DMLScript.STATISTICS)
-                       _removelist.add(e._key);
-
-               _cachesize -= e.getSize();
-               delete(e);
-               if (DMLScript.STATISTICS)
-                       LineageCacheStatistics.incrementMemDeletes();
-       }
-
-       private static void delete(Entry entry) {
-               if (entry._prev != null)
-                       entry._prev._next = entry._next;
-               else
-                       _head = entry._next;
-               if (entry._next != null)
-                       entry._next._prev = entry._prev;
-               else
-                       _end = entry._prev;
-       }
-       
-       private static void setHead(Entry entry) {
-               entry._next = _head;
-               entry._prev = null;
-               if (_head != null)
-                       _head._prev = entry;
-               _head = entry;
-               if (_end == null)
-                       _end = _head;
-       }
-       
-       //---------------- INTERNAL CACHE DATA STRUCTURES ----------------//
-       
-       private static class Entry {
-               private final LineageItem _key;
-               private final DataType _dt;
-               private MatrixBlock _MBval;
-               private ScalarObject _SOval;
-               private long _computeTime;
-               private LineageCacheStatus _status;
-               private Entry _prev;
-               private Entry _next;
-               private LineageItem _origItem;
-               
-               public Entry(LineageItem key, DataType dt, MatrixBlock Mval, 
ScalarObject Sval, long computetime) {
-                       _key = key;
-                       _dt = dt;
-                       _MBval = Mval;
-                       _SOval = Sval;
-                       _computeTime = computetime;
-                       _status = isNullVal() ? LineageCacheStatus.EMPTY : 
LineageCacheStatus.CACHED;
-                       _origItem = null;
-               }
-
-               public synchronized MatrixBlock getMBValue() {
-                       try {
-                               //wait until other thread completes operation
-                               //in order to avoid redundant computation
-                               while( _MBval == null ) {
-                                       wait();
-                               }
-                               return _MBval;
-                       }
-                       catch( InterruptedException ex ) {
-                               throw new DMLRuntimeException(ex);
-                       }
-               }
-
-               public synchronized ScalarObject getSOValue() {
-                       try {
-                               //wait until other thread completes operation
-                               //in order to avoid redundant computation
-                               while( _SOval == null ) {
-                                       wait();
-                               }
-                               return _SOval;
-                       }
-                       catch( InterruptedException ex ) {
-                               throw new DMLRuntimeException(ex);
-                       }
-               }
-               
-               public synchronized LineageCacheStatus getCacheStatus() {
-                       return _status;
-               }
-               
-               public synchronized long getSize() {
-                       return ((_MBval != null ? _MBval.getInMemorySize() : 0) 
+ (_SOval != null ? _SOval.getSize() : 0));
-               }
-               
-               public boolean isNullVal() {
-                       return(_MBval == null && _SOval == null);
-               }
-               
-               public boolean isMatrixValue() {
-                       return _dt.isMatrix();
-               }
-               
-               public synchronized void setValue(MatrixBlock val, long 
computetime) {
-                       _MBval = val;
-                       _computeTime = computetime;
-                       _status = isNullVal() ? LineageCacheStatus.EMPTY : 
LineageCacheStatus.CACHED;
-                       //resume all threads waiting for val
-                       notifyAll();
-               }
-
-               public synchronized void setValue(ScalarObject val, long 
computetime) {
-                       _SOval = val;
-                       _computeTime = computetime;
-                       _status = isNullVal() ? LineageCacheStatus.EMPTY : 
LineageCacheStatus.CACHED;
-                       //resume all threads waiting for val
-                       notifyAll();
-               }
-       }
-       
-       private static class SpilledItem {
-               String _outfile;
-               long _computeTime;
-
-               public SpilledItem(String outfile, long computetime) {
-                       _outfile = outfile;
-                       _computeTime = computetime;
-               }
-       }
 }
diff --git 
a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheConfig.java 
b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheConfig.java
index 3268840..888d27d 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheConfig.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheConfig.java
@@ -78,6 +78,14 @@ public class LineageCacheConfig {
                        return this == CACHED || this == RELOADED;
                }
         }
+        
+        public enum LineageCachePolicy {
+                LRU,
+                WEIGHTED;
+                public boolean isLRUcache() {
+                        return this == LRU;
+                }
+        }
        
        public ArrayList<String> _MMult = new ArrayList<>();
        public static boolean _allowSpill = true;
@@ -95,10 +103,13 @@ public class LineageCacheConfig {
        private static ReuseCacheType _cacheType = null;
        private static CachedItemHead _itemH = null;
        private static CachedItemTail _itemT = null;
+       private static LineageCachePolicy _cachepolicy = null;
        private static boolean _compilerAssistedRW = true;
+
        static {
                //setup static configuration parameters
                setSpill(true); //enable/disable disk spilling.
+               setCachePolicy(LineageCachePolicy.WEIGHTED);
        }
        
        public static boolean isReusable (Instruction inst, ExecutionContext 
ec) {
@@ -152,9 +163,17 @@ public class LineageCacheConfig {
                _allowSpill = toSpill;
        }
        
+       public static void setCachePolicy(LineageCachePolicy policy) {
+               _cachepolicy = policy;
+       }
+       
        public static boolean isSetSpill() {
                return _allowSpill;
        }
+       
+       public static LineageCachePolicy getCachePolicy() {
+               return _cachepolicy;
+       }
 
        public static ReuseCacheType getCacheType() {
                return _cacheType;
@@ -176,4 +195,5 @@ public class LineageCacheConfig {
        public static boolean getCompAssRW() {
                return _compilerAssistedRW;
        }
+
 }
diff --git 
a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEntry.java 
b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEntry.java
new file mode 100644
index 0000000..9421208
--- /dev/null
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEntry.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.lineage;
+
+import org.apache.sysds.common.Types.DataType;
+import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.instructions.cp.ScalarObject;
+import org.apache.sysds.runtime.lineage.LineageCacheConfig.LineageCacheStatus;
+import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+
+public class LineageCacheEntry {
+       protected final LineageItem _key;
+       protected final DataType _dt;
+       protected MatrixBlock _MBval;
+       protected ScalarObject _SOval;
+       protected long _computeTime;
+       protected LineageCacheStatus _status;
+       protected LineageCacheEntry _prev;
+       protected LineageCacheEntry _next;
+       protected LineageItem _origItem;
+       
+       public LineageCacheEntry(LineageItem key, DataType dt, MatrixBlock 
Mval, ScalarObject Sval, long computetime) {
+               _key = key;
+               _dt = dt;
+               _MBval = Mval;
+               _SOval = Sval;
+               _computeTime = computetime;
+               _status = isNullVal() ? LineageCacheStatus.EMPTY : 
LineageCacheStatus.CACHED;
+               _origItem = null;
+       }
+       
+       protected synchronized void setCacheStatus(LineageCacheStatus st) {
+               _status = st;
+       }
+
+       public synchronized MatrixBlock getMBValue() {
+               try {
+                       //wait until other thread completes operation
+                       //in order to avoid redundant computation
+                       while( _MBval == null ) {
+                               wait();
+                       }
+                       return _MBval;
+               }
+               catch( InterruptedException ex ) {
+                       throw new DMLRuntimeException(ex);
+               }
+       }
+
+       public synchronized ScalarObject getSOValue() {
+               try {
+                       //wait until other thread completes operation
+                       //in order to avoid redundant computation
+                       while( _SOval == null ) {
+                               wait();
+                       }
+                       return _SOval;
+               }
+               catch( InterruptedException ex ) {
+                       throw new DMLRuntimeException(ex);
+               }
+       }
+       
+       public synchronized LineageCacheStatus getCacheStatus() {
+               return _status;
+       }
+       
+       public synchronized long getSize() {
+               return ((_MBval != null ? _MBval.getInMemorySize() : 0) + 
(_SOval != null ? _SOval.getSize() : 0));
+       }
+       
+       public boolean isNullVal() {
+               return(_MBval == null && _SOval == null);
+       }
+       
+       public boolean isMatrixValue() {
+               return _dt.isMatrix();
+       }
+       
+       public synchronized void setValue(MatrixBlock val, long computetime) {
+               _MBval = val;
+               _computeTime = computetime;
+               _status = isNullVal() ? LineageCacheStatus.EMPTY : 
LineageCacheStatus.CACHED;
+               //resume all threads waiting for val
+               notifyAll();
+       }
+
+       public synchronized void setValue(ScalarObject val, long computetime) {
+               _SOval = val;
+               _computeTime = computetime;
+               _status = isNullVal() ? LineageCacheStatus.EMPTY : 
LineageCacheStatus.CACHED;
+               //resume all threads waiting for val
+               notifyAll();
+       }
+}
diff --git 
a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEviction.java 
b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEviction.java
new file mode 100644
index 0000000..f3c2c0e
--- /dev/null
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEviction.java
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.lineage;
+
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.PriorityQueue;
+
+import org.apache.sysds.api.DMLScript;
+import org.apache.sysds.common.Types.DataType;
+import org.apache.sysds.hops.OptimizerUtils;
+import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.lineage.LineageCacheConfig.LineageCacheStatus;
+import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+import org.apache.sysds.runtime.util.LocalFileUtils;
+
+public class LineageCacheEviction
+{
+       private static LineageCacheEntry _head = null;
+       private static LineageCacheEntry _end = null;
+       private static long _cachesize = 0;
+       private static long CACHE_LIMIT; //limit in bytes
+       protected static final HashSet<LineageItem> _removelist = new 
HashSet<>();
+       private static final Map<LineageItem, SpilledItem> _spillList = new 
HashMap<>();
+       private static String _outdir = null;
+       
+       private static Comparator<LineageCacheEntry> execTime2SizeComparator = 
(e1, e2) -> {
+               double t2s1 = ((double)e1._computeTime)/e1.getSize();
+               double t2s2 = ((double)e2._computeTime)/e2.getSize();
+               return t2s1 == t2s2 ? 0 : t2s1 < t2s2 ? -1 : 1;
+       };
+       
+       private static PriorityQueue<LineageCacheEntry> weightedQueue = new 
PriorityQueue<>(execTime2SizeComparator);
+       
+       protected static void resetEviction() {
+               _head = null;
+               _end = null;
+               // reset cache size, otherwise the cache clear leads to 
unusable 
+               // space which means evictions could run into endless loops
+               _cachesize = 0;
+               _spillList.clear();
+               weightedQueue.clear();
+               _outdir = null;
+               if (DMLScript.STATISTICS)
+                       _removelist.clear();
+       }
+
+       //--------------- CACHE MAINTENANCE & LOOKUP FUNCTIONS ---------//
+       
+       protected static void addEntry(LineageCacheEntry entry) {
+               if (entry.isNullVal())
+                       // Placeholders shouldn't be evicted.
+                       return;
+
+               double exectime = ((double) entry._computeTime) / 1000000; // 
in milliseconds
+               if (!entry.isMatrixValue() && exectime >= 
LineageCacheConfig.MIN_SPILL_TIME_ESTIMATE)
+                       // Pin the entries having scalar values and with higher 
computation time
+                       // to memory, to save those from eviction. Scalar 
values are
+                       // not spilled to disk and are just deleted. Scalar 
entries associated 
+                       // with high computation time might contain function 
outputs. Pinning them
+                       // will increase chances of multilevel reuse.
+                       entry.setCacheStatus(LineageCacheStatus.PINNED);
+
+               if (LineageCacheConfig.getCachePolicy().isLRUcache()) //LRU 
+                       // Maintain linked list.
+                       setHead(entry);
+               else {
+                       if (entry.isMatrixValue() || exectime < 
LineageCacheConfig.MIN_SPILL_TIME_ESTIMATE)
+                               // Don't add the memory pinned entries in 
weighted queue. 
+                               // The priorityQueue should contain only 
entries that can
+                               // be removed or spilled to disk.
+                               weightedQueue.add(entry);
+               }
+       }
+       
+       protected static void getEntry(LineageCacheEntry entry) {
+               if (LineageCacheConfig.getCachePolicy().isLRUcache()) { //LRU 
+                       // maintain linked list.
+                       delete(entry);
+                       setHead(entry);
+               }
+               // No maintenance is required for weighted scheme
+       }
+
+       protected static void removeEntry(Map<LineageItem, LineageCacheEntry> 
cache, LineageItem key) {
+               if (!cache.containsKey(key))
+                       return;
+               if (LineageCacheConfig.getCachePolicy().isLRUcache()) //LRU 
+                       delete(cache.get(key));
+               else
+                       weightedQueue.remove(cache.get(key));
+               cache.remove(key);
+       }
+
+       private static void removeEntry(Map<LineageItem, LineageCacheEntry> 
cache, LineageCacheEntry e) {
+               if (DMLScript.STATISTICS)
+                       _removelist.add(e._key);
+
+               if (LineageCacheConfig.getCachePolicy().isLRUcache()) //LRU 
+                       delete(e);
+               _cachesize -= e.getSize();
+               if (DMLScript.STATISTICS)
+                       LineageCacheStatistics.incrementMemDeletes();
+       }
+
+       private static void delete(LineageCacheEntry entry) {
+               if (entry._prev != null)
+                       entry._prev._next = entry._next;
+               else
+                       _head = entry._next;
+               if (entry._next != null)
+                       entry._next._prev = entry._prev;
+               else
+                       _end = entry._prev;
+       }
+       
+       protected static void setHead(LineageCacheEntry entry) {
+               entry._next = _head;
+               entry._prev = null;
+               if (_head != null)
+                       _head._prev = entry;
+               _head = entry;
+               if (_end == null)
+                       _end = _head;
+       }
+       
+       //---------------- CACHE SPACE MANAGEMENT METHODS -----------------
+       
+       protected static void setCacheLimit(long limit) {
+               CACHE_LIMIT = limit;
+       }
+
+       protected static long getCacheLimit() {
+               return CACHE_LIMIT;
+       }
+       
+       protected static void updateSize(long space, boolean addspace) {
+               if (addspace)
+                       _cachesize += space;
+               else
+                       _cachesize -= space;
+       }
+
+       protected static boolean isBelowThreshold(long spaceNeeded) {
+               return ((spaceNeeded + _cachesize) <= CACHE_LIMIT);
+       }
+
+       protected static void makeSpace(Map<LineageItem, LineageCacheEntry> 
cache, long spaceNeeded) {
+               //Cost based eviction
+               //TODO better generalization of the different policies (e.g.,
+               //_head in below condition is only used when LRU is active)
+               boolean isLRU = 
LineageCacheConfig.getCachePolicy().isLRUcache();
+               LineageCacheEntry e = isLRU ? _end : weightedQueue.poll();
+               while (e != _head && e != null)
+               {
+                       if ((spaceNeeded + _cachesize) <= CACHE_LIMIT)
+                               // Enough space recovered.
+                               break;
+
+                       if (!LineageCacheConfig.isSetSpill()) {
+                               // If eviction is disabled, just delete the 
entries.
+                               if (cache.remove(e._key) != null)
+                                       removeEntry(cache, e);
+                               e = isLRU ? e._prev : weightedQueue.poll();
+                               continue;
+                       }
+
+                       if (!e.getCacheStatus().canEvict() && isLRU) {
+                               // Don't delete if the entry's cache status 
doesn't allow.
+                               // Note: no action needed for weightedQueue as 
these entries 
+                               //       are not part of weightedQueue.
+                               e = e._prev;
+                               continue;
+                       }
+
+                       double exectime = ((double) e._computeTime) / 1000000; 
// in milliseconds
+
+                       if (!e.isMatrixValue()) {
+                               // No spilling for scalar entries. Just delete 
those.
+                               // Note: scalar entries with higher computation 
time are pinned.
+                               if (cache.remove(e._key) != null)
+                                       removeEntry(cache, e);
+                               e = isLRU ? e._prev : weightedQueue.poll();
+                               continue;
+                       }
+
+                       // Estimate time to write to FS + read from FS.
+                       double spilltime = getDiskSpillEstimate(e) * 1000; // 
in milliseconds
+
+                       if (LineageCache.DEBUG) {
+                               if (exectime > 
LineageCacheConfig.MIN_SPILL_TIME_ESTIMATE) {
+                                       System.out.print("LI " + 
e._key.getOpcode());
+                                       System.out.print(" exec time " + 
((double) e._computeTime) / 1000000);
+                                       System.out.print(" estimate time " + 
getDiskSpillEstimate(e) * 1000);
+                                       System.out.print(" dim " + 
e.getMBValue().getNumRows() + " " + e.getMBValue().getNumColumns());
+                                       System.out.println(" size " + 
getDiskSizeEstimate(e));
+                               }
+                       }
+
+                       if (spilltime < 
LineageCacheConfig.MIN_SPILL_TIME_ESTIMATE) {
+                               // Can't trust the estimate if less than 100ms.
+                               // Spill if it takes longer to recompute.
+                               if (exectime >= 
LineageCacheConfig.MIN_SPILL_TIME_ESTIMATE)
+                                       spillToLocalFS(e);
+                       }
+                       else {
+                               // Spill if it takes longer to recompute than 
spilling.
+                               if (exectime > spilltime)
+                                       spillToLocalFS(e);
+                       }
+
+                       // Remove the entry from cache.
+                       if (cache.remove(e._key) != null)
+                               removeEntry(cache, e);
+                       e = isLRU ? e._prev : weightedQueue.poll();
+               }
+       }
+
+       //---------------- COSTING RELATED METHODS -----------------
+
+       private static double getDiskSpillEstimate(LineageCacheEntry e) {
+               if (!e.isMatrixValue() || e.isNullVal())
+                       return 0;
+               // This includes sum of writing to and reading from disk
+               long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
+               double size = getDiskSizeEstimate(e);
+               double loadtime = isSparse(e) ? 
size/LineageCacheConfig.FSREAD_SPARSE : size/LineageCacheConfig.FSREAD_DENSE;
+               double writetime = isSparse(e) ? 
size/LineageCacheConfig.FSWRITE_SPARSE : size/LineageCacheConfig.FSWRITE_DENSE;
+
+               //double loadtime = CostEstimatorStaticRuntime.getFSReadTime(r, 
c, s);
+               //double writetime = 
CostEstimatorStaticRuntime.getFSWriteTime(r, c, s);
+               if (DMLScript.STATISTICS) 
+                       
LineageCacheStatistics.incrementCostingTime(System.nanoTime() - t0);
+               return loadtime + writetime;
+       }
+
+       private static double getDiskSizeEstimate(LineageCacheEntry e) {
+               if (!e.isMatrixValue() || e.isNullVal())
+                       return 0;
+               MatrixBlock mb = e.getMBValue();
+               long r = mb.getNumRows();
+               long c = mb.getNumColumns();
+               long nnz = mb.getNonZeros();
+               double s = OptimizerUtils.getSparsity(r, c, nnz);
+               double disksize = ((double)MatrixBlock.estimateSizeOnDisk(r, c, 
(long)(s*r*c))) / (1024*1024);
+               return disksize;
+       }
+       
+       private static void adjustReadWriteSpeed(LineageCacheEntry e, double 
IOtime, boolean read) {
+               double size = getDiskSizeEstimate(e);
+               if (!e.isMatrixValue() || size < 
LineageCacheConfig.MIN_SPILL_DATA)
+                       // Scalar or too small
+                       return; 
+               
+               long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
+               double newIOSpeed = size / IOtime; // MB per second 
+               // Adjust the read/write speed taking into account the last 
read/write.
+               // These constants will eventually converge to the real speed.
+               if (read) {
+                       if (isSparse(e))
+                               LineageCacheConfig.FSREAD_SPARSE = 
(LineageCacheConfig.FSREAD_SPARSE + newIOSpeed) / 2;
+                       else
+                               LineageCacheConfig.FSREAD_DENSE= 
(LineageCacheConfig.FSREAD_DENSE+ newIOSpeed) / 2;
+               }
+               else {
+                       if (isSparse(e))
+                               LineageCacheConfig.FSWRITE_SPARSE = 
(LineageCacheConfig.FSWRITE_SPARSE + newIOSpeed) / 2;
+                       else
+                               LineageCacheConfig.FSWRITE_DENSE= 
(LineageCacheConfig.FSWRITE_DENSE+ newIOSpeed) / 2;
+               }
+               if (DMLScript.STATISTICS) 
+                       
LineageCacheStatistics.incrementCostingTime(System.nanoTime() - t0);
+       }
+       
+       private static boolean isSparse(LineageCacheEntry e) {
+               if (!e.isMatrixValue() || e.isNullVal())
+                       return false;
+               return e.getMBValue().isInSparseFormat();
+       }
+
+       // ---------------- I/O METHODS TO LOCAL FS -----------------
+       
+       private static void spillToLocalFS(LineageCacheEntry entry) {
+               if (!entry.isMatrixValue())
+                       throw new DMLRuntimeException ("Spilling scalar objects 
to disk is not allowd. Key: "+entry._key);
+               if (entry.isNullVal())
+                       throw new DMLRuntimeException ("Cannot spill null value 
to disk. Key: "+entry._key);
+               
+               long t0 = System.nanoTime();
+               if (_outdir == null) {
+                       _outdir = 
LocalFileUtils.getUniqueWorkingDir(LocalFileUtils.CATEGORY_LINEAGE);
+                       LocalFileUtils.createLocalFileIfNotExist(_outdir);
+               }
+               String outfile = _outdir+"/"+entry._key.getId();
+               try {
+                       LocalFileUtils.writeMatrixBlockToLocal(outfile, 
entry.getMBValue());
+               } catch (IOException e) {
+                       throw new DMLRuntimeException ("Write to " + outfile + 
" failed.", e);
+               }
+               long t1 = System.nanoTime();
+               // Adjust disk writing speed
+               adjustReadWriteSpeed(entry, ((double)(t1-t0))/1000000000, 
false);
+               
+               if (DMLScript.STATISTICS) {
+                       LineageCacheStatistics.incrementFSWriteTime(t1-t0);
+                       LineageCacheStatistics.incrementFSWrites();
+               }
+
+               _spillList.put(entry._key, new SpilledItem(outfile, 
entry._computeTime));
+       }
+
+       protected static LineageCacheEntry readFromLocalFS(Map<LineageItem, 
LineageCacheEntry> cache, LineageItem key) {
+               long t0 = System.nanoTime();
+               MatrixBlock mb = null;
+               // Read from local FS
+               try {
+                       mb = 
LocalFileUtils.readMatrixBlockFromLocal(_spillList.get(key)._outfile);
+               } catch (IOException e) {
+                       throw new DMLRuntimeException ("Read from " + 
_spillList.get(key)._outfile + " failed.", e);
+               }
+               // Restore to cache
+               LocalFileUtils.deleteFileIfExists(_spillList.get(key)._outfile, 
true);
+               long t1 = System.nanoTime();
+               LineageCache.putIntern(key, DataType.MATRIX, mb, null, 
_spillList.get(key)._computeTime);
+               // Adjust disk reading speed
+               adjustReadWriteSpeed(cache.get(key), 
((double)(t1-t0))/1000000000, true);
+               // TODO: set cache status as RELOADED for this entry
+               _spillList.remove(key);
+               if (DMLScript.STATISTICS) {
+                       LineageCacheStatistics.incrementFSReadTime(t1-t0);
+                       LineageCacheStatistics.incrementFSHits();
+               }
+               return cache.get(key);
+       }
+       
+       protected static boolean spillListContains(LineageItem key) {
+               return _spillList.containsKey(key);
+       }
+
+       // ---------------- INTERNAL DATA STRUCTURES FOR EVICTION 
-----------------
+
+       private static class SpilledItem {
+               String _outfile;
+               long _computeTime;
+
+               public SpilledItem(String outfile, long computetime) {
+                       _outfile = outfile;
+                       _computeTime = computetime;
+               }
+       }
+}

Reply via email to