This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
     new c13b0ffc437 [SPARK-44501][K8S] Ignore checksum files in 
KubernetesLocalDiskShuffleExecutorComponents
c13b0ffc437 is described below

commit c13b0ffc4377a0d7ee9446deef3ed032532d3be3
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Thu Jul 20 13:22:59 2023 -0700

    [SPARK-44501][K8S] Ignore checksum files in 
KubernetesLocalDiskShuffleExecutorComponents
    
    ### What changes were proposed in this pull request?
    
    This PR aims to improve `KubernetesLocalDiskShuffleExecutorComponents` by 
ignoring checksum files.
    
    ### Why are the changes needed?
    
    To reduce the overhead of `BlockManager.TempFileBasedBlockStoreUpdater` API 
call.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Pass the CIs with the newly added test case.
    
    Closes #42094 from dongjoon-hyun/SPARK-44501.
    
    Authored-by: Dongjoon Hyun <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
    (cherry picked from commit ca822d5f25d5e0bfa4594aea76a26d53bd01b109)
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 ...KubernetesLocalDiskShuffleExecutorComponents.scala | 11 +++++++++--
 .../KubernetesLocalDiskShuffleDataIOSuite.scala       | 19 +++++++++++++++++++
 2 files changed, 28 insertions(+), 2 deletions(-)

diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/shuffle/KubernetesLocalDiskShuffleExecutorComponents.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/shuffle/KubernetesLocalDiskShuffleExecutorComponents.scala
index 3d6379b8713..8f0729067b9 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/shuffle/KubernetesLocalDiskShuffleExecutorComponents.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/shuffle/KubernetesLocalDiskShuffleExecutorComponents.scala
@@ -25,6 +25,7 @@ import scala.reflect.ClassTag
 import org.apache.commons.io.FileExistsException
 
 import org.apache.spark.{SparkConf, SparkEnv}
+import org.apache.spark.deploy.k8s.Config.KUBERNETES_DRIVER_REUSE_PVC
 import org.apache.spark.internal.Logging
 import org.apache.spark.shuffle.api.{ShuffleExecutorComponents, 
ShuffleMapOutputWriter, SingleSpillShuffleMapOutputWriter}
 import org.apache.spark.shuffle.sort.io.LocalDiskShuffleExecutorComponents
@@ -41,12 +42,15 @@ class 
KubernetesLocalDiskShuffleExecutorComponents(sparkConf: SparkConf)
       appId: String, execId: String, extraConfigs: java.util.Map[String, 
String]): Unit = {
     delegate.initializeExecutor(appId, execId, extraConfigs)
     blockManager = SparkEnv.get.blockManager
-    if 
(sparkConf.getBoolean("spark.kubernetes.driver.reusePersistentVolumeClaim", 
false)) {
+    if (sparkConf.getBoolean(KUBERNETES_DRIVER_REUSE_PVC.key, false)) {
+      logInfo("Try to recover shuffle data.")
       // Turn off the deletion of the shuffle data in order to reuse
       blockManager.diskBlockManager.deleteFilesOnStop = false
       Utils.tryLogNonFatalError {
         
KubernetesLocalDiskShuffleExecutorComponents.recoverDiskStore(sparkConf, 
blockManager)
       }
+    } else {
+      logInfo(s"Skip recovery because ${KUBERNETES_DRIVER_REUSE_PVC.key} is 
disabled.")
     }
   }
 
@@ -80,7 +84,7 @@ object KubernetesLocalDiskShuffleExecutorComponents extends 
Logging {
           .flatMap(_.listFiles).filter(_.isDirectory) // executor-xxx
           .flatMap(_.listFiles).filter(_.isDirectory) // blockmgr-xxx
           .flatMap(_.listFiles).filter(_.isDirectory) // 00
-          .flatMap(_.listFiles)
+          .flatMap(_.listFiles).filterNot(_.getName.contains(".checksum"))
         if (files != null) files.toSeq else Seq.empty
       }
 
@@ -91,14 +95,17 @@ object KubernetesLocalDiskShuffleExecutorComponents extends 
Logging {
     val level = StorageLevel.DISK_ONLY
     val (indexFiles, dataFiles) = files.partition(_.getName.endsWith(".index"))
     (dataFiles ++ indexFiles).foreach { f =>
+      logInfo(s"Try to recover ${f.getAbsolutePath}")
       try {
         val id = BlockId(f.getName)
         val decryptedSize = f.length()
         bm.TempFileBasedBlockStoreUpdater(id, level, classTag, f, 
decryptedSize).save()
       } catch {
         case _: UnrecognizedBlockId =>
+          logInfo("Skip due to UnrecognizedBlockId.")
         case _: FileExistsException =>
           // This may happen due to recompute, but we continue to recover next 
files
+          logInfo("Ignore due to FileExistsException.")
       }
     }
   }
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/shuffle/KubernetesLocalDiskShuffleDataIOSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/shuffle/KubernetesLocalDiskShuffleDataIOSuite.scala
index f3d45ced1bb..d105ac04171 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/shuffle/KubernetesLocalDiskShuffleDataIOSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/shuffle/KubernetesLocalDiskShuffleDataIOSuite.scala
@@ -17,8 +17,12 @@
 
 package org.apache.spark.shuffle
 
+import java.io.File
+import java.nio.file.Files
+
 import scala.concurrent.duration._
 
+import org.mockito.Mockito.{mock, when}
 import org.scalatest.concurrent.Eventually.{eventually, interval, timeout}
 
 import org.apache.spark.{LocalRootDirsTest, MapOutputTrackerMaster, 
SparkContext, SparkFunSuite, TestUtils}
@@ -26,6 +30,7 @@ import org.apache.spark.LocalSparkContext.withSpark
 import org.apache.spark.deploy.k8s.Config.KUBERNETES_DRIVER_REUSE_PVC
 import org.apache.spark.internal.config._
 import org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend
+import org.apache.spark.storage.BlockManager
 
 class KubernetesLocalDiskShuffleDataIOSuite extends SparkFunSuite with 
LocalRootDirsTest {
 
@@ -219,4 +224,18 @@ class KubernetesLocalDiskShuffleDataIOSuite extends 
SparkFunSuite with LocalRoot
       }
     }
   }
+
+  test("SPARK-44501: Ignore checksum files") {
+    val sparkConf = conf.clone.set("spark.local.dir",
+      conf.get("spark.local.dir") + "/spark-x/executor-y")
+    val dir = sparkConf.get("spark.local.dir") + "/blockmgr-z/00"
+    Files.createDirectories(new File(dir).toPath())
+    Seq("ADLER32", "CRC32").foreach { algorithm =>
+      new File(dir, s"1.checksum.$algorithm").createNewFile()
+    }
+
+    val bm = mock(classOf[BlockManager])
+    when(bm.TempFileBasedBlockStoreUpdater).thenAnswer(_ => throw new 
Exception())
+    KubernetesLocalDiskShuffleExecutorComponents.recoverDiskStore(sparkConf, 
bm)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to