This is an automated email from the ASF dual-hosted git repository.
hvanhovell pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new 7b7a9541701a [SPARK-55097][SQL] Fix re-adding cached artifacts drops
blocks silently issue
7b7a9541701a is described below
commit 7b7a9541701a305e1592238620892095e765d67e
Author: pranavdev022 <[email protected]>
AuthorDate: Tue Jan 20 18:20:14 2026 +0100
[SPARK-55097][SQL] Fix re-adding cached artifacts drops blocks silently
issue
### What changes were proposed in this pull request?
After the introduction of the ref-counting logic for cloning sessions
https://github.com/apache/spark/pull/52651, whenever an identical cached
artifact (same session, same hash) is re-added, it incorrectly leds to deletion
of the existing block.
Verified this bug locally using:
```
test("re-adding the same cache artifact should not remove the block") {
val blockManager = spark.sparkContext.env.blockManager
val remotePath = Paths.get("cache/duplicate_hash")
val blockId = CacheId(spark.sessionUUID, "duplicate_hash")
try {
// First addition
withTempPath { path =>
Files.write(path.toPath, "test".getBytes(StandardCharsets.UTF_8))
artifactManager.addArtifact(remotePath, path.toPath, None)
}
assert(blockManager.getLocalBytes(blockId).isDefined)
blockManager.releaseLock(blockId)
// Second addition with same hash - block should still exist
withTempPath { path =>
Files.write(path.toPath, "test".getBytes(StandardCharsets.UTF_8))
artifactManager.addArtifact(remotePath, path.toPath, None)
}
assert(blockManager.getLocalBytes(blockId).isDefined,
"Block should still exist after re-adding the same cache artifact")
} finally {
blockManager.releaseLock(blockId)
blockManager.removeCache(spark.sessionUUID)
}
}
```
which fails `assert(blockManager.getLocalBytes(blockId).isDefined` check
after the second addition with the same hash.
Proposed solution:
1. Add early exit check: if `existingBlock.id == blockId`, skip the
addition since the block already exists.
2. The fix preserves the intended replacement behavior for cloned sessions
(different session UUID, same hash)
### Why are the changes needed?
When the same cached artifact is added twice to the same session:
- BlockManager.save() detects the block exists and returns without re-adding
- hashToCachedIdMap.put() returns the existing RefCountedCacheId
- release() decrements ref count to 0 and deletes the block
### Does this PR introduce _any_ user-facing change?
No. This is an internal bug fix. Previously, duplicate artifact additions
could silently delete cached data causing runtime errors.
### How was this patch tested?
Added unit test `cache artifact deduplication and replacement across
sessions` in ArtifactManagerSuite that validates:
- Duplicate addition in same session is skipped (block survives)
- Cloned session can replace inherited artifacts (different CacheId)
- Reference counting works correctly across session cleanup
### Was this patch authored or co-authored using generative AI tooling?
Co-authored using cursor.
Closes #53852 from pranavdev022/fix-artifacts-duplicate-add-issue.
Authored-by: pranavdev022 <[email protected]>
Signed-off-by: Herman van Hövell <[email protected]>
(cherry picked from commit dfea03626309d04b651fdfa92d58066f29a6b34d)
Signed-off-by: Herman van Hövell <[email protected]>
---
.../spark/sql/artifact/ArtifactManager.scala | 41 +++++++++------
.../spark/sql/artifact/ArtifactManagerSuite.scala | 60 +++++++++++++++++-----
2 files changed, 73 insertions(+), 28 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/artifact/ArtifactManager.scala
b/sql/core/src/main/scala/org/apache/spark/sql/artifact/ArtifactManager.scala
index e11f0f99bf2f..0055d220a676 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/artifact/ArtifactManager.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/artifact/ArtifactManager.scala
@@ -183,25 +183,34 @@ class ArtifactManager(session: SparkSession) extends
AutoCloseable with Logging
if (normalizedRemoteRelativePath.startsWith(s"cache${File.separator}")) {
val tmpFile = serverLocalStagingPath.toFile
Utils.tryWithSafeFinallyAndFailureCallbacks {
+ val hash =
normalizedRemoteRelativePath.toString.stripPrefix(s"cache${File.separator}")
val blockManager = session.sparkContext.env.blockManager
val blockId = CacheId(
sessionUUID = session.sessionUUID,
- hash =
normalizedRemoteRelativePath.toString.stripPrefix(s"cache${File.separator}"))
- val updater = blockManager.TempFileBasedBlockStoreUpdater(
- blockId = blockId,
- level = StorageLevel.MEMORY_AND_DISK_SER,
- classTag = implicitly[ClassTag[Array[Byte]]],
- tmpFile = tmpFile,
- blockSize = tmpFile.length(),
- tellMaster = false)
- updater.save()
- val oldBlock = hashToCachedIdMap.put(blockId.hash, new
RefCountedCacheId(blockId))
- if (oldBlock != null) {
- logWarning(
- log"Replacing existing cache artifact with hash
${MDC(LogKeys.BLOCK_ID, blockId)} " +
- log"in session ${MDC(LogKeys.SESSION_ID, session.sessionUUID)}.
" +
- log"This may indicate duplicate artifact addition.")
- oldBlock.release(blockManager)
+ hash = hash)
+ // If the exact same block (same CacheId) already exists, skip
re-adding.
+ // This prevents incorrectly removing the existing block from
BlockManager.
+ // Note: We only skip if the CacheId matches - if it's a different
session's block
+ // (e.g., after clone), we should replace it.
+ val existingBlock = hashToCachedIdMap.get(hash)
+ if (existingBlock == null || existingBlock.id != blockId) {
+ val updater = blockManager.TempFileBasedBlockStoreUpdater(
+ blockId = blockId,
+ level = StorageLevel.MEMORY_AND_DISK_SER,
+ classTag = implicitly[ClassTag[Array[Byte]]],
+ tmpFile = tmpFile,
+ blockSize = tmpFile.length(),
+ tellMaster = false)
+ updater.save()
+ hashToCachedIdMap.put(blockId.hash, new RefCountedCacheId(blockId))
+ if (existingBlock != null) {
+ // Release the old block - this is a legitimate replacement
(different CacheId,
+ // e.g., after session clone). The old block will be removed when
its ref count
+ // reaches zero.
+ existingBlock.release(blockManager)
+ }
+ } else {
+ logWarning(s"Cache artifact with hash $hash already exists in this
session, skipping.")
}
}(finallyBlock = { tmpFile.delete() })
} else if
(normalizedRemoteRelativePath.startsWith(s"classes${File.separator}")) {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/artifact/ArtifactManagerSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/artifact/ArtifactManagerSuite.scala
index ead2d52edff3..41ef4ac00c69 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/artifact/ArtifactManagerSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/artifact/ArtifactManagerSuite.scala
@@ -51,6 +51,18 @@ class ArtifactManagerSuite extends SharedSparkSession {
super.afterEach()
}
+ private def isBlockRegistered(id: CacheId): Boolean = {
+ sparkContext.env.blockManager.getStatus(id).isDefined
+ }
+
+ private def addCachedArtifact(session: SparkSession, name: String, data:
String): CacheId = {
+ val bytes = new Artifact.InMemory(data.getBytes(StandardCharsets.UTF_8))
+ session.artifactManager.addLocalArtifacts(Artifact.newCacheArtifact(name,
bytes) :: Nil)
+ val id = CacheId(session.sessionUUID, name)
+ assert(isBlockRegistered(id))
+ id
+ }
+
test("Class artifacts are added to the correct directory.") {
assume(artifactPath.resolve("smallClassFile.class").toFile.exists)
@@ -534,18 +546,6 @@ class ArtifactManagerSuite extends SharedSparkSession {
}
test("Share blocks between ArtifactManagers") {
- def isBlockRegistered(id: CacheId): Boolean = {
- sparkContext.env.blockManager.getStatus(id).isDefined
- }
-
- def addCachedArtifact(session: SparkSession, name: String, data: String):
CacheId = {
- val bytes = new Artifact.InMemory(data.getBytes(StandardCharsets.UTF_8))
-
session.artifactManager.addLocalArtifacts(Artifact.newCacheArtifact(name,
bytes) :: Nil)
- val id = CacheId(session.sessionUUID, name)
- assert(isBlockRegistered(id))
- id
- }
-
// Create fresh session so there is no interference with other tests.
val session1 = spark.newSession()
val b1 = addCachedArtifact(session1, "b1", "b_one")
@@ -582,6 +582,42 @@ class ArtifactManagerSuite extends SharedSparkSession {
assert(!isBlockRegistered(b2a))
}
+ test("cache artifact deduplication and replacement across sessions") {
+ val session1 = spark.newSession()
+ val b1 = addCachedArtifact(session1, "b1", "data_one")
+
+ // Add the same block again to verify that it is still registered
+ addCachedArtifact(session1, "b1", "data_one")
+ assert(isBlockRegistered(b1))
+
+ val session2 = session1.cloneSession()
+ assert(session2.artifactManager.getCachedBlockId("b1").get == b1)
+
+ /*
+ * Replace the block with different data in the cloned session
+ * If we try to add the same hash in the cloned session, that is allowed
+ * and the old reference from the cloned session is removed.
+ */
+ val b1a = addCachedArtifact(session2, "b1", "data_one_replaced")
+ assert(session2.artifactManager.getCachedBlockId("b1").get == b1a)
+
+ // Verify that the original block is still registered
+ assert(isBlockRegistered(b1))
+ assert(session1.artifactManager.getCachedBlockId("b1").get == b1)
+
+ // Add the same block again to verify that it is still registered
+ addCachedArtifact(session2, "b1", "data_one_replaced")
+ assert(isBlockRegistered(b1a))
+
+ // Clean up the sessions
+ session1.artifactManager.cleanUpResourcesForTesting()
+ assert(!isBlockRegistered(b1))
+ assert(isBlockRegistered(b1a))
+
+ session2.artifactManager.cleanUpResourcesForTesting()
+ assert(!isBlockRegistered(b1a))
+ }
+
test("Codegen cache should be invalid when artifacts are added - class
artifact") {
withTempDir { dir =>
runCodegenTest("class artifact") {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]