Repository: spark
Updated Branches:
  refs/heads/branch-1.4 bc355e243 -> 1b5439f6e


[SPARK-8372] Do not show applications that haven't recorded their app ID yet.

Showing these applications may lead to weird behavior in the History Server. 
For old logs, if
the app ID is recorded later, you may end up with a duplicate entry. For new 
logs, the app might
be listed with a ".inprogress" suffix.

So ignore those, but still allow old applications that don't record app IDs at 
all (1.0 and 1.1) to be shown.

Author: Marcelo Vanzin <van...@cloudera.com>
Author: Carson Wang <carson.w...@intel.com>

Closes #7097 from vanzin/SPARK-8372 and squashes the following commits:

a24eab2 [Marcelo Vanzin] Feedback.
112ae8f [Marcelo Vanzin] Merge branch 'master' into SPARK-8372
7b91b74 [Marcelo Vanzin] Handle logs generated by 1.0 and 1.1.
1eca3fe [Carson Wang] [SPARK-8372] History server shows incorrect information 
for application not started

Conflicts:
        
core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1b5439f6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1b5439f6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1b5439f6

Branch: refs/heads/branch-1.4
Commit: 1b5439f6e809da7389993244f484692fb9ffb43f
Parents: bc355e2
Author: Marcelo Vanzin <van...@cloudera.com>
Authored: Tue Jun 30 14:01:52 2015 -0700
Committer: Andrew Or <and...@databricks.com>
Committed: Tue Jun 30 14:05:53 2015 -0700

----------------------------------------------------------------------
 .../deploy/history/FsHistoryProvider.scala      |  98 +++++++++++------
 .../deploy/history/FsHistoryProviderSuite.scala | 109 ++++++++++++++-----
 2 files changed, 147 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1b5439f6/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 45c2be3..3daa653 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -80,12 +80,6 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
   // List of application logs to be deleted by event log cleaner.
   private var attemptsToClean = new 
mutable.ListBuffer[FsApplicationAttemptInfo]
 
-  // Constants used to parse Spark 1.0.0 log directories.
-  private[history] val LOG_PREFIX = "EVENT_LOG_"
-  private[history] val SPARK_VERSION_PREFIX = 
EventLoggingListener.SPARK_VERSION_KEY + "_"
-  private[history] val COMPRESSION_CODEC_PREFIX = 
EventLoggingListener.COMPRESSION_CODEC_KEY + "_"
-  private[history] val APPLICATION_COMPLETE = "APPLICATION_COMPLETE"
-
   /**
    * Return a runnable that performs the given operation on the event logs.
    * This operation is expected to be executed periodically.
@@ -143,7 +137,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
   override def getAppUI(appId: String, attemptId: Option[String]): 
Option[SparkUI] = {
     try {
       applications.get(appId).flatMap { appInfo =>
-        appInfo.attempts.find(_.attemptId == attemptId).map { attempt =>
+        appInfo.attempts.find(_.attemptId == attemptId).flatMap { attempt =>
           val replayBus = new ReplayListenerBus()
           val ui = {
             val conf = this.conf.clone()
@@ -152,20 +146,20 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
               HistoryServer.getAttemptURI(appId, attempt.attemptId), 
attempt.startTime)
             // Do not call ui.bind() to avoid creating a new server for each 
application
           }
-
           val appListener = new ApplicationEventListener()
           replayBus.addListener(appListener)
           val appInfo = replay(fs.getFileStatus(new Path(logDir, 
attempt.logPath)), replayBus)
-
-          ui.setAppName(s"${appInfo.name} ($appId)")
-
-          val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", 
false)
-          ui.getSecurityManager.setAcls(uiAclsEnabled)
-          // make sure to set admin acls before view acls so they are properly 
picked up
-          
ui.getSecurityManager.setAdminAcls(appListener.adminAcls.getOrElse(""))
-          ui.getSecurityManager.setViewAcls(attempt.sparkUser,
-            appListener.viewAcls.getOrElse(""))
-          ui
+          appInfo.map { info =>
+            ui.setAppName(s"${info.name} ($appId)")
+
+            val uiAclsEnabled = 
conf.getBoolean("spark.history.ui.acls.enable", false)
+            ui.getSecurityManager.setAcls(uiAclsEnabled)
+            // make sure to set admin acls before view acls so they are 
properly picked up
+            
ui.getSecurityManager.setAdminAcls(appListener.adminAcls.getOrElse(""))
+            ui.getSecurityManager.setViewAcls(attempt.sparkUser,
+              appListener.viewAcls.getOrElse(""))
+            ui
+          }
         }
       }
     } catch {
@@ -227,8 +221,12 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
     val newAttempts = logs.flatMap { fileStatus =>
       try {
         val res = replay(fileStatus, bus)
-        logInfo(s"Application log ${res.logPath} loaded successfully.")
-        Some(res)
+        res match {
+          case Some(r) => logDebug(s"Application log ${r.logPath} loaded 
successfully.")
+          case None => logWarning(s"Failed to load application log 
${fileStatus.getPath}. " +
+            "The application may have not started.")
+        }
+        res
       } catch {
         case e: Exception =>
           logError(
@@ -374,9 +372,11 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
 
   /**
    * Replays the events in the specified log file and returns information 
about the associated
-   * application.
+   * application. Return `None` if the application ID cannot be located.
    */
-  private def replay(eventLog: FileStatus, bus: ReplayListenerBus): 
FsApplicationAttemptInfo = {
+  private def replay(
+      eventLog: FileStatus,
+      bus: ReplayListenerBus): Option[FsApplicationAttemptInfo] = {
     val logPath = eventLog.getPath()
     logInfo(s"Replaying log path: $logPath")
     val logInput =
@@ -390,16 +390,24 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
       val appCompleted = isApplicationCompleted(eventLog)
       bus.addListener(appListener)
       bus.replay(logInput, logPath.toString, !appCompleted)
-      new FsApplicationAttemptInfo(
-        logPath.getName(),
-        appListener.appName.getOrElse(NOT_STARTED),
-        appListener.appId.getOrElse(logPath.getName()),
-        appListener.appAttemptId,
-        appListener.startTime.getOrElse(-1L),
-        appListener.endTime.getOrElse(-1L),
-        getModificationTime(eventLog).get,
-        appListener.sparkUser.getOrElse(NOT_STARTED),
-        appCompleted)
+
+      // Without an app ID, new logs will render incorrectly in the listing 
page, so do not list or
+      // try to show their UI. Some old versions of Spark generate logs 
without an app ID, so let
+      // logs generated by those versions go through.
+      if (appListener.appId.isDefined || !sparkVersionHasAppId(eventLog)) {
+        Some(new FsApplicationAttemptInfo(
+          logPath.getName(),
+          appListener.appName.getOrElse(NOT_STARTED),
+          appListener.appId.getOrElse(logPath.getName()),
+          appListener.appAttemptId,
+          appListener.startTime.getOrElse(-1L),
+          appListener.endTime.getOrElse(-1L),
+          getModificationTime(eventLog).get,
+          appListener.sparkUser.getOrElse(NOT_STARTED),
+          appCompleted))
+      } else {
+        None
+      }
     } finally {
       logInput.close()
     }
@@ -474,10 +482,34 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
     }
   }
 
+  /**
+   * Returns whether the version of Spark that generated logs records app IDs. 
App IDs were added
+   * in Spark 1.1.
+   */
+  private def sparkVersionHasAppId(entry: FileStatus): Boolean = {
+    if (isLegacyLogDirectory(entry)) {
+      fs.listStatus(entry.getPath())
+        .find { status => 
status.getPath().getName().startsWith(SPARK_VERSION_PREFIX) }
+        .map { status =>
+          val version = 
status.getPath().getName().substring(SPARK_VERSION_PREFIX.length())
+          version != "1.0" && version != "1.1"
+        }
+        .getOrElse(true)
+    } else {
+      true
+    }
+  }
+
 }
 
-private object FsHistoryProvider {
+private[history] object FsHistoryProvider {
   val DEFAULT_LOG_DIR = "file:/tmp/spark-events"
+
+  // Constants used to parse Spark 1.0.0 log directories.
+  val LOG_PREFIX = "EVENT_LOG_"
+  val SPARK_VERSION_PREFIX = EventLoggingListener.SPARK_VERSION_KEY + "_"
+  val COMPRESSION_CODEC_PREFIX = EventLoggingListener.COMPRESSION_CODEC_KEY + 
"_"
+  val APPLICATION_COMPLETE = "APPLICATION_COMPLETE"
 }
 
 private class FsApplicationAttemptInfo(

http://git-wip-us.apache.org/repos/asf/spark/blob/1b5439f6/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
 
b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
index 0f6933d..afa4958 100644
--- 
a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
@@ -35,6 +35,8 @@ import org.apache.spark.util.{JsonProtocol, ManualClock, 
Utils}
 
 class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with 
Matchers with Logging {
 
+  import FsHistoryProvider._
+
   private var testDir: File = null
 
   before {
@@ -63,7 +65,8 @@ class FsHistoryProviderSuite extends SparkFunSuite with 
BeforeAndAfter with Matc
     // Write a new-style application log.
     val newAppComplete = newLogFile("new1", None, inProgress = false)
     writeFile(newAppComplete, true, None,
-      SparkListenerApplicationStart("new-app-complete", None, 1L, "test", 
None),
+      SparkListenerApplicationStart(newAppComplete.getName(), 
Some("new-app-complete"), 1L, "test",
+        None),
       SparkListenerApplicationEnd(5L)
       )
 
@@ -71,35 +74,30 @@ class FsHistoryProviderSuite extends SparkFunSuite with 
BeforeAndAfter with Matc
     val newAppCompressedComplete = newLogFile("new1compressed", None, 
inProgress = false,
       Some("lzf"))
     writeFile(newAppCompressedComplete, true, None,
-      SparkListenerApplicationStart("new-app-compressed-complete", None, 1L, 
"test", None),
+      SparkListenerApplicationStart(newAppCompressedComplete.getName(), 
Some("new-complete-lzf"),
+        1L, "test", None),
       SparkListenerApplicationEnd(4L))
 
     // Write an unfinished app, new-style.
     val newAppIncomplete = newLogFile("new2", None, inProgress = true)
     writeFile(newAppIncomplete, true, None,
-      SparkListenerApplicationStart("new-app-incomplete", None, 1L, "test", 
None)
+      SparkListenerApplicationStart(newAppIncomplete.getName(), 
Some("new-incomplete"), 1L, "test",
+        None)
       )
 
     // Write an old-style application log.
-    val oldAppComplete = new File(testDir, "old1")
-    oldAppComplete.mkdir()
-    createEmptyFile(new File(oldAppComplete, provider.SPARK_VERSION_PREFIX + 
"1.0"))
-    writeFile(new File(oldAppComplete, provider.LOG_PREFIX + "1"), false, None,
-      SparkListenerApplicationStart("old-app-complete", None, 2L, "test", 
None),
+    val oldAppComplete = writeOldLog("old1", "1.0", None, true,
+      SparkListenerApplicationStart("old1", Some("old-app-complete"), 2L, 
"test", None),
       SparkListenerApplicationEnd(3L)
       )
-    createEmptyFile(new File(oldAppComplete, provider.APPLICATION_COMPLETE))
 
     // Check for logs so that we force the older unfinished app to be loaded, 
to make
     // sure unfinished apps are also sorted correctly.
     provider.checkForLogs()
 
     // Write an unfinished app, old-style.
-    val oldAppIncomplete = new File(testDir, "old2")
-    oldAppIncomplete.mkdir()
-    createEmptyFile(new File(oldAppIncomplete, provider.SPARK_VERSION_PREFIX + 
"1.0"))
-    writeFile(new File(oldAppIncomplete, provider.LOG_PREFIX + "1"), false, 
None,
-      SparkListenerApplicationStart("old-app-incomplete", None, 2L, "test", 
None)
+    val oldAppIncomplete = writeOldLog("old2", "1.0", None, false,
+      SparkListenerApplicationStart("old2", None, 2L, "test", None)
       )
 
     // Force a reload of data from the log directory, and check that both logs 
are loaded.
@@ -120,16 +118,15 @@ class FsHistoryProviderSuite extends SparkFunSuite with 
BeforeAndAfter with Matc
           List(ApplicationAttemptInfo(None, start, end, lastMod, user, 
completed)))
       }
 
-      list(0) should be (makeAppInfo(newAppComplete.getName(), 
"new-app-complete", 1L, 5L,
+      list(0) should be (makeAppInfo("new-app-complete", 
newAppComplete.getName(), 1L, 5L,
         newAppComplete.lastModified(), "test", true))
-      list(1) should be (makeAppInfo(newAppCompressedComplete.getName(),
-        "new-app-compressed-complete", 1L, 4L, 
newAppCompressedComplete.lastModified(), "test",
-        true))
-      list(2) should be (makeAppInfo(oldAppComplete.getName(), 
"old-app-complete", 2L, 3L,
+      list(1) should be (makeAppInfo("new-complete-lzf", 
newAppCompressedComplete.getName(),
+        1L, 4L, newAppCompressedComplete.lastModified(), "test", true))
+      list(2) should be (makeAppInfo("old-app-complete", 
oldAppComplete.getName(), 2L, 3L,
         oldAppComplete.lastModified(), "test", true))
-      list(3) should be (makeAppInfo(oldAppIncomplete.getName(), 
"old-app-incomplete", 2L, -1L,
-        oldAppIncomplete.lastModified(), "test", false))
-      list(4) should be (makeAppInfo(newAppIncomplete.getName(), 
"new-app-incomplete", 1L, -1L,
+      list(3) should be (makeAppInfo(oldAppIncomplete.getName(), 
oldAppIncomplete.getName(), 2L,
+        -1L, oldAppIncomplete.lastModified(), "test", false))
+      list(4) should be (makeAppInfo("new-incomplete", 
newAppIncomplete.getName(), 1L, -1L,
         newAppIncomplete.lastModified(), "test", false))
 
       // Make sure the UI can be rendered.
@@ -151,12 +148,12 @@ class FsHistoryProviderSuite extends SparkFunSuite with 
BeforeAndAfter with Matc
       val codec = if (valid) CompressionCodec.createCodec(new SparkConf(), 
codecName) else null
       val logDir = new File(testDir, codecName)
       logDir.mkdir()
-      createEmptyFile(new File(logDir, provider.SPARK_VERSION_PREFIX + "1.0"))
-      writeFile(new File(logDir, provider.LOG_PREFIX + "1"), false, 
Option(codec),
+      createEmptyFile(new File(logDir, SPARK_VERSION_PREFIX + "1.0"))
+      writeFile(new File(logDir, LOG_PREFIX + "1"), false, Option(codec),
         SparkListenerApplicationStart("app2", None, 2L, "test", None),
         SparkListenerApplicationEnd(3L)
         )
-      createEmptyFile(new File(logDir, provider.COMPRESSION_CODEC_PREFIX + 
codecName))
+      createEmptyFile(new File(logDir, COMPRESSION_CODEC_PREFIX + codecName))
 
       val logPath = new Path(logDir.getAbsolutePath())
       try {
@@ -176,12 +173,12 @@ class FsHistoryProviderSuite extends SparkFunSuite with 
BeforeAndAfter with Matc
   test("SPARK-3697: ignore directories that cannot be read.") {
     val logFile1 = newLogFile("new1", None, inProgress = false)
     writeFile(logFile1, true, None,
-      SparkListenerApplicationStart("app1-1", None, 1L, "test", None),
+      SparkListenerApplicationStart("app1-1", Some("app1-1"), 1L, "test", 
None),
       SparkListenerApplicationEnd(2L)
       )
     val logFile2 = newLogFile("new2", None, inProgress = false)
     writeFile(logFile2, true, None,
-      SparkListenerApplicationStart("app1-2", None, 1L, "test", None),
+      SparkListenerApplicationStart("app1-2", Some("app1-2"), 1L, "test", 
None),
       SparkListenerApplicationEnd(2L)
       )
     logFile2.setReadable(false, false)
@@ -214,6 +211,18 @@ class FsHistoryProviderSuite extends SparkFunSuite with 
BeforeAndAfter with Matc
     }
   }
 
+  test("Parse logs that application is not started") {
+    val provider = new FsHistoryProvider((createTestConf()))
+
+    val logFile1 = newLogFile("app1", None, inProgress = true)
+    writeFile(logFile1, true, None,
+      SparkListenerLogStart("1.4")
+    )
+    updateAndCheck(provider) { list =>
+      list.size should be (0)
+    }
+  }
+
   test("SPARK-5582: empty log directory") {
     val provider = new FsHistoryProvider(createTestConf())
 
@@ -335,6 +344,33 @@ class FsHistoryProviderSuite extends SparkFunSuite with 
BeforeAndAfter with Matc
     assert(!log2.exists())
   }
 
+  test("SPARK-8372: new logs with no app ID are ignored") {
+    val provider = new FsHistoryProvider(createTestConf())
+
+    // Write a new log file without an app id, to make sure it's ignored.
+    val logFile1 = newLogFile("app1", None, inProgress = true)
+    writeFile(logFile1, true, None,
+      SparkListenerLogStart("1.4")
+    )
+
+    // Write a 1.2 log file with no start event (= no app id), it should be 
ignored.
+    writeOldLog("v12Log", "1.2", None, false)
+
+    // Write 1.0 and 1.1 logs, which don't have app ids.
+    writeOldLog("v11Log", "1.1", None, true,
+      SparkListenerApplicationStart("v11Log", None, 2L, "test", None),
+      SparkListenerApplicationEnd(3L))
+    writeOldLog("v10Log", "1.0", None, true,
+      SparkListenerApplicationStart("v10Log", None, 2L, "test", None),
+      SparkListenerApplicationEnd(4L))
+
+    updateAndCheck(provider) { list =>
+      list.size should be (2)
+      list(0).id should be ("v10Log")
+      list(1).id should be ("v11Log")
+    }
+  }
+
   /**
    * Asks the provider to check for logs and calls a function to perform 
checks on the updated
    * app list. Example:
@@ -374,4 +410,23 @@ class FsHistoryProviderSuite extends SparkFunSuite with 
BeforeAndAfter with Matc
     new SparkConf().set("spark.history.fs.logDirectory", 
testDir.getAbsolutePath())
   }
 
+  private def writeOldLog(
+      fname: String,
+      sparkVersion: String,
+      codec: Option[CompressionCodec],
+      completed: Boolean,
+      events: SparkListenerEvent*): File = {
+    val log = new File(testDir, fname)
+    log.mkdir()
+
+    val oldEventLog = new File(log, LOG_PREFIX + "1")
+    createEmptyFile(new File(log, SPARK_VERSION_PREFIX + sparkVersion))
+    writeFile(new File(log, LOG_PREFIX + "1"), false, codec, events: _*)
+    if (completed) {
+      createEmptyFile(new File(log, APPLICATION_COMPLETE))
+    }
+
+    log
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to