This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new 7b7a9541701a [SPARK-55097][SQL] Fix re-adding cached artifacts drops 
blocks silently issue
7b7a9541701a is described below

commit 7b7a9541701a305e1592238620892095e765d67e
Author: pranavdev022 <[email protected]>
AuthorDate: Tue Jan 20 18:20:14 2026 +0100

    [SPARK-55097][SQL] Fix re-adding cached artifacts drops blocks silently 
issue
    
    ### What changes were proposed in this pull request?
    After the introduction of the ref-counting logic for cloning sessions 
https://github.com/apache/spark/pull/52651, whenever an identical cached 
artifact (same session, same hash) is re-added, it incorrectly leds to deletion 
of the existing block.
    
    Verified this bug locally using:
    ```
    test("re-adding the same cache artifact should not remove the block") {
        val blockManager = spark.sparkContext.env.blockManager
        val remotePath = Paths.get("cache/duplicate_hash")
        val blockId = CacheId(spark.sessionUUID, "duplicate_hash")
        try {
          // First addition
          withTempPath { path =>
            Files.write(path.toPath, "test".getBytes(StandardCharsets.UTF_8))
            artifactManager.addArtifact(remotePath, path.toPath, None)
          }
          assert(blockManager.getLocalBytes(blockId).isDefined)
          blockManager.releaseLock(blockId)
    
          // Second addition with same hash - block should still exist
          withTempPath { path =>
            Files.write(path.toPath, "test".getBytes(StandardCharsets.UTF_8))
            artifactManager.addArtifact(remotePath, path.toPath, None)
          }
          assert(blockManager.getLocalBytes(blockId).isDefined,
            "Block should still exist after re-adding the same cache artifact")
        } finally {
          blockManager.releaseLock(blockId)
          blockManager.removeCache(spark.sessionUUID)
        }
      }
    ```
    which fails `assert(blockManager.getLocalBytes(blockId).isDefined` check 
after the second addition with the same hash.
    
    Proposed solution:
    1. Add early exit check: if `existingBlock.id == blockId`, skip the 
addition since the block already exists.
    2. The fix preserves the intended replacement behavior for cloned sessions 
(different session UUID, same hash)
    
    ### Why are the changes needed?
    When the same cached artifact is added twice to the same session:
    - BlockManager.save() detects the block exists and returns without re-adding
    - hashToCachedIdMap.put() returns the existing RefCountedCacheId
    - release() decrements ref count to 0 and deletes the block
    
    ### Does this PR introduce _any_ user-facing change?
    No. This is an internal bug fix. Previously, duplicate artifact additions 
could silently delete cached data causing runtime errors.
    
    ### How was this patch tested?
    Added unit test `cache artifact deduplication and replacement across 
sessions` in ArtifactManagerSuite that validates:
    - Duplicate addition in same session is skipped (block survives)
    - Cloned session can replace inherited artifacts (different CacheId)
    - Reference counting works correctly across session cleanup
    
    ### Was this patch authored or co-authored using generative AI tooling?
    Co-authored using cursor.
    
    Closes #53852 from pranavdev022/fix-artifacts-duplicate-add-issue.
    
    Authored-by: pranavdev022 <[email protected]>
    Signed-off-by: Herman van Hövell <[email protected]>
    (cherry picked from commit dfea03626309d04b651fdfa92d58066f29a6b34d)
    Signed-off-by: Herman van Hövell <[email protected]>
---
 .../spark/sql/artifact/ArtifactManager.scala       | 41 +++++++++------
 .../spark/sql/artifact/ArtifactManagerSuite.scala  | 60 +++++++++++++++++-----
 2 files changed, 73 insertions(+), 28 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/artifact/ArtifactManager.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/artifact/ArtifactManager.scala
index e11f0f99bf2f..0055d220a676 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/artifact/ArtifactManager.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/artifact/ArtifactManager.scala
@@ -183,25 +183,34 @@ class ArtifactManager(session: SparkSession) extends 
AutoCloseable with Logging
     if (normalizedRemoteRelativePath.startsWith(s"cache${File.separator}")) {
       val tmpFile = serverLocalStagingPath.toFile
       Utils.tryWithSafeFinallyAndFailureCallbacks {
+        val hash = 
normalizedRemoteRelativePath.toString.stripPrefix(s"cache${File.separator}")
         val blockManager = session.sparkContext.env.blockManager
         val blockId = CacheId(
           sessionUUID = session.sessionUUID,
-          hash = 
normalizedRemoteRelativePath.toString.stripPrefix(s"cache${File.separator}"))
-        val updater = blockManager.TempFileBasedBlockStoreUpdater(
-          blockId = blockId,
-          level = StorageLevel.MEMORY_AND_DISK_SER,
-          classTag = implicitly[ClassTag[Array[Byte]]],
-          tmpFile = tmpFile,
-          blockSize = tmpFile.length(),
-          tellMaster = false)
-        updater.save()
-        val oldBlock = hashToCachedIdMap.put(blockId.hash, new 
RefCountedCacheId(blockId))
-        if (oldBlock != null) {
-          logWarning(
-            log"Replacing existing cache artifact with hash 
${MDC(LogKeys.BLOCK_ID, blockId)} " +
-              log"in session ${MDC(LogKeys.SESSION_ID, session.sessionUUID)}. 
" +
-              log"This may indicate duplicate artifact addition.")
-          oldBlock.release(blockManager)
+          hash = hash)
+        // If the exact same block (same CacheId) already exists, skip 
re-adding.
+        // This prevents incorrectly removing the existing block from 
BlockManager.
+        // Note: We only skip if the CacheId matches - if it's a different 
session's block
+        // (e.g., after clone), we should replace it.
+        val existingBlock = hashToCachedIdMap.get(hash)
+        if (existingBlock == null || existingBlock.id != blockId) {
+          val updater = blockManager.TempFileBasedBlockStoreUpdater(
+            blockId = blockId,
+            level = StorageLevel.MEMORY_AND_DISK_SER,
+            classTag = implicitly[ClassTag[Array[Byte]]],
+            tmpFile = tmpFile,
+            blockSize = tmpFile.length(),
+            tellMaster = false)
+          updater.save()
+          hashToCachedIdMap.put(blockId.hash, new RefCountedCacheId(blockId))
+          if (existingBlock != null) {
+            // Release the old block - this is a legitimate replacement 
(different CacheId,
+            // e.g., after session clone). The old block will be removed when 
its ref count
+            // reaches zero.
+            existingBlock.release(blockManager)
+          }
+        } else {
+          logWarning(s"Cache artifact with hash $hash already exists in this 
session, skipping.")
         }
       }(finallyBlock = { tmpFile.delete() })
     } else if 
(normalizedRemoteRelativePath.startsWith(s"classes${File.separator}")) {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/artifact/ArtifactManagerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/artifact/ArtifactManagerSuite.scala
index ead2d52edff3..41ef4ac00c69 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/artifact/ArtifactManagerSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/artifact/ArtifactManagerSuite.scala
@@ -51,6 +51,18 @@ class ArtifactManagerSuite extends SharedSparkSession {
     super.afterEach()
   }
 
+  private def isBlockRegistered(id: CacheId): Boolean = {
+    sparkContext.env.blockManager.getStatus(id).isDefined
+  }
+
+  private def addCachedArtifact(session: SparkSession, name: String, data: 
String): CacheId = {
+    val bytes = new Artifact.InMemory(data.getBytes(StandardCharsets.UTF_8))
+    session.artifactManager.addLocalArtifacts(Artifact.newCacheArtifact(name, 
bytes) :: Nil)
+    val id = CacheId(session.sessionUUID, name)
+    assert(isBlockRegistered(id))
+    id
+  }
+
   test("Class artifacts are added to the correct directory.") {
     assume(artifactPath.resolve("smallClassFile.class").toFile.exists)
 
@@ -534,18 +546,6 @@ class ArtifactManagerSuite extends SharedSparkSession {
   }
 
   test("Share blocks between ArtifactManagers") {
-    def isBlockRegistered(id: CacheId): Boolean = {
-      sparkContext.env.blockManager.getStatus(id).isDefined
-    }
-
-    def addCachedArtifact(session: SparkSession, name: String, data: String): 
CacheId = {
-      val bytes = new Artifact.InMemory(data.getBytes(StandardCharsets.UTF_8))
-      
session.artifactManager.addLocalArtifacts(Artifact.newCacheArtifact(name, 
bytes) :: Nil)
-      val id = CacheId(session.sessionUUID, name)
-      assert(isBlockRegistered(id))
-      id
-    }
-
     // Create fresh session so there is no interference with other tests.
     val session1 = spark.newSession()
     val b1 = addCachedArtifact(session1, "b1", "b_one")
@@ -582,6 +582,42 @@ class ArtifactManagerSuite extends SharedSparkSession {
     assert(!isBlockRegistered(b2a))
   }
 
+  test("cache artifact deduplication and replacement across sessions") {
+    val session1 = spark.newSession()
+    val b1 = addCachedArtifact(session1, "b1", "data_one")
+
+    // Add the same block again to verify that it is still registered
+    addCachedArtifact(session1, "b1", "data_one")
+    assert(isBlockRegistered(b1))
+
+    val session2 = session1.cloneSession()
+    assert(session2.artifactManager.getCachedBlockId("b1").get == b1)
+
+    /*
+     * Replace the block with different data in the cloned session
+     * If we try to add the same hash in the cloned session, that is allowed
+     * and the old reference from the cloned session is removed.
+     */
+    val b1a = addCachedArtifact(session2, "b1", "data_one_replaced")
+    assert(session2.artifactManager.getCachedBlockId("b1").get == b1a)
+
+    // Verify that the original block is still registered
+    assert(isBlockRegistered(b1))
+    assert(session1.artifactManager.getCachedBlockId("b1").get == b1)
+
+    // Add the same block again to verify that it is still registered
+    addCachedArtifact(session2, "b1", "data_one_replaced")
+    assert(isBlockRegistered(b1a))
+
+    // Clean up the sessions
+    session1.artifactManager.cleanUpResourcesForTesting()
+    assert(!isBlockRegistered(b1))
+    assert(isBlockRegistered(b1a))
+
+    session2.artifactManager.cleanUpResourcesForTesting()
+    assert(!isBlockRegistered(b1a))
+  }
+
   test("Codegen cache should be invalid when artifacts are added - class 
artifact") {
     withTempDir { dir =>
       runCodegenTest("class artifact") {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to