This is an automated email from the ASF dual-hosted git repository.
arnabp20 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/main by this push:
new f8acaab0c7 [SYSTEMDS-3594] Multi-level reuse of RDDs
f8acaab0c7 is described below
commit f8acaab0c755dca0bf2e63c3f4d8279bc5effcaa
Author: Arnab Phani <[email protected]>
AuthorDate: Thu Jul 6 10:58:48 2023 +0200
[SYSTEMDS-3594] Multi-level reuse of RDDs
This patch extends the multi-level reuse framework to support functions
and statement blocks returning RDDs. Similar to instruction-level reuse,
we persist the function outputs on the second call. Based on if the
original instruction is shuffle-based, we also reuse the function output
locally.
Closes #1858
---
.../runtime/instructions/spark/data/RDDObject.java | 10 ++
.../apache/sysds/runtime/lineage/LineageCache.java | 154 ++++++++++++++-------
.../sysds/runtime/lineage/LineageCacheConfig.java | 6 +-
.../sysds/runtime/lineage/LineageCacheEntry.java | 2 +-
.../runtime/lineage/LineageSparkCacheEviction.java | 7 +-
.../functions/async/LineageReuseSparkTest.java | 11 +-
.../scripts/functions/async/LineageReuseSpark4.dml | 57 ++++++++
7 files changed, 191 insertions(+), 56 deletions(-)
diff --git
a/src/main/java/org/apache/sysds/runtime/instructions/spark/data/RDDObject.java
b/src/main/java/org/apache/sysds/runtime/instructions/spark/data/RDDObject.java
index 6ae7ed2061..e588b9dd31 100644
---
a/src/main/java/org/apache/sysds/runtime/instructions/spark/data/RDDObject.java
+++
b/src/main/java/org/apache/sysds/runtime/instructions/spark/data/RDDObject.java
@@ -20,6 +20,7 @@
package org.apache.sysds.runtime.instructions.spark.data;
import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.sysds.runtime.meta.DataCharacteristics;
public class RDDObject extends LineageObject
{
@@ -31,6 +32,7 @@ public class RDDObject extends LineageObject
private String _hdfsFname = null; //hdfs filename, if created from
hdfs.
private boolean _parRDD = false; //is a parallelized rdd at driver
private boolean _pending = true; //is a pending rdd operation
+ private DataCharacteristics _dc = null;
public RDDObject( JavaPairRDD<?,?> rddvar) {
super();
@@ -84,6 +86,14 @@ public class RDDObject extends LineageObject
public boolean isPending() {
return _pending;
}
+
+ public void setDataCharacteristics(DataCharacteristics dc) {
+ _dc = dc;
+ }
+
+ public DataCharacteristics getDataCharacteristics() {
+ return _dc;
+ }
/**
diff --git a/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
b/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
index 1a3b12d7a9..0b874f6708 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
@@ -19,6 +19,7 @@
package org.apache.sysds.runtime.lineage;
+import jcuda.Pointer;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.commons.lang3.tuple.Pair;
@@ -56,6 +57,7 @@ import
org.apache.sysds.runtime.instructions.fed.ComputationFEDInstruction;
import org.apache.sysds.runtime.instructions.gpu.GPUInstruction;
import org.apache.sysds.runtime.instructions.gpu.context.GPUObject;
import org.apache.sysds.runtime.instructions.spark.ComputationSPInstruction;
+import org.apache.sysds.runtime.instructions.spark.data.BroadcastObject;
import org.apache.sysds.runtime.instructions.spark.data.RDDObject;
import org.apache.sysds.runtime.lineage.LineageCacheConfig.LineageCacheStatus;
import org.apache.sysds.runtime.lineage.LineageCacheConfig.ReuseCacheType;
@@ -157,15 +159,11 @@ public class LineageCache
switch(e.getCacheStatus()) {
case TOPERSISTRDD:
//Mark for
caching on the second hit
-
persistRDD(inst, e, ec);
- //Update status
to indicate persisted in the executors
-
e.setCacheStatus(LineageCacheStatus.PERSISTEDRDD);
- //Even not
persisted, reuse the rdd locally for shuffle operations
- if
(!LineageCacheConfig.isShuffleOp(inst))
+ boolean
persisted = persistRDD(inst, e, ec);
+ //Return if not
already persisted and not a shuffle operations
+ if (!persisted
&& !LineageCacheConfig.isShuffleOp(inst.getOpcode()))
return
false;
-
-
((SparkExecutionContext) ec).setRDDHandleForVariable(outName, rdd);
- break;
+ //Else, fall
through to reuse (local or distributed)
case PERSISTEDRDD:
//Reuse the
persisted intermediate at the executors
((SparkExecutionContext) ec).setRDDHandleForVariable(outName, rdd);
@@ -175,16 +173,19 @@ public class LineageCache
}
}
else { //TODO handle locks on gpu
objects
+ Pointer gpuPtr =
e.getGPUPointer();
+ if (gpuPtr == null &&
e.getCacheStatus() == LineageCacheStatus.NOTCACHED)
+ return false; //the
executing thread removed this entry from cache
//Create a GPUObject with the
cached pointer
GPUObject gpuObj = new
GPUObject(ec.getGPUContext(0),
-
ec.getMatrixObject(outName), e.getGPUPointer());
+
ec.getMatrixObject(outName), gpuPtr);
ec.getMatrixObject(outName).setGPUObject(ec.getGPUContext(0), gpuObj);
//Set dirty to true, so that it
is later copied to the host for write
ec.getMatrixObject(outName).getGPUObject(ec.getGPUContext(0)).setDirty(true);
//Set the cached data
characteristics to the output matrix object
ec.getMatrixObject(outName).updateDataCharacteristics(e.getDataCharacteristics());
//Increment the live count for
this pointer
-
LineageGPUCacheEviction.incrementLiveCount(e.getGPUPointer());
+
LineageGPUCacheEviction.incrementLiveCount(gpuPtr);
}
//Replace the live lineage trace with
the cached one (if not parfor, dedup)
ec.replaceLineageItem(outName, e._key);
@@ -210,6 +211,7 @@ public class LineageCache
long savedComputeTime = 0;
HashMap<String, Data> funcOutputs = new HashMap<>();
HashMap<String, LineageItem> funcLIs = new HashMap<>();
+ ArrayList<LineageCacheEntry> funcOutLIs = new ArrayList<>();
for (int i=0; i<numOutputs; i++) {
String opcode = name + String.valueOf(i+1);
LineageItem li = new LineageItem(opcode, liInputs);
@@ -220,6 +222,7 @@ public class LineageCache
synchronized(_cache) {
if (LineageCache.probe(li)) {
e = LineageCache.getIntern(li);
+ funcOutLIs.add(e);
}
else {
//create a placeholder if no reuse to
avoid redundancy
@@ -244,16 +247,25 @@ public class LineageCache
((MatrixObject)boundValue).release();
}
else if (e.isGPUObject()) {
+ Pointer gpuPtr = e.getGPUPointer();
+ if (gpuPtr == null &&
e.getCacheStatus() == LineageCacheStatus.NOTCACHED)
+ return false; //the executing
thread removed this entry from cache
MetaDataFormat md = new
MetaDataFormat(e.getDataCharacteristics(), FileFormat.BINARY);
boundValue = new
MatrixObject(ValueType.FP64, boundVarName, md);
//Create a GPUObject with the cached
pointer
GPUObject gpuObj = new
GPUObject(ec.getGPUContext(0),
- ((MatrixObject)boundValue),
e.getGPUPointer());
+ ((MatrixObject)boundValue),
gpuPtr);
//Set dirty to true, so that it is
later copied to the host for write
gpuObj.setDirty(true);
((MatrixObject)
boundValue).setGPUObject(ec.getGPUContext(0), gpuObj);
- //Increment the live count for this
pointer
-
LineageGPUCacheEviction.incrementLiveCount(e.getGPUPointer());
+ }
+ else if (e.isRDDPersist()) {
+ RDDObject rdd = e.getRDDObject();
+ if (rdd == null && e.getCacheStatus()
== LineageCacheStatus.NOTCACHED)
+ return false; //the executing
thread removed this entry from cache
+ MetaDataFormat md = new
MetaDataFormat(rdd.getDataCharacteristics(),FileFormat.BINARY);
+ boundValue = new
MatrixObject(ValueType.FP64, boundVarName, md);
+ ((MatrixObject)
boundValue).setRDDHandle(rdd);
}
else if (e.isScalarValue()) {
boundValue = e.getSOValue();
@@ -277,6 +289,32 @@ public class LineageCache
}
if (reuse) {
+ //Additional maintenance for GPU pointers and RDDs
+ for (LineageCacheEntry e : funcOutLIs) {
+ if (e.isGPUObject())
+ //Increment the live count for this
pointer
+
LineageGPUCacheEviction.incrementLiveCount(e.getGPUPointer());
+ else if (e.isRDDPersist()) {
+ //Reuse the cached RDD (local or
persisted at the executors)
+ RDDObject rdd = e.getRDDObject();
+ switch(e.getCacheStatus()) {
+ case TOPERSISTRDD:
+ //Mark for caching on
the second hit
+ long estimatedSize =
MatrixBlock.estimateSizeInMemory(rdd.getDataCharacteristics());
+ boolean persisted =
persistRDD(e, estimatedSize);
+ //Return if not already
persisted and not a shuffle operations
+ if (!persisted &&
!LineageCacheConfig.isShuffleOp(e._origItem.getOpcode()))
+ return false;
+ //Else, fall through to
reuse (local or distributed)
+ case PERSISTEDRDD:
+ //Reuse the persisted
intermediate at the executors
+ break;
+ default:
+ return false;
+ }
+ }
+ }
+
funcOutputs.forEach((var, val) -> {
//cleanup existing data bound to output
variable name
Data exdata = ec.removeVariable(var);
@@ -291,7 +329,7 @@ public class LineageCache
if (DMLScript.STATISTICS) //increment saved time
LineageCacheStatistics.incrementSavedComputeTime(savedComputeTime);
}
-
+
return reuse;
}
@@ -707,6 +745,7 @@ public class LineageCache
RDDObject rddObj = cd.getRDDHandle();
// Set the RDD object in the cache and set the status
to TOPERSISTRDD
rddObj.setLineageCached();
+
rddObj.setDataCharacteristics(cd.getDataCharacteristics());
centry.setRDDValue(rddObj, computetime);
}
}
@@ -1003,8 +1042,10 @@ public class LineageCache
// Add to missed compute time
LineageCacheStatistics.incrementMissedComputeTime(e._computeTime);
- //maintain order for eviction
- LineageCacheEviction.addEntry(e);
+ // Maintain order for eviction
+ if (!e.isRDDPersist() && !e.isGPUObject())
+ LineageCacheEviction.addEntry(e);
+ // TODO: Handling of func/SB cache entries for Spark
and GPU
}
else
removePlaceholder(item); //remove the placeholder
@@ -1040,39 +1081,58 @@ public class LineageCache
return true;
}
- private static void persistRDD(Instruction inst, LineageCacheEntry
centry, ExecutionContext ec) {
- boolean opToPersist =
LineageCacheConfig.isReusableRDDType(inst);
- // Return if the operation is not in the list of instructions
which benefit
- // from persisting and the local only RDD caching is disabled
- if (!opToPersist &&
!LineageCacheConfig.ENABLE_LOCAL_ONLY_RDD_CACHING)
- return;
-
- if (opToPersist && centry.getCacheStatus() ==
LineageCacheStatus.TOPERSISTRDD) {
- CacheableData<?> cd =
ec.getCacheableData(((ComputationSPInstruction)inst).output.getName());
- // Estimate worst case dense size
- long estimatedSize =
MatrixBlock.estimateSizeInMemory(cd.getDataCharacteristics());
- // Skip if the entry is bigger than the total storage.
- if (estimatedSize >
LineageSparkCacheEviction.getSparkStorageLimit())
- return;
-
- // Mark the rdd for lazy checkpointing
- RDDObject rddObj = centry.getRDDObject();
- JavaPairRDD<?,?> rdd = rddObj.getRDD();
- rdd = rdd.persist(StorageLevel.MEMORY_AND_DISK());
- rddObj.setRDD(rdd);
- rddObj.setCheckpointRDD(true);
-
- // Make space based on the estimated size
-
if(!LineageSparkCacheEviction.isBelowThreshold(estimatedSize))
- LineageSparkCacheEviction.makeSpace(_cache,
estimatedSize);
- LineageSparkCacheEviction.updateSize(estimatedSize,
true);
- // Maintain order for eviction
- LineageSparkCacheEviction.addEntry(centry,
estimatedSize);
+ private static boolean persistRDD(Instruction inst, LineageCacheEntry
centry, ExecutionContext ec) {
+ // If already persisted, change the status and return true.
+ // Else, persist, change cache status and return false.
+ if (probeRDDDistributed(centry._key)) {
+ // Update status to indicate persisted in the executors
+ centry.setCacheStatus(LineageCacheStatus.PERSISTEDRDD);
+ return true;
+ }
+ CacheableData<?> cd =
ec.getCacheableData(((ComputationSPInstruction)inst).output.getName());
+ // Estimate worst case dense size
+ long estimatedSize =
MatrixBlock.estimateSizeInMemory(cd.getDataCharacteristics());
+ // Skip if the entry is bigger than the total storage.
+ if (estimatedSize >
LineageSparkCacheEviction.getSparkStorageLimit())
+ return false;
+ // Mark for distributed caching and change status
+ persistRDDIntern(centry, estimatedSize);
+ centry.setCacheStatus(LineageCacheStatus.PERSISTEDRDD);
+ return false;
+ }
- // Count number of RDDs marked for caching at the
executors
- if (DMLScript.STATISTICS)
- LineageCacheStatistics.incrementRDDPersists();
+ private static boolean persistRDD(LineageCacheEntry centry, long
estimatedSize) {
+ // If already persisted, change the status and return true.
+ // Else, persist, change cache status and return false.
+ if (probeRDDDistributed(centry._key)) {
+ // Update status to indicate persisted in the executors
+ centry.setCacheStatus(LineageCacheStatus.PERSISTEDRDD);
+ return true;
}
+ // Mark for distributed caching and change status
+ persistRDDIntern(centry, estimatedSize);
+ centry.setCacheStatus(LineageCacheStatus.PERSISTEDRDD);
+ return false;
+ }
+
+ private static void persistRDDIntern(LineageCacheEntry centry, long
estimatedSize) {
+ // Mark the rdd for lazy checkpointing
+ RDDObject rddObj = centry.getRDDObject();
+ JavaPairRDD<?,?> rdd = rddObj.getRDD();
+ rdd = rdd.persist(StorageLevel.MEMORY_AND_DISK());
+ rddObj.setRDD(rdd);
+ rddObj.setCheckpointRDD(true);
+
+ // Make space based on the estimated size
+ if(!LineageSparkCacheEviction.isBelowThreshold(estimatedSize))
+ LineageSparkCacheEviction.makeSpace(_cache,
estimatedSize);
+ LineageSparkCacheEviction.updateSize(estimatedSize, true);
+ // Maintain order for eviction
+ LineageSparkCacheEviction.addEntry(centry, estimatedSize);
+
+ // Count number of RDDs marked for caching at the executors
+ if (DMLScript.STATISTICS)
+ LineageCacheStatistics.incrementRDDPersists();
}
@Deprecated
diff --git
a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheConfig.java
b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheConfig.java
index 63863f7029..2971c36f16 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheConfig.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheConfig.java
@@ -70,7 +70,7 @@ public class LineageCacheConfig
// Relatively inexpensive instructions.
private static final String[] PERSIST_OPCODES2 = new String[] {
- "mapmm,"
+ "mapmm"
};
private static String[] REUSE_OPCODES = new String[] {};
@@ -300,8 +300,8 @@ public class LineageCacheConfig
return insttype && rightOp;
}
- protected static boolean isShuffleOp(Instruction inst) {
- return ArrayUtils.contains(PERSIST_OPCODES1, inst.getOpcode());
+ protected static boolean isShuffleOp(String opcode) {
+ return ArrayUtils.contains(PERSIST_OPCODES1, opcode);
}
protected static int getComputeGroup(String opcode) {
diff --git
a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEntry.java
b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEntry.java
index 220226031c..4f3a3354ab 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEntry.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEntry.java
@@ -254,7 +254,7 @@ public class LineageCacheEntry {
_gpuPointer = src._gpuPointer;
_rddObject = src._rddObject;
_computeTime = src._computeTime;
- _status = isNullVal() ? LineageCacheStatus.EMPTY :
LineageCacheStatus.CACHED;
+ _status = src._status; //requires for multi-level reuse of RDDs
// resume all threads waiting for val
notifyAll();
}
diff --git
a/src/main/java/org/apache/sysds/runtime/lineage/LineageSparkCacheEviction.java
b/src/main/java/org/apache/sysds/runtime/lineage/LineageSparkCacheEviction.java
index a639b69b63..6444df89d8 100644
---
a/src/main/java/org/apache/sysds/runtime/lineage/LineageSparkCacheEviction.java
+++
b/src/main/java/org/apache/sysds/runtime/lineage/LineageSparkCacheEviction.java
@@ -117,8 +117,11 @@ public class LineageSparkCacheEviction
private static void setSparkStorageLimit() {
// Set the limit only during the first RDD caching to avoid
context creation
- if (SPARK_STORAGE_LIMIT == 0)
- SPARK_STORAGE_LIMIT = (long)
SparkExecutionContext.getDataMemoryBudget(false, true); //FIXME
+ // Cache size = 70% of unified Spark memory = 0.7 * 0.6 = 42%.
+ if (SPARK_STORAGE_LIMIT == 0) {
+ long unifiedSparkMem = (long)
SparkExecutionContext.getDataMemoryBudget(false, true);
+ SPARK_STORAGE_LIMIT = (long)(unifiedSparkMem * 0.7d);
+ }
}
protected static double getSparkStorageLimit() {
diff --git
a/src/test/java/org/apache/sysds/test/functions/async/LineageReuseSparkTest.java
b/src/test/java/org/apache/sysds/test/functions/async/LineageReuseSparkTest.java
index 68db0caaba..d9200689fb 100644
---
a/src/test/java/org/apache/sysds/test/functions/async/LineageReuseSparkTest.java
+++
b/src/test/java/org/apache/sysds/test/functions/async/LineageReuseSparkTest.java
@@ -41,7 +41,7 @@ public class LineageReuseSparkTest extends AutomatedTestBase {
protected static final String TEST_DIR = "functions/async/";
protected static final String TEST_NAME = "LineageReuseSpark";
- protected static final int TEST_VARIANTS = 3;
+ protected static final int TEST_VARIANTS = 4;
protected static String TEST_CLASS_DIR = TEST_DIR +
LineageReuseSparkTest.class.getSimpleName() + "/";
@Override
@@ -73,6 +73,12 @@ public class LineageReuseSparkTest extends AutomatedTestBase
{
runTest(TEST_NAME+"3", ExecMode.SPARK, 3);
}
+ @Test
+ public void testlmdsMultiLevel() {
+ // Cache RDD and matrix block function returns and reuse
+ runTest(TEST_NAME+"4", ExecMode.HYBRID, 4);
+ }
+
public void runTest(String testname, ExecMode execMode, int testId) {
boolean old_simplification =
OptimizerUtils.ALLOW_ALGEBRAIC_SIMPLIFICATION;
boolean old_sum_product =
OptimizerUtils.ALLOW_SUM_PRODUCT_REWRITES;
@@ -92,7 +98,6 @@ public class LineageReuseSparkTest extends AutomatedTestBase {
//proArgs.add("-explain");
proArgs.add("-stats");
- proArgs.add("-explain");
proArgs.add("-args");
proArgs.add(output("R"));
programArgs = proArgs.toArray(new
String[proArgs.size()]);
@@ -109,7 +114,7 @@ public class LineageReuseSparkTest extends
AutomatedTestBase {
//proArgs.add("recompile_runtime");
proArgs.add("-stats");
proArgs.add("-lineage");
-
proArgs.add(LineageCacheConfig.ReuseCacheType.REUSE_FULL.name().toLowerCase());
+
proArgs.add(LineageCacheConfig.ReuseCacheType.REUSE_MULTILEVEL.name().toLowerCase());
proArgs.add("-args");
proArgs.add(output("R"));
programArgs = proArgs.toArray(new
String[proArgs.size()]);
diff --git a/src/test/scripts/functions/async/LineageReuseSpark4.dml
b/src/test/scripts/functions/async/LineageReuseSpark4.dml
new file mode 100644
index 0000000000..90f270c3ba
--- /dev/null
+++ b/src/test/scripts/functions/async/LineageReuseSpark4.dml
@@ -0,0 +1,57 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+SimlinRegDS = function(Matrix[Double] X, Matrix[Double] y, Double lamda,
Integer N)
+return (Matrix[double] A, Matrix[double] b)
+{
+ # Reuse sp_tsmm and sp_mapmm if not future-based
+ A = (t(X) %*% X) + diag(matrix(lamda, rows=N, cols=1));
+ while(FALSE){}
+ b = t(X) %*% y;
+}
+
+no_lamda = 2;
+
+stp = (0.1 - 0.0001)/no_lamda;
+lamda = 0.0001;
+lim = 0.1;
+
+X = rand(rows=1500, cols=1500, seed=42);
+y = rand(rows=1500, cols=1, seed=43);
+N = ncol(X);
+R = matrix(0, rows=N, cols=no_lamda+2);
+
+[A, b] = SimlinRegDS(X, y, lamda, N);
+beta = solve(A, b);
+R[,1] = beta;
+
+# Reuse function call
+[A, b] = SimlinRegDS(X, y, lamda, N);
+beta = solve(A, b);
+R[,2] = beta;
+
+[A, b] = SimlinRegDS(X, y, lamda, N);
+beta = solve(A, b);
+R[,3] = beta;
+
+R = sum(R);
+write(R, $1, format="text");
+