This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 13315eeec07e [SPARK-50481][CORE] Improve
`SortShuffleManager.unregisterShuffle` to skip checksum file logic if checksum
is disabled
13315eeec07e is described below
commit 13315eeec07e2aebcc05dfee762bbd060ae192ec
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Wed Dec 4 10:32:12 2024 +0900
[SPARK-50481][CORE] Improve `SortShuffleManager.unregisterShuffle` to skip
checksum file logic if checksum is disabled
### What changes were proposed in this pull request?
This PR aims to improve `SortShuffleManager.unregisterShuffle` to skip
checksum file logic if checksum is disabled.
### Why are the changes needed?
`SortShuffleManager.unregisterShuffle` depends on
`IndexShuffleBlockResolver.removeDataByMap`.
https://github.com/apache/spark/blob/7b974ca758961668a26a1d0c60c91614dac38742/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala#L185-L192
It always tries to check and delete the checksum files even when they
doesn't exists.
https://github.com/apache/spark/blob/7b974ca758961668a26a1d0c60c91614dac38742/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala#L198-L201
This PR aims to improve Spark by removing these operations when `checksum`
is disabled.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Pass the CIs.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #49046 from dongjoon-hyun/SPARK-50481.
Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
.../apache/spark/shuffle/IndexShuffleBlockResolver.scala | 14 +++++++++-----
1 file changed, 9 insertions(+), 5 deletions(-)
diff --git
a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
index 30bc1382fb02..bf3117a9a9b1 100644
---
a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
+++
b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
@@ -85,6 +85,9 @@ private[spark] class IndexShuffleBlockResolver(
private val remoteShuffleMaxDisk: Option[Long] =
conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_MAX_DISK_SIZE)
+ private val checksumEnabled = conf.get(config.SHUFFLE_CHECKSUM_ENABLED)
+ private lazy val algorithm = conf.get(config.SHUFFLE_CHECKSUM_ALGORITHM)
+
def getDataFile(shuffleId: Int, mapId: Long): File = getDataFile(shuffleId,
mapId, None)
/**
@@ -195,9 +198,11 @@ private[spark] class IndexShuffleBlockResolver(
logWarning(log"Error deleting index ${MDC(PATH, file.getPath())}")
}
- file = getChecksumFile(shuffleId, mapId,
conf.get(config.SHUFFLE_CHECKSUM_ALGORITHM))
- if (file.exists() && !file.delete()) {
- logWarning(log"Error deleting checksum ${MDC(PATH, file.getPath())}")
+ if (checksumEnabled) {
+ file = getChecksumFile(shuffleId, mapId, algorithm)
+ if (file.exists() && !file.delete()) {
+ logWarning(log"Error deleting checksum ${MDC(PATH, file.getPath())}")
+ }
}
}
@@ -396,8 +401,7 @@ private[spark] class IndexShuffleBlockResolver(
val (checksumFileOpt, checksumTmpOpt) = if (checksumEnabled) {
assert(lengths.length == checksums.length,
"The size of partition lengths and checksums should be equal")
- val checksumFile =
- getChecksumFile(shuffleId, mapId,
conf.get(config.SHUFFLE_CHECKSUM_ALGORITHM))
+ val checksumFile = getChecksumFile(shuffleId, mapId, algorithm)
(Some(checksumFile), Some(createTempFile(checksumFile)))
} else {
(None, None)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]