[ 
https://issues.apache.org/jira/browse/SPARK-12196?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16715712#comment-16715712
 ] 

ASF GitHub Bot commented on SPARK-12196:
----------------------------------------

vanzin closed pull request #10225: [SPARK-12196][Core] Store/retrieve blocks 
from different speed storage devices by hierarchical way
URL: https://github.com/apache/spark/pull/10225
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
 
b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
index 4a15559e55cbd..171017546d847 100644
--- 
a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
+++ 
b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
@@ -125,7 +125,7 @@ public void write(Iterator<Product2<K, V>> records) throws 
IOException {
     assert (partitionWriters == null);
     if (!records.hasNext()) {
       partitionLengths = new long[numPartitions];
-      shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, 
partitionLengths, null);
+      shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, 
partitionLengths, null, null);
       mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), 
partitionLengths);
       return;
     }
@@ -162,7 +162,7 @@ public void write(Iterator<Product2<K, V>> records) throws 
IOException {
     File tmp = Utils.tempFileWith(output);
     try {
       partitionLengths = writePartitionedFile(tmp);
-      shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, 
partitionLengths, tmp);
+      shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, 
partitionLengths, tmp, output);
     } finally {
       if (tmp.exists() && !tmp.delete()) {
         logger.error("Error while deleting temp file {}", 
tmp.getAbsolutePath());
diff --git 
a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java 
b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
index 8a1771848dee6..131c261b46e32 100644
--- a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
+++ b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
@@ -221,7 +221,7 @@ void closeAndWriteOutput() throws IOException {
           }
         }
       }
-      shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, 
partitionLengths, tmp);
+      shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, 
partitionLengths, tmp, output);
     } finally {
       if (tmp.exists() && !tmp.delete()) {
         logger.error("Error while deleting temp file {}", 
tmp.getAbsolutePath());
diff --git 
a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala 
b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
index 91858f0912b65..3f75a7fa9b3c0 100644
--- 
a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
+++ 
b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
@@ -137,7 +137,11 @@ private[spark] class IndexShuffleBlockResolver(
       shuffleId: Int,
       mapId: Int,
       lengths: Array[Long],
-      dataTmp: File): Unit = {
+      // SPARK-12196, when using tiered store,
+      // multiple call of getDataFile could return different value,
+      // so we pass both dataFileTmp and dataFile for renaming.
+      dataFileTmp: File,
+      dataFile: File): Unit = {
     val indexFile = getIndexFile(shuffleId, mapId)
     val indexTmp = Utils.tempFileWith(indexFile)
     try {
@@ -154,17 +158,17 @@ private[spark] class IndexShuffleBlockResolver(
         out.close()
       }
 
-      val dataFile = getDataFile(shuffleId, mapId)
       // There is only one IndexShuffleBlockResolver per executor, this 
synchronization make sure
       // the following check and rename are atomic.
       synchronized {
-        val existingLengths = checkIndexAndDataFile(indexFile, dataFile, 
lengths.length)
+        val existingLengths = checkIndexAndDataFile(getIndexFile(shuffleId, 
mapId),
+          getDataFile(shuffleId, mapId), lengths.length)
         if (existingLengths != null) {
           // Another attempt for the same task has already written our map 
outputs successfully,
           // so just use the existing partition lengths and delete our 
temporary map outputs.
           System.arraycopy(existingLengths, 0, lengths, 0, lengths.length)
-          if (dataTmp != null && dataTmp.exists()) {
-            dataTmp.delete()
+          if (dataFileTmp != null && dataFileTmp.exists()) {
+            dataFileTmp.delete()
           }
           indexTmp.delete()
         } else {
@@ -173,14 +177,15 @@ private[spark] class IndexShuffleBlockResolver(
           if (indexFile.exists()) {
             indexFile.delete()
           }
-          if (dataFile.exists()) {
+          if (dataFile != null && dataFile.exists()) {
             dataFile.delete()
           }
           if (!indexTmp.renameTo(indexFile)) {
             throw new IOException("fail to rename file " + indexTmp + " to " + 
indexFile)
           }
-          if (dataTmp != null && dataTmp.exists() && 
!dataTmp.renameTo(dataFile)) {
-            throw new IOException("fail to rename file " + dataTmp + " to " + 
dataFile)
+          if (dataFile != null && dataFileTmp != null
+            && dataFileTmp.exists() && !dataFileTmp.renameTo(dataFile)) {
+            throw new IOException("fail to rename file " + dataFileTmp + " to 
" + dataFile)
           }
         }
       }
diff --git 
a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala 
b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
index 636b88e792bf3..1acf8d5e840fe 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
@@ -70,7 +70,8 @@ private[spark] class SortShuffleWriter[K, V, C](
     try {
       val blockId = ShuffleBlockId(dep.shuffleId, mapId, 
IndexShuffleBlockResolver.NOOP_REDUCE_ID)
       val partitionLengths = sorter.writePartitionedFile(blockId, tmp)
-      shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, 
partitionLengths, tmp)
+      shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId,
+        partitionLengths, tmp, output)
       mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)
     } finally {
       if (tmp.exists() && !tmp.delete()) {
diff --git 
a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
index 3d43e3c367aac..ce7555cf6e68e 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
@@ -44,38 +44,41 @@ private[spark] class DiskBlockManager(conf: SparkConf, 
deleteFilesOnStop: Boolea
     logError("Failed to create any local dir.")
     System.exit(ExecutorExitCode.DISK_STORE_FAILED_TO_CREATE_DIR)
   }
-  // The content of subDirs is immutable but the content of subDirs(i) is 
mutable. And the content
-  // of subDirs(i) is protected by the lock of subDirs(i)
-  private val subDirs = Array.fill(localDirs.length)(new 
Array[File](subDirsPerLocalDir))
 
   private val shutdownHook = addShutdownHook()
 
-  /** Looks up a file by hashing it into one of our local subdirectories. */
-  // This method should be kept in sync with
-  // org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getFile().
-  def getFile(filename: String): File = {
-    // Figure out which local directory it hashes to, and which subdirectory 
in that
-    val hash = Utils.nonNegativeHash(filename)
-    val dirId = hash % localDirs.length
-    val subDirId = (hash / localDirs.length) % subDirsPerLocalDir
-
-    // Create the subdirectory if it doesn't already exist
-    val subDir = subDirs(dirId).synchronized {
-      val old = subDirs(dirId)(subDirId)
-      if (old != null) {
-        old
-      } else {
-        val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
-        if (!newDir.exists() && !newDir.mkdir()) {
-          throw new IOException(s"Failed to create local dir in $newDir.")
-        }
-        subDirs(dirId)(subDirId) = newDir
-        newDir
-      }
+  private val shortFileAllocatorNames = Map(
+    "hash" -> classOf[HashAllocator].getName,
+    "tiered" -> classOf[TieredAllocator].getName)
+  private def createFileAllcator(allocatorName: String): FileAllocator = {
+    val allocatorClass = shortFileAllocatorNames
+      .getOrElse(allocatorName.toLowerCase, allocatorName)
+    val allocator = try {
+      val clazz = Utils.classForName(allocatorClass)
+      val ctor = clazz.getConstructor(classOf[SparkConf],
+        classOf[Array[File]], java.lang.Integer.TYPE)
+      Some(ctor.newInstance(conf, localDirs, new Integer(subDirsPerLocalDir))
+        .asInstanceOf[FileAllocator])
+    } catch {
+      case _: ClassNotFoundException =>
+        logWarning(s"Could not find class for fileAllocator($allocatorClass).")
+        None
+      case _: Exception =>
+        logWarning(s"Fail to initialize fileAllocator($allocatorClass).")
+        None
+    }
+    if (allocator.isDefined && allocator.get.support) {
+      logInfo(s"fileAllocator($allocatorClass) is enabled.")
+      allocator.get
+    } else {
+      logWarning("Default fileAllocator(HashAllocator) is enabled instead.")
+      new HashAllocator(conf, localDirs, subDirsPerLocalDir)
     }
-
-    new File(subDir, filename)
   }
+  private val allocatorName = conf.get("spark.diskStore.allocator", "hash")
+  private val fileAllocator: FileAllocator = createFileAllcator(allocatorName)
+
+  def getFile(filename: String): File = fileAllocator(filename)
 
   def getFile(blockId: BlockId): File = getFile(blockId.name)
 
@@ -87,7 +90,7 @@ private[spark] class DiskBlockManager(conf: SparkConf, 
deleteFilesOnStop: Boolea
   /** List all the files currently stored on disk by the disk manager. */
   def getAllFiles(): Seq[File] = {
     // Get all the files inside the array of array of directories
-    subDirs.flatMap { dir =>
+    fileAllocator.subDirs.flatMap { dir =>
       dir.synchronized {
         // Copy the content of dir because it may be modified in other threads
         dir.clone()
diff --git a/core/src/main/scala/org/apache/spark/storage/FileAllocator.scala 
b/core/src/main/scala/org/apache/spark/storage/FileAllocator.scala
new file mode 100644
index 0000000000000..124f5b9da5eff
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/storage/FileAllocator.scala
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import java.io.{File, IOException}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+private[storage] abstract class FileAllocator(
+    conf: SparkConf,
+    localDirs: Array[File],
+    subDirsPerLocalDir: Int)
+  extends Logging {
+  // The content of subDirs is immutable but the content of subDirs(i) is 
mutable. And the content
+  // of subDirs(i) is protected by the lock of subDirs(i)
+  val subDirs: Array[Array[File]] =
+      Array.fill(localDirs.length)(new Array[File](subDirsPerLocalDir))
+
+  def support: Boolean = true
+
+  def apply(filename: String): File
+
+  protected def getFile(filename: String, storageDirs: Array[File]): File = {
+    require(storageDirs.nonEmpty, "could not find file when the directories 
are empty")
+
+    // Figure out which local directory it hashes to, and which subdirectory 
in that
+    val hash = Utils.nonNegativeHash(filename)
+    val dirId = localDirs.indexOf(storageDirs(hash % storageDirs.length))
+    val subDirId = (hash / storageDirs.length) % subDirsPerLocalDir
+
+    // Create the subdirectory if it doesn't already exist
+    val subDir = subDirs(dirId).synchronized {
+      val old = subDirs(dirId)(subDirId)
+      if (old != null) {
+        old
+      } else {
+        val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
+        if (!newDir.exists() && !newDir.mkdir()) {
+          throw new IOException(s"Failed to create local dir in $newDir.")
+        }
+        subDirs(dirId)(subDirId) = newDir
+        newDir
+      }
+    }
+
+    new File(subDir, filename)
+  }
+}
+
+/** Looks up a file by hashing it into one of our local subdirectories. */
+// This method should be kept in sync with
+// org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getFile().
+private[storage] class HashAllocator(
+    conf: SparkConf,
+    localDirs: Array[File],
+    subDirsPerLocalDir: Int)
+  extends FileAllocator(
+    conf,
+    localDirs,
+    subDirsPerLocalDir) {
+  def apply(filename: String): File = getFile(filename, localDirs)
+}
+
+/** Looks up a file by tier way in different speed storage devices. */
+private[storage] class TieredAllocator(
+    conf: SparkConf,
+    localDirs: Array[File],
+    subDirsPerLocalDir: Int)
+  extends FileAllocator(
+    conf,
+    localDirs,
+    subDirsPerLocalDir) {
+  private val tiersEnvConf = conf.getenv("SPARK_DIRS_TIERS")
+  private val threshold = conf.getDouble("spark.diskStore.tiered.threshold", 
0.15)
+
+  if (tiersEnvConf == null) {
+    logAndThrowException("SPARK_DIRS_TIERS is not set.")
+  }
+  private val tiersIDs = tiersEnvConf.trim.split("")
+
+  if(localDirs.length != tiersIDs.length) {
+    logAndThrowException(s"Incorrect SPARK_DIRS_TIERS setting, " +
+      s"SPARK_DIRS_TIERS = '$tiersEnvConf'.")
+  }
+  private val tieredDirs: Seq[Array[File]] = (localDirs zip tiersIDs).
+    groupBy(_._2).mapValues(_.map(_._1)).toSeq.sortBy(_._1).map(_._2)
+  tieredDirs.zipWithIndex.foreach {
+    case (dirs, index) =>
+      logInfo(s"Tier $index:")
+      dirs.foreach(d => logInfo(s"    $d"))
+  }
+
+  override def support: Boolean = {
+    // TODO: tiered allocation for external shuffle service will be supported 
in another PR
+    !conf.getBoolean("spark.shuffle.service.enabled", defaultValue = false)
+  }
+
+  def hasEnoughSpace(file: File): Boolean = {
+    file.getParentFile.getFreeSpace * 1.0 / file.getParentFile.getTotalSpace 
>= threshold
+  }
+
+  def apply(filename: String): File = {
+    var file: File = null
+    var availableFile: File = null
+    for (tier <- tieredDirs) {
+      file = getFile(filename, tier)
+      if (file.exists()) return file
+
+      if (availableFile == null && hasEnoughSpace(file)) {
+        availableFile = file
+      }
+    }
+    Option(availableFile).getOrElse(file)
+  }
+
+  private def logAndThrowException(msg: String) = {
+    logWarning(msg)
+    throw new IllegalArgumentException(msg)
+  }
+}
diff --git 
a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
 
b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
index 088b68132d905..9a7ac77d485c9 100644
--- 
a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
+++ 
b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
@@ -141,12 +141,14 @@ public DiskBlockObjectWriter answer(InvocationOnMock 
invocationOnMock) throws Th
       public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
         partitionSizesInMergedFile = (long[]) 
invocationOnMock.getArguments()[2];
         File tmp = (File) invocationOnMock.getArguments()[3];
-        mergedOutputFile.delete();
-        tmp.renameTo(mergedOutputFile);
+        File dataFile = (File) invocationOnMock.getArguments()[4];
+        dataFile.delete();
+        tmp.renameTo(dataFile);
         return null;
       }
     }).when(shuffleBlockResolver)
-      .writeIndexFileAndCommit(anyInt(), anyInt(), any(long[].class), 
any(File.class));
+      .writeIndexFileAndCommit(anyInt(), anyInt(), any(long[].class),
+              any(File.class), any(File.class));
 
     when(diskBlockManager.createTempShuffleBlock()).thenAnswer(
       new Answer<Tuple2<TempShuffleBlockId, File>>() {
diff --git 
a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala
 
b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala
index 85ccb33471048..e2c5fdbca3eae 100644
--- 
a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala
@@ -72,14 +72,16 @@ class BypassMergeSortShuffleWriterSuite extends 
SparkFunSuite with BeforeAndAfte
     doAnswer(new Answer[Void] {
       def answer(invocationOnMock: InvocationOnMock): Void = {
         val tmp: File = invocationOnMock.getArguments()(3).asInstanceOf[File]
-        if (tmp != null) {
-          outputFile.delete
-          tmp.renameTo(outputFile)
+        val dataFile: File = 
invocationOnMock.getArguments()(4).asInstanceOf[File]
+        if (tmp != null && dataFile != null) {
+          dataFile.delete
+          tmp.renameTo(dataFile)
         }
         null
       }
     }).when(blockResolver)
-      .writeIndexFileAndCommit(anyInt, anyInt, any(classOf[Array[Long]]), 
any(classOf[File]))
+      .writeIndexFileAndCommit(anyInt, anyInt, any(classOf[Array[Long]]),
+        any(classOf[File]), any(classOf[File]))
     when(blockManager.diskBlockManager).thenReturn(diskBlockManager)
     when(blockManager.getDiskWriter(
       any[BlockId],
diff --git 
a/core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala
 
b/core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala
index d21ce73f4021e..36660ffbeb13d 100644
--- 
a/core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala
@@ -65,15 +65,19 @@ class IndexShuffleBlockResolverSuite extends SparkFunSuite 
with BeforeAndAfterEa
 
   test("commit shuffle files multiple times") {
     val resolver = new IndexShuffleBlockResolver(conf, blockManager)
+    val shuffleId = 1
+    val mapId = 2
+
     val lengths = Array[Long](10, 0, 20)
-    val dataTmp = File.createTempFile("shuffle", null, tempDir)
+    val data = resolver.getDataFile(shuffleId, mapId)
+    val dataTmp = Utils.tempFileWith(data)
     val out = new FileOutputStream(dataTmp)
     Utils.tryWithSafeFinally {
       out.write(new Array[Byte](30))
     } {
       out.close()
     }
-    resolver.writeIndexFileAndCommit(1, 2, lengths, dataTmp)
+    resolver.writeIndexFileAndCommit(1, 2, lengths, dataTmp, data)
 
     val dataFile = resolver.getDataFile(1, 2)
     assert(dataFile.exists())
@@ -81,7 +85,8 @@ class IndexShuffleBlockResolverSuite extends SparkFunSuite 
with BeforeAndAfterEa
     assert(!dataTmp.exists())
 
     val lengths2 = new Array[Long](3)
-    val dataTmp2 = File.createTempFile("shuffle", null, tempDir)
+    val data2 = resolver.getDataFile(shuffleId, mapId)
+    val dataTmp2 = Utils.tempFileWith(data2)
     val out2 = new FileOutputStream(dataTmp2)
     Utils.tryWithSafeFinally {
       out2.write(Array[Byte](1))
@@ -89,7 +94,7 @@ class IndexShuffleBlockResolverSuite extends SparkFunSuite 
with BeforeAndAfterEa
     } {
       out2.close()
     }
-    resolver.writeIndexFileAndCommit(1, 2, lengths2, dataTmp2)
+    resolver.writeIndexFileAndCommit(1, 2, lengths2, dataTmp2, data2)
     assert(lengths2.toSeq === lengths.toSeq)
     assert(dataFile.exists())
     assert(dataFile.length() === 30)
@@ -109,7 +114,8 @@ class IndexShuffleBlockResolverSuite extends SparkFunSuite 
with BeforeAndAfterEa
     dataFile.delete()
 
     val lengths3 = Array[Long](10, 10, 15)
-    val dataTmp3 = File.createTempFile("shuffle", null, tempDir)
+    val data3 = resolver.getDataFile(shuffleId, mapId)
+    val dataTmp3 = Utils.tempFileWith(data3)
     val out3 = new FileOutputStream(dataTmp3)
     Utils.tryWithSafeFinally {
       out3.write(Array[Byte](2))
@@ -117,7 +123,7 @@ class IndexShuffleBlockResolverSuite extends SparkFunSuite 
with BeforeAndAfterEa
     } {
       out3.close()
     }
-    resolver.writeIndexFileAndCommit(1, 2, lengths3, dataTmp3)
+    resolver.writeIndexFileAndCommit(1, 2, lengths3, dataTmp3, data3)
     assert(lengths3.toSeq != lengths.toSeq)
     assert(dataFile.exists())
     assert(dataFile.length() === 35)
diff --git 
a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerTieredSuite.scala
 
b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerTieredSuite.scala
new file mode 100644
index 0000000000000..535acc8cb11d4
--- /dev/null
+++ 
b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerTieredSuite.scala
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import java.io.{File, FileWriter}
+
+import scala.collection.mutable
+import scala.language.reflectiveCalls
+
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.util.{SparkConfWithEnv, Utils}
+
+object DiskBlockManagerTieredSuite {
+  val SPARK_DIRS_TIERS_CONF = "000011112222"
+  case class mockDiskAndTier(diskSpaceStatus: Array[Boolean], expectedTiers: 
Array[Int])
+  val mockTestsSeq = Array(
+    mockDiskAndTier(Array(true, true, true), Array(0, 0, 0)),
+    mockDiskAndTier(Array(true, true, false, true, false, false, true, false, 
false, false),
+      Array(0, 0, 1, 2, 2)),
+    mockDiskAndTier(Array(false, false, false, false, false, false, false, 
false, false),
+      Array(2, 2, 2)),
+    mockDiskAndTier(Array(true, false, true, false, false, true,
+      false, false, true, true, false, true, true, false, false, false),
+      Array(0, 1, 2, 2, 0, 1, 0, 2))
+  )
+
+  // this class override hasEnoughSpace in the TieredAllocator to mock disk 
space change
+  class MockTieredAllocator(
+      conf: SparkConf,
+      localDirs: Array[File],
+      subDirsPerLocalDir: Int)
+    extends TieredAllocator(
+      conf,
+      localDirs,
+      subDirsPerLocalDir) {
+    private val caseNum = conf.getInt("spark.testTieredStorage.caseNum", 0)
+    private val diskStatus = mockTestsSeq(caseNum).diskSpaceStatus
+
+    private var index = -1
+    override def hasEnoughSpace(file: File): Boolean = {
+      index += 1
+      if (index < diskStatus.length) {
+        diskStatus(index)
+      }
+      else {
+        true
+      }
+    }
+  }
+}
+
+class DiskBlockManagerTieredSuite
+  extends SparkFunSuite with BeforeAndAfterEach with BeforeAndAfterAll {
+  import org.apache.spark.storage.DiskBlockManagerTieredSuite._
+
+  private val testConf = new SparkConfWithEnv(Map("SPARK_DIRS_TIERS" -> 
SPARK_DIRS_TIERS_CONF))
+  private var rootDirs: Seq[File] = _
+
+  var diskBlockManager: DiskBlockManager = _
+
+  override def beforeAll() {
+    super.beforeAll()
+    rootDirs = (0 until SPARK_DIRS_TIERS_CONF.length).map(_ => 
Utils.createTempDir())
+    testConf.set("spark.local.dir", 
rootDirs.map(_.getAbsolutePath).mkString(","))
+  }
+
+  override def afterAll() {
+    try {
+      rootDirs.foreach(Utils.deleteRecursively)
+    } finally {
+      super.afterAll()
+    }
+  }
+
+  override def afterEach() {
+    try {
+      diskBlockManager.stop()
+    } finally {
+      super.afterEach()
+    }
+  }
+
+  def testTieredStorage(caseNum: Int): Unit = {
+    testConf.set("spark.diskStore.allocator", 
classOf[MockTieredAllocator].getName)
+      .set("spark.testTieredStorage.caseNum", caseNum.toString)
+    diskBlockManager = new DiskBlockManager(testConf, deleteFilesOnStop = true)
+
+    val createdBlocks = mutable.ArrayBuffer[TestBlockId]()
+    val rootDirsPath = rootDirs.map(_.getCanonicalPath)
+    val tierSeq = mockTestsSeq(caseNum).expectedTiers
+    val tiersConf = SPARK_DIRS_TIERS_CONF
+    assert(!tierSeq.exists(s => s < 0 || s > tiersConf(tiersConf.length - 
1).toInt))
+    tierSeq.zipWithIndex.foreach {
+      case (i, idx) =>
+        val blockId = TestBlockId("test" + idx)
+        createdBlocks += blockId
+        val file = diskBlockManager.getFile(blockId)
+        writeToFile(file, 10)
+        val path = rootDirsPath.find(file.getCanonicalPath.startsWith(_))
+        val j = rootDirsPath.indexOf(path.get) / 4
+        assert(i == j)
+    }
+
+    createdBlocks.foreach {
+      b => assert(diskBlockManager.containsBlock(b))
+    }
+  }
+
+  test("basic block creation with hash allocator") {
+    testConf.set("spark.diskStore.allocator", "hash")
+    diskBlockManager = new DiskBlockManager(testConf, deleteFilesOnStop = true)
+    val blockId = TestBlockId("test")
+    val newFile = diskBlockManager.getFile(blockId)
+    writeToFile(newFile, 10)
+    assert(diskBlockManager.containsBlock(blockId))
+    newFile.delete()
+    assert(!diskBlockManager.containsBlock(blockId))
+  }
+
+  test("basic block creation with tiered allocator") {
+    testConf.set("spark.diskStore.allocator", "tiered")
+    diskBlockManager = new DiskBlockManager(testConf, deleteFilesOnStop = true)
+    // repeat 20 times and make sure the blocks are only allocated in tie 0
+    (0 until 100).foreach {
+      i =>
+        val blockId = TestBlockId("test" + i)
+        val file = diskBlockManager.getFile(blockId)
+        val rootDirsPath = rootDirs.map(_.getCanonicalPath)
+        val path = rootDirsPath.find(file.getCanonicalPath.startsWith(_))
+        val j = rootDirsPath.indexOf(path.get) / 4
+        assert(j == 0)
+        writeToFile(file, 10)
+        assert(diskBlockManager.containsBlock(blockId))
+        file.delete()
+        assert(!diskBlockManager.containsBlock(blockId))
+    }
+  }
+
+  test("tiered storage: enough disk space, all in tier 0") {
+    testTieredStorage(0)
+  }
+
+  test("tiered storage: in all ties including 0, 1, 2...") {
+    testTieredStorage(1)
+  }
+
+  test("tiered storage: no disk space, all in the last tier") {
+    testTieredStorage(2)
+  }
+
+  test("tiered storage: disks space keep changing") {
+    testTieredStorage(3)
+  }
+
+  def writeToFile(file: File, numBytes: Int) {
+    val writer = new FileWriter(file, true)
+    for (i <- 0 until numBytes) writer.write(i)
+    writer.close()
+  }
+}
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
index 868c2edc5a463..8e3bc417751ed 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
@@ -240,6 +240,10 @@ private[yarn] class ExecutorRunnable(
       YarnSparkHadoopUtil.setEnvFromInputString(env, userEnvs)
     }
 
+    // Tiered storage config
+    val tiersConf = conf.get("yarn.nodemanager.spark-dirs-tiers").trim
+    env("SPARK_DIRS_TIERS") = tiersConf
+
     // lookup appropriate http scheme for container log urls
     val yarnHttpPolicy = conf.get(
       YarnConfiguration.YARN_HTTP_POLICY_KEY,


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Store/retrieve blocks in different speed storage devices by hierarchy way
> -------------------------------------------------------------------------
>
>                 Key: SPARK-12196
>                 URL: https://issues.apache.org/jira/browse/SPARK-12196
>             Project: Spark
>          Issue Type: New Feature
>          Components: Spark Core
>            Reporter: yucai
>            Priority: Major
>
> *Motivation*
> Our customers want to use SSD to speed up machine learning and SQL workload, 
> but all SSDs are quite expensive and SSD's capacity is still smaller than HDD.
> *Proposal*
> Our solution is to build tiered storage: use SSDs as cache and HDDs as 
> backup. 
> When Spark core allocates blocks (either for shuffle or RDD cache), it stores 
> blocks in SSDs first, and when the SSD’s free space is less than some 
> threshold, starting to use HDDs.
> *Performance Evaluation*
> 1. At the best case, our solution performs the same as all SSDs.
> 2. At the worst case, like all data are spilled to HDDs, no performance 
> regression.
> 3. Compared with all HDDs, tiered store improves +x2 for machine learning 
> workload and +x1.7 for SparkSQL workload.
> *Usage*
> 1. Enable tiered storage in spark-default.conf.
> {code}
> spark.diskStore.allocator      tiered
> {code}
> 2. Configure storage hierarchy, for Yarn user, see below example:
> {code}
>   <property>
>     <name>yarn.nodemanager.local-dirs</name>
>     <value>/mnt/DP_disk1/yucai/yarn/local,/mnt/DP_disk2/yucai/yarn/local,
>            /mnt/DP_disk3/yucai/yarn/local,/mnt/DP_disk4/yucai/yarn/local,
>            /mnt/DP_disk5/yucai/yarn/local,/mnt/DP_disk6/yucai/yarn/local,
>     </value>
>   </property>
>   <property>
>     <name>yarn.nodemanager.spark-dirs-tiers</name>
>     <value>001111</value>
>   </property>
> {code}
> It means DP_disk1-2 are in tier1 and DP_disk2-6 make up tier2.
>  
> *More tiers*
> In our implementation, we support to build any number tiers cross various 
> storage medias (NVMe, SSD, HDD etc.). For example:
> {code}
>   <property>
>     <name>yarn.nodemanager.local-dirs</name>
>     <value>/mnt/DP_disk1/yucai/yarn/local,/mnt/DP_disk2/yucai/yarn/local,
>            /mnt/DP_disk3/yucai/yarn/local,/mnt/DP_disk4/yucai/yarn/local,
>            /mnt/DP_disk5/yucai/yarn/local,/mnt/DP_disk6/yucai/yarn/local,
>     </value>
>   </property>
>   <property>
>     <name>yarn.nodemanager.spark-dirs-tiers</name>
>     <value>001122</value>
>   </property>
> {code}
> It means DP_disk1-2 are in tier1, DP_disk3-4 are in tier2 and DP_disk5-6 make 
> up tier3.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to