This is an automated email from the ASF dual-hosted git repository.
mboehm7 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemml.git
The following commit(s) were added to refs/heads/master by this push:
new 12f69c7 [SYSTEMDS-333,337] Improved lineage cache eviction
12f69c7 is described below
commit 12f69c7c111cbe5e0ccc35d8bac58674b06480af
Author: arnabp <[email protected]>
AuthorDate: Thu Apr 23 22:12:25 2020 +0200
[SYSTEMDS-333,337] Improved lineage cache eviction
This patch improves lineage cache eviction by taking into account actual
execution time of instructions/functions. The ordering policy is still
LRU. Future commits will bring better approach to estimate spilling time
and new eviction policies.
Closes #891.
---
docs/Tasks.txt | 6 +-
.../runtime/controlprogram/BasicProgramBlock.java | 8 +-
.../sysds/runtime/controlprogram/ProgramBlock.java | 4 +-
.../instructions/cp/FunctionCallCPInstruction.java | 7 +-
.../apache/sysds/runtime/lineage/LineageCache.java | 295 ++++++++++++++-------
.../sysds/runtime/lineage/LineageCacheConfig.java | 23 +-
.../runtime/lineage/LineageCacheStatistics.java | 10 +
.../sysds/runtime/lineage/LineageRewriteReuse.java | 9 +-
.../java/org/apache/sysds/utils/Statistics.java | 2 +-
.../functions/lineage/.FunctionFullReuse5.dml.swp | Bin 0 -> 4096 bytes
10 files changed, 258 insertions(+), 106 deletions(-)
diff --git a/docs/Tasks.txt b/docs/Tasks.txt
index 6e6118c..2283d57 100644
--- a/docs/Tasks.txt
+++ b/docs/Tasks.txt
@@ -244,7 +244,11 @@ SYSTEMDS-320 Merge SystemDS into Apache SystemML
OK
SYSTEMDS-330 Lineage Tracing, Reuse and Integration
* 331 Cache and reuse scalar outputs (instruction and multi-level) OK
* 332 Parfor integration with multi-level reuse OK
- * 333 Use exact execution time for cost based eviction
+ * 333 Improve cache eviction with actual compute time OK
+ * 334 Cache scalars only with atleast one matrix inputs
+ * 335 Weighted eviction policy (function of size & computetime)
+ * 336 Better use of cache status to handle multithreading
+ * 337 Adjust disk I/O speed by recording actual time taken OK
SYSTEMDS-340 Compiler Assisted Lineage Caching and Reuse
* 341 Finalize unmarking of loop dependent operations
diff --git
a/src/main/java/org/apache/sysds/runtime/controlprogram/BasicProgramBlock.java
b/src/main/java/org/apache/sysds/runtime/controlprogram/BasicProgramBlock.java
index 5f44ac3..4590f0e 100644
---
a/src/main/java/org/apache/sysds/runtime/controlprogram/BasicProgramBlock.java
+++
b/src/main/java/org/apache/sysds/runtime/controlprogram/BasicProgramBlock.java
@@ -108,14 +108,17 @@ public class BasicProgramBlock extends ProgramBlock
//statement-block-level, lineage-based reuse
LineageItem[] liInputs = null;
+ long t0 = 0;
if (_sb != null && LineageCacheConfig.isMultiLevelReuse()) {
liInputs =
LineageItemUtils.getLineageItemInputstoSB(_sb.getInputstoSB(), ec);
List<String> outNames = _sb.getOutputNamesofSB();
- if( LineageCache.reuse(outNames, _sb.getOutputsofSB(),
outNames.size(), liInputs, _sb.getName(), ec) ) {
+ if(liInputs != null && LineageCache.reuse(outNames,
_sb.getOutputsofSB(),
+ outNames.size(), liInputs,
_sb.getName(), ec) ) {
if( DMLScript.STATISTICS )
LineageCacheStatistics.incrementSBHits();
return;
}
+ t0 = System.nanoTime();
}
//actual instruction execution
@@ -123,6 +126,7 @@ public class BasicProgramBlock extends ProgramBlock
//statement-block-level, lineage-based caching
if (_sb != null && liInputs != null)
- LineageCache.putValue(_sb.getOutputsofSB(), liInputs,
_sb.getName(), ec);
+ LineageCache.putValue(_sb.getOutputsofSB(), liInputs,
_sb.getName(),
+ ec, System.nanoTime()-t0);
}
}
diff --git
a/src/main/java/org/apache/sysds/runtime/controlprogram/ProgramBlock.java
b/src/main/java/org/apache/sysds/runtime/controlprogram/ProgramBlock.java
index 5cde84e..8859d39 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/ProgramBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/ProgramBlock.java
@@ -43,6 +43,7 @@ import org.apache.sysds.runtime.instructions.cp.IntObject;
import org.apache.sysds.runtime.instructions.cp.ScalarObject;
import org.apache.sysds.runtime.instructions.cp.StringObject;
import org.apache.sysds.runtime.lineage.LineageCache;
+import org.apache.sysds.runtime.lineage.LineageCacheConfig.ReuseCacheType;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.utils.Statistics;
@@ -217,10 +218,11 @@ public abstract class ProgramBlock implements ParseInfo
// try to reuse instruction result from lineage cache
if( !LineageCache.reuse(tmp, ec) ) {
// process actual instruction
+ long et0 = !ReuseCacheType.isNone() ?
System.nanoTime() : 0;
tmp.processInstruction(ec);
// cache result
- LineageCache.putValue(tmp, ec);
+ LineageCache.putValue(tmp, ec,
System.nanoTime()-et0);
// post-process instruction (debug)
tmp.postprocessInstruction( ec );
diff --git
a/src/main/java/org/apache/sysds/runtime/instructions/cp/FunctionCallCPInstruction.java
b/src/main/java/org/apache/sysds/runtime/instructions/cp/FunctionCallCPInstruction.java
index 5d7feee..def4859 100644
---
a/src/main/java/org/apache/sysds/runtime/instructions/cp/FunctionCallCPInstruction.java
+++
b/src/main/java/org/apache/sysds/runtime/instructions/cp/FunctionCallCPInstruction.java
@@ -40,6 +40,7 @@ import org.apache.sysds.runtime.io.IOUtilFunctions;
import org.apache.sysds.runtime.lineage.Lineage;
import org.apache.sysds.runtime.lineage.LineageCache;
import org.apache.sysds.runtime.lineage.LineageCacheConfig;
+import org.apache.sysds.runtime.lineage.LineageCacheConfig.ReuseCacheType;
import org.apache.sysds.runtime.lineage.LineageCacheStatistics;
import org.apache.sysds.runtime.lineage.LineageItem;
import org.apache.sysds.runtime.lineage.LineageItemUtils;
@@ -172,6 +173,7 @@ public class FunctionCallCPInstruction extends
CPInstruction {
fn_ec.setVariables(functionVariables);
fn_ec.setLineage(lineage);
// execute the function block
+ long t0 = !ReuseCacheType.isNone() ? System.nanoTime() : 0;
try {
fpb._functionName = this._functionName;
fpb._namespace = this._namespace;
@@ -184,6 +186,7 @@ public class FunctionCallCPInstruction extends
CPInstruction {
String fname =
DMLProgram.constructFunctionKey(_namespace, _functionName);
throw new DMLRuntimeException("error executing function
" + fname, e);
}
+ long t1 = !ReuseCacheType.isNone() ? System.nanoTime() : 0;
// cleanup all returned variables w/o binding
HashSet<String> expectRetVars = new HashSet<>();
@@ -226,8 +229,8 @@ public class FunctionCallCPInstruction extends
CPInstruction {
//update lineage cache with the functions outputs
if( DMLScript.LINEAGE && LineageCacheConfig.isMultiLevelReuse()
) {
- LineageCache.putValue(fpb.getOutputParams(),
- liInputs, getCacheFunctionName(_functionName,
fpb), ec);
+ LineageCache.putValue(fpb.getOutputParams(), liInputs,
+ getCacheFunctionName(_functionName,
fpb), ec, t1-t0);
}
}
diff --git a/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
b/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
index 0d93699..5e945d6 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
@@ -23,7 +23,6 @@ import org.apache.sysds.api.DMLScript;
import org.apache.sysds.common.Types.DataType;
import org.apache.sysds.common.Types.ValueType;
import org.apache.sysds.hops.OptimizerUtils;
-import org.apache.sysds.hops.cost.CostEstimatorStaticRuntime;
import org.apache.sysds.lops.MMTSJ.MMTSJType;
import org.apache.sysds.parser.DataIdentifier;
import org.apache.sysds.parser.Statement;
@@ -39,6 +38,7 @@ import org.apache.sysds.runtime.instructions.cp.Data;
import org.apache.sysds.runtime.instructions.cp.MMTSJCPInstruction;
import
org.apache.sysds.runtime.instructions.cp.ParameterizedBuiltinCPInstruction;
import org.apache.sysds.runtime.instructions.cp.ScalarObject;
+import org.apache.sysds.runtime.lineage.LineageCacheConfig.LineageCacheStatus;
import org.apache.sysds.runtime.lineage.LineageCacheConfig.ReuseCacheType;
import org.apache.sysds.runtime.matrix.data.InputInfo;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
@@ -60,7 +60,8 @@ public class LineageCache
private static final HashSet<LineageItem> _removelist = new HashSet<>();
private static final double CACHE_FRAC = 0.05; // 5% of JVM heap size
private static final long CACHE_LIMIT; //limit in bytes
- private static String outdir = null;
+ private static final boolean DEBUG = false;
+ private static String _outdir = null;
private static long _cachesize = 0;
private static Entry _head = null;
private static Entry _end = null;
@@ -79,9 +80,7 @@ public class LineageCache
// a complex workflow of operations that accesses the cache as well.
- ///////////////////////////////////////
- // Public Cache API (keep it narrow) //
- ///////////////////////////////////////
+ //--------------- PUBLIC CACHE API (keep it narrow) ----------------//
public static boolean reuse(Instruction inst, ExecutionContext ec) {
if (ReuseCacheType.isNone())
@@ -115,7 +114,7 @@ public class LineageCache
}
}
- if( reuse ) { //reuse
+ if(reuse) { //reuse
//put reuse value into symbol table (w/
blocking on placeholders)
if (e.isMatrixValue())
ec.setMatrixOutput(cinst.output.getName(), e.getMBValue());
@@ -130,7 +129,8 @@ public class LineageCache
return reuse;
}
- public static boolean reuse(List<String> outNames, List<DataIdentifier>
outParams, int numOutputs, LineageItem[] liInputs, String name,
ExecutionContext ec)
+ public static boolean reuse(List<String> outNames, List<DataIdentifier>
outParams,
+ int numOutputs, LineageItem[] liInputs, String name,
ExecutionContext ec)
{
if( !LineageCacheConfig.isMultiLevelReuse())
return false;
@@ -142,7 +142,7 @@ public class LineageCache
String opcode = name + String.valueOf(i+1);
LineageItem li = new LineageItem(outNames.get(i),
opcode, liInputs);
Entry e = null;
- synchronized( _cache ) {
+ synchronized(_cache) {
if (LineageCache.probe(li)) {
e = LineageCache.getIntern(li);
}
@@ -154,7 +154,7 @@ public class LineageCache
}
//TODO: handling of recursive calls
- if ( e != null ) {
+ if (e != null) {
String boundVarName = outNames.get(i);
Data boundValue = null;
//convert to matrix object
@@ -216,43 +216,42 @@ public class LineageCache
//NOTE: safe to pin the object in memory as coming from CPInstruction
//TODO why do we need both of these public put methods
- public static void putMatrix(Instruction inst, ExecutionContext ec) {
+ public static void putMatrix(Instruction inst, ExecutionContext ec,
long computetime) {
if (LineageCacheConfig.isReusable(inst, ec) ) {
LineageItem item = ((LineageTraceable)
inst).getLineageItems(ec)[0];
//This method is called only to put matrix value
MatrixObject mo =
ec.getMatrixObject(((ComputationCPInstruction) inst).output);
synchronized( _cache ) {
- putIntern(item, DataType.MATRIX,
mo.acquireReadAndRelease(),
- null, getRecomputeEstimate(inst, ec));
+ putIntern(item, DataType.MATRIX,
mo.acquireReadAndRelease(), null, computetime);
}
}
}
- public static void putValue(Instruction inst, ExecutionContext ec) {
+ public static void putValue(Instruction inst, ExecutionContext ec, long
computetime) {
if (ReuseCacheType.isNone())
return;
if (LineageCacheConfig.isReusable(inst, ec) ) {
//if (!isMarkedForCaching(inst, ec)) return;
LineageItem item = ((LineageTraceable)
inst).getLineageItems(ec)[0];
Data data = ec.getVariable(((ComputationCPInstruction)
inst).output);
- double cest = getRecomputeEstimate(inst, ec);
synchronized( _cache ) {
- if( data instanceof MatrixObject )
-
_cache.get(item).setValue(((MatrixObject)data).acquireReadAndRelease(), cest);
+ if (data instanceof MatrixObject)
+
_cache.get(item).setValue(((MatrixObject)data).acquireReadAndRelease(),
computetime);
else
-
_cache.get(item).setValue((ScalarObject)data, cest);
+
_cache.get(item).setValue((ScalarObject)data, computetime);
long size = _cache.get(item).getSize();
- if( !isBelowThreshold(size) )
+ if (!isBelowThreshold(size))
makeSpace(size);
updateSize(size, true);
}
}
}
- public static void putValue(List<DataIdentifier> outputs, LineageItem[]
liInputs, String name, ExecutionContext ec)
+ public static void putValue(List<DataIdentifier> outputs, LineageItem[]
liInputs,
+ String name, ExecutionContext ec, long
computetime)
{
- if( !LineageCacheConfig.isMultiLevelReuse() )
+ if (!LineageCacheConfig.isMultiLevelReuse())
return;
HashMap<LineageItem, LineageItem> FuncLIMap = new HashMap<>();
@@ -275,10 +274,10 @@ public class LineageCache
}
//cache either all the outputs, or none.
- synchronized( _cache ) {
+ synchronized (_cache) {
//move or remove placeholders
if(AllOutputsCacheable)
- FuncLIMap.forEach((Li, boundLI) -> mvIntern(Li,
boundLI));
+ FuncLIMap.forEach((Li, boundLI) -> mvIntern(Li,
boundLI, computetime));
else
FuncLIMap.forEach((Li, boundLI) ->
removeEntry(Li));
}
@@ -287,7 +286,7 @@ public class LineageCache
}
public static void resetCache() {
- synchronized( _cache ) {
+ synchronized (_cache) {
_cache.clear();
_spillList.clear();
_head = null;
@@ -295,22 +294,21 @@ public class LineageCache
// reset cache size, otherwise the cache clear leads to
unusable
// space which means evictions could run into endless
loops
_cachesize = 0;
+ _outdir = null;
if (DMLScript.STATISTICS)
_removelist.clear();
}
}
- /////////////////////////////////////////
- // Internal Cache Logic Implementation //
- /////////////////////////////////////////
+ //----------------- INTERNAL CACHE LOGIC IMPLEMENTATION --------------//
- private static void putIntern(LineageItem key, DataType dt, MatrixBlock
Mval, ScalarObject Sval, double compcost) {
+ private static void putIntern(LineageItem key, DataType dt, MatrixBlock
Mval, ScalarObject Sval, long computetime) {
if (_cache.containsKey(key))
//can come here if reuse_partial option is enabled
return;
// Create a new entry.
- Entry newItem = new Entry(key, dt, Mval, Sval, compcost);
+ Entry newItem = new Entry(key, dt, Mval, Sval, computetime);
// Make space by removing or spilling LRU entries.
if( Mval != null || Sval != null ) {
@@ -346,17 +344,18 @@ public class LineageCache
}
- private static void mvIntern(LineageItem item, LineageItem probeItem) {
+ private static void mvIntern(LineageItem item, LineageItem probeItem,
long computetime) {
if (ReuseCacheType.isNone())
return;
+ // Move the value from the cache entry with key probeItem to
+ // the placeholder entry with key item.
if (LineageCache.probe(probeItem)) {
Entry oe = getIntern(probeItem);
Entry e = _cache.get(item);
- //TODO: compute estimate for function
if (oe.isMatrixValue())
- e.setValue(oe.getMBValue(), 0);
+ e.setValue(oe.getMBValue(), computetime);
else
- e.setValue(oe.getSOValue(), 0);
+ e.setValue(oe.getSOValue(), computetime);
e._origItem = probeItem;
long size = oe.getSize();
@@ -390,31 +389,70 @@ public class LineageCache
private static void makeSpace(long spaceNeeded) {
// cost based eviction
- while ((spaceNeeded +_cachesize) > CACHE_LIMIT)
+ Entry e = _end;
+ while (e != _head)
{
- if (_cache.get(_end._key).isNullVal()) {
- //Must be a null function/SB placeholder entry.
This
- //function is currently being executed. Skip
and continue.
- setEnd2Head(_end);
+ if ((spaceNeeded + _cachesize) <= CACHE_LIMIT)
+ // Enough space recovered.
+ break;
+
+ if (!LineageCacheConfig.isSetSpill()) {
+ // If eviction is disabled, just delete the
entries.
+ removeEntry(e);
+ e = e._prev;
continue;
}
-
- if (_cache.get(_end._key).isMatrixValue()) { //spill
matrix blocks only
- if (_cache.get(_end._key)._compEst >
getDiskSpillEstimate()
- &&
LineageCacheConfig.isSetSpill())
- spillToLocalFS(); // If re-computation
is more expensive, spill data to disk.
+
+ if (!e.getCacheStatus().canEvict()) {
+ // Don't delete if the entry's cache status
doesn't allow.
+ e = e._prev;
+ continue;
}
- if (_cache.get(_end._key)._compEst == 0) {
- //Must be a function/SB/scalar entry. Move to
next.
- //FIXME: Remove this logic after implementing
new eviction logic.
- setEnd2Head(_end);
+ double exectime = ((double) e._computeTime) / 1000000;
// in milliseconds
+
+ if (!e.isMatrixValue()) {
+ // Skip scalar entries with higher computation
time, as
+ // those could be function/statementblock
outputs.
+ if (exectime <
LineageCacheConfig.MIN_SPILL_TIME_ESTIMATE)
+ removeEntry(e);
+ e = e._prev;
continue;
}
- removeLastEntry();
+
+ // Estimate time to write to FS + read from FS.
+ double spilltime = getDiskSpillEstimate(e) * 1000; //
in milliseconds
+
+ if (DEBUG) {
+ if (exectime >
LineageCacheConfig.MIN_SPILL_TIME_ESTIMATE) {
+ System.out.print("LI " +
e._key.getOpcode());
+ System.out.print(" exec time " +
((double) e._computeTime) / 1000000);
+ System.out.print(" estimate time " +
getDiskSpillEstimate(e) * 1000);
+ System.out.print(" dim " +
e.getMBValue().getNumRows() + " " + e.getMBValue().getNumColumns());
+ System.out.println(" size " +
getDiskSizeEstimate(e));
+ }
+ }
+
+ if (LineageCacheConfig.isSetSpill()) {
+ if (spilltime <
LineageCacheConfig.MIN_SPILL_TIME_ESTIMATE) {
+ // Can't trust the estimate if less
than 100ms.
+ // Spill if it takes longer to
recompute.
+ if (exectime >=
LineageCacheConfig.MIN_SPILL_TIME_ESTIMATE)
+ spillToLocalFS(e);
+ }
+ else {
+ // Spill if it takes longer to
recompute than spilling.
+ if (exectime > spilltime)
+ spillToLocalFS(e);
+ }
+ }
+
+ // Remove the entry from cache.
+ removeEntry(e);
+ e = e._prev;
}
}
-
+
private static void updateSize(long space, boolean addspace) {
if (addspace)
_cachesize += space;
@@ -424,21 +462,74 @@ public class LineageCache
//---------------- COSTING RELATED METHODS -----------------
- private static double getDiskSpillEstimate() {
+ private static double getDiskSpillEstimate(Entry e) {
+ if (!e.isMatrixValue() || e.isNullVal())
+ return 0;
// This includes sum of writing to and reading from disk
long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
- MatrixBlock mb = _cache.get(_end._key).getMBValue();
+ double size = getDiskSizeEstimate(e);
+ double loadtime = isSparse(e) ?
size/LineageCacheConfig.FSREAD_SPARSE : size/LineageCacheConfig.FSREAD_DENSE;
+ double writetime = isSparse(e) ?
size/LineageCacheConfig.FSWRITE_SPARSE : size/LineageCacheConfig.FSWRITE_DENSE;
+
+ //double loadtime = CostEstimatorStaticRuntime.getFSReadTime(r,
c, s);
+ //double writetime =
CostEstimatorStaticRuntime.getFSWriteTime(r, c, s);
+ if (DMLScript.STATISTICS)
+
LineageCacheStatistics.incrementCostingTime(System.nanoTime() - t0);
+ return loadtime + writetime;
+ }
+
+ private static double getDiskSizeEstimate(Entry e) {
+ if (!e.isMatrixValue() || e.isNullVal())
+ return 0;
+ MatrixBlock mb = e.getMBValue();
long r = mb.getNumRows();
long c = mb.getNumColumns();
long nnz = mb.getNonZeros();
double s = OptimizerUtils.getSparsity(r, c, nnz);
- double loadtime = CostEstimatorStaticRuntime.getFSReadTime(r,
c, s);
- double writetime = CostEstimatorStaticRuntime.getFSWriteTime(r,
c, s);
+ double disksize = ((double)MatrixBlock.estimateSizeOnDisk(r, c,
(long)(s*r*c))) / (1024*1024);
+ return disksize;
+ }
+
+ private static void adjustReadWriteSpeed(Entry e, double IOtime,
boolean read) {
+ double size = getDiskSizeEstimate(e);
+ if (!e.isMatrixValue() || size <
LineageCacheConfig.MIN_SPILL_DATA)
+ // Scalar or too small
+ return;
+
+ long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
+ double newIOSpeed = size / IOtime; // MB per second
+ // Adjust the read/write speed taking into account the last
read/write.
+ // These constants will eventually converge to the real speed.
+ if (read) {
+ if (isSparse(e))
+ LineageCacheConfig.FSREAD_SPARSE =
(LineageCacheConfig.FSREAD_SPARSE + newIOSpeed) / 2;
+ else
+ LineageCacheConfig.FSREAD_DENSE=
(LineageCacheConfig.FSREAD_DENSE+ newIOSpeed) / 2;
+ }
+ else {
+ if (isSparse(e))
+ LineageCacheConfig.FSWRITE_SPARSE =
(LineageCacheConfig.FSWRITE_SPARSE + newIOSpeed) / 2;
+ else
+ LineageCacheConfig.FSWRITE_DENSE=
(LineageCacheConfig.FSWRITE_DENSE+ newIOSpeed) / 2;
+ }
if (DMLScript.STATISTICS)
LineageCacheStatistics.incrementCostingTime(System.nanoTime() - t0);
- return loadtime+writetime;
}
+ private static boolean isSparse(Entry e) {
+ if (!e.isMatrixValue() || e.isNullVal())
+ return false;
+ MatrixBlock mb = e.getMBValue();
+ long r = mb.getNumRows();
+ long c = mb.getNumColumns();
+ long nnz = mb.getNonZeros();
+ double s = OptimizerUtils.getSparsity(r, c, nnz);
+ boolean sparse = MatrixBlock.evalSparseFormatOnDisk(r, c,
(long)(s*r*c));
+ return sparse;
+ }
+
+ @Deprecated
+ @SuppressWarnings("unused")
private static double getRecomputeEstimate(Instruction inst,
ExecutionContext ec) {
if (!((ComputationCPInstruction)inst).output.isMatrix()
|| (((ComputationCPInstruction)inst).input1 != null &&
!((ComputationCPInstruction)inst).input1.isMatrix()))
@@ -586,29 +677,37 @@ public class LineageCache
// ---------------- I/O METHODS TO LOCAL FS -----------------
- private static void spillToLocalFS() {
- long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
- if (outdir == null) {
- outdir =
LocalFileUtils.getUniqueWorkingDir(LocalFileUtils.CATEGORY_LINEAGE);
- LocalFileUtils.createLocalFileIfNotExist(outdir);
+ private static void spillToLocalFS(Entry entry) {
+ if (!entry.isMatrixValue())
+ throw new DMLRuntimeException ("Spilling scalar objects
to disk is not allowd. Key: "+entry._key);
+ if (entry.isNullVal())
+ throw new DMLRuntimeException ("Cannot spill null value
to disk. Key: "+entry._key);
+
+ long t0 = System.nanoTime();
+ if (_outdir == null) {
+ _outdir =
LocalFileUtils.getUniqueWorkingDir(LocalFileUtils.CATEGORY_LINEAGE);
+ LocalFileUtils.createLocalFileIfNotExist(_outdir);
}
- String outfile = outdir+"/"+_cache.get(_end._key)._key.getId();
+ String outfile = _outdir+"/"+entry._key.getId();
try {
- LocalFileUtils.writeMatrixBlockToLocal(outfile,
_cache.get(_end._key).getMBValue());
+ LocalFileUtils.writeMatrixBlockToLocal(outfile,
entry.getMBValue());
} catch (IOException e) {
throw new DMLRuntimeException ("Write to " + outfile +
" failed.", e);
}
+ long t1 = System.nanoTime();
+ // Adjust disk writing speed
+ adjustReadWriteSpeed(entry, ((double)(t1-t0))/1000000000,
false);
+
if (DMLScript.STATISTICS) {
- long t1 = System.nanoTime();
LineageCacheStatistics.incrementFSWriteTime(t1-t0);
LineageCacheStatistics.incrementFSWrites();
}
- _spillList.put(_end._key, new SpilledItem(outfile,
_end._compEst));
+ _spillList.put(entry._key, new SpilledItem(outfile,
entry._computeTime));
}
private static Entry readFromLocalFS(LineageItem key) {
- long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
+ long t0 = System.nanoTime();
MatrixBlock mb = null;
// Read from local FS
try {
@@ -618,27 +717,20 @@ public class LineageCache
}
// Restore to cache
LocalFileUtils.deleteFileIfExists(_spillList.get(key)._outfile,
true);
- putIntern(key, DataType.MATRIX, mb, null,
_spillList.get(key)._compEst);
+ long t1 = System.nanoTime();
+ putIntern(key, DataType.MATRIX, mb, null,
_spillList.get(key)._computeTime);
+ // Adjust disk reading speed
+ adjustReadWriteSpeed(_cache.get(key),
((double)(t1-t0))/1000000000, true);
+ //TODO: set cache status as RELOADED for this entry
_spillList.remove(key);
if (DMLScript.STATISTICS) {
- long t1 = System.nanoTime();
LineageCacheStatistics.incrementFSReadTime(t1-t0);
LineageCacheStatistics.incrementFSHits();
}
return _cache.get(key);
}
- ////////////////////////////////////////////
- // Cache Maintenance and Lookup Functions //
- ////////////////////////////////////////////
-
- private static void removeLastEntry() {
- if (DMLScript.STATISTICS)
- _removelist.add(_end._key);
- Entry e = _cache.remove(_end._key);
- _cachesize -= e.getSize();
- delete(_end);
- }
+ //--------------- CACHE MAINTENANCE & LOOKUP FUNCTIONS ---------//
private static void removeEntry(LineageItem key) {
// Remove the entry for key
@@ -647,12 +739,21 @@ public class LineageCache
delete(_cache.get(key));
_cache.remove(key);
}
-
- private static void setEnd2Head(Entry entry) {
- delete(entry);
- setHead(entry);
+
+ private static void removeEntry(Entry e) {
+ if (_cache.remove(e._key) == null)
+ // Entry not found in cache
+ return;
+
+ if (DMLScript.STATISTICS)
+ _removelist.add(e._key);
+
+ _cachesize -= e.getSize();
+ delete(e);
+ if (DMLScript.STATISTICS)
+ LineageCacheStatistics.incrementMemDeletes();
}
-
+
private static void delete(Entry entry) {
if (entry._prev != null)
entry._prev._next = entry._next;
@@ -674,26 +775,26 @@ public class LineageCache
_end = _head;
}
- ////////////////////////////////////
- // Internal Cache Data Structures //
- ////////////////////////////////////
+ //---------------- INTERNAL CACHE DATA STRUCTURES ----------------//
private static class Entry {
private final LineageItem _key;
private final DataType _dt;
private MatrixBlock _MBval;
private ScalarObject _SOval;
- double _compEst;
+ private long _computeTime;
+ private LineageCacheStatus _status;
private Entry _prev;
private Entry _next;
private LineageItem _origItem;
- public Entry(LineageItem key, DataType dt, MatrixBlock Mval,
ScalarObject Sval, double computecost) {
+ public Entry(LineageItem key, DataType dt, MatrixBlock Mval,
ScalarObject Sval, long computetime) {
_key = key;
_dt = dt;
_MBval = Mval;
_SOval = Sval;
- _compEst = computecost;
+ _computeTime = computetime;
+ _status = isNullVal() ? LineageCacheStatus.EMPTY :
LineageCacheStatus.CACHED;
_origItem = null;
}
@@ -725,6 +826,10 @@ public class LineageCache
}
}
+ public synchronized LineageCacheStatus getCacheStatus() {
+ return _status;
+ }
+
public synchronized long getSize() {
return ((_MBval != null ? _MBval.getInMemorySize() : 0)
+ (_SOval != null ? _SOval.getSize() : 0));
}
@@ -737,16 +842,18 @@ public class LineageCache
return _dt.isMatrix();
}
- public synchronized void setValue(MatrixBlock val, double
compEst) {
+ public synchronized void setValue(MatrixBlock val, long
computetime) {
_MBval = val;
- _compEst = compEst;
+ _computeTime = computetime;
+ _status = isNullVal() ? LineageCacheStatus.EMPTY :
LineageCacheStatus.CACHED;
//resume all threads waiting for val
notifyAll();
}
- public synchronized void setValue(ScalarObject val, double
compEst) {
+ public synchronized void setValue(ScalarObject val, long
computetime) {
_SOval = val;
- _compEst = compEst;
+ _computeTime = computetime;
+ _status = isNullVal() ? LineageCacheStatus.EMPTY :
LineageCacheStatus.CACHED;
//resume all threads waiting for val
notifyAll();
}
@@ -754,11 +861,11 @@ public class LineageCache
private static class SpilledItem {
String _outfile;
- double _compEst;
+ long _computeTime;
- public SpilledItem(String outfile, double computecost) {
+ public SpilledItem(String outfile, long computetime) {
_outfile = outfile;
- _compEst = computecost;
+ _computeTime = computetime;
}
}
}
diff --git
a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheConfig.java
b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheConfig.java
index e130cfa..efe35f7 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheConfig.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheConfig.java
@@ -67,8 +67,29 @@ public class LineageCacheConfig {
ALL
}
+ public enum LineageCacheStatus {
+ EMPTY, //Placeholder with no data. Cannot be evicted.
+ CACHED, //General cached data. Can be evicted.
+ EVICTED, //Data is in disk. Empty value. Cannot be
evicted.
+ RELOADED, //Reloaded from disk. Can be evicted.
+ PINNED; //Pinned to memory. Cannot be evicted.
+ public boolean canEvict() {
+ return this == CACHED || this == RELOADED;
+ }
+ }
+
public ArrayList<String> _MMult = new ArrayList<>();
public static boolean _allowSpill = true;
+ // Minimum reliable spilling estimate in milliseconds.
+ public static final double MIN_SPILL_TIME_ESTIMATE = 100;
+ // Minimum reliable data size for spilling estimate in MB.
+ public static final double MIN_SPILL_DATA = 20;
+
+ // Default I/O in MB per second for binary blocks
+ public static double FSREAD_DENSE = 200;
+ public static double FSREAD_SPARSE = 100;
+ public static double FSWRITE_DENSE = 150;
+ public static double FSWRITE_SPARSE = 75;
private static ReuseCacheType _cacheType = null;
private static CachedItemHead _itemH = null;
@@ -76,7 +97,7 @@ public class LineageCacheConfig {
private static boolean _compilerAssistedRW = true;
static {
//setup static configuration parameters
- setSpill(false); //disable spilling of cache entries to disk
+ setSpill(true); //enable/disable disk spilling.
}
public static boolean isReusable (Instruction inst, ExecutionContext
ec) {
diff --git
a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheStatistics.java
b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheStatistics.java
index 9704797..7ab7490 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheStatistics.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheStatistics.java
@@ -34,6 +34,7 @@ public class LineageCacheStatistics {
private static final LongAdder _numHitsFunc = new LongAdder();
private static final LongAdder _numWritesMem = new LongAdder();
private static final LongAdder _numWritesFS = new LongAdder();
+ private static final LongAdder _numMemDel = new LongAdder();
private static final LongAdder _numRewrites = new LongAdder();
private static final LongAdder _ctimeFSRead = new LongAdder(); //in
nano sec
private static final LongAdder _ctimeFSWrite = new LongAdder(); //in
nano sec
@@ -50,6 +51,7 @@ public class LineageCacheStatistics {
_numHitsFunc.reset();
_numWritesMem.reset();
_numWritesFS.reset();
+ _numMemDel.reset();
_numRewrites.reset();
_ctimeFSRead.reset();
_ctimeFSWrite.reset();
@@ -102,6 +104,12 @@ public class LineageCacheStatistics {
// Number of times written in local FS.
_numWritesFS.increment();
}
+
+ public static void incrementMemDeletes() {
+ // Number of deletions from cache (including spilling).
+ _numMemDel.increment();
+ }
+
public static void incrementFSReadTime(long delta) {
// Total time spent on reading from FS.
@@ -161,6 +169,8 @@ public class LineageCacheStatistics {
sb.append(_numWritesMem.longValue());
sb.append("/");
sb.append(_numWritesFS.longValue());
+ sb.append("/");
+ sb.append(_numMemDel.longValue());
return sb.toString();
}
diff --git
a/src/main/java/org/apache/sysds/runtime/lineage/LineageRewriteReuse.java
b/src/main/java/org/apache/sysds/runtime/lineage/LineageRewriteReuse.java
index f48c869..4be8aef 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageRewriteReuse.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageRewriteReuse.java
@@ -95,11 +95,15 @@ public class LineageRewriteReuse
return false;
//execute instructions & write the o/p to symbol table
+ long t0 = System.nanoTime();
executeInst(newInst, lrwec);
+ long t1 = System.nanoTime();
ec.setVariable(((ComputationCPInstruction)curr).output.getName(),
lrwec.getVariable(LR_VAR));
//put the result into the cache
- LineageCache.putMatrix(curr, ec);
+ LineageCache.putMatrix(curr, ec, t1-t0);
+ if (DMLScript.STATISTICS)
+ LineageCacheStatistics.incrementPRwExecTime(t1-t0);
DMLScript.EXPLAIN = et; //TODO can't change this here
//cleanup execution context
@@ -755,7 +759,6 @@ public class LineageRewriteReuse
DMLScript.EXPLAIN = ExplainType.NONE;
try {
- long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
//execute instructions
BasicProgramBlock pb = getProgramBlock();
pb.setInstructions(newInst);
@@ -763,8 +766,6 @@ public class LineageRewriteReuse
LineageCacheConfig.shutdownReuse();
pb.execute(lrwec);
LineageCacheConfig.restartReuse(oldReuseOption);
- if (DMLScript.STATISTICS)
-
LineageCacheStatistics.incrementPRwExecTime(System.nanoTime()-t0);
}
catch (Exception e) {
throw new DMLRuntimeException("Error executing lineage
rewrites" , e);
diff --git a/src/main/java/org/apache/sysds/utils/Statistics.java
b/src/main/java/org/apache/sysds/utils/Statistics.java
index 4c3cbef..eb94f83 100644
--- a/src/main/java/org/apache/sysds/utils/Statistics.java
+++ b/src/main/java/org/apache/sysds/utils/Statistics.java
@@ -946,7 +946,7 @@ public class Statistics
if (DMLScript.LINEAGE && !ReuseCacheType.isNone()) {
sb.append("LinCache hits (Mem/FS/Del): \t" +
LineageCacheStatistics.displayHits() + ".\n");
sb.append("LinCache MultiLevel (Ins/SB/Fn):" +
LineageCacheStatistics.displayMultiLevelHits() + ".\n");
- sb.append("LinCache writes (Mem/FS): \t" +
LineageCacheStatistics.displayWtrites() + ".\n");
+ sb.append("LinCache writes (Mem/FS/Del): \t" +
LineageCacheStatistics.displayWtrites() + ".\n");
sb.append("LinCache FStimes (Rd/Wr): \t" +
LineageCacheStatistics.displayTime() + " sec.\n");
sb.append("LinCache costing time: \t" +
LineageCacheStatistics.displayCostingTime() + " sec.\n");
sb.append("LinCache Rewrites: \t\t" +
LineageCacheStatistics.displayRewrites() + ".\n");
diff --git a/src/test/scripts/functions/lineage/.FunctionFullReuse5.dml.swp
b/src/test/scripts/functions/lineage/.FunctionFullReuse5.dml.swp
new file mode 100644
index 0000000..68c260c
Binary files /dev/null and
b/src/test/scripts/functions/lineage/.FunctionFullReuse5.dml.swp differ