This is an automated email from the ASF dual-hosted git repository.
arnabp20 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/main by this push:
new 34444e8 [SYSTEMDS-3185] Lineage-based reuse of federated reads
34444e8 is described below
commit 34444e88ac3163c1eb72a2012d1426378fd67817
Author: ywcb00 <[email protected]>
AuthorDate: Mon Feb 7 11:36:47 2022 +0100
[SYSTEMDS-3185] Lineage-based reuse of federated reads
This patch adds lineage-based reuse of federated reads on the workers.
We fall back to the read cache if lineage-based reuse is globally disabled.
Closes #1522
Closes #1540
---
.../federated/FederatedReadCache.java | 4 +-
.../controlprogram/federated/FederatedRequest.java | 3 +
.../federated/FederatedStatistics.java | 59 ++++----
.../controlprogram/federated/FederatedWorker.java | 6 +
.../federated/FederatedWorkerHandler.java | 148 +++++++++++++--------
.../apache/sysds/runtime/lineage/LineageCache.java | 77 ++++++++++-
.../java/org/apache/sysds/utils/Statistics.java | 2 +-
.../org/apache/sysds/test/AutomatedTestBase.java | 13 +-
.../multitenant/FederatedMultiTenantTest.java | 1 -
...dCacheTest.java => FederatedReuseReadTest.java} | 47 +++++--
.../federated/multitenant/MultiTenantTestBase.java | 8 +-
...eadCacheTest.dml => FederatedReuseReadTest.dml} | 0
12 files changed, 255 insertions(+), 113 deletions(-)
diff --git
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedReadCache.java
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedReadCache.java
index a7180d7..b145b34 100644
---
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedReadCache.java
+++
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedReadCache.java
@@ -98,8 +98,8 @@ public class FederatedReadCache {
}
if(DMLScript.STATISTICS) {
- FederatedStatistics.incFedReadCacheHitCount();
-
FederatedStatistics.incFedReadCacheBytesCount(_data);
+ FederatedStatistics.incFedReuseReadHitCount();
+
FederatedStatistics.incFedReuseReadBytesCount(_data);
}
//comes here if data is placed or the entry is removed
by the running thread
diff --git
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedRequest.java
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedRequest.java
index 9e38527..6e9b388 100644
---
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedRequest.java
+++
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedRequest.java
@@ -150,6 +150,9 @@ public class FederatedRequest implements Serializable {
}
public long getChecksum(int i) {
+ if(_checksums == null)
+ setChecksum();
+
return _checksums.get(i);
}
diff --git
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedStatistics.java
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedStatistics.java
index 9ef0518..5597d18 100644
---
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedStatistics.java
+++
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedStatistics.java
@@ -37,8 +37,9 @@ import javax.net.ssl.SSLException;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.sysds.api.DMLScript;
-import org.apache.sysds.runtime.controlprogram.caching.CacheStatistics;
import org.apache.sysds.runtime.controlprogram.caching.CacheableData;
+import org.apache.sysds.runtime.controlprogram.caching.CacheBlock;
+import org.apache.sysds.runtime.controlprogram.caching.CacheStatistics;
import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
import
org.apache.sysds.runtime.controlprogram.federated.FederatedRequest.RequestType;
import
org.apache.sysds.runtime.controlprogram.federated.FederatedStatistics.FedStatsCollection.CacheStatsCollection;
@@ -74,8 +75,8 @@ public class FederatedStatistics {
private static final LongAdder fedLookupTableGetCount = new LongAdder();
private static final LongAdder fedLookupTableGetTime = new LongAdder();
// in milli sec
private static final LongAdder fedLookupTableEntryCount = new
LongAdder();
- private static final LongAdder fedReadCacheHitCount = new LongAdder();
- private static final LongAdder fedReadCacheBytesCount = new LongAdder();
+ private static final LongAdder fedReuseReadHitCount = new LongAdder();
+ private static final LongAdder fedReuseReadBytesCount = new LongAdder();
public static synchronized void incFederated(RequestType rqt,
List<Object> data){
switch (rqt) {
@@ -138,8 +139,8 @@ public class FederatedStatistics {
fedLookupTableGetCount.reset();
fedLookupTableGetTime.reset();
fedLookupTableEntryCount.reset();
- fedReadCacheHitCount.reset();
- fedReadCacheBytesCount.reset();
+ fedReuseReadHitCount.reset();
+ fedReuseReadBytesCount.reset();
}
public static String displayFedIOExecStatistics() {
@@ -218,7 +219,7 @@ public class FederatedStatistics {
private static String
displayMultiTenantStats(MultiTenantStatsCollection mtsc) {
StringBuilder sb = new StringBuilder();
sb.append(displayFedLookupTableStats(mtsc.fLTGetCount,
mtsc.fLTEntryCount, mtsc.fLTGetTime));
- sb.append(displayFedReadCacheStats(mtsc.readCacheHits,
mtsc.readCacheBytes));
+ sb.append(displayFedReuseReadStats(mtsc.reuseReadHits,
mtsc.reuseReadBytes));
return sb.toString();
}
@@ -340,12 +341,12 @@ public class FederatedStatistics {
return fedLookupTableEntryCount.longValue();
}
- public static long getFedReadCacheHitCount() {
- return fedReadCacheHitCount.longValue();
+ public static long getFedReuseReadHitCount() {
+ return fedReuseReadHitCount.longValue();
}
- public static long getFedReadCacheBytesCount() {
- return fedReadCacheBytesCount.longValue();
+ public static long getFedReuseReadBytesCount() {
+ return fedReuseReadBytesCount.longValue();
}
public static void incFedLookupTableGetCount() {
@@ -360,12 +361,16 @@ public class FederatedStatistics {
fedLookupTableEntryCount.increment();
}
- public static void incFedReadCacheHitCount() {
- fedReadCacheHitCount.increment();
+ public static void incFedReuseReadHitCount() {
+ fedReuseReadHitCount.increment();
+ }
+
+ public static void incFedReuseReadBytesCount(CacheableData<?> data) {
+ fedReuseReadBytesCount.add(data.getDataSize());
}
- public static void incFedReadCacheBytesCount(CacheableData<?> data) {
- fedReadCacheBytesCount.add(data.getDataSize());
+ public static void incFedReuseReadBytesCount(CacheBlock cb) {
+ fedReuseReadBytesCount.add(cb.getInMemorySize());
}
public static String displayFedLookupTableStats() {
@@ -383,16 +388,16 @@ public class FederatedStatistics {
return "";
}
- public static String displayFedReadCacheStats() {
- return
displayFedReadCacheStats(fedReadCacheHitCount.longValue(),
- fedReadCacheBytesCount.longValue());
+ public static String displayFedReuseReadStats() {
+ return
displayFedReuseReadStats(fedReuseReadHitCount.longValue(),
+ fedReuseReadBytesCount.longValue());
}
- public static String displayFedReadCacheStats(long rcHits, long
rcBytes) {
- if(rcHits > 0) {
+ public static String displayFedReuseReadStats(long rrHits, long
rrBytes) {
+ if(rrHits > 0) {
StringBuilder sb = new StringBuilder();
- sb.append("Fed ReadCache (Hits, Bytes):\t" +
- rcHits + "/" + rcBytes + ".\n");
+ sb.append("Fed ReuseRead (Hits, Bytes):\t" +
+ rrHits + "/" + rrBytes + ".\n");
return sb.toString();
}
return "";
@@ -515,23 +520,23 @@ public class FederatedStatistics {
fLTGetCount = getFedLookupTableGetCount();
fLTGetTime =
((double)getFedLookupTableGetTime()) / 1000000000; // in sec
fLTEntryCount = getFedLookupTableEntryCount();
- readCacheHits = getFedReadCacheHitCount();
- readCacheBytes = getFedReadCacheBytesCount();
+ reuseReadHits = getFedReuseReadHitCount();
+ reuseReadBytes = getFedReuseReadBytesCount();
}
private void aggregate(MultiTenantStatsCollection that)
{
fLTGetCount += that.fLTGetCount;
fLTGetTime += that.fLTGetTime;
fLTEntryCount += that.fLTEntryCount;
- readCacheHits += that.readCacheHits;
- readCacheBytes += that.readCacheBytes;
+ reuseReadHits += that.reuseReadHits;
+ reuseReadBytes += that.reuseReadBytes;
}
private long fLTGetCount = 0;
private double fLTGetTime = 0;
private long fLTEntryCount = 0;
- private long readCacheHits = 0;
- private long readCacheBytes = 0;
+ private long reuseReadHits = 0;
+ private long reuseReadBytes = 0;
}
private CacheStatsCollection cacheStats = new
CacheStatsCollection();
diff --git
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorker.java
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorker.java
index ef94dfe..bef4032 100644
---
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorker.java
+++
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorker.java
@@ -40,9 +40,11 @@ import io.netty.handler.codec.serialization.ObjectEncoder;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.SelfSignedCertificate;
+import org.apache.sysds.api.DMLScript;
import org.apache.log4j.Logger;
import org.apache.sysds.conf.ConfigurationManager;
import org.apache.sysds.conf.DMLConfig;
+import org.apache.sysds.runtime.lineage.LineageCacheConfig;
public class FederatedWorker {
protected static Logger log = Logger.getLogger(FederatedWorker.class);
@@ -57,6 +59,10 @@ public class FederatedWorker {
_frc = new FederatedReadCache();
_port = (port == -1) ? DMLConfig.DEFAULT_FEDERATED_PORT : port;
_debug = debug;
+
+ LineageCacheConfig.setConfig(DMLScript.LINEAGE_REUSE);
+ LineageCacheConfig.setCachePolicy(DMLScript.LINEAGE_POLICY);
+ LineageCacheConfig.setEstimator(DMLScript.LINEAGE_ESTIMATE);
}
public void run() throws CertificateException, SSLException {
diff --git
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
index ca2b055..d798c2d 100644
---
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
+++
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
@@ -246,74 +246,46 @@ public class FederatedWorkerHandler extends
ChannelInboundHandlerAdapter {
// early throwing of exception to avoid infinitely
waiting threads for data
throw new FederatedWorkerHandlerException("Could not
recognize datatype");
- CacheableData<?> cd = _frc.get(filename);
- if(cd == null) {
+ ExecutionContext ec = ecm.get(tid);
+ LineageItem linItem = new LineageItem(filename);
+ CacheableData<?> cd = null;
+
+ if(!LineageCache.reuseFedRead(Long.toString(id), dataType,
linItem, ec)) {
+ // Lookup read cache if reuse is disabled and we
skipped storing in the
+ // lineage cache due to other constraints
+ // FIXME: It is possible that lineage reuse is enabled
later. In that case
+ // read cache may not be empty. Hence, it may be
necessary to lookup both
+ // the caches.
+ if (ReuseCacheType.isNone() || dataType !=
DataType.MATRIX)
+ cd = _frc.get(filename);
try {
- switch(dataType) {
- case MATRIX:
- cd = new
MatrixObject(Types.ValueType.FP64, filename);
- break;
- case FRAME:
- cd = new FrameObject(filename);
- break;
- default:
- throw new
FederatedWorkerHandlerException("Could not recognize datatype");
+ if(cd == null) { // data is neither in lineage
cache nor in read cache
+ long t0 = !ReuseCacheType.isNone() ?
System.nanoTime() : 0;
+ cd = readDataNoReuse(filename,
dataType, mc); // actual read of the data
+ long t1 = !ReuseCacheType.isNone() ?
System.nanoTime() : 0;
+ if(!ReuseCacheType.isNone() && dataType
== DataType.MATRIX)
+ // put the object into the
lineage cache
+ // FIXME: As we lazily read the
actual data, this computetime
+ // only records the metadata
read. A small computetime wrongly
+ // dictates the cache eviction
logic to remove this entry early.
+
LineageCache.putFedReadObject(cd, linItem, ec, t1 - t0);
+ else
+ _frc.setData(filename, cd); //
set the data into the read cache entry
}
+ ec.setVariable(String.valueOf(id), cd);
- FileFormat fmt = null;
- boolean header = false;
- String delim = null;
- FileSystem fs = null;
- MetaDataAll mtd;
-
- try {
- final String mtdName =
DataExpression.getMTDFileName(filename);
- Path path = new Path(mtdName);
- fs =
IOUtilFunctions.getFileSystem(mtdName);
- try(BufferedReader br = new
BufferedReader(new InputStreamReader(fs.open(path)))) {
- mtd = new MetaDataAll(br);
- if(!mtd.mtdExists())
- throw new
FederatedWorkerHandlerException("Could not parse metadata file");
- mc.setRows(mtd.getDim1());
- mc.setCols(mtd.getDim2());
- mc.setNonZeros(mtd.getNnz());
- header = mtd.getHasHeader();
- cd =
mtd.parseAndSetPrivacyConstraint(cd);
- fmt = mtd.getFileFormat();
- delim = mtd.getDelim();
- }
- }
- catch(DMLPrivacyException |
FederatedWorkerHandlerException ex) {
- throw ex;
- }
- catch(Exception ex) {
- String msg = "Exception of type " +
ex.getClass() + " thrown when processing READ request";
- LOG.error(msg, ex);
- throw new DMLRuntimeException(msg);
- }
- finally {
- IOUtilFunctions.closeSilently(fs);
- }
-
- // put meta data object in symbol table, read
on first operation
- cd.setMetaData(new MetaDataFormat(mc, fmt));
- if(fmt == FileFormat.CSV)
- cd.setFileFormatProperties(new
FileFormatPropertiesCSV(header, delim,
-
DataExpression.DEFAULT_DELIM_SPARSE));
- cd.enableCleanup(false); // guard against
deletion
-
- _frc.setData(filename, cd);
} catch(Exception ex) {
- _frc.setInvalid(filename);
+ if(!ReuseCacheType.isNone() && dataType ==
DataType.MATRIX)
+ LineageCache.putFedReadObject(null,
linItem, ec, 0); // removing the placeholder
+ else
+ _frc.setInvalid(filename);
throw ex;
}
}
- ecm.get(tid).setVariable(String.valueOf(id), cd);
-
if(DMLScript.LINEAGE)
// create a literal type lineage item with the file name
- ecm.get(tid).getLineage().set(String.valueOf(id), new
LineageItem(filename));
+ ec.getLineage().set(String.valueOf(id), linItem);
if(dataType == Types.DataType.FRAME) {
FrameObject frameObject = (FrameObject) cd;
@@ -325,6 +297,66 @@ public class FederatedWorkerHandler extends
ChannelInboundHandlerAdapter {
return new FederatedResponse(ResponseType.SUCCESS, new Object[]
{id, mc});
}
+ private CacheableData<?> readDataNoReuse(String filename, DataType
dataType,
+ MatrixCharacteristics mc) {
+ CacheableData<?> cd = null;
+
+ switch(dataType) {
+ case MATRIX:
+ cd = new MatrixObject(Types.ValueType.FP64,
filename);
+ break;
+ case FRAME:
+ cd = new FrameObject(filename);
+ break;
+ default:
+ throw new
FederatedWorkerHandlerException("Could not recognize datatype");
+ }
+
+ FileFormat fmt = null;
+ boolean header = false;
+ String delim = null;
+ FileSystem fs = null;
+ MetaDataAll mtd;
+
+ try {
+ final String mtdName =
DataExpression.getMTDFileName(filename);
+ Path path = new Path(mtdName);
+ fs = IOUtilFunctions.getFileSystem(mtdName);
+ try(BufferedReader br = new BufferedReader(new
InputStreamReader(fs.open(path)))) {
+ mtd = new MetaDataAll(br);
+ if(!mtd.mtdExists())
+ throw new
FederatedWorkerHandlerException("Could not parse metadata file");
+ mc.setRows(mtd.getDim1());
+ mc.setCols(mtd.getDim2());
+ mc.setNonZeros(mtd.getNnz());
+ header = mtd.getHasHeader();
+ cd = mtd.parseAndSetPrivacyConstraint(cd);
+ fmt = mtd.getFileFormat();
+ delim = mtd.getDelim();
+ }
+ }
+ catch(DMLPrivacyException | FederatedWorkerHandlerException ex)
{
+ throw ex;
+ }
+ catch(Exception ex) {
+ String msg = "Exception of type " + ex.getClass() + "
thrown when processing READ request";
+ LOG.error(msg, ex);
+ throw new DMLRuntimeException(msg);
+ }
+ finally {
+ IOUtilFunctions.closeSilently(fs);
+ }
+
+ // put meta data object in symbol table, read on first operation
+ cd.setMetaData(new MetaDataFormat(mc, fmt));
+ if(fmt == FileFormat.CSV)
+ cd.setFileFormatProperties(new
FileFormatPropertiesCSV(header, delim,
+ DataExpression.DEFAULT_DELIM_SPARSE));
+ cd.enableCleanup(false); // guard against deletion
+
+ return cd;
+ }
+
private FederatedResponse putVariable(FederatedRequest request,
ExecutionContextMap ecm) {
checkNumParams(request.getNumParams(), 1, 2);
final String varName = String.valueOf(request.getID());
diff --git a/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
b/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
index 8450fb5..472c6ff 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
@@ -33,6 +33,7 @@ import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
import org.apache.sysds.runtime.controlprogram.federated.FederatedResponse;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedStatistics;
import org.apache.sysds.runtime.controlprogram.federated.FederatedUDF;
import org.apache.sysds.runtime.instructions.CPInstructionParser;
import org.apache.sysds.runtime.instructions.Instruction;
@@ -162,14 +163,14 @@ public class LineageCache
if (mb == null &&
e.getCacheStatus() == LineageCacheStatus.NOTCACHED)
return false; //the
executing thread removed this entry from cache
else
-
ec.setMatrixOutput(outName, e.getMBValue());
+
ec.setMatrixOutput(outName, mb);
}
else if (e.isScalarValue()) {
ScalarObject so =
e.getSOValue(); //wait if another thread is executing the same inst.
if (so == null &&
e.getCacheStatus() == LineageCacheStatus.NOTCACHED)
return false; //the
executing thread removed this entry from cache
else
-
ec.setScalarOutput(outName, e.getSOValue());
+
ec.setScalarOutput(outName, so);
}
else { //TODO handle locks on gpu
objects
//shallow copy the cached
GPUObj to the output MatrixObject
@@ -364,7 +365,39 @@ public class LineageCache
}
return new
FederatedResponse(FederatedResponse.ResponseType.ERROR);
}
-
+
+ public static boolean reuseFedRead(String outName, DataType dataType,
LineageItem li, ExecutionContext ec) {
+ if (ReuseCacheType.isNone() || dataType != DataType.MATRIX)
+ return false;
+
+ LineageCacheEntry e = null;
+ synchronized(_cache) {
+ if(LineageCache.probe(li)) {
+ e = LineageCache.getIntern(li);
+ }
+ else {
+ putIntern(li, dataType, null, null, 0);
+ return false; // direct return after placing
the placeholder
+ }
+ }
+
+ if(e != null && e.isMatrixValue()) {
+ MatrixBlock mb = e.getMBValue(); // waiting if the
value is not set yet
+ if (mb == null || e.getCacheStatus() ==
LineageCacheStatus.NOTCACHED)
+ return false; // the executing thread removed
this entry from cache
+ ec.setMatrixOutput(outName, e.getMBValue());
+
+ if (DMLScript.STATISTICS) { //increment saved time
+ FederatedStatistics.incFedReuseReadHitCount();
+
FederatedStatistics.incFedReuseReadBytesCount(mb);
+
LineageCacheStatistics.incrementSavedComputeTime(e._computeTime);
+ }
+
+ return true;
+ }
+ return false;
+ }
+
public static boolean probe(LineageItem key) {
//TODO problematic as after probe the matrix might be kicked
out of cache
boolean p = _cache.containsKey(key); // in cache or in disk
@@ -542,7 +575,7 @@ public class LineageCache
LineageGPUCacheEviction.addEntry(centry);
}
}
-
+
public static void putValue(List<DataIdentifier> outputs,
LineageItem[] liInputs, String name, ExecutionContext ec, long
computetime)
{
@@ -629,7 +662,37 @@ public class LineageCache
LineageCacheEviction.addEntry(entry);
}
}
-
+
+ public static void putFedReadObject(Data data, LineageItem li,
ExecutionContext ec, long computetime) {
+ if(ReuseCacheType.isNone())
+ return;
+
+ LineageCacheEntry entry = _cache.get(li);
+ if(entry != null && data instanceof MatrixObject) {
+ MatrixBlock mb = ((MatrixObject)data).acquireRead();
+ synchronized(_cache) {
+ long size = mb != null ? mb.getInMemorySize() :
0;
+
+ //remove the placeholder if the entry is bigger
than the cache.
+ if (size >
LineageCacheEviction.getCacheLimit()) {
+ removePlaceholder(li);
+ }
+
+ //make space for the data
+ if
(!LineageCacheEviction.isBelowThreshold(size))
+ LineageCacheEviction.makeSpace(_cache,
size);
+ LineageCacheEviction.updateSize(size, true);
+
+ entry.setValue(mb, computetime);
+ }
+ }
+ else {
+ synchronized(_cache) {
+ removePlaceholder(li);
+ }
+ }
+ }
+
public static void resetCache() {
synchronized (_cache) {
_cache.clear();
@@ -658,7 +721,7 @@ public class LineageCache
long size = newItem.getSize();
if( size > LineageCacheEviction.getCacheLimit())
return; //not applicable
- if( !LineageCacheEviction.isBelowThreshold(size) )
+ if( !LineageCacheEviction.isBelowThreshold(size) )
LineageCacheEviction.makeSpace(_cache, size);
LineageCacheEviction.updateSize(size, true);
}
@@ -697,7 +760,7 @@ public class LineageCache
LineageCacheEntry e = _cache.get(item);
boolean exists = !e.isNullVal();
if (oe.isMatrixValue())
- e.setValue(oe.getMBValue(), computetime);
+ e.setValue(oe.getMBValue(), computetime);
else
e.setValue(oe.getSOValue(), computetime);
e._origItem = probeItem;
diff --git a/src/main/java/org/apache/sysds/utils/Statistics.java
b/src/main/java/org/apache/sysds/utils/Statistics.java
index 4382138..b105ffa 100644
--- a/src/main/java/org/apache/sysds/utils/Statistics.java
+++ b/src/main/java/org/apache/sysds/utils/Statistics.java
@@ -655,7 +655,7 @@ public class Statistics
sb.append(FederatedStatistics.displayFedIOExecStatistics());
sb.append(FederatedStatistics.displayFedLookupTableStats());
-
sb.append(FederatedStatistics.displayFedReadCacheStats());
+
sb.append(FederatedStatistics.displayFedReuseReadStats());
sb.append(TransformStatistics.displayStatistics());
diff --git a/src/test/java/org/apache/sysds/test/AutomatedTestBase.java
b/src/test/java/org/apache/sysds/test/AutomatedTestBase.java
index af00ec3..5944947 100644
--- a/src/test/java/org/apache/sysds/test/AutomatedTestBase.java
+++ b/src/test/java/org/apache/sysds/test/AutomatedTestBase.java
@@ -41,6 +41,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -1540,6 +1541,11 @@ public abstract class AutomatedTestBase {
}
}
+ @Deprecated
+ protected Process startLocalFedWorker(int port) {
+ return startLocalFedWorker(port, null);
+ }
+
/**
* Start new JVM for a federated worker at the port.
*
@@ -1548,13 +1554,14 @@ public abstract class AutomatedTestBase {
* @return the process associated with the worker.
*/
@Deprecated
- protected Process startLocalFedWorker(int port) {
+ protected Process startLocalFedWorker(int port, String[] addArgs) {
Process process = null;
String separator = System.getProperty("file.separator");
String classpath = System.getProperty("java.class.path");
String path = System.getProperty("java.home") + separator +
"bin" + separator + "java";
- ProcessBuilder processBuilder = new ProcessBuilder(path, "-cp",
classpath, DMLScript.class.getName(), "-w",
- Integer.toString(port), "-stats");
+ String[] args = ArrayUtils.addAll(new String[]{path, "-cp",
classpath, DMLScript.class.getName(),
+ "-w", Integer.toString(port), "-stats"}, addArgs);
+ ProcessBuilder processBuilder = new ProcessBuilder(args);
try {
process = processBuilder.start();
diff --git
a/src/test/java/org/apache/sysds/test/functions/federated/multitenant/FederatedMultiTenantTest.java
b/src/test/java/org/apache/sysds/test/functions/federated/multitenant/FederatedMultiTenantTest.java
index ca0d1ce..d03b9fd 100644
---
a/src/test/java/org/apache/sysds/test/functions/federated/multitenant/FederatedMultiTenantTest.java
+++
b/src/test/java/org/apache/sysds/test/functions/federated/multitenant/FederatedMultiTenantTest.java
@@ -282,7 +282,6 @@ public class FederatedMultiTenantTest extends
MultiTenantTestBase {
// wait for the coordinator processes to end and verify the
results
String coordinatorOutput = waitForCoordinators();
- System.out.println(coordinatorOutput);
verifyResults(opType, coordinatorOutput, execMode);
// check that federated input files are still existing
diff --git
a/src/test/java/org/apache/sysds/test/functions/federated/multitenant/FederatedReadCacheTest.java
b/src/test/java/org/apache/sysds/test/functions/federated/multitenant/FederatedReuseReadTest.java
similarity index 82%
rename from
src/test/java/org/apache/sysds/test/functions/federated/multitenant/FederatedReadCacheTest.java
rename to
src/test/java/org/apache/sysds/test/functions/federated/multitenant/FederatedReuseReadTest.java
index 67e4c1a..047a98f 100644
---
a/src/test/java/org/apache/sysds/test/functions/federated/multitenant/FederatedReadCacheTest.java
+++
b/src/test/java/org/apache/sysds/test/functions/federated/multitenant/FederatedReuseReadTest.java
@@ -40,11 +40,11 @@ import org.junit.runners.Parameterized;
@RunWith(value = Parameterized.class)
@net.jcip.annotations.NotThreadSafe
-public class FederatedReadCacheTest extends MultiTenantTestBase {
- private final static String TEST_NAME = "FederatedReadCacheTest";
+public class FederatedReuseReadTest extends MultiTenantTestBase {
+ private final static String TEST_NAME = "FederatedReuseReadTest";
private final static String TEST_DIR =
"functions/federated/multitenant/";
- private static final String TEST_CLASS_DIR = TEST_DIR +
FederatedReadCacheTest.class.getSimpleName() + "/";
+ private static final String TEST_CLASS_DIR = TEST_DIR +
FederatedReuseReadTest.class.getSimpleName() + "/";
private final static double TOLERANCE = 0;
@@ -82,28 +82,51 @@ public class FederatedReadCacheTest extends
MultiTenantTestBase {
@Test
public void testPlusScalarCP() {
- runReadCacheTest(OpType.PLUS_SCALAR, 3, ExecMode.SINGLE_NODE);
+ runReuseReadTest(OpType.PLUS_SCALAR, 3, ExecMode.SINGLE_NODE,
false);
}
@Test
@Ignore
public void testPlusScalarSP() {
- runReadCacheTest(OpType.PLUS_SCALAR, 3, ExecMode.SPARK);
+ runReuseReadTest(OpType.PLUS_SCALAR, 3, ExecMode.SPARK, false);
+ }
+
+ @Test
+ @Ignore
+ public void testPlusScalarLineageCP() {
+ runReuseReadTest(OpType.PLUS_SCALAR, 3, ExecMode.SINGLE_NODE,
true);
+ }
+
+ @Test
+ public void testPlusScalarLineageSP() {
+ runReuseReadTest(OpType.PLUS_SCALAR, 3, ExecMode.SPARK, true);
}
@Test
public void testModifiedValCP() {
//TODO with 4 runs sporadically into non-terminating state
- runReadCacheTest(OpType.MODIFIED_VAL, 3, ExecMode.SINGLE_NODE);
+ runReuseReadTest(OpType.MODIFIED_VAL, 3, ExecMode.SINGLE_NODE,
false);
}
@Test
@Ignore
public void testModifiedValSP() {
- runReadCacheTest(OpType.MODIFIED_VAL, 4, ExecMode.SPARK);
+ runReuseReadTest(OpType.MODIFIED_VAL, 4, ExecMode.SPARK, false);
+ }
+
+ @Test
+ @Ignore
+ public void testModifiedValLineageCP() {
+ //TODO with 4 runs sporadically into non-terminating state
+ runReuseReadTest(OpType.MODIFIED_VAL, 3, ExecMode.SINGLE_NODE,
true);
+ }
+
+ @Test
+ public void testModifiedValLineageSP() {
+ runReuseReadTest(OpType.MODIFIED_VAL, 4, ExecMode.SPARK, true);
}
- private void runReadCacheTest(OpType opType, int numCoordinators,
ExecMode execMode) {
+ private void runReuseReadTest(OpType opType, int numCoordinators,
ExecMode execMode, boolean lineage) {
boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG;
ExecMode platformOld = rtplatform;
@@ -135,7 +158,7 @@ public class FederatedReadCacheTest extends
MultiTenantTestBase {
// empty script name because we don't execute any script, just
start the worker
fullDMLScriptName = "";
- int[] workerPorts = startFedWorkers(4);
+ int[] workerPorts = startFedWorkers(4, lineage ? new
String[]{"-lineage", "reuse"} : null);
rtplatform = execMode;
if(rtplatform == ExecMode.SPARK) {
@@ -146,7 +169,8 @@ public class FederatedReadCacheTest extends
MultiTenantTestBase {
// start the coordinator processes
String scriptName = HOME + TEST_NAME + ".dml";
- programArgs = new String[] {"-stats", "100", "-fedStats",
"100", "-nvargs",
+ programArgs = new String[] {"-config", CONFIG_DIR +
"SystemDS-MultiTenant-config.xml",
+ "-stats", "100", "-fedStats", "100", "-nvargs",
"in_X1=" + TestUtils.federatedAddress(workerPorts[0],
""),
"in_X2=" + TestUtils.federatedAddress(workerPorts[1],
""),
"in_X3=" + TestUtils.federatedAddress(workerPorts[2],
""),
@@ -160,7 +184,6 @@ public class FederatedReadCacheTest extends
MultiTenantTestBase {
// wait for the coordinator processes to end and verify the
results
String coordinatorOutput = waitForCoordinators();
- System.out.println(coordinatorOutput);
verifyResults(opType, coordinatorOutput, execMode);
// check that federated input files are still existing
@@ -178,7 +201,7 @@ public class FederatedReadCacheTest extends
MultiTenantTestBase {
private void verifyResults(OpType opType, String outputLog, ExecMode
execMode) {
Assert.assertTrue(checkForHeavyHitter(opType, outputLog,
execMode));
// verify that the matrix object has been taken from cache
- Assert.assertTrue(outputLog.contains("Fed ReadCache (Hits,
Bytes):\t"
+ Assert.assertTrue(outputLog.contains("Fed ReuseRead (Hits,
Bytes):\t"
+ Integer.toString((coordinatorProcesses.size()-1) *
workerProcesses.size()) + "/"));
// compare the results via files
diff --git
a/src/test/java/org/apache/sysds/test/functions/federated/multitenant/MultiTenantTestBase.java
b/src/test/java/org/apache/sysds/test/functions/federated/multitenant/MultiTenantTestBase.java
index 90f50d4..167e330 100644
---
a/src/test/java/org/apache/sysds/test/functions/federated/multitenant/MultiTenantTestBase.java
+++
b/src/test/java/org/apache/sysds/test/functions/federated/multitenant/MultiTenantTestBase.java
@@ -49,6 +49,10 @@ public abstract class MultiTenantTestBase extends
AutomatedTestBase {
p.destroyForcibly();
}
+ protected int[] startFedWorkers(int numFedWorkers) {
+ return startFedWorkers(numFedWorkers, null);
+ }
+
/**
* Start numFedWorkers federated worker processes on available ports
and add
* them to the workerProcesses
@@ -56,12 +60,12 @@ public abstract class MultiTenantTestBase extends
AutomatedTestBase {
* @param numFedWorkers the number of federated workers to start
* @return int[] the ports of the created federated workers
*/
- protected int[] startFedWorkers(int numFedWorkers) {
+ protected int[] startFedWorkers(int numFedWorkers, String[] addArgs) {
int[] ports = new int[numFedWorkers];
for(int counter = 0; counter < numFedWorkers; counter++) {
ports[counter] = getRandomAvailablePort();
@SuppressWarnings("deprecation")
- Process tmpProcess =
startLocalFedWorker(ports[counter]);
+ Process tmpProcess =
startLocalFedWorker(ports[counter], addArgs);
workerProcesses.add(tmpProcess);
}
return ports;
diff --git
a/src/test/scripts/functions/federated/multitenant/FederatedReadCacheTest.dml
b/src/test/scripts/functions/federated/multitenant/FederatedReuseReadTest.dml
similarity index 100%
rename from
src/test/scripts/functions/federated/multitenant/FederatedReadCacheTest.dml
rename to
src/test/scripts/functions/federated/multitenant/FederatedReuseReadTest.dml