This is an automated email from the ASF dual-hosted git repository.

arnabp20 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/main by this push:
     new 14e2995409 [SYSTEMDS-3518] Periodically update actual Spark storage 
used
14e2995409 is described below

commit 14e2995409dfb65aca49cee9e56f096cd6b0fb7f
Author: Arnab Phani <[email protected]>
AuthorDate: Sat Mar 30 12:21:02 2024 +0100

    [SYSTEMDS-3518] Periodically update actual Spark storage used
    
    The estimated total Spark storage memory used for caching is
    often far from actual used due to compiler-placed checkpoints.
    This patch enables periodic update of the _sparkStorageSize
    metadata during the cleanup of child RDDs and broadcasts.
---
 .../java/org/apache/sysds/runtime/lineage/LineageCache.java   |  1 +
 .../sysds/runtime/lineage/LineageSparkCacheEviction.java      | 11 ++++++++++-
 2 files changed, 11 insertions(+), 1 deletion(-)

diff --git a/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java 
b/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
index e5fc9be938..c084938d0f 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
@@ -1214,6 +1214,7 @@ public class LineageCache
                // Mark for distributed caching and change status
                persistRDDIntern(centry, estimatedSize);
                centry.setCacheStatus(LineageCacheStatus.PERSISTEDRDD);
+               //centry.getRDDObject().getRDD().count(); //eager caching 
(experimental)
                return false;
        }
 
diff --git 
a/src/main/java/org/apache/sysds/runtime/lineage/LineageSparkCacheEviction.java 
b/src/main/java/org/apache/sysds/runtime/lineage/LineageSparkCacheEviction.java
index e114fee29e..cb778a67e4 100644
--- 
a/src/main/java/org/apache/sysds/runtime/lineage/LineageSparkCacheEviction.java
+++ 
b/src/main/java/org/apache/sysds/runtime/lineage/LineageSparkCacheEviction.java
@@ -138,11 +138,18 @@ public class LineageSparkCacheEviction
                return SPARK_STORAGE_LIMIT;
        }
 
+       // NOTE: _sparkStorageSize doesn't represent the true size as we 
maintain total size based on estimations
        protected static void updateSize(long space, boolean addspace) {
                _sparkStorageSize += addspace ? space : -space;
-               // NOTE: this doesn't represent the true size as we maintain 
total size based on estimations
+
+               // Debug print: actual vs estimated size
+               //System.out.println("Storage space used = 
"+(SparkExecutionContext.getStorageSpaceUsed()/1024)/1024+" MB, "
+               //      +"Estimated used = "+(_sparkStorageSize/1024)/1024 +" 
MB, "+"Limit = "+(getSparkStorageLimit()/1024)/1024+" MB");
        }
 
+       // FIXME: Actual memory usage is often way more than the estimated,
+       //   mostly due to not tracking the compiler-placed checkpoints.
+       //   Always use actual as getStorageSpaceUsed() is not expensive 
(verify).
        protected static boolean isBelowThreshold(long estimateSize) {
                boolean available = (estimateSize + _sparkStorageSize) <= 
getSparkStorageLimit();
                if (!available)
@@ -175,6 +182,8 @@ public class LineageSparkCacheEviction
                        }
                        // Also detach the child RDDs to be GCed
                        e.getRDDObject().removeAllChild();
+                       // Update actual storage used (including 
compiler-placed checkpoints)
+                       _sparkStorageSize = 
SparkExecutionContext.getStorageSpaceUsed();
                }
                // TODO: Cleanup the child RDDs of the persisted RDDs
                //  which are never reused after the second hit.

Reply via email to