This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new b7d10505e52 [SPARK-44534][K8S] Handle only shuffle files in 
KubernetesLocalDiskShuffleExecutorComponents
b7d10505e52 is described below

commit b7d10505e524e346420baecdf8a2f2dc7b884e12
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Tue Jul 25 00:12:31 2023 -0700

    [SPARK-44534][K8S] Handle only shuffle files in 
KubernetesLocalDiskShuffleExecutorComponents
    
    ### What changes were proposed in this pull request?
    
    This PR aims to prevent any future regressions due to new block types by 
checking `isShuffle` explicitly before invoking 
`BlockManager.TempFileBasedBlockStoreUpdater` in 
`KubernetesLocalDiskShuffleExecutorComponents`. `isShuffle` allows only 4 block 
types currently.
    
    
https://github.com/apache/spark/blob/58e5d86cc076d4546dac5e1f594977d615ec1e7a/core/src/main/scala/org/apache/spark/storage/BlockId.scala#L43-L46
    
    ### Why are the changes needed?
    
    We want to protect the logic from non-shuffle blocks and new future type 
blocks.
    
    ```
    $ k logs group-by-test-exec-6 | grep 'Try to recover'
    23/07/24 22:59:45 INFO KubernetesLocalDiskShuffleExecutorComponents: Try to 
recover shuffle data.
    23/07/24 22:59:45 INFO KubernetesLocalDiskShuffleExecutorComponents: Try to 
recover
    
/data/spark-1/executor-x/blockmgr-716520a6-2a94-4418-8a23-d56fd9f4191b/11/broadcast_0_piece0
    23/07/24 22:59:45 INFO KubernetesLocalDiskShuffleExecutorComponents: Try to 
recover
    
/data/spark-1/executor-x/blockmgr-716520a6-2a94-4418-8a23-d56fd9f4191b/11/shuffle_0_3136_0.data
    23/07/24 22:59:45 INFO KubernetesLocalDiskShuffleExecutorComponents: Try to 
recover
    
/data/spark-1/executor-x/blockmgr-716520a6-2a94-4418-8a23-d56fd9f4191b/11/shuffle_0_3312_0.data
    23/07/24 22:59:45 INFO KubernetesLocalDiskShuffleExecutorComponents: Try to 
recover
    
/data/spark-1/executor-x/blockmgr-716520a6-2a94-4418-8a23-d56fd9f4191b/11/shuffle_0_3402_0.data
    23/07/24 22:59:45 INFO KubernetesLocalDiskShuffleExecutorComponents: Try to 
recover
    
/data/spark-1/executor-x/blockmgr-716520a6-2a94-4418-8a23-d56fd9f4191b/11/shuffle_0_3752_0.data
    23/07/24 22:59:45 INFO KubernetesLocalDiskShuffleExecutorComponents: Try to 
recover
    
/data/spark-1/executor-x/blockmgr-716520a6-2a94-4418-8a23-d56fd9f4191b/0e/broadcast_0
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Pass the CIs with the newly added test case.
    
    Closes #42138 from dongjoon-hyun/SPARK-44534.
    
    Authored-by: Dongjoon Hyun <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../KubernetesLocalDiskShuffleExecutorComponents.scala     |  9 +++++++--
 .../shuffle/KubernetesLocalDiskShuffleDataIOSuite.scala    | 14 ++++++++++++++
 2 files changed, 21 insertions(+), 2 deletions(-)

diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/shuffle/KubernetesLocalDiskShuffleExecutorComponents.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/shuffle/KubernetesLocalDiskShuffleExecutorComponents.scala
index 8f0729067b9..e553a56b7e1 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/shuffle/KubernetesLocalDiskShuffleExecutorComponents.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/shuffle/KubernetesLocalDiskShuffleExecutorComponents.scala
@@ -98,8 +98,13 @@ object KubernetesLocalDiskShuffleExecutorComponents extends 
Logging {
       logInfo(s"Try to recover ${f.getAbsolutePath}")
       try {
         val id = BlockId(f.getName)
-        val decryptedSize = f.length()
-        bm.TempFileBasedBlockStoreUpdater(id, level, classTag, f, 
decryptedSize).save()
+        // To make it sure to handle only shuffle blocks
+        if (id.isShuffle) {
+          val decryptedSize = f.length()
+          bm.TempFileBasedBlockStoreUpdater(id, level, classTag, f, 
decryptedSize).save()
+        } else {
+          logInfo("Ignore a non-shuffle block file.")
+        }
       } catch {
         case _: UnrecognizedBlockId =>
           logInfo("Skip due to UnrecognizedBlockId.")
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/shuffle/KubernetesLocalDiskShuffleDataIOSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/shuffle/KubernetesLocalDiskShuffleDataIOSuite.scala
index d105ac04171..663be35ce0c 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/shuffle/KubernetesLocalDiskShuffleDataIOSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/shuffle/KubernetesLocalDiskShuffleDataIOSuite.scala
@@ -238,4 +238,18 @@ class KubernetesLocalDiskShuffleDataIOSuite extends 
SparkFunSuite with LocalRoot
     when(bm.TempFileBasedBlockStoreUpdater).thenAnswer(_ => throw new 
Exception())
     KubernetesLocalDiskShuffleExecutorComponents.recoverDiskStore(sparkConf, 
bm)
   }
+
+  test("SPARK-44534: Handle only shuffle files") {
+    val sparkConf = conf.clone.set("spark.local.dir",
+      conf.get("spark.local.dir") + "/spark-x/executor-y")
+    val dir = sparkConf.get("spark.local.dir") + "/blockmgr-z/00"
+    Files.createDirectories(new File(dir).toPath())
+    Seq("broadcast_0", "broadcast_0_piece0", "temp_shuffle_uuid").foreach { f 
=>
+      new File(dir, f).createNewFile()
+    }
+
+    val bm = mock(classOf[BlockManager])
+    when(bm.TempFileBasedBlockStoreUpdater).thenAnswer(_ => throw new 
Exception())
+    KubernetesLocalDiskShuffleExecutorComponents.recoverDiskStore(sparkConf, 
bm)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to