This is an automated email from the ASF dual-hosted git repository.
srowen pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new c10160b4163 [SPARK-38640][CORE] Fix NPE with memory-only cache blocks
and RDD fetching
c10160b4163 is described below
commit c10160b4163be00b8009cb462b1e33704b0ff3d6
Author: Adam Binford <[email protected]>
AuthorDate: Sun Apr 17 08:39:27 2022 -0500
[SPARK-38640][CORE] Fix NPE with memory-only cache blocks and RDD fetching
### What changes were proposed in this pull request?
Fixes a bug where if `spark.shuffle.service.fetch.rdd.enabled=true`,
memory-only cached blocks will fail to unpersist.
### Why are the changes needed?
In https://github.com/apache/spark/pull/33020, when all RDD blocks are
removed from `externalShuffleServiceBlockStatus`, the underlying Map is nulled
to reduce memory. When persisting blocks we check if it's using disk before
adding it to `externalShuffleServiceBlockStatus`, but when removing them there
is no check, so a memory-only cache block will keep
`externalShuffleServiceBlockStatus` null, and when unpersisting it throw an NPE
because it tries to remove from the null Map. This a [...]
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
New and updated UT
Closes #35959 from Kimahriman/fetch-rdd-memory-only-unpersist.
Authored-by: Adam Binford <[email protected]>
Signed-off-by: Sean Owen <[email protected]>
(cherry picked from commit e0939f0f7c3d3bd4baa89e720038dbd3c7363a72)
Signed-off-by: Sean Owen <[email protected]>
---
.../spark/storage/BlockManagerMasterEndpoint.scala | 8 +++++---
.../apache/spark/ExternalShuffleServiceSuite.scala | 22 ++++++++++++++++++++++
.../spark/storage/BlockManagerInfoSuite.scala | 2 ++
3 files changed, 29 insertions(+), 3 deletions(-)
diff --git
a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
index 4d8ba9b3e4e..adeb507941c 100644
---
a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
+++
b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
@@ -838,9 +838,11 @@ private[spark] class BlockStatusPerBlockId {
}
def remove(blockId: BlockId): Unit = {
- blocks.remove(blockId)
- if (blocks.isEmpty) {
- blocks = null
+ if (blocks != null) {
+ blocks.remove(blockId)
+ if (blocks.isEmpty) {
+ blocks = null
+ }
}
}
diff --git
a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
index dd3d90f3124..1ca78d572c7 100644
--- a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
@@ -255,4 +255,26 @@ class ExternalShuffleServiceSuite extends ShuffleSuite
with BeforeAndAfterAll wi
}
}
}
+
+ test("SPARK-38640: memory only blocks can unpersist using shuffle service
cache fetching") {
+ for (enabled <- Seq(true, false)) {
+ val confWithRddFetch =
+ conf.clone.set(config.SHUFFLE_SERVICE_FETCH_RDD_ENABLED, enabled)
+ sc = new SparkContext("local-cluster[1,1,1024]", "test",
confWithRddFetch)
+ sc.env.blockManager.externalShuffleServiceEnabled should equal(true)
+ sc.env.blockManager.blockStoreClient.getClass should
equal(classOf[ExternalBlockStoreClient])
+ try {
+ val rdd = sc.parallelize(0 until 100, 2)
+ .map { i => (i, 1) }
+ .persist(StorageLevel.MEMORY_ONLY)
+
+ rdd.count()
+ rdd.unpersist(true)
+ assert(sc.persistentRdds.isEmpty)
+ } finally {
+ rpcHandler.applicationRemoved(sc.conf.getAppId, true)
+ sc.stop()
+ }
+ }
+ }
}
diff --git
a/core/src/test/scala/org/apache/spark/storage/BlockManagerInfoSuite.scala
b/core/src/test/scala/org/apache/spark/storage/BlockManagerInfoSuite.scala
index f0c19c5ccce..85f012aece3 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerInfoSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerInfoSuite.scala
@@ -63,6 +63,8 @@ class BlockManagerInfoSuite extends SparkFunSuite {
if (svcEnabled) {
assert(getEssBlockStatus(bmInfo, rddId).isEmpty)
}
+ bmInfo.updateBlockInfo(rddId, StorageLevel.NONE, memSize = 0, diskSize = 0)
+ assert(bmInfo.remainingMem === 30000)
}
testWithShuffleServiceOnOff("RDD block with MEMORY_AND_DISK") { (svcEnabled,
bmInfo) =>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]