This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push:
new c13b0ffc437 [SPARK-44501][K8S] Ignore checksum files in
KubernetesLocalDiskShuffleExecutorComponents
c13b0ffc437 is described below
commit c13b0ffc4377a0d7ee9446deef3ed032532d3be3
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Thu Jul 20 13:22:59 2023 -0700
[SPARK-44501][K8S] Ignore checksum files in
KubernetesLocalDiskShuffleExecutorComponents
### What changes were proposed in this pull request?
This PR aims to improve `KubernetesLocalDiskShuffleExecutorComponents` by
ignoring checksum files.
### Why are the changes needed?
To reduce the overhead of `BlockManager.TempFileBasedBlockStoreUpdater` API
call.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Pass the CIs with the newly added test case.
Closes #42094 from dongjoon-hyun/SPARK-44501.
Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit ca822d5f25d5e0bfa4594aea76a26d53bd01b109)
Signed-off-by: Dongjoon Hyun <[email protected]>
---
...KubernetesLocalDiskShuffleExecutorComponents.scala | 11 +++++++++--
.../KubernetesLocalDiskShuffleDataIOSuite.scala | 19 +++++++++++++++++++
2 files changed, 28 insertions(+), 2 deletions(-)
diff --git
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/shuffle/KubernetesLocalDiskShuffleExecutorComponents.scala
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/shuffle/KubernetesLocalDiskShuffleExecutorComponents.scala
index 3d6379b8713..8f0729067b9 100644
---
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/shuffle/KubernetesLocalDiskShuffleExecutorComponents.scala
+++
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/shuffle/KubernetesLocalDiskShuffleExecutorComponents.scala
@@ -25,6 +25,7 @@ import scala.reflect.ClassTag
import org.apache.commons.io.FileExistsException
import org.apache.spark.{SparkConf, SparkEnv}
+import org.apache.spark.deploy.k8s.Config.KUBERNETES_DRIVER_REUSE_PVC
import org.apache.spark.internal.Logging
import org.apache.spark.shuffle.api.{ShuffleExecutorComponents,
ShuffleMapOutputWriter, SingleSpillShuffleMapOutputWriter}
import org.apache.spark.shuffle.sort.io.LocalDiskShuffleExecutorComponents
@@ -41,12 +42,15 @@ class
KubernetesLocalDiskShuffleExecutorComponents(sparkConf: SparkConf)
appId: String, execId: String, extraConfigs: java.util.Map[String,
String]): Unit = {
delegate.initializeExecutor(appId, execId, extraConfigs)
blockManager = SparkEnv.get.blockManager
- if
(sparkConf.getBoolean("spark.kubernetes.driver.reusePersistentVolumeClaim",
false)) {
+ if (sparkConf.getBoolean(KUBERNETES_DRIVER_REUSE_PVC.key, false)) {
+ logInfo("Try to recover shuffle data.")
// Turn off the deletion of the shuffle data in order to reuse
blockManager.diskBlockManager.deleteFilesOnStop = false
Utils.tryLogNonFatalError {
KubernetesLocalDiskShuffleExecutorComponents.recoverDiskStore(sparkConf,
blockManager)
}
+ } else {
+ logInfo(s"Skip recovery because ${KUBERNETES_DRIVER_REUSE_PVC.key} is
disabled.")
}
}
@@ -80,7 +84,7 @@ object KubernetesLocalDiskShuffleExecutorComponents extends
Logging {
.flatMap(_.listFiles).filter(_.isDirectory) // executor-xxx
.flatMap(_.listFiles).filter(_.isDirectory) // blockmgr-xxx
.flatMap(_.listFiles).filter(_.isDirectory) // 00
- .flatMap(_.listFiles)
+ .flatMap(_.listFiles).filterNot(_.getName.contains(".checksum"))
if (files != null) files.toSeq else Seq.empty
}
@@ -91,14 +95,17 @@ object KubernetesLocalDiskShuffleExecutorComponents extends
Logging {
val level = StorageLevel.DISK_ONLY
val (indexFiles, dataFiles) = files.partition(_.getName.endsWith(".index"))
(dataFiles ++ indexFiles).foreach { f =>
+ logInfo(s"Try to recover ${f.getAbsolutePath}")
try {
val id = BlockId(f.getName)
val decryptedSize = f.length()
bm.TempFileBasedBlockStoreUpdater(id, level, classTag, f,
decryptedSize).save()
} catch {
case _: UnrecognizedBlockId =>
+ logInfo("Skip due to UnrecognizedBlockId.")
case _: FileExistsException =>
// This may happen due to recompute, but we continue to recover next
files
+ logInfo("Ignore due to FileExistsException.")
}
}
}
diff --git
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/shuffle/KubernetesLocalDiskShuffleDataIOSuite.scala
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/shuffle/KubernetesLocalDiskShuffleDataIOSuite.scala
index f3d45ced1bb..d105ac04171 100644
---
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/shuffle/KubernetesLocalDiskShuffleDataIOSuite.scala
+++
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/shuffle/KubernetesLocalDiskShuffleDataIOSuite.scala
@@ -17,8 +17,12 @@
package org.apache.spark.shuffle
+import java.io.File
+import java.nio.file.Files
+
import scala.concurrent.duration._
+import org.mockito.Mockito.{mock, when}
import org.scalatest.concurrent.Eventually.{eventually, interval, timeout}
import org.apache.spark.{LocalRootDirsTest, MapOutputTrackerMaster,
SparkContext, SparkFunSuite, TestUtils}
@@ -26,6 +30,7 @@ import org.apache.spark.LocalSparkContext.withSpark
import org.apache.spark.deploy.k8s.Config.KUBERNETES_DRIVER_REUSE_PVC
import org.apache.spark.internal.config._
import org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend
+import org.apache.spark.storage.BlockManager
class KubernetesLocalDiskShuffleDataIOSuite extends SparkFunSuite with
LocalRootDirsTest {
@@ -219,4 +224,18 @@ class KubernetesLocalDiskShuffleDataIOSuite extends
SparkFunSuite with LocalRoot
}
}
}
+
+ test("SPARK-44501: Ignore checksum files") {
+ val sparkConf = conf.clone.set("spark.local.dir",
+ conf.get("spark.local.dir") + "/spark-x/executor-y")
+ val dir = sparkConf.get("spark.local.dir") + "/blockmgr-z/00"
+ Files.createDirectories(new File(dir).toPath())
+ Seq("ADLER32", "CRC32").foreach { algorithm =>
+ new File(dir, s"1.checksum.$algorithm").createNewFile()
+ }
+
+ val bm = mock(classOf[BlockManager])
+ when(bm.TempFileBasedBlockStoreUpdater).thenAnswer(_ => throw new
Exception())
+ KubernetesLocalDiskShuffleExecutorComponents.recoverDiskStore(sparkConf,
bm)
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]