Repository: spark
Updated Branches:
  refs/heads/master 11a849b3a -> 8b497046c


[SPARK-20654][CORE] Add config to limit disk usage of the history server.

This change adds a new configuration option and support code that limits
how much disk space the SHS will use. The default value is pretty generous
so that applications will, hopefully, only rarely need to be replayed
because of their disk stored being evicted.

This works by keeping track of how much data each application is using.
Also, because it's not possible to know, before replaying, how much space
will be needed, it's possible that usage will exceed the configured limit
temporarily. The code uses the concept of a "lease" to try to limit how
much the SHS will exceed the limit in those cases.

Active UIs are also tracked, so they're never deleted. This works in
tandem with the existing option of how many active UIs are loaded; because
unused UIs will be unloaded, their disk stores will also become candidates
for deletion. If the data is not deleted, though, re-loading the UI is
pretty quick.

Author: Marcelo Vanzin <[email protected]>

Closes #20011 from vanzin/SPARK-20654.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8b497046
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8b497046
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8b497046

Branch: refs/heads/master
Commit: 8b497046c647a21bbed1bdfbdcb176745a1d5cd5
Parents: 11a849b
Author: Marcelo Vanzin <[email protected]>
Authored: Fri Dec 29 10:40:09 2017 -0600
Committer: Imran Rashid <[email protected]>
Committed: Fri Dec 29 10:40:09 2017 -0600

----------------------------------------------------------------------
 .../deploy/history/FsHistoryProvider.scala      | 173 ++++++----
 .../history/HistoryServerDiskManager.scala      | 327 +++++++++++++++++++
 .../apache/spark/deploy/history/config.scala    |   5 +
 .../spark/scheduler/EventLoggingListener.scala  |  15 +-
 .../history/HistoryServerDiskManagerSuite.scala | 160 +++++++++
 5 files changed, 606 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8b497046/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index a299b79..94c80eb 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -24,6 +24,7 @@ import java.util.zip.{ZipEntry, ZipOutputStream}
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable
+import scala.util.Try
 import scala.xml.Node
 
 import com.fasterxml.jackson.annotation.JsonIgnore
@@ -39,11 +40,13 @@ import org.fusesource.leveldbjni.internal.NativeDB
 import org.apache.spark.{SecurityManager, SparkConf, SparkException}
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.internal.Logging
+import org.apache.spark.io.CompressionCodec
 import org.apache.spark.scheduler._
 import org.apache.spark.scheduler.ReplayListenerBus._
 import org.apache.spark.status._
 import org.apache.spark.status.KVUtils._
 import org.apache.spark.status.api.v1.{ApplicationAttemptInfo, ApplicationInfo}
+import org.apache.spark.status.config._
 import org.apache.spark.ui.SparkUI
 import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}
 import org.apache.spark.util.kvstore._
@@ -149,6 +152,10 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
     }
   }.getOrElse(new InMemoryStore())
 
+  private val diskManager = storePath.map { path =>
+    new HistoryServerDiskManager(conf, path, listing, clock)
+  }
+
   private val activeUIs = new mutable.HashMap[(String, Option[String]), 
LoadedAppUI]()
 
   /**
@@ -219,6 +226,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
   }
 
   private def startPolling(): Unit = {
+    diskManager.foreach(_.initialize())
+
     // Validate the log directory.
     val path = new Path(logDir)
     try {
@@ -299,63 +308,24 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
       attempt.adminAclsGroups.getOrElse(""))
     secManager.setViewAclsGroups(attempt.viewAclsGroups.getOrElse(""))
 
-    val uiStorePath = storePath.map { path => getStorePath(path, appId, 
attemptId) }
+    val kvstore = try {
+      diskManager match {
+        case Some(sm) =>
+          loadDiskStore(sm, appId, attempt)
 
-    val (kvstore, needReplay) = uiStorePath match {
-      case Some(path) =>
-        try {
-          // The store path is not guaranteed to exist - maybe it hasn't been 
created, or was
-          // invalidated because changes to the event log were detected. Need 
to replay in that
-          // case.
-          val _replay = !path.isDirectory()
-          (createDiskStore(path, conf), _replay)
-        } catch {
-          case e: Exception =>
-            // Get rid of the old data and re-create it. The store is either 
old or corrupted.
-            logWarning(s"Failed to load disk store $uiStorePath for $appId.", 
e)
-            Utils.deleteRecursively(path)
-            (createDiskStore(path, conf), true)
-        }
-
-      case _ =>
-        (new InMemoryStore(), true)
-    }
-
-    val plugins = ServiceLoader.load(
-      classOf[AppHistoryServerPlugin], 
Utils.getContextOrSparkClassLoader).asScala
-    val trackingStore = new ElementTrackingStore(kvstore, conf)
-    if (needReplay) {
-      val replayBus = new ReplayListenerBus()
-      val listener = new AppStatusListener(trackingStore, conf, false,
-        lastUpdateTime = Some(attempt.info.lastUpdated.getTime()))
-      replayBus.addListener(listener)
-      for {
-        plugin <- plugins
-        listener <- plugin.createListeners(conf, trackingStore)
-      } replayBus.addListener(listener)
-      try {
-        val fileStatus = fs.getFileStatus(new Path(logDir, attempt.logPath))
-        replay(fileStatus, isApplicationCompleted(fileStatus), replayBus)
-        trackingStore.close(false)
-      } catch {
-        case e: Exception =>
-          Utils.tryLogNonFatalError {
-            trackingStore.close()
-          }
-          uiStorePath.foreach(Utils.deleteRecursively)
-          if (e.isInstanceOf[FileNotFoundException]) {
-            return None
-          } else {
-            throw e
-          }
+        case _ =>
+          createInMemoryStore(attempt)
       }
+    } catch {
+      case _: FileNotFoundException =>
+        return None
     }
 
     val ui = SparkUI.create(None, new AppStatusStore(kvstore), conf, 
secManager, app.info.name,
       HistoryServer.getAttemptURI(appId, attempt.info.attemptId),
       attempt.info.startTime.getTime(),
       attempt.info.appSparkVersion)
-    plugins.foreach(_.setupUI(ui))
+    loadPlugins().foreach(_.setupUI(ui))
 
     val loadedUI = LoadedAppUI(ui)
 
@@ -417,11 +387,11 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
         loadedUI.lock.writeLock().unlock()
       }
 
-      // If the UI is not valid, delete its files from disk, if any. This 
relies on the fact that
-      // ApplicationCache will never call this method concurrently with 
getAppUI() for the same
-      // appId / attemptId.
-      if (!loadedUI.valid && storePath.isDefined) {
-        Utils.deleteRecursively(getStorePath(storePath.get, appId, attemptId))
+      diskManager.foreach { dm =>
+        // If the UI is not valid, delete its files from disk, if any. This 
relies on the fact that
+        // ApplicationCache will never call this method concurrently with 
getAppUI() for the same
+        // appId / attemptId.
+        dm.release(appId, attemptId, delete = !loadedUI.valid)
       }
     }
   }
@@ -569,12 +539,11 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
     }
 
     val logPath = fileStatus.getPath()
-
     val bus = new ReplayListenerBus()
     val listener = new AppListingListener(fileStatus, clock)
     bus.addListener(listener)
+    replay(fileStatus, bus, eventsFilter = eventsFilter)
 
-    replay(fileStatus, isApplicationCompleted(fileStatus), bus, eventsFilter)
     listener.applicationInfo.foreach { app =>
       // Invalidate the existing UI for the reloaded app attempt, if any. See 
LoadedAppUI for a
       // discussion on the UI lifecycle.
@@ -651,10 +620,10 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
    */
   private def replay(
       eventLog: FileStatus,
-      appCompleted: Boolean,
       bus: ReplayListenerBus,
       eventsFilter: ReplayEventsFilter = SELECT_ALL_FILTER): Unit = {
     val logPath = eventLog.getPath()
+    val isCompleted = 
!logPath.getName().endsWith(EventLoggingListener.IN_PROGRESS)
     logInfo(s"Replaying log path: $logPath")
     // Note that the eventLog may have *increased* in size since when we 
grabbed the filestatus,
     // and when we read the file here.  That is OK -- it may result in an 
unnecessary refresh
@@ -664,18 +633,44 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
     // after it's created, so we get a file size that is no bigger than what 
is actually read.
     val logInput = EventLoggingListener.openEventLog(logPath, fs)
     try {
-      bus.replay(logInput, logPath.toString, !appCompleted, eventsFilter)
-      logInfo(s"Finished replaying $logPath")
+      bus.replay(logInput, logPath.toString, !isCompleted, eventsFilter)
+      logInfo(s"Finished parsing $logPath")
     } finally {
       logInput.close()
     }
   }
 
   /**
-   * Return true when the application has completed.
+   * Rebuilds the application state store from its event log.
    */
-  private def isApplicationCompleted(entry: FileStatus): Boolean = {
-    !entry.getPath().getName().endsWith(EventLoggingListener.IN_PROGRESS)
+  private def rebuildAppStore(
+      store: KVStore,
+      eventLog: FileStatus,
+      lastUpdated: Long): Unit = {
+    // Disable async updates, since they cause higher memory usage, and it's 
ok to take longer
+    // to parse the event logs in the SHS.
+    val replayConf = conf.clone().set(ASYNC_TRACKING_ENABLED, false)
+    val trackingStore = new ElementTrackingStore(store, replayConf)
+    val replayBus = new ReplayListenerBus()
+    val listener = new AppStatusListener(trackingStore, replayConf, false,
+      lastUpdateTime = Some(lastUpdated))
+    replayBus.addListener(listener)
+
+    for {
+      plugin <- loadPlugins()
+      listener <- plugin.createListeners(conf, trackingStore)
+    } replayBus.addListener(listener)
+
+    try {
+      replay(eventLog, replayBus)
+      trackingStore.close(false)
+    } catch {
+      case e: Exception =>
+        Utils.tryLogNonFatalError {
+          trackingStore.close()
+        }
+        throw e
+    }
   }
 
   /**
@@ -751,14 +746,58 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
     listing.write(newAppInfo)
   }
 
-  private def createDiskStore(path: File, conf: SparkConf): KVStore = {
+  private def loadDiskStore(
+      dm: HistoryServerDiskManager,
+      appId: String,
+      attempt: AttemptInfoWrapper): KVStore = {
     val metadata = new AppStatusStoreMetadata(AppStatusStore.CURRENT_VERSION)
-    KVUtils.open(path, metadata)
+
+    // First check if the store already exists and try to open it. If that 
fails, then get rid of
+    // the existing data.
+    dm.openStore(appId, attempt.info.attemptId).foreach { path =>
+      try {
+        return KVUtils.open(path, metadata)
+      } catch {
+        case e: Exception =>
+          logInfo(s"Failed to open existing store for 
$appId/${attempt.info.attemptId}.", e)
+          dm.release(appId, attempt.info.attemptId, delete = true)
+      }
+    }
+
+    // At this point the disk data either does not exist or was deleted 
because it failed to
+    // load, so the event log needs to be replayed.
+    val status = fs.getFileStatus(new Path(logDir, attempt.logPath))
+    val isCompressed = 
EventLoggingListener.codecName(status.getPath()).flatMap { name =>
+      Try(CompressionCodec.getShortName(name)).toOption
+    }.isDefined
+    logInfo(s"Leasing disk manager space for app $appId / 
${attempt.info.attemptId}...")
+    val lease = dm.lease(status.getLen(), isCompressed)
+    val newStorePath = try {
+      val store = KVUtils.open(lease.tmpPath, metadata)
+      try {
+        rebuildAppStore(store, status, attempt.info.lastUpdated.getTime())
+      } finally {
+        store.close()
+      }
+      lease.commit(appId, attempt.info.attemptId)
+    } catch {
+      case e: Exception =>
+        lease.rollback()
+        throw e
+    }
+
+    KVUtils.open(newStorePath, metadata)
+  }
+
+  private def createInMemoryStore(attempt: AttemptInfoWrapper): KVStore = {
+    val store = new InMemoryStore()
+    val status = fs.getFileStatus(new Path(logDir, attempt.logPath))
+    rebuildAppStore(store, status, attempt.info.lastUpdated.getTime())
+    store
   }
 
-  private def getStorePath(path: File, appId: String, attemptId: 
Option[String]): File = {
-    val fileName = appId + attemptId.map("_" + _).getOrElse("") + ".ldb"
-    new File(path, fileName)
+  private def loadPlugins(): Iterable[AppHistoryServerPlugin] = {
+    ServiceLoader.load(classOf[AppHistoryServerPlugin], 
Utils.getContextOrSparkClassLoader).asScala
   }
 
   /** For testing. Returns internal data about a single attempt. */

http://git-wip-us.apache.org/repos/asf/spark/blob/8b497046/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerDiskManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerDiskManager.scala
 
b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerDiskManager.scala
new file mode 100644
index 0000000..c03a360
--- /dev/null
+++ 
b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerDiskManager.scala
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.io.File
+import java.nio.file.Files
+import java.nio.file.attribute.PosixFilePermissions
+import java.util.concurrent.atomic.AtomicLong
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.{HashMap, ListBuffer}
+
+import org.apache.commons.io.FileUtils
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.status.KVUtils._
+import org.apache.spark.util.{Clock, Utils}
+import org.apache.spark.util.kvstore.KVStore
+
+/**
+ * A class used to keep track of disk usage by the SHS, allowing application 
data to be deleted
+ * from disk when usage exceeds a configurable threshold.
+ *
+ * The goal of the class is not to guarantee that usage will never exceed the 
threshold; because of
+ * how application data is written, disk usage may temporarily go higher. But, 
eventually, it
+ * should fall back under the threshold.
+ *
+ * @param conf Spark configuration.
+ * @param path Path where to store application data.
+ * @param listing The listing store, used to persist usage data.
+ * @param clock Clock instance to use.
+ */
+private class HistoryServerDiskManager(
+    conf: SparkConf,
+    path: File,
+    listing: KVStore,
+    clock: Clock) extends Logging {
+
+  import config._
+
+  private val appStoreDir = new File(path, "apps")
+  if (!appStoreDir.isDirectory() && !appStoreDir.mkdir()) {
+    throw new IllegalArgumentException(s"Failed to create app directory 
($appStoreDir).")
+  }
+
+  private val tmpStoreDir = new File(path, "temp")
+  if (!tmpStoreDir.isDirectory() && !tmpStoreDir.mkdir()) {
+    throw new IllegalArgumentException(s"Failed to create temp directory 
($tmpStoreDir).")
+  }
+
+  private val maxUsage = conf.get(MAX_LOCAL_DISK_USAGE)
+  private val currentUsage = new AtomicLong(0L)
+  private val committedUsage = new AtomicLong(0L)
+  private val active = new HashMap[(String, Option[String]), Long]()
+
+  def initialize(): Unit = {
+    updateUsage(sizeOf(appStoreDir), committed = true)
+
+    // Clean up any temporary stores during start up. This assumes that 
they're leftover from other
+    // instances and are not useful.
+    tmpStoreDir.listFiles().foreach(FileUtils.deleteQuietly)
+
+    // Go through the recorded store directories and remove any that may have 
been removed by
+    // external code.
+    val orphans = listing.view(classOf[ApplicationStoreInfo]).asScala.filter { 
info =>
+      !new File(info.path).exists()
+    }.toSeq
+
+    orphans.foreach { info =>
+      listing.delete(info.getClass(), info.path)
+    }
+
+    logInfo("Initialized disk manager: " +
+      s"current usage = ${Utils.bytesToString(currentUsage.get())}, " +
+      s"max usage = ${Utils.bytesToString(maxUsage)}")
+  }
+
+  /**
+   * Lease some space from the store. The leased space is calculated as a 
fraction of the given
+   * event log size; this is an approximation, and doesn't mean the 
application store cannot
+   * outgrow the lease.
+   *
+   * If there's not enough space for the lease, other applications might be 
evicted to make room.
+   * This method always returns a lease, meaning that it's possible for local 
disk usage to grow
+   * past the configured threshold if there aren't enough idle applications to 
evict.
+   *
+   * While the lease is active, the data is written to a temporary location, 
so `openStore()`
+   * will still return `None` for the application.
+   */
+  def lease(eventLogSize: Long, isCompressed: Boolean = false): Lease = {
+    val needed = approximateSize(eventLogSize, isCompressed)
+    makeRoom(needed)
+
+    val perms = PosixFilePermissions.fromString("rwx------")
+    val tmp = Files.createTempDirectory(tmpStoreDir.toPath(), "appstore",
+      PosixFilePermissions.asFileAttribute(perms)).toFile()
+
+    updateUsage(needed)
+    val current = currentUsage.get()
+    if (current > maxUsage) {
+      logInfo(s"Lease of ${Utils.bytesToString(needed)} may cause usage to 
exceed max " +
+        s"(${Utils.bytesToString(current)} > 
${Utils.bytesToString(maxUsage)})")
+    }
+
+    new Lease(tmp, needed)
+  }
+
+  /**
+   * Returns the location of an application store if it's still available. 
Marks the store as
+   * being used so that it's not evicted when running out of designated space.
+   */
+  def openStore(appId: String, attemptId: Option[String]): Option[File] = {
+    val storePath = active.synchronized {
+      val path = appStorePath(appId, attemptId)
+      if (path.isDirectory()) {
+        active(appId -> attemptId) = sizeOf(path)
+        Some(path)
+      } else {
+        None
+      }
+    }
+
+    storePath.foreach { path =>
+      updateAccessTime(appId, attemptId)
+    }
+
+    storePath
+  }
+
+  /**
+   * Tell the disk manager that the store for the given application is not 
being used anymore.
+   *
+   * @param delete Whether to delete the store from disk.
+   */
+  def release(appId: String, attemptId: Option[String], delete: Boolean = 
false): Unit = {
+    // Because LevelDB may modify the structure of the store files even when 
just reading, update
+    // the accounting for this application when it's closed.
+    val oldSizeOpt = active.synchronized {
+      active.remove(appId -> attemptId)
+    }
+
+    oldSizeOpt.foreach { oldSize =>
+      val path = appStorePath(appId, attemptId)
+      updateUsage(-oldSize, committed = true)
+      if (path.isDirectory()) {
+        if (delete) {
+          deleteStore(path)
+        } else {
+          val newSize = sizeOf(path)
+          val newInfo = listing.read(classOf[ApplicationStoreInfo], 
path.getAbsolutePath())
+            .copy(size = newSize)
+          listing.write(newInfo)
+          updateUsage(newSize, committed = true)
+        }
+      }
+    }
+  }
+
+  /**
+   * A non-scientific approximation of how large an app state store will be 
given the size of the
+   * event log.
+   */
+  def approximateSize(eventLogSize: Long, isCompressed: Boolean): Long = {
+    if (isCompressed) {
+      // For compressed logs, assume that compression reduces the log size a 
lot, and the disk
+      // store will actually grow compared to the log size.
+      eventLogSize * 2
+    } else {
+      // For non-compressed logs, assume the disk store will end up at 
approximately 50% of the
+      // size of the logs. This is loosely based on empirical evidence.
+      eventLogSize / 2
+    }
+  }
+
+  /** Current free space. Considers space currently leased out too. */
+  def free(): Long = {
+    math.max(maxUsage - currentUsage.get(), 0L)
+  }
+
+  /** Current committed space. */
+  def committed(): Long = committedUsage.get()
+
+  private def deleteStore(path: File): Unit = {
+    FileUtils.deleteDirectory(path)
+    listing.delete(classOf[ApplicationStoreInfo], path.getAbsolutePath())
+  }
+
+  private def makeRoom(size: Long): Unit = {
+    if (free() < size) {
+      logDebug(s"Not enough free space, looking at candidates for deletion...")
+      val evicted = new ListBuffer[ApplicationStoreInfo]()
+      Utils.tryWithResource(
+        
listing.view(classOf[ApplicationStoreInfo]).index("lastAccess").closeableIterator()
+      ) { iter =>
+        var needed = size
+        while (needed > 0 && iter.hasNext()) {
+          val info = iter.next()
+          val isActive = active.synchronized {
+            active.contains(info.appId -> info.attemptId)
+          }
+          if (!isActive) {
+            evicted += info
+            needed -= info.size
+          }
+        }
+      }
+
+      if (evicted.nonEmpty) {
+        val freed = evicted.map { info =>
+          logInfo(s"Deleting store for ${info.appId}/${info.attemptId}.")
+          deleteStore(new File(info.path))
+          updateUsage(-info.size, committed = true)
+          info.size
+        }.sum
+
+        logInfo(s"Deleted ${evicted.size} store(s) to free 
${Utils.bytesToString(freed)} " +
+          s"(target = ${Utils.bytesToString(size)}).")
+      } else {
+        logWarning(s"Unable to free any space to make room for 
${Utils.bytesToString(size)}.")
+      }
+    }
+  }
+
+  private def appStorePath(appId: String, attemptId: Option[String]): File = {
+    val fileName = appId + attemptId.map("_" + _).getOrElse("") + ".ldb"
+    new File(appStoreDir, fileName)
+  }
+
+  private def updateAccessTime(appId: String, attemptId: Option[String]): Unit 
= {
+    val path = appStorePath(appId, attemptId)
+    val info = ApplicationStoreInfo(path.getAbsolutePath(), 
clock.getTimeMillis(), appId, attemptId,
+      sizeOf(path))
+    listing.write(info)
+  }
+
+  private def updateUsage(delta: Long, committed: Boolean = false): Unit = {
+    val updated = currentUsage.addAndGet(delta)
+    if (updated < 0) {
+      throw new IllegalStateException(
+        s"Disk usage tracker went negative (now = $updated, delta = $delta)")
+    }
+    if (committed) {
+      val updatedCommitted = committedUsage.addAndGet(delta)
+      if (updatedCommitted < 0) {
+        throw new IllegalStateException(
+          s"Disk usage tracker went negative (now = $updatedCommitted, delta = 
$delta)")
+      }
+    }
+  }
+
+  /** Visible for testing. Return the size of a directory. */
+  private[history] def sizeOf(path: File): Long = FileUtils.sizeOf(path)
+
+  private[history] class Lease(val tmpPath: File, private val leased: Long) {
+
+    /**
+     * Commits a lease to its final location, and update accounting 
information. This method
+     * marks the application as active, so its store is not available for 
eviction.
+     */
+    def commit(appId: String, attemptId: Option[String]): File = {
+      val dst = appStorePath(appId, attemptId)
+
+      active.synchronized {
+        require(!active.contains(appId -> attemptId),
+          s"Cannot commit lease for active application $appId / $attemptId")
+
+        if (dst.isDirectory()) {
+          val size = sizeOf(dst)
+          deleteStore(dst)
+          updateUsage(-size, committed = true)
+        }
+      }
+
+      updateUsage(-leased)
+
+      val newSize = sizeOf(tmpPath)
+      makeRoom(newSize)
+      tmpPath.renameTo(dst)
+
+      updateUsage(newSize, committed = true)
+      if (committedUsage.get() > maxUsage) {
+        val current = Utils.bytesToString(committedUsage.get())
+        val max = Utils.bytesToString(maxUsage)
+        logWarning(s"Commit of application $appId / $attemptId causes maximum 
disk usage to be " +
+          s"exceeded ($current > $max)")
+      }
+
+      updateAccessTime(appId, attemptId)
+
+      active.synchronized {
+        active(appId -> attemptId) = newSize
+      }
+      dst
+    }
+
+    /** Deletes the temporary directory created for the lease. */
+    def rollback(): Unit = {
+      updateUsage(-leased)
+      FileUtils.deleteDirectory(tmpPath)
+    }
+
+  }
+
+}
+
+private case class ApplicationStoreInfo(
+    @KVIndexParam path: String,
+    @KVIndexParam("lastAccess") lastAccess: Long,
+    appId: String,
+    attemptId: Option[String],
+    size: Long)

http://git-wip-us.apache.org/repos/asf/spark/blob/8b497046/core/src/main/scala/org/apache/spark/deploy/history/config.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/config.scala 
b/core/src/main/scala/org/apache/spark/deploy/history/config.scala
index 52dedc1..22b6d49 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/config.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/config.scala
@@ -20,6 +20,7 @@ package org.apache.spark.deploy.history
 import java.util.concurrent.TimeUnit
 
 import org.apache.spark.internal.config.ConfigBuilder
+import org.apache.spark.network.util.ByteUnit
 
 private[spark] object config {
 
@@ -39,4 +40,8 @@ private[spark] object config {
     .stringConf
     .createOptional
 
+  val MAX_LOCAL_DISK_USAGE = ConfigBuilder("spark.history.store.maxDiskUsage")
+    .bytesConf(ByteUnit.BYTE)
+    .createWithDefaultString("10g")
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/8b497046/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala 
b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
index a77adc5..b3a5b1f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
@@ -352,14 +352,8 @@ private[spark] object EventLoggingListener extends Logging 
{
    */
   def openEventLog(log: Path, fs: FileSystem): InputStream = {
     val in = new BufferedInputStream(fs.open(log))
-
-    // Compression codec is encoded as an extension, e.g. app_123.lzf
-    // Since we sanitize the app ID to not include periods, it is safe to 
split on it
-    val logName = log.getName.stripSuffix(IN_PROGRESS)
-    val codecName: Option[String] = logName.split("\\.").tail.lastOption
-
     try {
-      val codec = codecName.map { c =>
+      val codec = codecName(log).map { c =>
         codecMap.getOrElseUpdate(c, CompressionCodec.createCodec(new 
SparkConf, c))
       }
       codec.map(_.compressedInputStream(in)).getOrElse(in)
@@ -370,4 +364,11 @@ private[spark] object EventLoggingListener extends Logging 
{
     }
   }
 
+  def codecName(log: Path): Option[String] = {
+    // Compression codec is encoded as an extension, e.g. app_123.lzf
+    // Since we sanitize the app ID to not include periods, it is safe to 
split on it
+    val logName = log.getName.stripSuffix(IN_PROGRESS)
+    logName.split("\\.").tail.lastOption
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/8b497046/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerDiskManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerDiskManagerSuite.scala
 
b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerDiskManagerSuite.scala
new file mode 100644
index 0000000..4b1b921
--- /dev/null
+++ 
b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerDiskManagerSuite.scala
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.io.File
+
+import org.mockito.AdditionalAnswers
+import org.mockito.Matchers.{any, anyBoolean, anyLong, eq => meq}
+import org.mockito.Mockito._
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.status.KVUtils
+import org.apache.spark.util.{ManualClock, Utils}
+import org.apache.spark.util.kvstore.KVStore
+
+class HistoryServerDiskManagerSuite extends SparkFunSuite with BeforeAndAfter {
+
+  import config._
+
+  private val MAX_USAGE = 3L
+
+  private var testDir: File = _
+  private var store: KVStore = _
+
+  before {
+    testDir = Utils.createTempDir()
+    store = KVUtils.open(new File(testDir, "listing"), "test")
+  }
+
+  after {
+    store.close()
+    if (testDir != null) {
+      Utils.deleteRecursively(testDir)
+    }
+  }
+
+  private def mockManager(): HistoryServerDiskManager = {
+    val conf = new SparkConf().set(MAX_LOCAL_DISK_USAGE, MAX_USAGE)
+    val manager = spy(new HistoryServerDiskManager(conf, testDir, store, new 
ManualClock()))
+    doAnswer(AdditionalAnswers.returnsFirstArg[Long]()).when(manager)
+      .approximateSize(anyLong(), anyBoolean())
+    manager
+  }
+
+  test("leasing space") {
+    val manager = mockManager()
+
+    // Lease all available space.
+    val leaseA = manager.lease(1)
+    val leaseB = manager.lease(1)
+    val leaseC = manager.lease(1)
+    assert(manager.free() === 0)
+
+    // Revert one lease, get another one.
+    leaseA.rollback()
+    assert(manager.free() > 0)
+    assert(!leaseA.tmpPath.exists())
+
+    val leaseD = manager.lease(1)
+    assert(manager.free() === 0)
+
+    // Committing B should bring the "used" space up to 4, so there shouldn't 
be space left yet.
+    doReturn(2L).when(manager).sizeOf(meq(leaseB.tmpPath))
+    val dstB = leaseB.commit("app2", None)
+    assert(manager.free() === 0)
+    assert(manager.committed() === 2)
+
+    // Rollback C and D, now there should be 1 left.
+    leaseC.rollback()
+    leaseD.rollback()
+    assert(manager.free() === 1)
+
+    // Release app 2 to make it available for eviction.
+    doReturn(2L).when(manager).sizeOf(meq(dstB))
+    manager.release("app2", None)
+    assert(manager.committed() === 2)
+
+    // Emulate an updated event log by replacing the store for lease B. Lease 
1, and commit with
+    // size 3.
+    val leaseE = manager.lease(1)
+    doReturn(3L).when(manager).sizeOf(meq(leaseE.tmpPath))
+    val dstE = leaseE.commit("app2", None)
+    assert(dstE === dstB)
+    assert(dstE.exists())
+    doReturn(3L).when(manager).sizeOf(meq(dstE))
+    assert(!leaseE.tmpPath.exists())
+    assert(manager.free() === 0)
+    manager.release("app2", None)
+    assert(manager.committed() === 3)
+
+    // Try a big lease that should cause the released app to be evicted.
+    val leaseF = manager.lease(6)
+    assert(!dstB.exists())
+    assert(manager.free() === 0)
+    assert(manager.committed() === 0)
+
+    // Leasing when no free space is available should still be allowed.
+    manager.lease(1)
+    assert(manager.free() === 0)
+  }
+
+  test("tracking active stores") {
+    val manager = mockManager()
+
+    // Lease and commit space for app 1, making it active.
+    val leaseA = manager.lease(2)
+    assert(manager.free() === 1)
+    doReturn(2L).when(manager).sizeOf(leaseA.tmpPath)
+    assert(manager.openStore("appA", None).isEmpty)
+    val dstA = leaseA.commit("appA", None)
+
+    // Create a new lease. Leases are always granted, but this shouldn't cause 
app1's store
+    // to be deleted.
+    val leaseB = manager.lease(2)
+    assert(dstA.exists())
+
+    // Trying to commit on top of an active application should fail.
+    intercept[IllegalArgumentException] {
+      leaseB.commit("appA", None)
+    }
+
+    leaseB.rollback()
+
+    // Close appA with an updated size, then create a new lease. Now the app's 
directory should be
+    // deleted.
+    doReturn(3L).when(manager).sizeOf(dstA)
+    manager.release("appA", None)
+    assert(manager.free() === 0)
+
+    val leaseC = manager.lease(1)
+    assert(!dstA.exists())
+    leaseC.rollback()
+
+    assert(manager.openStore("appA", None).isEmpty)
+  }
+
+  test("approximate size heuristic") {
+    val manager = new HistoryServerDiskManager(new SparkConf(false), testDir, 
store,
+      new ManualClock())
+    assert(manager.approximateSize(50L, false) < 50L)
+    assert(manager.approximateSize(50L, true) > 50L)
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to