This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push:
new c0d1a3c05ab [SPARK-44534][K8S] Handle only shuffle files in
KubernetesLocalDiskShuffleExecutorComponents
c0d1a3c05ab is described below
commit c0d1a3c05abf967ad3a9d7d1448a0da4478db9ce
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Tue Jul 25 00:12:31 2023 -0700
[SPARK-44534][K8S] Handle only shuffle files in
KubernetesLocalDiskShuffleExecutorComponents
### What changes were proposed in this pull request?
This PR aims to prevent any future regressions due to new block types by
checking `isShuffle` explicitly before invoking
`BlockManager.TempFileBasedBlockStoreUpdater` in
`KubernetesLocalDiskShuffleExecutorComponents`. `isShuffle` allows only 4 block
types currently.
https://github.com/apache/spark/blob/58e5d86cc076d4546dac5e1f594977d615ec1e7a/core/src/main/scala/org/apache/spark/storage/BlockId.scala#L43-L46
### Why are the changes needed?
We want to protect the logic from non-shuffle blocks and new future type
blocks.
```
$ k logs group-by-test-exec-6 | grep 'Try to recover'
23/07/24 22:59:45 INFO KubernetesLocalDiskShuffleExecutorComponents: Try to
recover shuffle data.
23/07/24 22:59:45 INFO KubernetesLocalDiskShuffleExecutorComponents: Try to
recover
/data/spark-1/executor-x/blockmgr-716520a6-2a94-4418-8a23-d56fd9f4191b/11/broadcast_0_piece0
23/07/24 22:59:45 INFO KubernetesLocalDiskShuffleExecutorComponents: Try to
recover
/data/spark-1/executor-x/blockmgr-716520a6-2a94-4418-8a23-d56fd9f4191b/11/shuffle_0_3136_0.data
23/07/24 22:59:45 INFO KubernetesLocalDiskShuffleExecutorComponents: Try to
recover
/data/spark-1/executor-x/blockmgr-716520a6-2a94-4418-8a23-d56fd9f4191b/11/shuffle_0_3312_0.data
23/07/24 22:59:45 INFO KubernetesLocalDiskShuffleExecutorComponents: Try to
recover
/data/spark-1/executor-x/blockmgr-716520a6-2a94-4418-8a23-d56fd9f4191b/11/shuffle_0_3402_0.data
23/07/24 22:59:45 INFO KubernetesLocalDiskShuffleExecutorComponents: Try to
recover
/data/spark-1/executor-x/blockmgr-716520a6-2a94-4418-8a23-d56fd9f4191b/11/shuffle_0_3752_0.data
23/07/24 22:59:45 INFO KubernetesLocalDiskShuffleExecutorComponents: Try to
recover
/data/spark-1/executor-x/blockmgr-716520a6-2a94-4418-8a23-d56fd9f4191b/0e/broadcast_0
```
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Pass the CIs with the newly added test case.
Closes #42138 from dongjoon-hyun/SPARK-44534.
Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit b7d10505e524e346420baecdf8a2f2dc7b884e12)
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../KubernetesLocalDiskShuffleExecutorComponents.scala | 9 +++++++--
.../shuffle/KubernetesLocalDiskShuffleDataIOSuite.scala | 14 ++++++++++++++
2 files changed, 21 insertions(+), 2 deletions(-)
diff --git
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/shuffle/KubernetesLocalDiskShuffleExecutorComponents.scala
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/shuffle/KubernetesLocalDiskShuffleExecutorComponents.scala
index 8f0729067b9..e553a56b7e1 100644
---
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/shuffle/KubernetesLocalDiskShuffleExecutorComponents.scala
+++
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/shuffle/KubernetesLocalDiskShuffleExecutorComponents.scala
@@ -98,8 +98,13 @@ object KubernetesLocalDiskShuffleExecutorComponents extends
Logging {
logInfo(s"Try to recover ${f.getAbsolutePath}")
try {
val id = BlockId(f.getName)
- val decryptedSize = f.length()
- bm.TempFileBasedBlockStoreUpdater(id, level, classTag, f,
decryptedSize).save()
+ // To make it sure to handle only shuffle blocks
+ if (id.isShuffle) {
+ val decryptedSize = f.length()
+ bm.TempFileBasedBlockStoreUpdater(id, level, classTag, f,
decryptedSize).save()
+ } else {
+ logInfo("Ignore a non-shuffle block file.")
+ }
} catch {
case _: UnrecognizedBlockId =>
logInfo("Skip due to UnrecognizedBlockId.")
diff --git
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/shuffle/KubernetesLocalDiskShuffleDataIOSuite.scala
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/shuffle/KubernetesLocalDiskShuffleDataIOSuite.scala
index d105ac04171..663be35ce0c 100644
---
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/shuffle/KubernetesLocalDiskShuffleDataIOSuite.scala
+++
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/shuffle/KubernetesLocalDiskShuffleDataIOSuite.scala
@@ -238,4 +238,18 @@ class KubernetesLocalDiskShuffleDataIOSuite extends
SparkFunSuite with LocalRoot
when(bm.TempFileBasedBlockStoreUpdater).thenAnswer(_ => throw new
Exception())
KubernetesLocalDiskShuffleExecutorComponents.recoverDiskStore(sparkConf,
bm)
}
+
+ test("SPARK-44534: Handle only shuffle files") {
+ val sparkConf = conf.clone.set("spark.local.dir",
+ conf.get("spark.local.dir") + "/spark-x/executor-y")
+ val dir = sparkConf.get("spark.local.dir") + "/blockmgr-z/00"
+ Files.createDirectories(new File(dir).toPath())
+ Seq("broadcast_0", "broadcast_0_piece0", "temp_shuffle_uuid").foreach { f
=>
+ new File(dir, f).createNewFile()
+ }
+
+ val bm = mock(classOf[BlockManager])
+ when(bm.TempFileBasedBlockStoreUpdater).thenAnswer(_ => throw new
Exception())
+ KubernetesLocalDiskShuffleExecutorComponents.recoverDiskStore(sparkConf,
bm)
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]