This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 1a93d554b5f6 [SPARK-55341][SQL] Add storage level flag for cached 
local relations
1a93d554b5f6 is described below

commit 1a93d554b5f6e44a7b25d20db70032fb6095d7f4
Author: pranavdev022 <[email protected]>
AuthorDate: Thu Feb 5 09:39:25 2026 -0400

    [SPARK-55341][SQL] Add storage level flag for cached local relations
    
    ### What changes were proposed in this pull request?
    This PR adds a feature flag `spark.sql.artifact.cacheStorageLevel` to 
control the storage level used for cached blocks:
    - When enabled: uses `DISK_ONLY` storage level to reduce memory pressure
    - When disabled (default): uses `MEMORY_AND_DISK_SER` storage level 
(current behavior)
    
    This allows users to opt into disk-only storage for cached artifacts when 
memory is constrained, while maintaining backward compatibility with the 
default behavior.
    
    ### Why are the changes needed?
    Cached artifact blocks in ArtifactManager currently use 
`MEMORY_AND_DISK_SER` storage level. In some scenarios with large artifacts, 
especially large local relations, this can cause memory pressure.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Existing tests verify the default behavior `MEMORY_AND_DISK_SER` continues 
to work correctly. The flag is internal and defaults to `MEMORY_AND_DISK_SER`, 
maintaining current behavior.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #54118 from pranavdev022/add-flag-for-cached-disk-storage.
    
    Authored-by: pranavdev022 <[email protected]>
    Signed-off-by: Herman van Hövell <[email protected]>
---
 .../src/main/scala/org/apache/spark/sql/internal/SQLConf.scala | 10 ++++++++++
 .../scala/org/apache/spark/sql/artifact/ArtifactManager.scala  |  4 +++-
 2 files changed, 13 insertions(+), 1 deletion(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 1bed0b3bebc9..7973c652a695 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -5139,6 +5139,16 @@ object SQLConf {
       .booleanConf
       .createWithDefault(false)
 
+  val ARTIFACT_MANAGER_CACHE_STORAGE_LEVEL =
+    buildConf("spark.sql.artifact.cacheStorageLevel")
+      .internal()
+      .doc("Storage level for cached blocks in artifact manager. Valid values 
are any " +
+        "StorageLevel name (e.g., MEMORY_AND_DISK_SER, DISK_ONLY, MEMORY_ONLY, 
etc.).")
+      .version("4.2.0")
+      .stringConf
+      .checkValue(v => Try(StorageLevel.fromString(v)).isSuccess, "Invalid 
StorageLevel")
+      .createWithDefault("MEMORY_AND_DISK_SER")
+
   val FAST_HASH_AGGREGATE_MAX_ROWS_CAPACITY_BIT =
     buildConf("spark.sql.codegen.aggregate.fastHashMap.capacityBit")
       .internal()
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/artifact/ArtifactManager.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/artifact/ArtifactManager.scala
index 0055d220a676..804b5269c929 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/artifact/ArtifactManager.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/artifact/ArtifactManager.scala
@@ -194,9 +194,11 @@ class ArtifactManager(session: SparkSession) extends 
AutoCloseable with Logging
         // (e.g., after clone), we should replace it.
         val existingBlock = hashToCachedIdMap.get(hash)
         if (existingBlock == null || existingBlock.id != blockId) {
+          val storageLevel = StorageLevel.fromString(
+            session.conf.get(SQLConf.ARTIFACT_MANAGER_CACHE_STORAGE_LEVEL))
           val updater = blockManager.TempFileBasedBlockStoreUpdater(
             blockId = blockId,
-            level = StorageLevel.MEMORY_AND_DISK_SER,
+            level = storageLevel,
             classTag = implicitly[ClassTag[Array[Byte]]],
             tmpFile = tmpFile,
             blockSize = tmpFile.length(),


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to