This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new d4e32c8  [SPARK-35654][CORE] Allow ShuffleDataIO control 
DiskBlockManager.deleteFilesOnStop
d4e32c8 is described below

commit d4e32c896a6d3c71f61f96f0b4c5d98fc8730a21
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Sun Jun 6 09:20:42 2021 -0700

    [SPARK-35654][CORE] Allow ShuffleDataIO control 
DiskBlockManager.deleteFilesOnStop
    
    ### What changes were proposed in this pull request?
    
    This PR aims to change `DiskBlockManager` like the following to allow 
`ShuffleDataIO` to decide the behavior of shuffle file deletion.
    ```scala
    - private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: 
Boolean)
    + private[spark] class DiskBlockManager(conf: SparkConf, var 
deleteFilesOnStop: Boolean)
    ```
    
    ### Why are the changes needed?
    
    `SparkContext` creates
    1. `SparkEnv` (with `BlockManager` and its `DiskBlockManager`)
    2. loads `ShuffleDataIO`
    3. initialize block manager.
    ```scala
    _env = createSparkEnv(_conf, isLocal, listenerBus)
    
    ...
    _shuffleDriverComponents = 
ShuffleDataIOUtils.loadShuffleDataIO(config).driver()
        _shuffleDriverComponents.initializeApplication().asScala.foreach { case 
(k, v) =>
          _conf.set(ShuffleDataIOUtils.SHUFFLE_SPARK_CONF_PREFIX + k, v)
        }
    ...
    
    _env.blockManager.initialize(_applicationId)
    ...
    ```
    
    `DiskBlockManager` is created first at `BlockManager` constructor and we 
cannot change `deleteFilesOnStop` later at `ShuffleDataIO`. By switching to 
`var`, we can implement enhanced shuffle data management feature via 
`ShuffleDataIO` like https://github.com/apache/spark/pull/32730 .
    ```
      val diskBlockManager = {
        // Only perform cleanup if an external service is not serving our 
shuffle files.
        val deleteFilesOnStop =
          !externalShuffleServiceEnabled || executorId == 
SparkContext.DRIVER_IDENTIFIER
        new DiskBlockManager(conf, deleteFilesOnStop)
      }
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    No. This is a private class.
    
    ### How was this patch tested?
    
    N/A
    
    Closes #32784 from dongjoon-hyun/SPARK-35654.
    
    Authored-by: Dongjoon Hyun <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)

diff --git 
a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
index 5db4965..f5ad4f9 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
@@ -32,8 +32,11 @@ import org.apache.spark.util.{ShutdownHookManager, Utils}
  *
  * Block files are hashed among the directories listed in spark.local.dir (or 
in
  * SPARK_LOCAL_DIRS, if it's set).
+ *
+ * ShuffleDataIO also can change the behavior of deleteFilesOnStop.
  */
-private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: 
Boolean) extends Logging {
+private[spark] class DiskBlockManager(conf: SparkConf, var deleteFilesOnStop: 
Boolean)
+  extends Logging {
 
   private[spark] val subDirsPerLocalDir = 
conf.get(config.DISKSTORE_SUB_DIRECTORIES)
 

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to