This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 08bb5f0  [SPARK-31314][CORE] Revert SPARK-29285 to fix shuffle 
regression caused by creating temporary file eagerly
08bb5f0 is described below

commit 08bb5f0ffeb4f5e37417f15931717784db544730
Author: Yuanjian Li <xyliyuanj...@gmail.com>
AuthorDate: Tue Mar 31 19:01:08 2020 +0800

    [SPARK-31314][CORE] Revert SPARK-29285 to fix shuffle regression caused by 
creating temporary file eagerly
    
    ### What changes were proposed in this pull request?
    This reverts commit 8cf76f8d61b393bb3abd9780421b978e98db8cae. #25962
    
    ### Why are the changes needed?
    In SPARK-29285, we change to create shuffle temporary eagerly. This is 
helpful for not to fail the entire task in the scenario of occasional disk 
failure. But for the applications that many tasks don't actually create shuffle 
files, it caused overhead. See the below benchmark:
    Env: Spark local-cluster[2, 4, 19968], each queries run 5 round, each round 
5 times.
    Data: TPC-DS scale=99 generate by spark-tpcds-datagen
    Results:
    |     | Base                                                                
                        | Revert                                                
                                      |
    
|-----|---------------------------------------------------------------------------------------------|---------------------------------------------------------------------------------------------|
    | Q20 | Vector(4.096865667, 2.76231748, 2.722007606, 2.514433591, 
2.400373579)  Median 2.722007606  | Vector(3.763185446, 2.586498463, 
2.593472842, 2.320522846, 2.224627274)  Median 2.586498463 |
    | Q33 | Vector(5.872176321, 4.854397586, 4.568787136, 4.393378146, 
4.423996818)  Median 4.568787136 | Vector(5.38746785, 4.361236877, 4.082311276, 
3.867206824, 3.783188024)  Median 4.082311276  |
    | Q52 | Vector(3.978870321, 3.225437871, 3.282411608, 2.869674887, 
2.644490664)  Median 3.225437871 | Vector(4.000381522, 3.196025108, 
3.248787619, 2.767444508, 2.606163423)  Median 3.196025108 |
    | Q56 | Vector(6.238045133, 4.820535173, 4.609965579, 4.313509894, 
4.221256227)  Median 4.609965579 | Vector(6.241611339, 4.225592467, 
4.195202502, 3.757085755, 3.657525982)  Median 4.195202502 |
    
    ### Does this PR introduce any user-facing change?
    No
    
    ### How was this patch tested?
    Existing tests.
    
    Closes #28072 from xuanyuanking/SPARK-29285-revert.
    
    Authored-by: Yuanjian Li <xyliyuanj...@gmail.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
    (cherry picked from commit 07c50784d34e10bbfafac7498c0b70c4ec08048a)
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../apache/spark/storage/DiskBlockManager.scala    | 36 ++++--------------
 .../main/scala/org/apache/spark/util/Utils.scala   |  2 +-
 .../spark/storage/DiskBlockManagerSuite.scala      | 43 +---------------------
 3 files changed, 10 insertions(+), 71 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
index ee43b76..f211394 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
@@ -20,8 +20,6 @@ package org.apache.spark.storage
 import java.io.{File, IOException}
 import java.util.UUID
 
-import scala.util.control.NonFatal
-
 import org.apache.spark.SparkConf
 import org.apache.spark.executor.ExecutorExitCode
 import org.apache.spark.internal.{config, Logging}
@@ -119,38 +117,20 @@ private[spark] class DiskBlockManager(conf: SparkConf, 
deleteFilesOnStop: Boolea
 
   /** Produces a unique block id and File suitable for storing local 
intermediate results. */
   def createTempLocalBlock(): (TempLocalBlockId, File) = {
-    var blockId = TempLocalBlockId(UUID.randomUUID())
-    var tempLocalFile = getFile(blockId)
-    var count = 0
-    while (!canCreateFile(tempLocalFile) && count < 
Utils.MAX_DIR_CREATION_ATTEMPTS) {
-      blockId = TempLocalBlockId(UUID.randomUUID())
-      tempLocalFile = getFile(blockId)
-      count += 1
+    var blockId = new TempLocalBlockId(UUID.randomUUID())
+    while (getFile(blockId).exists()) {
+      blockId = new TempLocalBlockId(UUID.randomUUID())
     }
-    (blockId, tempLocalFile)
+    (blockId, getFile(blockId))
   }
 
   /** Produces a unique block id and File suitable for storing shuffled 
intermediate results. */
   def createTempShuffleBlock(): (TempShuffleBlockId, File) = {
-    var blockId = TempShuffleBlockId(UUID.randomUUID())
-    var tempShuffleFile = getFile(blockId)
-    var count = 0
-    while (!canCreateFile(tempShuffleFile) && count < 
Utils.MAX_DIR_CREATION_ATTEMPTS) {
-      blockId = TempShuffleBlockId(UUID.randomUUID())
-      tempShuffleFile = getFile(blockId)
-      count += 1
-    }
-    (blockId, tempShuffleFile)
-  }
-
-  private def canCreateFile(file: File): Boolean = {
-    try {
-      file.createNewFile()
-    } catch {
-      case NonFatal(_) =>
-        logError("Failed to create temporary block file: " + 
file.getAbsoluteFile)
-        false
+    var blockId = new TempShuffleBlockId(UUID.randomUUID())
+    while (getFile(blockId).exists()) {
+      blockId = new TempShuffleBlockId(UUID.randomUUID())
     }
+    (blockId, getFile(blockId))
   }
 
   /**
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 9f332ba..c7db212 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -95,7 +95,7 @@ private[spark] object Utils extends Logging {
    */
   val DEFAULT_DRIVER_MEM_MB = JavaUtils.DEFAULT_DRIVER_MEM_MB.toInt
 
-  val MAX_DIR_CREATION_ATTEMPTS: Int = 10
+  private val MAX_DIR_CREATION_ATTEMPTS: Int = 10
   @volatile private var localRootDirs: Array[String] = null
 
   /** Scheme used for files that are locally available on worker nodes in the 
cluster. */
diff --git 
a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
index ccc525e..c757dee 100644
--- a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
@@ -51,7 +51,7 @@ class DiskBlockManagerSuite extends SparkFunSuite with 
BeforeAndAfterEach with B
   override def beforeEach(): Unit = {
     super.beforeEach()
     val conf = testConf.clone
-    conf.set("spark.local.dir", 
rootDirs).set("spark.diskStore.subDirectories", "1")
+    conf.set("spark.local.dir", rootDirs)
     diskBlockManager = new DiskBlockManager(conf, deleteFilesOnStop = true)
   }
 
@@ -90,45 +90,4 @@ class DiskBlockManagerSuite extends SparkFunSuite with 
BeforeAndAfterEach with B
     for (i <- 0 until numBytes) writer.write(i)
     writer.close()
   }
-
-  test("temporary shuffle/local file should be able to handle disk failures") {
-    try {
-      // the following two lines pre-create subdirectories under each root dir 
of block manager
-      diskBlockManager.getFile("1")
-      diskBlockManager.getFile("2")
-
-      val tempShuffleFile1 = diskBlockManager.createTempShuffleBlock()._2
-      val tempLocalFile1 = diskBlockManager.createTempLocalBlock()._2
-      assert(tempShuffleFile1.exists(), "There are no bad disks, so temp 
shuffle file exists")
-      assert(tempLocalFile1.exists(), "There are no bad disks, so temp local 
file exists")
-
-      // partial disks damaged
-      rootDir0.setExecutable(false)
-      val tempShuffleFile2 = diskBlockManager.createTempShuffleBlock()._2
-      val tempLocalFile2 = diskBlockManager.createTempLocalBlock()._2
-      // It's possible that after 10 retries we still not able to find the 
healthy disk. we need to
-      // remove the flakiness of these two asserts
-      if (tempShuffleFile2.getParentFile.getParentFile.getParent === 
rootDir1.getAbsolutePath) {
-        assert(tempShuffleFile2.exists(),
-          "There is only one bad disk, so temp shuffle file should be created")
-      }
-      if (tempLocalFile2.getParentFile.getParentFile.getParent === 
rootDir1.getAbsolutePath) {
-        assert(tempLocalFile2.exists(),
-          "There is only one bad disk, so temp local file should be created")
-      }
-
-      // all disks damaged
-      rootDir1.setExecutable(false)
-      val tempShuffleFile3 = diskBlockManager.createTempShuffleBlock()._2
-      val tempLocalFile3 = diskBlockManager.createTempLocalBlock()._2
-      assert(!tempShuffleFile3.exists(),
-        "All disks are broken, so there should be no temp shuffle file 
created")
-      assert(!tempLocalFile3.exists(),
-        "All disks are broken, so there should be no temp local file created")
-    } finally {
-      rootDir0.setExecutable(true)
-      rootDir1.setExecutable(true)
-    }
-
-  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to