Repository: spark
Updated Branches:
  refs/heads/master 07296a61c -> fed2139f0


[SPARK-20664][CORE] Delete stale application data from SHS.

Detect the deletion of event log files from storage, and remove
data about the related application attempt in the SHS.

Also contains code to fix SPARK-21571 based on code by ericvandenbergfb.

Author: Marcelo Vanzin <van...@cloudera.com>

Closes #20138 from vanzin/SPARK-20664.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fed2139f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fed2139f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fed2139f

Branch: refs/heads/master
Commit: fed2139f053fac4a9a6952ff0ab1cc2a5f657bd0
Parents: 07296a6
Author: Marcelo Vanzin <van...@cloudera.com>
Authored: Fri Jan 19 13:26:37 2018 -0600
Committer: Imran Rashid <iras...@cloudera.com>
Committed: Fri Jan 19 13:26:37 2018 -0600

----------------------------------------------------------------------
 .../deploy/history/FsHistoryProvider.scala      | 297 ++++++++++++-------
 .../deploy/history/FsHistoryProviderSuite.scala | 117 +++++++-
 .../deploy/history/HistoryServerSuite.scala     |   4 +-
 3 files changed, 306 insertions(+), 112 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/fed2139f/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 94c80eb..f9d0b5e 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -19,7 +19,7 @@ package org.apache.spark.deploy.history
 
 import java.io.{File, FileNotFoundException, IOException}
 import java.util.{Date, ServiceLoader, UUID}
-import java.util.concurrent.{Executors, ExecutorService, Future, TimeUnit}
+import java.util.concurrent.{ExecutorService, TimeUnit}
 import java.util.zip.{ZipEntry, ZipOutputStream}
 
 import scala.collection.JavaConverters._
@@ -29,7 +29,7 @@ import scala.xml.Node
 
 import com.fasterxml.jackson.annotation.JsonIgnore
 import com.google.common.io.ByteStreams
-import com.google.common.util.concurrent.{MoreExecutors, ThreadFactoryBuilder}
+import com.google.common.util.concurrent.MoreExecutors
 import org.apache.hadoop.fs.{FileStatus, Path}
 import org.apache.hadoop.fs.permission.FsAction
 import org.apache.hadoop.hdfs.DistributedFileSystem
@@ -116,8 +116,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
   // Used by check event thread and clean log thread.
   // Scheduled thread pool size must be one, otherwise it will have concurrent 
issues about fs
   // and applications between check task and clean task.
-  private val pool = Executors.newScheduledThreadPool(1, new 
ThreadFactoryBuilder()
-    .setNameFormat("spark-history-task-%d").setDaemon(true).build())
+  private val pool = 
ThreadUtils.newDaemonSingleThreadScheduledExecutor("spark-history-task-%d")
 
   // The modification time of the newest log detected during the last scan.   
Currently only
   // used for logging msgs (logs are re-scanned based on file size, rather 
than modtime)
@@ -174,7 +173,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
    * Fixed size thread pool to fetch and parse log files.
    */
   private val replayExecutor: ExecutorService = {
-    if (!conf.contains("spark.testing")) {
+    if (Utils.isTesting) {
       ThreadUtils.newDaemonFixedThreadPool(NUM_PROCESSING_THREADS, 
"log-replay-executor")
     } else {
       MoreExecutors.sameThreadExecutor()
@@ -275,7 +274,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
     try {
       Some(load(appId).toApplicationInfo())
     } catch {
-      case e: NoSuchElementException =>
+      case _: NoSuchElementException =>
         None
     }
   }
@@ -405,49 +404,70 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
     try {
       val newLastScanTime = getNewLastScanTime()
       logDebug(s"Scanning $logDir with lastScanTime==$lastScanTime")
-      // scan for modified applications, replay and merge them
-      val logInfos = Option(fs.listStatus(new 
Path(logDir))).map(_.toSeq).getOrElse(Nil)
+
+      val updated = Option(fs.listStatus(new 
Path(logDir))).map(_.toSeq).getOrElse(Nil)
         .filter { entry =>
           !entry.isDirectory() &&
             // FsHistoryProvider generates a hidden file which can't be read.  
Accidentally
             // reading a garbage file is safe, but we would log an error which 
can be scary to
             // the end-user.
             !entry.getPath().getName().startsWith(".") &&
-            SparkHadoopUtil.get.checkAccessPermission(entry, FsAction.READ) &&
-            recordedFileSize(entry.getPath()) < entry.getLen()
+            SparkHadoopUtil.get.checkAccessPermission(entry, FsAction.READ)
+        }
+        .filter { entry =>
+          try {
+            val info = listing.read(classOf[LogInfo], 
entry.getPath().toString())
+            if (info.fileSize < entry.getLen()) {
+              // Log size has changed, it should be parsed.
+              true
+            } else {
+              // If the SHS view has a valid application, update the time the 
file was last seen so
+              // that the entry is not deleted from the SHS listing.
+              if (info.appId.isDefined) {
+                listing.write(info.copy(lastProcessed = newLastScanTime))
+              }
+              false
+            }
+          } catch {
+            case _: NoSuchElementException =>
+              // If the file is currently not being tracked by the SHS, add an 
entry for it and try
+              // to parse it. This will allow the cleaner code to detect the 
file as stale later on
+              // if it was not possible to parse it.
+              listing.write(LogInfo(entry.getPath().toString(), 
newLastScanTime, None, None,
+                entry.getLen()))
+              entry.getLen() > 0
+          }
         }
         .sortWith { case (entry1, entry2) =>
           entry1.getModificationTime() > entry2.getModificationTime()
         }
 
-      if (logInfos.nonEmpty) {
-        logDebug(s"New/updated attempts found: ${logInfos.size} 
${logInfos.map(_.getPath)}")
+      if (updated.nonEmpty) {
+        logDebug(s"New/updated attempts found: ${updated.size} 
${updated.map(_.getPath)}")
       }
 
-      var tasks = mutable.ListBuffer[Future[_]]()
-
-      try {
-        for (file <- logInfos) {
-          tasks += replayExecutor.submit(new Runnable {
-            override def run(): Unit = mergeApplicationListing(file)
+      val tasks = updated.map { entry =>
+        try {
+          replayExecutor.submit(new Runnable {
+            override def run(): Unit = mergeApplicationListing(entry, 
newLastScanTime)
           })
+        } catch {
+          // let the iteration over the updated entries break, since an 
exception on
+          // replayExecutor.submit (..) indicates the ExecutorService is unable
+          // to take any more submissions at this time
+          case e: Exception =>
+            logError(s"Exception while submitting event log for replay", e)
+            null
         }
-      } catch {
-        // let the iteration over logInfos break, since an exception on
-        // replayExecutor.submit (..) indicates the ExecutorService is unable
-        // to take any more submissions at this time
-
-        case e: Exception =>
-          logError(s"Exception while submitting event log for replay", e)
-      }
+      }.filter(_ != null)
 
       pendingReplayTasksCount.addAndGet(tasks.size)
 
+      // Wait for all tasks to finish. This makes sure that checkForLogs
+      // is not scheduled again while some tasks are already running in
+      // the replayExecutor.
       tasks.foreach { task =>
         try {
-          // Wait for all tasks to finish. This makes sure that checkForLogs
-          // is not scheduled again while some tasks are already running in
-          // the replayExecutor.
           task.get()
         } catch {
           case e: InterruptedException =>
@@ -459,13 +479,70 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
         }
       }
 
+      // Delete all information about applications whose log files disappeared 
from storage.
+      // This is done by identifying the event logs which were not touched by 
the current
+      // directory scan.
+      //
+      // Only entries with valid applications are cleaned up here. Cleaning up 
invalid log
+      // files is done by the periodic cleaner task.
+      val stale = listing.view(classOf[LogInfo])
+        .index("lastProcessed")
+        .last(newLastScanTime - 1)
+        .asScala
+        .toList
+      stale.foreach { log =>
+        log.appId.foreach { appId =>
+          cleanAppData(appId, log.attemptId, log.logPath)
+          listing.delete(classOf[LogInfo], log.logPath)
+        }
+      }
+
       lastScanTime.set(newLastScanTime)
     } catch {
       case e: Exception => logError("Exception in checking for event log 
updates", e)
     }
   }
 
-  private def getNewLastScanTime(): Long = {
+  private def cleanAppData(appId: String, attemptId: Option[String], logPath: 
String): Unit = {
+    try {
+      val app = load(appId)
+      val (attempt, others) = app.attempts.partition(_.info.attemptId == 
attemptId)
+
+      assert(attempt.isEmpty || attempt.size == 1)
+      val isStale = attempt.headOption.exists { a =>
+        if (a.logPath != new Path(logPath).getName()) {
+          // If the log file name does not match, then probably the old log 
file was from an
+          // in progress application. Just return that the app should be left 
alone.
+          false
+        } else {
+          val maybeUI = synchronized {
+            activeUIs.remove(appId -> attemptId)
+          }
+
+          maybeUI.foreach { ui =>
+            ui.invalidate()
+            ui.ui.store.close()
+          }
+
+          diskManager.foreach(_.release(appId, attemptId, delete = true))
+          true
+        }
+      }
+
+      if (isStale) {
+        if (others.nonEmpty) {
+          val newAppInfo = new ApplicationInfoWrapper(app.info, others)
+          listing.write(newAppInfo)
+        } else {
+          listing.delete(classOf[ApplicationInfoWrapper], appId)
+        }
+      }
+    } catch {
+      case _: NoSuchElementException =>
+    }
+  }
+
+  private[history] def getNewLastScanTime(): Long = {
     val fileName = "." + UUID.randomUUID().toString
     val path = new Path(logDir, fileName)
     val fos = fs.create(path)
@@ -530,7 +607,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
   /**
    * Replay the given log file, saving the application in the listing db.
    */
-  protected def mergeApplicationListing(fileStatus: FileStatus): Unit = {
+  protected def mergeApplicationListing(fileStatus: FileStatus, scanTime: 
Long): Unit = {
     val eventsFilter: ReplayEventsFilter = { eventString =>
       eventString.startsWith(APPL_START_EVENT_PREFIX) ||
         eventString.startsWith(APPL_END_EVENT_PREFIX) ||
@@ -544,73 +621,78 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
     bus.addListener(listener)
     replay(fileStatus, bus, eventsFilter = eventsFilter)
 
-    listener.applicationInfo.foreach { app =>
-      // Invalidate the existing UI for the reloaded app attempt, if any. See 
LoadedAppUI for a
-      // discussion on the UI lifecycle.
-      synchronized {
-        activeUIs.get((app.info.id, app.attempts.head.info.attemptId)).foreach 
{ ui =>
-          ui.invalidate()
-          ui.ui.store.close()
+    val (appId, attemptId) = listener.applicationInfo match {
+      case Some(app) =>
+        // Invalidate the existing UI for the reloaded app attempt, if any. 
See LoadedAppUI for a
+        // discussion on the UI lifecycle.
+        synchronized {
+          activeUIs.get((app.info.id, 
app.attempts.head.info.attemptId)).foreach { ui =>
+            ui.invalidate()
+            ui.ui.store.close()
+          }
         }
-      }
 
-      addListing(app)
+        addListing(app)
+        (Some(app.info.id), app.attempts.head.info.attemptId)
+
+      case _ =>
+        // If the app hasn't written down its app ID to the logs, still record 
the entry in the
+        // listing db, with an empty ID. This will make the log eligible for 
deletion if the app
+        // does not make progress after the configured max log age.
+        (None, None)
     }
-    listing.write(new LogInfo(logPath.toString(), fileStatus.getLen()))
+    listing.write(LogInfo(logPath.toString(), scanTime, appId, attemptId, 
fileStatus.getLen()))
   }
 
   /**
    * Delete event logs from the log directory according to the clean policy 
defined by the user.
    */
-  private[history] def cleanLogs(): Unit = {
-    var iterator: Option[KVStoreIterator[ApplicationInfoWrapper]] = None
-    try {
-      val maxTime = clock.getTimeMillis() - conf.get(MAX_LOG_AGE_S) * 1000
-
-      // Iterate descending over all applications whose oldest attempt 
happened before maxTime.
-      iterator = Some(listing.view(classOf[ApplicationInfoWrapper])
-        .index("oldestAttempt")
-        .reverse()
-        .first(maxTime)
-        .closeableIterator())
-
-      iterator.get.asScala.foreach { app =>
-        // Applications may have multiple attempts, some of which may not need 
to be deleted yet.
-        val (remaining, toDelete) = app.attempts.partition { attempt =>
-          attempt.info.lastUpdated.getTime() >= maxTime
-        }
+  private[history] def cleanLogs(): Unit = Utils.tryLog {
+    val maxTime = clock.getTimeMillis() - conf.get(MAX_LOG_AGE_S) * 1000
 
-        if (remaining.nonEmpty) {
-          val newApp = new ApplicationInfoWrapper(app.info, remaining)
-          listing.write(newApp)
-        }
+    val expired = listing.view(classOf[ApplicationInfoWrapper])
+      .index("oldestAttempt")
+      .reverse()
+      .first(maxTime)
+      .asScala
+      .toList
+    expired.foreach { app =>
+      // Applications may have multiple attempts, some of which may not need 
to be deleted yet.
+      val (remaining, toDelete) = app.attempts.partition { attempt =>
+        attempt.info.lastUpdated.getTime() >= maxTime
+      }
 
-        toDelete.foreach { attempt =>
-          val logPath = new Path(logDir, attempt.logPath)
-          try {
-            listing.delete(classOf[LogInfo], logPath.toString())
-          } catch {
-            case _: NoSuchElementException =>
-              logDebug(s"Log info entry for $logPath not found.")
-          }
-          try {
-            fs.delete(logPath, true)
-          } catch {
-            case e: AccessControlException =>
-              logInfo(s"No permission to delete ${attempt.logPath}, ignoring.")
-            case t: IOException =>
-              logError(s"IOException in cleaning ${attempt.logPath}", t)
-          }
-        }
+      if (remaining.nonEmpty) {
+        val newApp = new ApplicationInfoWrapper(app.info, remaining)
+        listing.write(newApp)
+      }
 
-        if (remaining.isEmpty) {
-          listing.delete(app.getClass(), app.id)
-        }
+      toDelete.foreach { attempt =>
+        logInfo(s"Deleting expired event log for ${attempt.logPath}")
+        val logPath = new Path(logDir, attempt.logPath)
+        listing.delete(classOf[LogInfo], logPath.toString())
+        cleanAppData(app.id, attempt.info.attemptId, logPath.toString())
+        deleteLog(logPath)
+      }
+
+      if (remaining.isEmpty) {
+        listing.delete(app.getClass(), app.id)
+      }
+    }
+
+    // Delete log files that don't have a valid application and exceed the 
configured max age.
+    val stale = listing.view(classOf[LogInfo])
+      .index("lastProcessed")
+      .reverse()
+      .first(maxTime)
+      .asScala
+      .toList
+    stale.foreach { log =>
+      if (log.appId.isEmpty) {
+        logInfo(s"Deleting invalid / corrupt event log ${log.logPath}")
+        deleteLog(new Path(log.logPath))
+        listing.delete(classOf[LogInfo], log.logPath)
       }
-    } catch {
-      case t: Exception => logError("Exception while cleaning logs", t)
-    } finally {
-      iterator.foreach(_.close())
     }
   }
 
@@ -631,12 +713,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
     // an error the other way -- if we report a size bigger (ie later) than 
the file that is
     // actually read, we may never refresh the app.  FileStatus is guaranteed 
to be static
     // after it's created, so we get a file size that is no bigger than what 
is actually read.
-    val logInput = EventLoggingListener.openEventLog(logPath, fs)
-    try {
-      bus.replay(logInput, logPath.toString, !isCompleted, eventsFilter)
+    Utils.tryWithResource(EventLoggingListener.openEventLog(logPath, fs)) { in 
=>
+      bus.replay(in, logPath.toString, !isCompleted, eventsFilter)
       logInfo(s"Finished parsing $logPath")
-    } finally {
-      logInput.close()
     }
   }
 
@@ -703,18 +782,6 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
         |  application count=$count}""".stripMargin
   }
 
-  /**
-   * Return the last known size of the given event log, recorded the last time 
the file
-   * system scanner detected a change in the file.
-   */
-  private def recordedFileSize(log: Path): Long = {
-    try {
-      listing.read(classOf[LogInfo], log.toString()).fileSize
-    } catch {
-      case _: NoSuchElementException => 0L
-    }
-  }
-
   private def load(appId: String): ApplicationInfoWrapper = {
     listing.read(classOf[ApplicationInfoWrapper], appId)
   }
@@ -773,11 +840,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
     logInfo(s"Leasing disk manager space for app $appId / 
${attempt.info.attemptId}...")
     val lease = dm.lease(status.getLen(), isCompressed)
     val newStorePath = try {
-      val store = KVUtils.open(lease.tmpPath, metadata)
-      try {
+      Utils.tryWithResource(KVUtils.open(lease.tmpPath, metadata)) { store =>
         rebuildAppStore(store, status, attempt.info.lastUpdated.getTime())
-      } finally {
-        store.close()
       }
       lease.commit(appId, attempt.info.attemptId)
     } catch {
@@ -806,6 +870,17 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
       throw new NoSuchElementException(s"Cannot find attempt $attemptId of 
$appId."))
   }
 
+  private def deleteLog(log: Path): Unit = {
+    try {
+      fs.delete(log, true)
+    } catch {
+      case _: AccessControlException =>
+        logInfo(s"No permission to delete $log, ignoring.")
+      case ioe: IOException =>
+        logError(s"IOException in cleaning $log", ioe)
+    }
+  }
+
 }
 
 private[history] object FsHistoryProvider {
@@ -832,8 +907,16 @@ private[history] case class FsHistoryProviderMetadata(
     uiVersion: Long,
     logDir: String)
 
+/**
+ * Tracking info for event logs detected in the configured log directory. 
Tracks both valid and
+ * invalid logs (e.g. unparseable logs, recorded as logs with no app ID) so 
that the cleaner
+ * can know what log files are safe to delete.
+ */
 private[history] case class LogInfo(
     @KVIndexParam logPath: String,
+    @KVIndexParam("lastProcessed") lastProcessed: Long,
+    appId: Option[String],
+    attemptId: Option[String],
     fileSize: Long)
 
 private[history] class AttemptInfoWrapper(

http://git-wip-us.apache.org/repos/asf/spark/blob/fed2139f/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
 
b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
index 84ee01c..787de59 100644
--- 
a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
@@ -31,7 +31,7 @@ import org.apache.hadoop.fs.{FileStatus, Path}
 import org.apache.hadoop.hdfs.DistributedFileSystem
 import org.json4s.jackson.JsonMethods._
 import org.mockito.Matchers.any
-import org.mockito.Mockito.{mock, spy, verify}
+import org.mockito.Mockito.{doReturn, mock, spy, verify}
 import org.scalatest.BeforeAndAfter
 import org.scalatest.Matchers
 import org.scalatest.concurrent.Eventually._
@@ -149,8 +149,10 @@ class FsHistoryProviderSuite extends SparkFunSuite with 
BeforeAndAfter with Matc
 
     class TestFsHistoryProvider extends FsHistoryProvider(createTestConf()) {
       var mergeApplicationListingCall = 0
-      override protected def mergeApplicationListing(fileStatus: FileStatus): 
Unit = {
-        super.mergeApplicationListing(fileStatus)
+      override protected def mergeApplicationListing(
+          fileStatus: FileStatus,
+          lastSeen: Long): Unit = {
+        super.mergeApplicationListing(fileStatus, lastSeen)
         mergeApplicationListingCall += 1
       }
     }
@@ -663,6 +665,115 @@ class FsHistoryProviderSuite extends SparkFunSuite with 
BeforeAndAfter with Matc
     freshUI.get.ui.store.job(0)
   }
 
+  test("clean up stale app information") {
+    val storeDir = Utils.createTempDir()
+    val conf = createTestConf().set(LOCAL_STORE_DIR, 
storeDir.getAbsolutePath())
+    val provider = spy(new FsHistoryProvider(conf))
+    val appId = "new1"
+
+    // Write logs for two app attempts.
+    doReturn(1L).when(provider).getNewLastScanTime()
+    val attempt1 = newLogFile(appId, Some("1"), inProgress = false)
+    writeFile(attempt1, true, None,
+      SparkListenerApplicationStart(appId, Some(appId), 1L, "test", Some("1")),
+      SparkListenerJobStart(0, 1L, Nil, null),
+      SparkListenerApplicationEnd(5L)
+      )
+    val attempt2 = newLogFile(appId, Some("2"), inProgress = false)
+    writeFile(attempt2, true, None,
+      SparkListenerApplicationStart(appId, Some(appId), 1L, "test", Some("2")),
+      SparkListenerJobStart(0, 1L, Nil, null),
+      SparkListenerApplicationEnd(5L)
+      )
+    updateAndCheck(provider) { list =>
+      assert(list.size === 1)
+      assert(list(0).id === appId)
+      assert(list(0).attempts.size === 2)
+    }
+
+    // Load the app's UI.
+    val ui = provider.getAppUI(appId, Some("1"))
+    assert(ui.isDefined)
+
+    // Delete the underlying log file for attempt 1 and rescan. The UI should 
go away, but since
+    // attempt 2 still exists, listing data should be there.
+    doReturn(2L).when(provider).getNewLastScanTime()
+    attempt1.delete()
+    updateAndCheck(provider) { list =>
+      assert(list.size === 1)
+      assert(list(0).id === appId)
+      assert(list(0).attempts.size === 1)
+    }
+    assert(!ui.get.valid)
+    assert(provider.getAppUI(appId, None) === None)
+
+    // Delete the second attempt's log file. Now everything should go away.
+    doReturn(3L).when(provider).getNewLastScanTime()
+    attempt2.delete()
+    updateAndCheck(provider) { list =>
+      assert(list.isEmpty)
+    }
+  }
+
+  test("SPARK-21571: clean up removes invalid history files") {
+    // TODO: "maxTime" becoming negative in cleanLogs() causes this test to 
fail, so avoid that
+    // until we figure out what's causing the problem.
+    val clock = new ManualClock(TimeUnit.DAYS.toMillis(120))
+    val conf = createTestConf().set(MAX_LOG_AGE_S.key, s"2d")
+    val provider = new FsHistoryProvider(conf, clock) {
+      override def getNewLastScanTime(): Long = clock.getTimeMillis()
+    }
+
+    // Create 0-byte size inprogress and complete files
+    var logCount = 0
+    var validLogCount = 0
+
+    val emptyInProgress = newLogFile("emptyInprogressLogFile", None, 
inProgress = true)
+    emptyInProgress.createNewFile()
+    emptyInProgress.setLastModified(clock.getTimeMillis())
+    logCount += 1
+
+    val slowApp = newLogFile("slowApp", None, inProgress = true)
+    slowApp.createNewFile()
+    slowApp.setLastModified(clock.getTimeMillis())
+    logCount += 1
+
+    val emptyFinished = newLogFile("emptyFinishedLogFile", None, inProgress = 
false)
+    emptyFinished.createNewFile()
+    emptyFinished.setLastModified(clock.getTimeMillis())
+    logCount += 1
+
+    // Create an incomplete log file, has an end record but no start record.
+    val corrupt = newLogFile("nonEmptyCorruptLogFile", None, inProgress = 
false)
+    writeFile(corrupt, true, None, SparkListenerApplicationEnd(0))
+    corrupt.setLastModified(clock.getTimeMillis())
+    logCount += 1
+
+    provider.checkForLogs()
+    provider.cleanLogs()
+    assert(new File(testDir.toURI).listFiles().size === logCount)
+
+    // Move the clock forward 1 day and scan the files again. They should 
still be there.
+    clock.advance(TimeUnit.DAYS.toMillis(1))
+    provider.checkForLogs()
+    provider.cleanLogs()
+    assert(new File(testDir.toURI).listFiles().size === logCount)
+
+    // Update the slow app to contain valid info. Code should detect the 
change and not clean
+    // it up.
+    writeFile(slowApp, true, None,
+      SparkListenerApplicationStart(slowApp.getName(), 
Some(slowApp.getName()), 1L, "test", None))
+    slowApp.setLastModified(clock.getTimeMillis())
+    validLogCount += 1
+
+    // Move the clock forward another 2 days and scan the files again. This 
time the cleaner should
+    // pick up the invalid files and get rid of them.
+    clock.advance(TimeUnit.DAYS.toMillis(2))
+    provider.checkForLogs()
+    provider.cleanLogs()
+    assert(new File(testDir.toURI).listFiles().size === validLogCount)
+  }
+
   /**
    * Asks the provider to check for logs and calls a function to perform 
checks on the updated
    * app list. Example:

http://git-wip-us.apache.org/repos/asf/spark/blob/fed2139f/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala 
b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
index 87778dd..7aa60f2 100644
--- 
a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
@@ -48,7 +48,7 @@ import org.apache.spark.deploy.history.config._
 import org.apache.spark.status.api.v1.ApplicationInfo
 import org.apache.spark.status.api.v1.JobData
 import org.apache.spark.ui.SparkUI
-import org.apache.spark.util.{ResetSystemProperties, Utils}
+import org.apache.spark.util.{ResetSystemProperties, ShutdownHookManager, 
Utils}
 
 /**
  * A collection of tests against the historyserver, including comparing 
responses from the json
@@ -564,7 +564,7 @@ class HistoryServerSuite extends SparkFunSuite with 
BeforeAndAfter with Matchers
     assert(jobcount === getNumJobs("/jobs"))
 
     // no need to retain the test dir now the tests complete
-    logDir.deleteOnExit()
+    ShutdownHookManager.registerShutdownDeleteDir(logDir)
   }
 
   test("ui and api authorization checks") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to