This is an automated email from the ASF dual-hosted git repository.

mboehm7 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemml.git


The following commit(s) were added to refs/heads/master by this push:
     new 12f69c7  [SYSTEMDS-333,337] Improved lineage cache eviction
12f69c7 is described below

commit 12f69c7c111cbe5e0ccc35d8bac58674b06480af
Author: arnabp <[email protected]>
AuthorDate: Thu Apr 23 22:12:25 2020 +0200

    [SYSTEMDS-333,337] Improved lineage cache eviction
    
    This patch improves lineage cache eviction by taking into account actual
    execution time of instructions/functions. The ordering policy is still
    LRU. Future commits will bring better approach to estimate spilling time
    and new eviction policies.
    
    Closes #891.
---
 docs/Tasks.txt                                     |   6 +-
 .../runtime/controlprogram/BasicProgramBlock.java  |   8 +-
 .../sysds/runtime/controlprogram/ProgramBlock.java |   4 +-
 .../instructions/cp/FunctionCallCPInstruction.java |   7 +-
 .../apache/sysds/runtime/lineage/LineageCache.java | 295 ++++++++++++++-------
 .../sysds/runtime/lineage/LineageCacheConfig.java  |  23 +-
 .../runtime/lineage/LineageCacheStatistics.java    |  10 +
 .../sysds/runtime/lineage/LineageRewriteReuse.java |   9 +-
 .../java/org/apache/sysds/utils/Statistics.java    |   2 +-
 .../functions/lineage/.FunctionFullReuse5.dml.swp  | Bin 0 -> 4096 bytes
 10 files changed, 258 insertions(+), 106 deletions(-)

diff --git a/docs/Tasks.txt b/docs/Tasks.txt
index 6e6118c..2283d57 100644
--- a/docs/Tasks.txt
+++ b/docs/Tasks.txt
@@ -244,7 +244,11 @@ SYSTEMDS-320 Merge SystemDS into Apache SystemML           
           OK
 SYSTEMDS-330 Lineage Tracing, Reuse and Integration
  * 331 Cache and reuse scalar outputs (instruction and multi-level)   OK
  * 332 Parfor integration with multi-level reuse                      OK
- * 333 Use exact execution time for cost based eviction
+ * 333 Improve cache eviction with actual compute time                OK
+ * 334 Cache scalars only with atleast one matrix inputs
+ * 335 Weighted eviction policy (function of size & computetime)
+ * 336 Better use of cache status to handle multithreading
+ * 337 Adjust disk I/O speed by recording actual time taken           OK
  
 SYSTEMDS-340 Compiler Assisted Lineage Caching and Reuse
  * 341 Finalize unmarking of loop dependent operations
diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/BasicProgramBlock.java 
b/src/main/java/org/apache/sysds/runtime/controlprogram/BasicProgramBlock.java
index 5f44ac3..4590f0e 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/BasicProgramBlock.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/BasicProgramBlock.java
@@ -108,14 +108,17 @@ public class BasicProgramBlock extends ProgramBlock
                
                //statement-block-level, lineage-based reuse
                LineageItem[] liInputs = null;
+               long t0 = 0;
                if (_sb != null && LineageCacheConfig.isMultiLevelReuse()) {
                        liInputs = 
LineageItemUtils.getLineageItemInputstoSB(_sb.getInputstoSB(), ec);
                        List<String> outNames = _sb.getOutputNamesofSB();
-                       if( LineageCache.reuse(outNames, _sb.getOutputsofSB(), 
outNames.size(), liInputs, _sb.getName(), ec) ) {
+                       if(liInputs != null && LineageCache.reuse(outNames, 
_sb.getOutputsofSB(), 
+                                               outNames.size(), liInputs, 
_sb.getName(), ec) ) {
                                if( DMLScript.STATISTICS )
                                        
LineageCacheStatistics.incrementSBHits();
                                return;
                        }
+                       t0 = System.nanoTime();
                }
 
                //actual instruction execution
@@ -123,6 +126,7 @@ public class BasicProgramBlock extends ProgramBlock
                
                //statement-block-level, lineage-based caching
                if (_sb != null && liInputs != null)
-                       LineageCache.putValue(_sb.getOutputsofSB(), liInputs, 
_sb.getName(), ec);
+                       LineageCache.putValue(_sb.getOutputsofSB(), liInputs, 
_sb.getName(), 
+                                       ec, System.nanoTime()-t0);
        }
 }
diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/ProgramBlock.java 
b/src/main/java/org/apache/sysds/runtime/controlprogram/ProgramBlock.java
index 5cde84e..8859d39 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/ProgramBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/ProgramBlock.java
@@ -43,6 +43,7 @@ import org.apache.sysds.runtime.instructions.cp.IntObject;
 import org.apache.sysds.runtime.instructions.cp.ScalarObject;
 import org.apache.sysds.runtime.instructions.cp.StringObject;
 import org.apache.sysds.runtime.lineage.LineageCache;
+import org.apache.sysds.runtime.lineage.LineageCacheConfig.ReuseCacheType;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 import org.apache.sysds.utils.Statistics;
 
@@ -217,10 +218,11 @@ public abstract class ProgramBlock implements ParseInfo
                        // try to reuse instruction result from lineage cache
                        if( !LineageCache.reuse(tmp, ec) ) {
                                // process actual instruction
+                               long et0 = !ReuseCacheType.isNone() ? 
System.nanoTime() : 0;
                                tmp.processInstruction(ec);
                                
                                // cache result
-                               LineageCache.putValue(tmp, ec);
+                               LineageCache.putValue(tmp, ec, 
System.nanoTime()-et0);
                                
                                // post-process instruction (debug)
                                tmp.postprocessInstruction( ec );
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/cp/FunctionCallCPInstruction.java
 
b/src/main/java/org/apache/sysds/runtime/instructions/cp/FunctionCallCPInstruction.java
index 5d7feee..def4859 100644
--- 
a/src/main/java/org/apache/sysds/runtime/instructions/cp/FunctionCallCPInstruction.java
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/cp/FunctionCallCPInstruction.java
@@ -40,6 +40,7 @@ import org.apache.sysds.runtime.io.IOUtilFunctions;
 import org.apache.sysds.runtime.lineage.Lineage;
 import org.apache.sysds.runtime.lineage.LineageCache;
 import org.apache.sysds.runtime.lineage.LineageCacheConfig;
+import org.apache.sysds.runtime.lineage.LineageCacheConfig.ReuseCacheType;
 import org.apache.sysds.runtime.lineage.LineageCacheStatistics;
 import org.apache.sysds.runtime.lineage.LineageItem;
 import org.apache.sysds.runtime.lineage.LineageItemUtils;
@@ -172,6 +173,7 @@ public class FunctionCallCPInstruction extends 
CPInstruction {
                fn_ec.setVariables(functionVariables);
                fn_ec.setLineage(lineage);
                // execute the function block
+               long t0 = !ReuseCacheType.isNone() ? System.nanoTime() : 0;
                try {
                        fpb._functionName = this._functionName;
                        fpb._namespace = this._namespace;
@@ -184,6 +186,7 @@ public class FunctionCallCPInstruction extends 
CPInstruction {
                        String fname = 
DMLProgram.constructFunctionKey(_namespace, _functionName);
                        throw new DMLRuntimeException("error executing function 
" + fname, e);
                }
+               long t1 = !ReuseCacheType.isNone() ? System.nanoTime() : 0;
                
                // cleanup all returned variables w/o binding 
                HashSet<String> expectRetVars = new HashSet<>();
@@ -226,8 +229,8 @@ public class FunctionCallCPInstruction extends 
CPInstruction {
 
                //update lineage cache with the functions outputs
                if( DMLScript.LINEAGE && LineageCacheConfig.isMultiLevelReuse() 
) {
-                       LineageCache.putValue(fpb.getOutputParams(), 
-                               liInputs, getCacheFunctionName(_functionName, 
fpb), ec);
+                       LineageCache.putValue(fpb.getOutputParams(), liInputs, 
+                                       getCacheFunctionName(_functionName, 
fpb), ec, t1-t0);
                }
        }
 
diff --git a/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java 
b/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
index 0d93699..5e945d6 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
@@ -23,7 +23,6 @@ import org.apache.sysds.api.DMLScript;
 import org.apache.sysds.common.Types.DataType;
 import org.apache.sysds.common.Types.ValueType;
 import org.apache.sysds.hops.OptimizerUtils;
-import org.apache.sysds.hops.cost.CostEstimatorStaticRuntime;
 import org.apache.sysds.lops.MMTSJ.MMTSJType;
 import org.apache.sysds.parser.DataIdentifier;
 import org.apache.sysds.parser.Statement;
@@ -39,6 +38,7 @@ import org.apache.sysds.runtime.instructions.cp.Data;
 import org.apache.sysds.runtime.instructions.cp.MMTSJCPInstruction;
 import 
org.apache.sysds.runtime.instructions.cp.ParameterizedBuiltinCPInstruction;
 import org.apache.sysds.runtime.instructions.cp.ScalarObject;
+import org.apache.sysds.runtime.lineage.LineageCacheConfig.LineageCacheStatus;
 import org.apache.sysds.runtime.lineage.LineageCacheConfig.ReuseCacheType;
 import org.apache.sysds.runtime.matrix.data.InputInfo;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
@@ -60,7 +60,8 @@ public class LineageCache
        private static final HashSet<LineageItem> _removelist = new HashSet<>();
        private static final double CACHE_FRAC = 0.05; // 5% of JVM heap size
        private static final long CACHE_LIMIT; //limit in bytes
-       private static String outdir = null;
+       private static final boolean DEBUG = false;
+       private static String _outdir = null;
        private static long _cachesize = 0;
        private static Entry _head = null;
        private static Entry _end = null;
@@ -79,9 +80,7 @@ public class LineageCache
        //   a complex workflow of operations that accesses the cache as well.
        
        
-       ///////////////////////////////////////
-       // Public Cache API (keep it narrow) //
-       ///////////////////////////////////////
+       //--------------- PUBLIC CACHE API (keep it narrow) ----------------//
        
        public static boolean reuse(Instruction inst, ExecutionContext ec) {
                if (ReuseCacheType.isNone())
@@ -115,7 +114,7 @@ public class LineageCache
                                }
                        }
                        
-                       if( reuse ) { //reuse
+                       if(reuse) { //reuse
                                //put reuse value into symbol table (w/ 
blocking on placeholders)
                                if (e.isMatrixValue())
                                        
ec.setMatrixOutput(cinst.output.getName(), e.getMBValue());
@@ -130,7 +129,8 @@ public class LineageCache
                return reuse;
        }
        
-       public static boolean reuse(List<String> outNames, List<DataIdentifier> 
outParams, int numOutputs, LineageItem[] liInputs, String name, 
ExecutionContext ec)
+       public static boolean reuse(List<String> outNames, List<DataIdentifier> 
outParams, 
+                       int numOutputs, LineageItem[] liInputs, String name, 
ExecutionContext ec)
        {
                if( !LineageCacheConfig.isMultiLevelReuse())
                        return false;
@@ -142,7 +142,7 @@ public class LineageCache
                        String opcode = name + String.valueOf(i+1);
                        LineageItem li = new LineageItem(outNames.get(i), 
opcode, liInputs);
                        Entry e = null;
-                       synchronized( _cache ) {
+                       synchronized(_cache) {
                                if (LineageCache.probe(li)) {
                                        e = LineageCache.getIntern(li);
                                }
@@ -154,7 +154,7 @@ public class LineageCache
                        }
                        //TODO: handling of recursive calls
                        
-                       if ( e != null ) {
+                       if (e != null) {
                                String boundVarName = outNames.get(i);
                                Data boundValue = null;
                                //convert to matrix object
@@ -216,43 +216,42 @@ public class LineageCache
        
        //NOTE: safe to pin the object in memory as coming from CPInstruction
        //TODO why do we need both of these public put methods
-       public static void putMatrix(Instruction inst, ExecutionContext ec) {
+       public static void putMatrix(Instruction inst, ExecutionContext ec, 
long computetime) {
                if (LineageCacheConfig.isReusable(inst, ec) ) {
                        LineageItem item = ((LineageTraceable) 
inst).getLineageItems(ec)[0];
                        //This method is called only to put matrix value
                        MatrixObject mo = 
ec.getMatrixObject(((ComputationCPInstruction) inst).output);
                        synchronized( _cache ) {
-                               putIntern(item, DataType.MATRIX, 
mo.acquireReadAndRelease(),
-                                       null, getRecomputeEstimate(inst, ec));
+                               putIntern(item, DataType.MATRIX, 
mo.acquireReadAndRelease(), null, computetime);
                        }
                }
        }
        
-       public static void putValue(Instruction inst, ExecutionContext ec) {
+       public static void putValue(Instruction inst, ExecutionContext ec, long 
computetime) {
                if (ReuseCacheType.isNone())
                        return;
                if (LineageCacheConfig.isReusable(inst, ec) ) {
                        //if (!isMarkedForCaching(inst, ec)) return;
                        LineageItem item = ((LineageTraceable) 
inst).getLineageItems(ec)[0];
                        Data data = ec.getVariable(((ComputationCPInstruction) 
inst).output);
-                       double cest = getRecomputeEstimate(inst, ec);
                        synchronized( _cache ) {
-                               if( data instanceof MatrixObject )
-                                       
_cache.get(item).setValue(((MatrixObject)data).acquireReadAndRelease(), cest);
+                               if (data instanceof MatrixObject)
+                                       
_cache.get(item).setValue(((MatrixObject)data).acquireReadAndRelease(), 
computetime);
                                else
-                                       
_cache.get(item).setValue((ScalarObject)data, cest);
+                                       
_cache.get(item).setValue((ScalarObject)data, computetime);
                                long size = _cache.get(item).getSize();
                                
-                               if( !isBelowThreshold(size) ) 
+                               if (!isBelowThreshold(size))
                                        makeSpace(size);
                                updateSize(size, true);
                        }
                }
        }
        
-       public static void putValue(List<DataIdentifier> outputs, LineageItem[] 
liInputs, String name, ExecutionContext ec)
+       public static void putValue(List<DataIdentifier> outputs, LineageItem[] 
liInputs, 
+                               String name, ExecutionContext ec, long 
computetime)
        {
-               if( !LineageCacheConfig.isMultiLevelReuse() )
+               if (!LineageCacheConfig.isMultiLevelReuse())
                        return;
 
                HashMap<LineageItem, LineageItem> FuncLIMap = new HashMap<>();
@@ -275,10 +274,10 @@ public class LineageCache
                }
 
                //cache either all the outputs, or none.
-               synchronized( _cache ) {
+               synchronized (_cache) {
                        //move or remove placeholders 
                        if(AllOutputsCacheable)
-                               FuncLIMap.forEach((Li, boundLI) -> mvIntern(Li, 
boundLI));
+                               FuncLIMap.forEach((Li, boundLI) -> mvIntern(Li, 
boundLI, computetime));
                        else
                                FuncLIMap.forEach((Li, boundLI) -> 
removeEntry(Li));
                }
@@ -287,7 +286,7 @@ public class LineageCache
        }
        
        public static void resetCache() {
-               synchronized( _cache ) {
+               synchronized (_cache) {
                        _cache.clear();
                        _spillList.clear();
                        _head = null;
@@ -295,22 +294,21 @@ public class LineageCache
                        // reset cache size, otherwise the cache clear leads to 
unusable 
                        // space which means evictions could run into endless 
loops
                        _cachesize = 0;
+                       _outdir = null;
                        if (DMLScript.STATISTICS)
                                _removelist.clear();
                }
        }
        
-       /////////////////////////////////////////
-       // Internal Cache Logic Implementation //
-       /////////////////////////////////////////
+       //----------------- INTERNAL CACHE LOGIC IMPLEMENTATION --------------//
        
-       private static void putIntern(LineageItem key, DataType dt, MatrixBlock 
Mval, ScalarObject Sval, double compcost) {
+       private static void putIntern(LineageItem key, DataType dt, MatrixBlock 
Mval, ScalarObject Sval, long computetime) {
                if (_cache.containsKey(key))
                        //can come here if reuse_partial option is enabled
                        return;
                
                // Create a new entry.
-               Entry newItem = new Entry(key, dt, Mval, Sval, compcost);
+               Entry newItem = new Entry(key, dt, Mval, Sval, computetime);
                
                // Make space by removing or spilling LRU entries.
                if( Mval != null || Sval != null ) {
@@ -346,17 +344,18 @@ public class LineageCache
        }
 
        
-       private static void mvIntern(LineageItem item, LineageItem probeItem) {
+       private static void mvIntern(LineageItem item, LineageItem probeItem, 
long computetime) {
                if (ReuseCacheType.isNone())
                        return;
+               // Move the value from the cache entry with key probeItem to
+               // the placeholder entry with key item.
                if (LineageCache.probe(probeItem)) {
                        Entry oe = getIntern(probeItem);
                        Entry e = _cache.get(item);
-                       //TODO: compute estimate for function
                        if (oe.isMatrixValue())
-                               e.setValue(oe.getMBValue(), 0); 
+                               e.setValue(oe.getMBValue(), computetime); 
                        else
-                               e.setValue(oe.getSOValue(), 0);
+                               e.setValue(oe.getSOValue(), computetime);
                        e._origItem = probeItem; 
 
                        long size = oe.getSize();
@@ -390,31 +389,70 @@ public class LineageCache
        
        private static void makeSpace(long spaceNeeded) {
                // cost based eviction
-               while ((spaceNeeded +_cachesize) > CACHE_LIMIT)
+               Entry e = _end;
+               while (e != _head)
                {
-                       if (_cache.get(_end._key).isNullVal()) {
-                               //Must be a null function/SB placeholder entry. 
This 
-                               //function is currently being executed. Skip 
and continue.
-                               setEnd2Head(_end);
+                       if ((spaceNeeded + _cachesize) <= CACHE_LIMIT)
+                               // Enough space recovered.
+                               break;
+
+                       if (!LineageCacheConfig.isSetSpill()) {
+                               // If eviction is disabled, just delete the 
entries.
+                               removeEntry(e);
+                               e = e._prev;
                                continue;
                        }
-                       
-                       if (_cache.get(_end._key).isMatrixValue()) { //spill 
matrix blocks only
-                               if (_cache.get(_end._key)._compEst > 
getDiskSpillEstimate() 
-                                               && 
LineageCacheConfig.isSetSpill())
-                                       spillToLocalFS(); // If re-computation 
is more expensive, spill data to disk.
+
+                       if (!e.getCacheStatus().canEvict()) {
+                               // Don't delete if the entry's cache status 
doesn't allow.
+                               e = e._prev;
+                               continue;
                        }
 
-                       if (_cache.get(_end._key)._compEst == 0) {
-                               //Must be a function/SB/scalar entry. Move to 
next.
-                               //FIXME: Remove this logic after implementing 
new eviction logic.
-                               setEnd2Head(_end);  
+                       double exectime = ((double) e._computeTime) / 1000000; 
// in milliseconds
+
+                       if (!e.isMatrixValue()) {
+                               // Skip scalar entries with higher computation 
time, as
+                               // those could be function/statementblock 
outputs.
+                               if (exectime < 
LineageCacheConfig.MIN_SPILL_TIME_ESTIMATE)
+                                       removeEntry(e);
+                               e = e._prev;
                                continue;
                        }
-                       removeLastEntry();
+
+                       // Estimate time to write to FS + read from FS.
+                       double spilltime = getDiskSpillEstimate(e) * 1000; // 
in milliseconds
+
+                       if (DEBUG) {
+                               if (exectime > 
LineageCacheConfig.MIN_SPILL_TIME_ESTIMATE) {
+                                       System.out.print("LI " + 
e._key.getOpcode());
+                                       System.out.print(" exec time " + 
((double) e._computeTime) / 1000000);
+                                       System.out.print(" estimate time " + 
getDiskSpillEstimate(e) * 1000);
+                                       System.out.print(" dim " + 
e.getMBValue().getNumRows() + " " + e.getMBValue().getNumColumns());
+                                       System.out.println(" size " + 
getDiskSizeEstimate(e));
+                               }
+                       }
+
+                       if (LineageCacheConfig.isSetSpill()) {
+                               if (spilltime < 
LineageCacheConfig.MIN_SPILL_TIME_ESTIMATE) {
+                                       // Can't trust the estimate if less 
than 100ms.
+                                       // Spill if it takes longer to 
recompute.
+                                       if (exectime >= 
LineageCacheConfig.MIN_SPILL_TIME_ESTIMATE)
+                                               spillToLocalFS(e);
+                               }
+                               else {
+                                       // Spill if it takes longer to 
recompute than spilling.
+                                       if (exectime > spilltime)
+                                               spillToLocalFS(e);
+                               }
+                       }
+
+                       // Remove the entry from cache.
+                       removeEntry(e);
+                       e = e._prev;
                }
        }
-       
+
        private static void updateSize(long space, boolean addspace) {
                if (addspace)
                        _cachesize += space;
@@ -424,21 +462,74 @@ public class LineageCache
 
        //---------------- COSTING RELATED METHODS -----------------
 
-       private static double getDiskSpillEstimate() {
+       private static double getDiskSpillEstimate(Entry e) {
+               if (!e.isMatrixValue() || e.isNullVal())
+                       return 0;
                // This includes sum of writing to and reading from disk
                long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
-               MatrixBlock mb = _cache.get(_end._key).getMBValue();
+               double size = getDiskSizeEstimate(e);
+               double loadtime = isSparse(e) ? 
size/LineageCacheConfig.FSREAD_SPARSE : size/LineageCacheConfig.FSREAD_DENSE;
+               double writetime = isSparse(e) ? 
size/LineageCacheConfig.FSWRITE_SPARSE : size/LineageCacheConfig.FSWRITE_DENSE;
+
+               //double loadtime = CostEstimatorStaticRuntime.getFSReadTime(r, 
c, s);
+               //double writetime = 
CostEstimatorStaticRuntime.getFSWriteTime(r, c, s);
+               if (DMLScript.STATISTICS) 
+                       
LineageCacheStatistics.incrementCostingTime(System.nanoTime() - t0);
+               return loadtime + writetime;
+       }
+
+       private static double getDiskSizeEstimate(Entry e) {
+               if (!e.isMatrixValue() || e.isNullVal())
+                       return 0;
+               MatrixBlock mb = e.getMBValue();
                long r = mb.getNumRows();
                long c = mb.getNumColumns();
                long nnz = mb.getNonZeros();
                double s = OptimizerUtils.getSparsity(r, c, nnz);
-               double loadtime = CostEstimatorStaticRuntime.getFSReadTime(r, 
c, s);
-               double writetime = CostEstimatorStaticRuntime.getFSWriteTime(r, 
c, s);
+               double disksize = ((double)MatrixBlock.estimateSizeOnDisk(r, c, 
(long)(s*r*c))) / (1024*1024);
+               return disksize;
+       }
+       
+       private static void adjustReadWriteSpeed(Entry e, double IOtime, 
boolean read) {
+               double size = getDiskSizeEstimate(e);
+               if (!e.isMatrixValue() || size < 
LineageCacheConfig.MIN_SPILL_DATA)
+                       // Scalar or too small
+                       return; 
+               
+               long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
+               double newIOSpeed = size / IOtime; // MB per second 
+               // Adjust the read/write speed taking into account the last 
read/write.
+               // These constants will eventually converge to the real speed.
+               if (read) {
+                       if (isSparse(e))
+                               LineageCacheConfig.FSREAD_SPARSE = 
(LineageCacheConfig.FSREAD_SPARSE + newIOSpeed) / 2;
+                       else
+                               LineageCacheConfig.FSREAD_DENSE= 
(LineageCacheConfig.FSREAD_DENSE+ newIOSpeed) / 2;
+               }
+               else {
+                       if (isSparse(e))
+                               LineageCacheConfig.FSWRITE_SPARSE = 
(LineageCacheConfig.FSWRITE_SPARSE + newIOSpeed) / 2;
+                       else
+                               LineageCacheConfig.FSWRITE_DENSE= 
(LineageCacheConfig.FSWRITE_DENSE+ newIOSpeed) / 2;
+               }
                if (DMLScript.STATISTICS) 
                        
LineageCacheStatistics.incrementCostingTime(System.nanoTime() - t0);
-               return loadtime+writetime;
        }
        
+       private static boolean isSparse(Entry e) {
+               if (!e.isMatrixValue() || e.isNullVal())
+                       return false;
+               MatrixBlock mb = e.getMBValue();
+               long r = mb.getNumRows();
+               long c = mb.getNumColumns();
+               long nnz = mb.getNonZeros();
+               double s = OptimizerUtils.getSparsity(r, c, nnz);
+               boolean sparse = MatrixBlock.evalSparseFormatOnDisk(r, c, 
(long)(s*r*c));
+               return sparse;
+       }
+       
+       @Deprecated
+       @SuppressWarnings("unused")
        private static double getRecomputeEstimate(Instruction inst, 
ExecutionContext ec) {
                if (!((ComputationCPInstruction)inst).output.isMatrix()
                        || (((ComputationCPInstruction)inst).input1 != null && 
!((ComputationCPInstruction)inst).input1.isMatrix()))
@@ -586,29 +677,37 @@ public class LineageCache
 
        // ---------------- I/O METHODS TO LOCAL FS -----------------
        
-       private static void spillToLocalFS() {
-               long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
-               if (outdir == null) {
-                       outdir = 
LocalFileUtils.getUniqueWorkingDir(LocalFileUtils.CATEGORY_LINEAGE);
-                       LocalFileUtils.createLocalFileIfNotExist(outdir);
+       private static void spillToLocalFS(Entry entry) {
+               if (!entry.isMatrixValue())
+                       throw new DMLRuntimeException ("Spilling scalar objects 
to disk is not allowd. Key: "+entry._key);
+               if (entry.isNullVal())
+                       throw new DMLRuntimeException ("Cannot spill null value 
to disk. Key: "+entry._key);
+               
+               long t0 = System.nanoTime();
+               if (_outdir == null) {
+                       _outdir = 
LocalFileUtils.getUniqueWorkingDir(LocalFileUtils.CATEGORY_LINEAGE);
+                       LocalFileUtils.createLocalFileIfNotExist(_outdir);
                }
-               String outfile = outdir+"/"+_cache.get(_end._key)._key.getId();
+               String outfile = _outdir+"/"+entry._key.getId();
                try {
-                       LocalFileUtils.writeMatrixBlockToLocal(outfile, 
_cache.get(_end._key).getMBValue());
+                       LocalFileUtils.writeMatrixBlockToLocal(outfile, 
entry.getMBValue());
                } catch (IOException e) {
                        throw new DMLRuntimeException ("Write to " + outfile + 
" failed.", e);
                }
+               long t1 = System.nanoTime();
+               // Adjust disk writing speed
+               adjustReadWriteSpeed(entry, ((double)(t1-t0))/1000000000, 
false);
+               
                if (DMLScript.STATISTICS) {
-                       long t1 = System.nanoTime();
                        LineageCacheStatistics.incrementFSWriteTime(t1-t0);
                        LineageCacheStatistics.incrementFSWrites();
                }
 
-               _spillList.put(_end._key, new SpilledItem(outfile, 
_end._compEst));
+               _spillList.put(entry._key, new SpilledItem(outfile, 
entry._computeTime));
        }
        
        private static Entry readFromLocalFS(LineageItem key) {
-               long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
+               long t0 = System.nanoTime();
                MatrixBlock mb = null;
                // Read from local FS
                try {
@@ -618,27 +717,20 @@ public class LineageCache
                }
                // Restore to cache
                LocalFileUtils.deleteFileIfExists(_spillList.get(key)._outfile, 
true);
-               putIntern(key, DataType.MATRIX, mb, null, 
_spillList.get(key)._compEst);
+               long t1 = System.nanoTime();
+               putIntern(key, DataType.MATRIX, mb, null, 
_spillList.get(key)._computeTime);
+               // Adjust disk reading speed
+               adjustReadWriteSpeed(_cache.get(key), 
((double)(t1-t0))/1000000000, true);
+               //TODO: set cache status as RELOADED for this entry
                _spillList.remove(key);
                if (DMLScript.STATISTICS) {
-                       long t1 = System.nanoTime();
                        LineageCacheStatistics.incrementFSReadTime(t1-t0);
                        LineageCacheStatistics.incrementFSHits();
                }
                return _cache.get(key);
        }
 
-       ////////////////////////////////////////////
-       // Cache Maintenance and Lookup Functions //
-       ////////////////////////////////////////////
-       
-       private static void removeLastEntry() {
-               if (DMLScript.STATISTICS)
-                       _removelist.add(_end._key);
-               Entry e = _cache.remove(_end._key);
-               _cachesize -= e.getSize();
-               delete(_end);
-       }
+       //--------------- CACHE MAINTENANCE & LOOKUP FUNCTIONS ---------//
        
        private static void removeEntry(LineageItem key) {
                // Remove the entry for key
@@ -647,12 +739,21 @@ public class LineageCache
                delete(_cache.get(key));
                _cache.remove(key);
        }
-       
-       private static void setEnd2Head(Entry entry) {
-               delete(entry);
-               setHead(entry);
+
+       private static void removeEntry(Entry e) {
+               if (_cache.remove(e._key) == null)
+                       // Entry not found in cache
+                       return;
+
+               if (DMLScript.STATISTICS)
+                       _removelist.add(e._key);
+
+               _cachesize -= e.getSize();
+               delete(e);
+               if (DMLScript.STATISTICS)
+                       LineageCacheStatistics.incrementMemDeletes();
        }
-       
+
        private static void delete(Entry entry) {
                if (entry._prev != null)
                        entry._prev._next = entry._next;
@@ -674,26 +775,26 @@ public class LineageCache
                        _end = _head;
        }
        
-       ////////////////////////////////////
-       // Internal Cache Data Structures //
-       ////////////////////////////////////
+       //---------------- INTERNAL CACHE DATA STRUCTURES ----------------//
        
        private static class Entry {
                private final LineageItem _key;
                private final DataType _dt;
                private MatrixBlock _MBval;
                private ScalarObject _SOval;
-               double _compEst;
+               private long _computeTime;
+               private LineageCacheStatus _status;
                private Entry _prev;
                private Entry _next;
                private LineageItem _origItem;
                
-               public Entry(LineageItem key, DataType dt, MatrixBlock Mval, 
ScalarObject Sval, double computecost) {
+               public Entry(LineageItem key, DataType dt, MatrixBlock Mval, 
ScalarObject Sval, long computetime) {
                        _key = key;
                        _dt = dt;
                        _MBval = Mval;
                        _SOval = Sval;
-                       _compEst = computecost;
+                       _computeTime = computetime;
+                       _status = isNullVal() ? LineageCacheStatus.EMPTY : 
LineageCacheStatus.CACHED;
                        _origItem = null;
                }
 
@@ -725,6 +826,10 @@ public class LineageCache
                        }
                }
                
+               public synchronized LineageCacheStatus getCacheStatus() {
+                       return _status;
+               }
+               
                public synchronized long getSize() {
                        return ((_MBval != null ? _MBval.getInMemorySize() : 0) 
+ (_SOval != null ? _SOval.getSize() : 0));
                }
@@ -737,16 +842,18 @@ public class LineageCache
                        return _dt.isMatrix();
                }
                
-               public synchronized void setValue(MatrixBlock val, double 
compEst) {
+               public synchronized void setValue(MatrixBlock val, long 
computetime) {
                        _MBval = val;
-                       _compEst = compEst;
+                       _computeTime = computetime;
+                       _status = isNullVal() ? LineageCacheStatus.EMPTY : 
LineageCacheStatus.CACHED;
                        //resume all threads waiting for val
                        notifyAll();
                }
 
-               public synchronized void setValue(ScalarObject val, double 
compEst) {
+               public synchronized void setValue(ScalarObject val, long 
computetime) {
                        _SOval = val;
-                       _compEst = compEst;
+                       _computeTime = computetime;
+                       _status = isNullVal() ? LineageCacheStatus.EMPTY : 
LineageCacheStatus.CACHED;
                        //resume all threads waiting for val
                        notifyAll();
                }
@@ -754,11 +861,11 @@ public class LineageCache
        
        private static class SpilledItem {
                String _outfile;
-               double _compEst;
+               long _computeTime;
 
-               public SpilledItem(String outfile, double computecost) {
+               public SpilledItem(String outfile, long computetime) {
                        _outfile = outfile;
-                       _compEst = computecost;
+                       _computeTime = computetime;
                }
        }
 }
diff --git 
a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheConfig.java 
b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheConfig.java
index e130cfa..efe35f7 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheConfig.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheConfig.java
@@ -67,8 +67,29 @@ public class LineageCacheConfig {
                ALL
        }
        
+        public enum LineageCacheStatus {
+               EMPTY,          //Placeholder with no data. Cannot be evicted.
+               CACHED,         //General cached data. Can be evicted.
+               EVICTED,        //Data is in disk. Empty value. Cannot be 
evicted.
+               RELOADED,       //Reloaded from disk. Can be evicted.
+               PINNED;         //Pinned to memory. Cannot be evicted.
+               public boolean canEvict() {
+                       return this == CACHED || this == RELOADED;
+               }
+        }
+       
        public ArrayList<String> _MMult = new ArrayList<>();
        public static boolean _allowSpill = true;
+       // Minimum reliable spilling estimate in milliseconds.
+       public static final double MIN_SPILL_TIME_ESTIMATE = 100;
+       // Minimum reliable data size for spilling estimate in MB.
+       public static final double MIN_SPILL_DATA = 20;
+
+       // Default I/O in MB per second for binary blocks
+       public static double FSREAD_DENSE = 200;
+       public static double FSREAD_SPARSE = 100;
+       public static double FSWRITE_DENSE = 150;
+       public static double FSWRITE_SPARSE = 75;
 
        private static ReuseCacheType _cacheType = null;
        private static CachedItemHead _itemH = null;
@@ -76,7 +97,7 @@ public class LineageCacheConfig {
        private static boolean _compilerAssistedRW = true;
        static {
                //setup static configuration parameters
-               setSpill(false); //disable spilling of cache entries to disk
+               setSpill(true); //enable/disable disk spilling.
        }
        
        public static boolean isReusable (Instruction inst, ExecutionContext 
ec) {
diff --git 
a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheStatistics.java 
b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheStatistics.java
index 9704797..7ab7490 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheStatistics.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheStatistics.java
@@ -34,6 +34,7 @@ public class LineageCacheStatistics {
        private static final LongAdder _numHitsFunc     = new LongAdder();
        private static final LongAdder _numWritesMem    = new LongAdder();
        private static final LongAdder _numWritesFS     = new LongAdder();
+       private static final LongAdder _numMemDel       = new LongAdder();
        private static final LongAdder _numRewrites     = new LongAdder();
        private static final LongAdder _ctimeFSRead     = new LongAdder(); //in 
nano sec
        private static final LongAdder _ctimeFSWrite    = new LongAdder(); //in 
nano sec
@@ -50,6 +51,7 @@ public class LineageCacheStatistics {
                _numHitsFunc.reset();
                _numWritesMem.reset();
                _numWritesFS.reset();
+               _numMemDel.reset();
                _numRewrites.reset();
                _ctimeFSRead.reset();
                _ctimeFSWrite.reset();
@@ -102,6 +104,12 @@ public class LineageCacheStatistics {
                // Number of times written in local FS.
                _numWritesFS.increment();
        }
+       
+       public static void incrementMemDeletes() {
+               // Number of deletions from cache (including spilling).
+               _numMemDel.increment();
+       }
+
 
        public static void incrementFSReadTime(long delta) {
                // Total time spent on reading from FS.
@@ -161,6 +169,8 @@ public class LineageCacheStatistics {
                sb.append(_numWritesMem.longValue());
                sb.append("/");
                sb.append(_numWritesFS.longValue());
+               sb.append("/");
+               sb.append(_numMemDel.longValue());
                return sb.toString();
        }
 
diff --git 
a/src/main/java/org/apache/sysds/runtime/lineage/LineageRewriteReuse.java 
b/src/main/java/org/apache/sysds/runtime/lineage/LineageRewriteReuse.java
index f48c869..4be8aef 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageRewriteReuse.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageRewriteReuse.java
@@ -95,11 +95,15 @@ public class LineageRewriteReuse
                        return false;
                
                //execute instructions & write the o/p to symbol table
+               long t0 = System.nanoTime();
                executeInst(newInst, lrwec);
+               long t1 = System.nanoTime();
                
ec.setVariable(((ComputationCPInstruction)curr).output.getName(), 
lrwec.getVariable(LR_VAR));
 
                //put the result into the cache
-               LineageCache.putMatrix(curr, ec);
+               LineageCache.putMatrix(curr, ec, t1-t0);
+               if (DMLScript.STATISTICS) 
+                       LineageCacheStatistics.incrementPRwExecTime(t1-t0);
                DMLScript.EXPLAIN = et; //TODO can't change this here
                
                //cleanup execution context
@@ -755,7 +759,6 @@ public class LineageRewriteReuse
                DMLScript.EXPLAIN = ExplainType.NONE;
 
                try {
-                       long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
                        //execute instructions
                        BasicProgramBlock pb = getProgramBlock();
                        pb.setInstructions(newInst);
@@ -763,8 +766,6 @@ public class LineageRewriteReuse
                        LineageCacheConfig.shutdownReuse();
                        pb.execute(lrwec);
                        LineageCacheConfig.restartReuse(oldReuseOption);
-                       if (DMLScript.STATISTICS) 
-                               
LineageCacheStatistics.incrementPRwExecTime(System.nanoTime()-t0);
                }
                catch (Exception e) {
                        throw new DMLRuntimeException("Error executing lineage 
rewrites" , e);
diff --git a/src/main/java/org/apache/sysds/utils/Statistics.java 
b/src/main/java/org/apache/sysds/utils/Statistics.java
index 4c3cbef..eb94f83 100644
--- a/src/main/java/org/apache/sysds/utils/Statistics.java
+++ b/src/main/java/org/apache/sysds/utils/Statistics.java
@@ -946,7 +946,7 @@ public class Statistics
                        if (DMLScript.LINEAGE && !ReuseCacheType.isNone()) {
                                sb.append("LinCache hits (Mem/FS/Del): \t" + 
LineageCacheStatistics.displayHits() + ".\n");
                                sb.append("LinCache MultiLevel (Ins/SB/Fn):" + 
LineageCacheStatistics.displayMultiLevelHits() + ".\n");
-                               sb.append("LinCache writes (Mem/FS): \t" + 
LineageCacheStatistics.displayWtrites() + ".\n");
+                               sb.append("LinCache writes (Mem/FS/Del): \t" + 
LineageCacheStatistics.displayWtrites() + ".\n");
                                sb.append("LinCache FStimes (Rd/Wr): \t" + 
LineageCacheStatistics.displayTime() + " sec.\n");
                                sb.append("LinCache costing time:  \t" + 
LineageCacheStatistics.displayCostingTime() + " sec.\n");
                                sb.append("LinCache Rewrites:    \t\t" + 
LineageCacheStatistics.displayRewrites() + ".\n");
diff --git a/src/test/scripts/functions/lineage/.FunctionFullReuse5.dml.swp 
b/src/test/scripts/functions/lineage/.FunctionFullReuse5.dml.swp
new file mode 100644
index 0000000..68c260c
Binary files /dev/null and 
b/src/test/scripts/functions/lineage/.FunctionFullReuse5.dml.swp differ

Reply via email to