This is an automated email from the ASF dual-hosted git repository.

mboehm7 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemml.git


The following commit(s) were added to refs/heads/master by this push:
     new 722eaf6  [SYSTEMDS-411] Efficient multi-level lineage cache management
722eaf6 is described below

commit 722eaf6eb13efade8e80552b9ef07f611e6ddc1a
Author: arnabp <[email protected]>
AuthorDate: Mon Jun 1 22:54:02 2020 +0200

    [SYSTEMDS-411] Efficient multi-level lineage cache management
    
    This patch improves the handling of multiple cache entries pointing to
    the same data (due to multilevel caching).
    
    1) All the entries with the same values are connected with a linkedlist.
    Even though they output same data, they have different computation time.
    
    2) Eviction logic marks an entry for deferred spilling/removal if other
     entries are linked to that. If all the entries in a list are marked for
     spilling or removal, only then we evict the item.
    
    3) Disk write and read happen only once for all the items connected to a
     single matrix. This way single read and write restores multiple entries
     to cache and clears more space respectively.
    
    4) Initial experiments show huge improvements in cache management. Now
     the cache can store many more entries (this patch fixes duplicate size
     calculations), need reduced number of disk I/O. These changes overall
     improve cache hit count.
    
    Closes #932.
---
 dev/docs/Tasks.txt                                 |   3 +
 .../apache/sysds/runtime/lineage/LineageCache.java |  22 ++--
 .../sysds/runtime/lineage/LineageCacheConfig.java  |   9 +-
 .../sysds/runtime/lineage/LineageCacheEntry.java   |  12 ++
 .../runtime/lineage/LineageCacheEviction.java      | 137 ++++++++++++++++-----
 5 files changed, 139 insertions(+), 44 deletions(-)

diff --git a/dev/docs/Tasks.txt b/dev/docs/Tasks.txt
index 6b5dbb0..9a51eb5 100644
--- a/dev/docs/Tasks.txt
+++ b/dev/docs/Tasks.txt
@@ -314,5 +314,8 @@ SYSTEMDS-390 New Builtin Functions IV
 SYSTEMDS-400 Spark Backend Improvements
  * 401 Fix output block indexes of rdiag (diagM2V)                    OK
 
+SYSTEMDS-410 Lineage Tracing, Reuse and Integration II
+ * 411 Improved handling of multi-level cache duplicates              OK 
+
 Others:
  * Break append instruction to cbind and rbind 
diff --git a/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java 
b/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
index ac54b70..ca6b349 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
@@ -39,6 +39,7 @@ import org.apache.sysds.runtime.instructions.cp.Data;
 import org.apache.sysds.runtime.instructions.cp.MMTSJCPInstruction;
 import 
org.apache.sysds.runtime.instructions.cp.ParameterizedBuiltinCPInstruction;
 import org.apache.sysds.runtime.instructions.cp.ScalarObject;
+import org.apache.sysds.runtime.lineage.LineageCacheConfig.LineageCacheStatus;
 import org.apache.sysds.runtime.lineage.LineageCacheConfig.ReuseCacheType;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 import org.apache.sysds.runtime.meta.MetaDataFormat;
@@ -255,7 +256,7 @@ public class LineageCache
                        LineageItem boundLI = ec.getLineage().get(boundVarName);
                        if (boundLI != null)
                                boundLI.resetVisitStatus();
-                       if (boundLI == null || !LineageCache.probe(li)) {
+                       if (boundLI == null || !LineageCache.probe(li) || 
!LineageCache.probe(boundLI)) {
                                AllOutputsCacheable = false;
                        }
                        FuncLIMap.put(li, boundLI);
@@ -282,7 +283,7 @@ public class LineageCache
        
        //----------------- INTERNAL CACHE LOGIC IMPLEMENTATION --------------//
        
-       protected static void putIntern(LineageItem key, DataType dt, 
MatrixBlock Mval, ScalarObject Sval, long computetime) {
+       private static void putIntern(LineageItem key, DataType dt, MatrixBlock 
Mval, ScalarObject Sval, long computetime) {
                if (_cache.containsKey(key))
                        //can come here if reuse_partial option is enabled
                        return;
@@ -300,7 +301,7 @@ public class LineageCache
                        LineageCacheEviction.updateSize(size, true);
                }
                
-               // Place the entry at head position.
+               // Place the entry in the weighted queue.
                LineageCacheEviction.addEntry(newItem);
                
                _cache.put(key, newItem);
@@ -310,8 +311,7 @@ public class LineageCache
        
        private static LineageCacheEntry getIntern(LineageItem key) {
                // This method is called only when entry is present either in 
cache or in local FS.
-               if (_cache.containsKey(key)) {
-                       // Read and put the entry at head.
+               if (_cache.containsKey(key) && _cache.get(key).getCacheStatus() 
!= LineageCacheStatus.SPILLED) {
                        LineageCacheEntry e = _cache.get(key);
                        // Maintain order for eviction
                        LineageCacheEviction.getEntry(e);
@@ -336,14 +336,16 @@ public class LineageCache
                        else
                                e.setValue(oe.getSOValue(), computetime);
                        e._origItem = probeItem; 
+                       // Add the SB/func entry to the end of the list of 
items pointing to the same data.
+                       // No cache size update is necessary.
+                       LineageCacheEntry tmp = oe;
+                       // Maintain _origItem as head.
+                       while (tmp._nextEntry != null)
+                               tmp = tmp._nextEntry;
+                       tmp._nextEntry = e;
                        
                        //maintain order for eviction
                        LineageCacheEviction.addEntry(e);
-
-                       long size = oe.getSize();
-                       if(!LineageCacheEviction.isBelowThreshold(size)) 
-                               LineageCacheEviction.makeSpace(_cache, size);
-                       LineageCacheEviction.updateSize(size, true);
                }
                else
                        _cache.remove(item);    //remove the placeholder
diff --git 
a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheConfig.java 
b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheConfig.java
index 2a3c426..7fce53b 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheConfig.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheConfig.java
@@ -33,8 +33,9 @@ public class LineageCacheConfig
        //-------------CACHING LOGIC RELATED CONFIGURATIONS--------------//
 
        private static final String[] REUSE_OPCODES = new String[] {
-               "tsmm", "ba+*", "*", "/", "+", "nrow", "ncol", "round", "exp", 
"log",
+               "tsmm", "ba+*", "*", "/", "+", "||", "nrow", "ncol", "round", 
"exp", "log",
                "rightIndex", "leftIndex", "groupedagg", "r'", "solve", "spoof"
+               //TODO: Reuse everything. 
        };
        
        public enum ReuseCacheType {
@@ -97,9 +98,11 @@ public class LineageCacheConfig
        protected enum LineageCacheStatus {
                EMPTY,     //Placeholder with no data. Cannot be evicted.
                CACHED,    //General cached data. Can be evicted.
-               EVICTED,   //Data is in disk. Empty value. Cannot be evicted.
+               SPILLED,   //Data is in disk. Empty value. Cannot be evicted.
                RELOADED,  //Reloaded from disk. Can be evicted.
-               PINNED;    //Pinned to memory. Cannot be evicted.
+               PINNED,    //Pinned to memory. Cannot be evicted.
+               TOSPILL,   //To be spilled lazily 
+               TODELETE;  //TO be removed lazily
                public boolean canEvict() {
                        return this == CACHED || this == RELOADED;
                }
diff --git 
a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEntry.java 
b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEntry.java
index 485cac6..256d85f 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEntry.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEntry.java
@@ -33,6 +33,7 @@ public class LineageCacheEntry {
        protected long _computeTime;
        protected long _timestamp = 0;
        protected LineageCacheStatus _status;
+       protected LineageCacheEntry _nextEntry;
        protected LineageItem _origItem;
        
        public LineageCacheEntry(LineageItem key, DataType dt, MatrixBlock 
Mval, ScalarObject Sval, long computetime) {
@@ -42,6 +43,7 @@ public class LineageCacheEntry {
                _SOval = Sval;
                _computeTime = computetime;
                _status = isNullVal() ? LineageCacheStatus.EMPTY : 
LineageCacheStatus.CACHED;
+               _nextEntry = null;
                _origItem = null;
        }
        
@@ -100,6 +102,10 @@ public class LineageCacheEntry {
                //resume all threads waiting for val
                notifyAll();
        }
+       
+       public synchronized void setValue(MatrixBlock val) {
+               setValue(val, _computeTime);
+       }
 
        public synchronized void setValue(ScalarObject val, long computetime) {
                _SOval = val;
@@ -109,6 +115,12 @@ public class LineageCacheEntry {
                notifyAll();
        }
        
+       protected synchronized void setNullValues() {
+               _MBval = null;
+               _SOval = null;
+               _status = LineageCacheStatus.EMPTY;
+       }
+       
        protected synchronized void setTimestamp() {
                _timestamp = System.currentTimeMillis();
        }
diff --git 
a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEviction.java 
b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEviction.java
index 127e152..b83a78b 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEviction.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEviction.java
@@ -26,7 +26,6 @@ import java.util.Map;
 import java.util.TreeSet;
 
 import org.apache.sysds.api.DMLScript;
-import org.apache.sysds.common.Types.DataType;
 import org.apache.sysds.hops.OptimizerUtils;
 import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.lineage.LineageCacheConfig.LineageCacheStatus;
@@ -89,22 +88,68 @@ public class LineageCacheEviction
                }
        }
 
-       protected static void removeEntry(Map<LineageItem, LineageCacheEntry> 
cache, LineageItem key) {
-               if (!cache.containsKey(key))
-                       return;
-               weightedQueue.remove(cache.get(key));
-               cache.remove(key);
-       }
-
        private static void removeEntry(Map<LineageItem, LineageCacheEntry> 
cache, LineageCacheEntry e) {
-               if (DMLScript.STATISTICS)
-                       _removelist.add(e._key);
+               if (cache.remove(e._key) != null)
+                       _cachesize -= e.getSize();
 
-               _cachesize -= e.getSize();
-               // NOTE: The caller of this method maintains the cache and the 
eviction queue.
-
-               if (DMLScript.STATISTICS)
+               if (DMLScript.STATISTICS) {
+                       _removelist.add(e._key);
                        LineageCacheStatistics.incrementMemDeletes();
+               }
+               // NOTE: The caller of this method maintains the eviction queue.
+       }
+       private static void removeOrSpillEntry(Map<LineageItem, 
LineageCacheEntry> cache, LineageCacheEntry e, boolean spill) {
+               if (e._origItem == null) {
+                       // Single entry. Remove or spill.
+                       if (spill)
+                               spillToLocalFS(cache, e);
+                       else
+                               removeEntry(cache, e);
+                       return;
+               }
+               
+               // Defer the eviction till all the entries with the same matrix 
are evicted.
+               e.setCacheStatus(spill ? LineageCacheStatus.TOSPILL : 
LineageCacheStatus.TODELETE);
+
+               // If all the entries with the same data are evicted, check if 
deferred spilling 
+               // is set for any of those. If so, spill the matrix to disk and 
set null in the 
+               // cache entries. Keeping the spilled entries removes the need 
to use another 
+               // data structure and also maintains the connections between 
items pointing to the 
+               // same data. Delete all the entries if all are set to be 
deleted.
+               boolean write = false;
+               LineageCacheEntry tmp = cache.get(e._origItem); //head
+               while (tmp != null) {
+                       if (tmp.getCacheStatus() != LineageCacheStatus.TOSPILL
+                               && tmp.getCacheStatus() != 
LineageCacheStatus.TODELETE)
+                               return; //do nothing
+
+                       write |= (tmp.getCacheStatus() == 
LineageCacheStatus.TOSPILL);
+                       tmp = tmp._nextEntry;
+               }
+               if (write) {
+                       // Spill to disk if at least one entry has status 
TOSPILL. 
+                       spillToLocalFS(cache, cache.get(e._origItem));
+                       LineageCacheEntry h = cache.get(e._origItem);
+                       while (h != null) {
+                               // Set values to null for all the entries.
+                               h.setNullValues();
+                               // Set status to spilled for all the entries.
+                               h.setCacheStatus(LineageCacheStatus.SPILLED);
+                               h = h._nextEntry;
+                       }
+                       // Keep them in cache.
+                       return;
+               }
+               // All are set to be deleted.
+               else {
+                       // Remove all the entries from cache.
+                       LineageCacheEntry h = cache.get(e._origItem);
+                       while (h != null) {
+                               removeEntry(cache, h);
+                               h = h._nextEntry;
+                       }
+               }
+               // NOTE: The callers of this method maintain the eviction queue.
        }
 
        //---------------- CACHE SPACE MANAGEMENT METHODS -----------------//
@@ -113,6 +158,7 @@ public class LineageCacheEviction
                CACHE_LIMIT = limit;
        }
 
+       //Note: public for spilling tests
        public static long getCacheLimit() {
                return CACHE_LIMIT;
        }
@@ -139,8 +185,7 @@ public class LineageCacheEviction
 
                        if (!LineageCacheConfig.isSetSpill()) {
                                // If eviction is disabled, just delete the 
entries.
-                               if (cache.remove(e._key) != null)
-                                       removeEntry(cache, e);
+                               removeOrSpillEntry(cache, e, false);
                                e = weightedQueue.pollFirst();
                                continue;
                        }
@@ -157,8 +202,7 @@ public class LineageCacheEviction
                        if (!e.isMatrixValue()) {
                                // No spilling for scalar entries. Just delete 
those.
                                // Note: scalar entries with higher computation 
time are pinned.
-                               if (cache.remove(e._key) != null)
-                                       removeEntry(cache, e);
+                               removeOrSpillEntry(cache, e, false);
                                e = weightedQueue.pollFirst();
                                continue;
                        }
@@ -180,17 +224,21 @@ public class LineageCacheEviction
                                // Can't trust the estimate if less than 100ms.
                                // Spill if it takes longer to recompute.
                                if (exectime >= 
LineageCacheConfig.MIN_SPILL_TIME_ESTIMATE)
-                                       spillToLocalFS(e);
+                                       //spillToLocalFS(e);
+                                       removeOrSpillEntry(cache, e, true);  
//spill
+                               else
+                                       removeOrSpillEntry(cache, e, false); 
//delete
                        }
                        else {
                                // Spill if it takes longer to recompute than 
spilling.
                                if (exectime > spilltime)
-                                       spillToLocalFS(e);
+                                       //spillToLocalFS(e);
+                                       removeOrSpillEntry(cache, e, true);  
//spill
+                               else
+                                       removeOrSpillEntry(cache, e, false); 
//delete
                        }
 
                        // Remove the entry from cache.
-                       if (cache.remove(e._key) != null)
-                               removeEntry(cache, e);
                        e = weightedQueue.pollFirst();
                }
        }
@@ -259,7 +307,7 @@ public class LineageCacheEviction
 
        // ---------------- I/O METHODS TO LOCAL FS -----------------
        
-       private static void spillToLocalFS(LineageCacheEntry entry) {
+       private static void spillToLocalFS(Map<LineageItem, LineageCacheEntry> 
cache, LineageCacheEntry entry) {
                if (!entry.isMatrixValue())
                        throw new DMLRuntimeException ("Spilling scalar objects 
to disk is not allowd. Key: "+entry._key);
                if (entry.isNullVal())
@@ -280,15 +328,28 @@ public class LineageCacheEviction
                // Adjust disk writing speed
                adjustReadWriteSpeed(entry, ((double)(t1-t0))/1000000000, 
false);
                
+               // Add all the entries associated with this matrix to spillList.
+               if (entry._origItem == null) {
+                       _spillList.put(entry._key, new SpilledItem(outfile));
+               }
+               else {
+                       LineageCacheEntry h = cache.get(entry._origItem); //head
+                       while (h != null) {
+                               _spillList.put(h._key, new 
SpilledItem(outfile));
+                               h = h._nextEntry;
+                       }
+               }
+
                if (DMLScript.STATISTICS) {
                        LineageCacheStatistics.incrementFSWriteTime(t1-t0);
                        LineageCacheStatistics.incrementFSWrites();
                }
-
-               _spillList.put(entry._key, new SpilledItem(outfile, 
entry._computeTime));
        }
 
        protected static LineageCacheEntry readFromLocalFS(Map<LineageItem, 
LineageCacheEntry> cache, LineageItem key) {
+               if (cache.get(key) == null)
+                       throw new DMLRuntimeException ("Spilled item should 
present in cache. Key: "+key);
+
                long t0 = System.nanoTime();
                MatrixBlock mb = null;
                // Read from local FS
@@ -297,12 +358,23 @@ public class LineageCacheEviction
                } catch (IOException e) {
                        throw new DMLRuntimeException ("Read from " + 
_spillList.get(key)._outfile + " failed.", e);
                }
-               // Restore to cache
                LocalFileUtils.deleteFileIfExists(_spillList.get(key)._outfile, 
true);
                long t1 = System.nanoTime();
-               LineageCache.putIntern(key, DataType.MATRIX, mb, null, 
_spillList.get(key)._computeTime);
+
+               // Restore to cache
+               LineageCacheEntry e = cache.get(key);
+               e.setValue(mb);
+               if (e._origItem != null) {
+                       // Restore to all the entries having the same data.
+                       LineageCacheEntry h = cache.get(e._origItem); //head
+                       while (h != null) {
+                               h.setValue(mb);
+                               h = h._nextEntry;
+                       }
+               }
+
                // Adjust disk reading speed
-               adjustReadWriteSpeed(cache.get(key), 
((double)(t1-t0))/1000000000, true);
+               adjustReadWriteSpeed(e, ((double)(t1-t0))/1000000000, true);
                // TODO: set cache status as RELOADED for this entry
                _spillList.remove(key);
                if (DMLScript.STATISTICS) {
@@ -318,13 +390,16 @@ public class LineageCacheEviction
 
        // ---------------- INTERNAL DATA STRUCTURES FOR EVICTION 
-----------------
 
+       // TODO: Remove this class, and add outfile to LineageCacheEntry.
        private static class SpilledItem {
                String _outfile;
-               long _computeTime;
+               //long _computeTime;
+               //protected LineageItem _origItem;
 
-               public SpilledItem(String outfile, long computetime) {
+               public SpilledItem(String outfile) {
                        _outfile = outfile;
-                       _computeTime = computetime;
+                       //_computeTime = computetime;
+                       //_origItem = origItem;
                }
        }
 }

Reply via email to