Repository: spark
Updated Branches:
  refs/heads/master 43b2a6390 -> ae1f54aa0


[SPARK-12500][CORE] Fix Tachyon deprecations; pull Tachyon dependency into one 
class

Fix Tachyon deprecations; pull Tachyon dependency into `TachyonBlockManager` 
only

CC calvinjia as I probably need a double-check that the usage of the new API is 
correct.

Author: Sean Owen <[email protected]>

Closes #10449 from srowen/SPARK-12500.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ae1f54aa
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ae1f54aa
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ae1f54aa

Branch: refs/heads/master
Commit: ae1f54aa0ed69f9daa1f32766ca234bda9320452
Parents: 43b2a63
Author: Sean Owen <[email protected]>
Authored: Wed Dec 23 13:24:06 2015 -0800
Committer: Reynold Xin <[email protected]>
Committed: Wed Dec 23 13:24:06 2015 -0800

----------------------------------------------------------------------
 .../spark/storage/TachyonBlockManager.scala     | 135 ++++++++++++++-----
 .../apache/spark/util/ShutdownHookManager.scala |  42 ------
 .../scala/org/apache/spark/util/Utils.scala     |  11 --
 3 files changed, 104 insertions(+), 84 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ae1f54aa/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala
index d14fe46..7f88f2f 100644
--- a/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala
@@ -26,13 +26,17 @@ import scala.util.control.NonFatal
 
 import com.google.common.io.ByteStreams
 
-import tachyon.client.{ReadType, WriteType, TachyonFS, TachyonFile}
+import tachyon.{Constants, TachyonURI}
+import tachyon.client.ClientContext
+import tachyon.client.file.{TachyonFile, TachyonFileSystem}
+import tachyon.client.file.TachyonFileSystem.TachyonFileSystemFactory
+import tachyon.client.file.options.DeleteOptions
 import tachyon.conf.TachyonConf
-import tachyon.TachyonURI
+import tachyon.exception.{FileAlreadyExistsException, 
FileDoesNotExistException}
 
 import org.apache.spark.Logging
 import org.apache.spark.executor.ExecutorExitCode
-import org.apache.spark.util.{ShutdownHookManager, Utils}
+import org.apache.spark.util.Utils
 
 
 /**
@@ -44,15 +48,15 @@ private[spark] class TachyonBlockManager() extends 
ExternalBlockManager with Log
 
   var rootDirs: String = _
   var master: String = _
-  var client: tachyon.client.TachyonFS = _
+  var client: TachyonFileSystem = _
   private var subDirsPerTachyonDir: Int = _
 
   // Create one Tachyon directory for each path mentioned in 
spark.tachyonStore.folderName;
   // then, inside this directory, create multiple subdirectories that we will 
hash files into,
   // in order to avoid having really large inodes at the top level in Tachyon.
   private var tachyonDirs: Array[TachyonFile] = _
-  private var subDirs: Array[Array[tachyon.client.TachyonFile]] = _
-
+  private var subDirs: Array[Array[TachyonFile]] = _
+  private val shutdownDeleteTachyonPaths = new 
scala.collection.mutable.HashSet[String]()
 
   override def init(blockManager: BlockManager, executorId: String): Unit = {
     super.init(blockManager, executorId)
@@ -62,7 +66,10 @@ private[spark] class TachyonBlockManager() extends 
ExternalBlockManager with Log
     rootDirs = s"$storeDir/$appFolderName/$executorId"
     master = blockManager.conf.get(ExternalBlockStore.MASTER_URL, 
"tachyon://localhost:19998")
     client = if (master != null && master != "") {
-      TachyonFS.get(new TachyonURI(master), new TachyonConf())
+      val tachyonConf = new TachyonConf()
+      tachyonConf.set(Constants.MASTER_ADDRESS, master)
+      ClientContext.reset(tachyonConf)
+      TachyonFileSystemFactory.get
     } else {
       null
     }
@@ -80,7 +87,7 @@ private[spark] class TachyonBlockManager() extends 
ExternalBlockManager with Log
     // in order to avoid having really large inodes at the top level in 
Tachyon.
     tachyonDirs = createTachyonDirs()
     subDirs = Array.fill(tachyonDirs.length)(new 
Array[TachyonFile](subDirsPerTachyonDir))
-    tachyonDirs.foreach(tachyonDir => 
ShutdownHookManager.registerShutdownDeleteDir(tachyonDir))
+    tachyonDirs.foreach(registerShutdownDeleteDir)
   }
 
   override def toString: String = {"ExternalBlockStore-Tachyon"}
@@ -89,6 +96,7 @@ private[spark] class TachyonBlockManager() extends 
ExternalBlockManager with Log
     val file = getFile(blockId)
     if (fileExists(file)) {
       removeFile(file)
+      true
     } else {
       false
     }
@@ -101,7 +109,7 @@ private[spark] class TachyonBlockManager() extends 
ExternalBlockManager with Log
 
   override def putBytes(blockId: BlockId, bytes: ByteBuffer): Unit = {
     val file = getFile(blockId)
-    val os = file.getOutStream(WriteType.TRY_CACHE)
+    val os = client.getOutStream(new TachyonURI(client.getInfo(file).getPath))
     try {
       Utils.writeByteBuffer(bytes, os)
     } catch {
@@ -115,7 +123,7 @@ private[spark] class TachyonBlockManager() extends 
ExternalBlockManager with Log
 
   override def putValues(blockId: BlockId, values: Iterator[_]): Unit = {
     val file = getFile(blockId)
-    val os = file.getOutStream(WriteType.TRY_CACHE)
+    val os = client.getOutStream(new TachyonURI(client.getInfo(file).getPath))
     try {
       blockManager.dataSerializeStream(blockId, os, values)
     } catch {
@@ -129,12 +137,17 @@ private[spark] class TachyonBlockManager() extends 
ExternalBlockManager with Log
 
   override def getBytes(blockId: BlockId): Option[ByteBuffer] = {
     val file = getFile(blockId)
-    if (file == null || file.getLocationHosts.size == 0) {
+    if (file == null) {
       return None
     }
-    val is = file.getInStream(ReadType.CACHE)
+    val is = try {
+      client.getInStream(file)
+    } catch {
+      case _: FileDoesNotExistException =>
+        return None
+    }
     try {
-      val size = file.length
+      val size = client.getInfo(file).length
       val bs = new Array[Byte](size.asInstanceOf[Int])
       ByteStreams.readFully(is, bs)
       Some(ByteBuffer.wrap(bs))
@@ -149,25 +162,37 @@ private[spark] class TachyonBlockManager() extends 
ExternalBlockManager with Log
 
   override def getValues(blockId: BlockId): Option[Iterator[_]] = {
     val file = getFile(blockId)
-    if (file == null || file.getLocationHosts().size() == 0) {
+    if (file == null) {
       return None
     }
-    val is = file.getInStream(ReadType.CACHE)
-    Option(is).map { is =>
-      blockManager.dataDeserializeStream(blockId, is)
+    val is = try {
+      client.getInStream(file)
+    } catch {
+      case _: FileDoesNotExistException =>
+        return None
+    }
+    try {
+      Some(blockManager.dataDeserializeStream(blockId, is))
+    } finally {
+      is.close()
     }
   }
 
   override def getSize(blockId: BlockId): Long = {
-    getFile(blockId.name).length
+    client.getInfo(getFile(blockId.name)).length
   }
 
-  def removeFile(file: TachyonFile): Boolean = {
-    client.delete(new TachyonURI(file.getPath()), false)
+  def removeFile(file: TachyonFile): Unit = {
+    client.delete(file)
   }
 
   def fileExists(file: TachyonFile): Boolean = {
-    client.exist(new TachyonURI(file.getPath()))
+    try {
+      client.getInfo(file)
+      true
+    } catch {
+      case _: FileDoesNotExistException => false
+    }
   }
 
   def getFile(filename: String): TachyonFile = {
@@ -186,18 +211,18 @@ private[spark] class TachyonBlockManager() extends 
ExternalBlockManager with Log
         } else {
           val path = new 
TachyonURI(s"${tachyonDirs(dirId)}/${"%02x".format(subDirId)}")
           client.mkdir(path)
-          val newDir = client.getFile(path)
+          val newDir = client.loadMetadata(path)
           subDirs(dirId)(subDirId) = newDir
           newDir
         }
       }
     }
     val filePath = new TachyonURI(s"$subDir/$filename")
-    if(!client.exist(filePath)) {
-      client.createFile(filePath)
+    try {
+      client.create(filePath)
+    } catch {
+      case _: FileAlreadyExistsException => client.loadMetadata(filePath)
     }
-    val file = client.getFile(filePath)
-    file
   }
 
   def getFile(blockId: BlockId): TachyonFile = getFile(blockId.name)
@@ -217,9 +242,11 @@ private[spark] class TachyonBlockManager() extends 
ExternalBlockManager with Log
         try {
           tachyonDirId = "%s-%04x".format(dateFormat.format(new Date), 
rand.nextInt(65536))
           val path = new TachyonURI(s"$rootDir/spark-tachyon-$tachyonDirId")
-          if (!client.exist(path)) {
+          try {
             foundLocalDir = client.mkdir(path)
-            tachyonDir = client.getFile(path)
+            tachyonDir = client.loadMetadata(path)
+          } catch {
+            case _: FileAlreadyExistsException => // continue
           }
         } catch {
           case NonFatal(e) =>
@@ -240,14 +267,60 @@ private[spark] class TachyonBlockManager() extends 
ExternalBlockManager with Log
     logDebug("Shutdown hook called")
     tachyonDirs.foreach { tachyonDir =>
       try {
-        if (!ShutdownHookManager.hasRootAsShutdownDeleteDir(tachyonDir)) {
-          Utils.deleteRecursively(tachyonDir, client)
+        if (!hasRootAsShutdownDeleteDir(tachyonDir)) {
+          deleteRecursively(tachyonDir, client)
         }
       } catch {
         case NonFatal(e) =>
           logError("Exception while deleting tachyon spark dir: " + 
tachyonDir, e)
       }
     }
-    client.close()
   }
+
+  /**
+    * Delete a file or directory and its contents recursively.
+    */
+  private def deleteRecursively(dir: TachyonFile, client: TachyonFileSystem) {
+    client.delete(dir, new 
DeleteOptions.Builder(ClientContext.getConf).setRecursive(true).build())
+  }
+
+  // Register the tachyon path to be deleted via shutdown hook
+  private def registerShutdownDeleteDir(file: TachyonFile) {
+    val absolutePath = client.getInfo(file).getPath
+    shutdownDeleteTachyonPaths.synchronized {
+      shutdownDeleteTachyonPaths += absolutePath
+    }
+  }
+
+  // Remove the tachyon path to be deleted via shutdown hook
+  private def removeShutdownDeleteDir(file: TachyonFile) {
+    val absolutePath = client.getInfo(file).getPath
+    shutdownDeleteTachyonPaths.synchronized {
+      shutdownDeleteTachyonPaths -= absolutePath
+    }
+  }
+
+  // Is the path already registered to be deleted via a shutdown hook ?
+  private def hasShutdownDeleteTachyonDir(file: TachyonFile): Boolean = {
+    val absolutePath = client.getInfo(file).getPath
+    shutdownDeleteTachyonPaths.synchronized {
+      shutdownDeleteTachyonPaths.contains(absolutePath)
+    }
+  }
+
+  // Note: if file is child of some registered path, while not equal to it, 
then return true;
+  // else false. This is to ensure that two shutdown hooks do not try to 
delete each others
+  // paths - resulting in Exception and incomplete cleanup.
+  private def hasRootAsShutdownDeleteDir(file: TachyonFile): Boolean = {
+    val absolutePath = client.getInfo(file).getPath
+    val hasRoot = shutdownDeleteTachyonPaths.synchronized {
+      shutdownDeleteTachyonPaths.exists(
+        path => !absolutePath.equals(path) && absolutePath.startsWith(path))
+    }
+    if (hasRoot) {
+      logInfo(s"path = $absolutePath, already present as root for deletion.")
+    }
+    hasRoot
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/ae1f54aa/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala 
b/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala
index 1a0f3b4..0065b1f 100644
--- a/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala
+++ b/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala
@@ -21,7 +21,6 @@ import java.io.File
 import java.util.PriorityQueue
 
 import scala.util.{Failure, Success, Try}
-import tachyon.client.TachyonFile
 
 import org.apache.hadoop.fs.FileSystem
 import org.apache.spark.Logging
@@ -52,7 +51,6 @@ private[spark] object ShutdownHookManager extends Logging {
   }
 
   private val shutdownDeletePaths = new 
scala.collection.mutable.HashSet[String]()
-  private val shutdownDeleteTachyonPaths = new 
scala.collection.mutable.HashSet[String]()
 
   // Add a shutdown hook to delete the temp dirs when the JVM exits
   addShutdownHook(TEMP_DIR_SHUTDOWN_PRIORITY) { () =>
@@ -77,14 +75,6 @@ private[spark] object ShutdownHookManager extends Logging {
     }
   }
 
-  // Register the tachyon path to be deleted via shutdown hook
-  def registerShutdownDeleteDir(tachyonfile: TachyonFile) {
-    val absolutePath = tachyonfile.getPath()
-    shutdownDeleteTachyonPaths.synchronized {
-      shutdownDeleteTachyonPaths += absolutePath
-    }
-  }
-
   // Remove the path to be deleted via shutdown hook
   def removeShutdownDeleteDir(file: File) {
     val absolutePath = file.getAbsolutePath()
@@ -93,14 +83,6 @@ private[spark] object ShutdownHookManager extends Logging {
     }
   }
 
-  // Remove the tachyon path to be deleted via shutdown hook
-  def removeShutdownDeleteDir(tachyonfile: TachyonFile) {
-    val absolutePath = tachyonfile.getPath()
-    shutdownDeleteTachyonPaths.synchronized {
-      shutdownDeleteTachyonPaths.remove(absolutePath)
-    }
-  }
-
   // Is the path already registered to be deleted via a shutdown hook ?
   def hasShutdownDeleteDir(file: File): Boolean = {
     val absolutePath = file.getAbsolutePath()
@@ -109,14 +91,6 @@ private[spark] object ShutdownHookManager extends Logging {
     }
   }
 
-  // Is the path already registered to be deleted via a shutdown hook ?
-  def hasShutdownDeleteTachyonDir(file: TachyonFile): Boolean = {
-    val absolutePath = file.getPath()
-    shutdownDeleteTachyonPaths.synchronized {
-      shutdownDeleteTachyonPaths.contains(absolutePath)
-    }
-  }
-
   // Note: if file is child of some registered path, while not equal to it, 
then return true;
   // else false. This is to ensure that two shutdown hooks do not try to 
delete each others
   // paths - resulting in IOException and incomplete cleanup.
@@ -133,22 +107,6 @@ private[spark] object ShutdownHookManager extends Logging {
     retval
   }
 
-  // Note: if file is child of some registered path, while not equal to it, 
then return true;
-  // else false. This is to ensure that two shutdown hooks do not try to 
delete each others
-  // paths - resulting in Exception and incomplete cleanup.
-  def hasRootAsShutdownDeleteDir(file: TachyonFile): Boolean = {
-    val absolutePath = file.getPath()
-    val retval = shutdownDeleteTachyonPaths.synchronized {
-      shutdownDeleteTachyonPaths.exists { path =>
-        !absolutePath.equals(path) && absolutePath.startsWith(path)
-      }
-    }
-    if (retval) {
-      logInfo("path = " + file + ", already present as root for deletion.")
-    }
-    retval
-  }
-
   /**
    * Detect whether this thread might be executing a shutdown hook. Will 
always return true if
    * the current thread is a running a shutdown hook but may spuriously return 
true otherwise (e.g.

http://git-wip-us.apache.org/repos/asf/spark/blob/ae1f54aa/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 1a07f7c..b8ca6b0 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -44,8 +44,6 @@ import org.apache.log4j.PropertyConfigurator
 import org.eclipse.jetty.util.MultiException
 import org.json4s._
 import org.slf4j.Logger
-import tachyon.TachyonURI
-import tachyon.client.{TachyonFS, TachyonFile}
 
 import org.apache.spark._
 import org.apache.spark.deploy.SparkHadoopUtil
@@ -947,15 +945,6 @@ private[spark] object Utils extends Logging {
   }
 
   /**
-   * Delete a file or directory and its contents recursively.
-   */
-  def deleteRecursively(dir: TachyonFile, client: TachyonFS) {
-    if (!client.delete(new TachyonURI(dir.getPath()), true)) {
-      throw new IOException("Failed to delete the tachyon dir: " + dir)
-    }
-  }
-
-  /**
    * Check to see if file is a symbolic link.
    */
   def isSymlink(file: File): Boolean = {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to