Repository: spark
Updated Branches:
  refs/heads/master b44d1b8fc -> ed8869ebb


[SPARK-8617][WEBUI] HistoryServer: Include in-progress files during cleanup

## What changes were proposed in this pull request?
- Removed the`attempt.completed ` filter so cleaner would include the orphan 
inprogress files.
- Use loading time for inprogress files as lastUpdated. Keep using the modTime 
for completed files. First one will prevent deletion of inprogress job files. 
Second one will ensure that lastUpdated time won't change for completed jobs in 
an event of HistoryServer reboot.

## How was this patch tested?
Added new unittests and via existing tests.

Author: Ergin Seyfe <[email protected]>

Closes #16165 from seyfe/clear_old_inprogress_files.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ed8869eb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ed8869eb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ed8869eb

Branch: refs/heads/master
Commit: ed8869ebbf39783b16daba2e2498a2bc1889306f
Parents: b44d1b8
Author: Ergin Seyfe <[email protected]>
Authored: Thu Dec 8 10:21:09 2016 -0800
Committer: Marcelo Vanzin <[email protected]>
Committed: Thu Dec 8 10:21:09 2016 -0800

----------------------------------------------------------------------
 .../deploy/history/FsHistoryProvider.scala      | 10 ++--
 .../deploy/history/FsHistoryProviderSuite.scala | 50 +++++++++++++++++++-
 2 files changed, 55 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ed8869eb/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 8ef69b1..3011ed0 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -446,9 +446,13 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
       }
 
       val logPath = fileStatus.getPath()
-
       val appCompleted = isApplicationCompleted(fileStatus)
 
+      // Use loading time as lastUpdated since some filesystems don't update 
modifiedTime
+      // each time file is updated. However use modifiedTime for completed 
jobs so lastUpdated
+      // won't change whenever HistoryServer restarts and reloads the file.
+      val lastUpdated = if (appCompleted) fileStatus.getModificationTime else 
clock.getTimeMillis()
+
       val appListener = replay(fileStatus, appCompleted, new 
ReplayListenerBus(), eventsFilter)
 
       // Without an app ID, new logs will render incorrectly in the listing 
page, so do not list or
@@ -461,7 +465,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
           appListener.appAttemptId,
           appListener.startTime.getOrElse(-1L),
           appListener.endTime.getOrElse(-1L),
-          fileStatus.getModificationTime(),
+          lastUpdated,
           appListener.sparkUser.getOrElse(NOT_STARTED),
           appCompleted,
           fileStatus.getLen()
@@ -546,7 +550,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
       val appsToRetain = new mutable.LinkedHashMap[String, 
FsApplicationHistoryInfo]()
 
       def shouldClean(attempt: FsApplicationAttemptInfo): Boolean = {
-        now - attempt.lastUpdated > maxAge && attempt.completed
+        now - attempt.lastUpdated > maxAge
       }
 
       // Scan all logs from the log directory.

http://git-wip-us.apache.org/repos/asf/spark/blob/ed8869eb/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
 
b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
index 2c41c43..027f412 100644
--- 
a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
@@ -66,7 +66,8 @@ class FsHistoryProviderSuite extends SparkFunSuite with 
BeforeAndAfter with Matc
   }
 
   test("Parse application logs") {
-    val provider = new FsHistoryProvider(createTestConf())
+    val clock = new ManualClock(12345678)
+    val provider = new FsHistoryProvider(createTestConf(), clock)
 
     // Write a new-style application log.
     val newAppComplete = newLogFile("new1", None, inProgress = false)
@@ -109,12 +110,15 @@ class FsHistoryProviderSuite extends SparkFunSuite with 
BeforeAndAfter with Matc
           List(ApplicationAttemptInfo(None, start, end, lastMod, user, 
completed)))
       }
 
+      // For completed files, lastUpdated would be lastModified time.
       list(0) should be (makeAppInfo("new-app-complete", 
newAppComplete.getName(), 1L, 5L,
         newAppComplete.lastModified(), "test", true))
       list(1) should be (makeAppInfo("new-complete-lzf", 
newAppCompressedComplete.getName(),
         1L, 4L, newAppCompressedComplete.lastModified(), "test", true))
+
+      // For Inprogress files, lastUpdated would be current loading time.
       list(2) should be (makeAppInfo("new-incomplete", 
newAppIncomplete.getName(), 1L, -1L,
-        newAppIncomplete.lastModified(), "test", false))
+        clock.getTimeMillis(), "test", false))
 
       // Make sure the UI can be rendered.
       list.foreach { case info =>
@@ -299,6 +303,48 @@ class FsHistoryProviderSuite extends SparkFunSuite with 
BeforeAndAfter with Matc
     assert(!log2.exists())
   }
 
+  test("log cleaner for inProgress files") {
+    val firstFileModifiedTime = TimeUnit.SECONDS.toMillis(10)
+    val secondFileModifiedTime = TimeUnit.SECONDS.toMillis(20)
+    val maxAge = TimeUnit.SECONDS.toMillis(40)
+    val clock = new ManualClock(0)
+    val provider = new FsHistoryProvider(
+      createTestConf().set("spark.history.fs.cleaner.maxAge", s"${maxAge}ms"), 
clock)
+
+    val log1 = newLogFile("inProgressApp1", None, inProgress = true)
+    writeFile(log1, true, None,
+      SparkListenerApplicationStart(
+        "inProgressApp1", Some("inProgressApp1"), 3L, "test", Some("attempt1"))
+    )
+
+    clock.setTime(firstFileModifiedTime)
+    provider.checkForLogs()
+
+    val log2 = newLogFile("inProgressApp2", None, inProgress = true)
+    writeFile(log2, true, None,
+      SparkListenerApplicationStart(
+        "inProgressApp2", Some("inProgressApp2"), 23L, "test2", 
Some("attempt2"))
+    )
+
+    clock.setTime(secondFileModifiedTime)
+    provider.checkForLogs()
+
+    // This should not trigger any cleanup
+    updateAndCheck(provider)(list => list.size should be(2))
+
+    // Should trigger cleanup for first file but not second one
+    clock.setTime(firstFileModifiedTime + maxAge + 1)
+    updateAndCheck(provider)(list => list.size should be(1))
+    assert(!log1.exists())
+    assert(log2.exists())
+
+    // Should cleanup the second file as well.
+    clock.setTime(secondFileModifiedTime + maxAge + 1)
+    updateAndCheck(provider)(list => list.size should be(0))
+    assert(!log1.exists())
+    assert(!log2.exists())
+  }
+
   test("Event log copy") {
     val provider = new FsHistoryProvider(createTestConf())
     val logs = (1 to 2).map { i =>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to