This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new d4e32c8 [SPARK-35654][CORE] Allow ShuffleDataIO control
DiskBlockManager.deleteFilesOnStop
d4e32c8 is described below
commit d4e32c896a6d3c71f61f96f0b4c5d98fc8730a21
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Sun Jun 6 09:20:42 2021 -0700
[SPARK-35654][CORE] Allow ShuffleDataIO control
DiskBlockManager.deleteFilesOnStop
### What changes were proposed in this pull request?
This PR aims to change `DiskBlockManager` like the following to allow
`ShuffleDataIO` to decide the behavior of shuffle file deletion.
```scala
- private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop:
Boolean)
+ private[spark] class DiskBlockManager(conf: SparkConf, var
deleteFilesOnStop: Boolean)
```
### Why are the changes needed?
`SparkContext` creates
1. `SparkEnv` (with `BlockManager` and its `DiskBlockManager`)
2. loads `ShuffleDataIO`
3. initialize block manager.
```scala
_env = createSparkEnv(_conf, isLocal, listenerBus)
...
_shuffleDriverComponents =
ShuffleDataIOUtils.loadShuffleDataIO(config).driver()
_shuffleDriverComponents.initializeApplication().asScala.foreach { case
(k, v) =>
_conf.set(ShuffleDataIOUtils.SHUFFLE_SPARK_CONF_PREFIX + k, v)
}
...
_env.blockManager.initialize(_applicationId)
...
```
`DiskBlockManager` is created first at `BlockManager` constructor and we
cannot change `deleteFilesOnStop` later at `ShuffleDataIO`. By switching to
`var`, we can implement enhanced shuffle data management feature via
`ShuffleDataIO` like https://github.com/apache/spark/pull/32730 .
```
val diskBlockManager = {
// Only perform cleanup if an external service is not serving our
shuffle files.
val deleteFilesOnStop =
!externalShuffleServiceEnabled || executorId ==
SparkContext.DRIVER_IDENTIFIER
new DiskBlockManager(conf, deleteFilesOnStop)
}
```
### Does this PR introduce _any_ user-facing change?
No. This is a private class.
### How was this patch tested?
N/A
Closes #32784 from dongjoon-hyun/SPARK-35654.
Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala | 5 ++++-
1 file changed, 4 insertions(+), 1 deletion(-)
diff --git
a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
index 5db4965..f5ad4f9 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
@@ -32,8 +32,11 @@ import org.apache.spark.util.{ShutdownHookManager, Utils}
*
* Block files are hashed among the directories listed in spark.local.dir (or
in
* SPARK_LOCAL_DIRS, if it's set).
+ *
+ * ShuffleDataIO also can change the behavior of deleteFilesOnStop.
*/
-private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop:
Boolean) extends Logging {
+private[spark] class DiskBlockManager(conf: SparkConf, var deleteFilesOnStop:
Boolean)
+ extends Logging {
private[spark] val subDirsPerLocalDir =
conf.get(config.DISKSTORE_SUB_DIRECTORIES)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]