Repository: spark
Updated Branches:
  refs/heads/master 271c891b9 -> 653fe0241


[SPARK-6951][CORE] Speed up parsing of event logs during listing.

This change introduces two optimizations to help speed up generation
of listing data when parsing events logs.

The first one allows the parser to be stopped when enough data to
create the listing entry has been read. This is currently the start
event plus environment info, to capture UI ACLs. If the end event is
needed, the code will skip to the end of the log to try to find that
information, instead of parsing the whole log file.

Unfortunately this works better with uncompressed logs. Skipping bytes
on compressed logs only saves the work of parsing lines and some events,
so not a lot of gains are observed.

The second optimization deals with in-progress logs. It works in two
ways: first, it completely avoids parsing the rest of the log for
these apps when enough listing data is read. This, unlike the above,
also speeds things up for compressed logs, since only the very beginning
of the log has to be read.

On top of that, the code that decides whether to re-parse logs to get
updated listing data will ignore in-progress applications until they've
completed.

Both optimizations can be disabled but are enabled by default.

I tested this on some fake event logs to see the effect. I created
500 logs of about 60M each (so ~30G uncompressed; each log was 1.7M
when compressed with zstd). Below, C = completed, IP = in-progress,
the size means the amount of data re-parsed at the end of logs
when necessary.

```
            none/C   none/IP   zstd/C   zstd/IP
On / 16k      2s       2s       22s       2s
On / 1m       3s       2s       24s       2s
Off          1.1m     1.1m      26s      24s
```

This was with 4 threads on a single local SSD. As expected from the
previous explanations, there are considerable gains for in-progress
logs, and for uncompressed logs, but not so much when looking at the
full compressed log.

As a side note, I removed the custom code to get the scan time by
creating a file on HDFS; since file mod times are not used to detect
changed logs anymore, local time is enough for the current use of
the SHS.

Author: Marcelo Vanzin <van...@cloudera.com>

Closes #20952 from vanzin/SPARK-6951.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/653fe024
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/653fe024
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/653fe024

Branch: refs/heads/master
Commit: 653fe02415a537299e15f92b56045569864b6183
Parents: 271c891
Author: Marcelo Vanzin <van...@cloudera.com>
Authored: Wed Apr 11 09:49:25 2018 -0500
Committer: Imran Rashid <iras...@cloudera.com>
Committed: Wed Apr 11 09:49:25 2018 -0500

----------------------------------------------------------------------
 .../deploy/history/FsHistoryProvider.scala      | 251 +++++++++++++------
 .../apache/spark/deploy/history/config.scala    |  15 ++
 .../spark/scheduler/ReplayListenerBus.scala     |  11 +
 .../org/apache/spark/util/ListenerBus.scala     |   5 +-
 .../deploy/history/FsHistoryProviderSuite.scala |  78 ++++--
 5 files changed, 264 insertions(+), 96 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/653fe024/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index ace6d9e..56db935 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -18,12 +18,13 @@
 package org.apache.spark.deploy.history
 
 import java.io.{File, FileNotFoundException, IOException}
-import java.util.{Date, ServiceLoader, UUID}
+import java.util.{Date, ServiceLoader}
 import java.util.concurrent.{ExecutorService, TimeUnit}
 import java.util.zip.{ZipEntry, ZipOutputStream}
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable
+import scala.io.Source
 import scala.util.Try
 import scala.xml.Node
 
@@ -58,10 +59,10 @@ import org.apache.spark.util.kvstore._
  *
  * == How new and updated attempts are detected ==
  *
- * - New attempts are detected in [[checkForLogs]]: the log dir is scanned, 
and any
- * entries in the log dir whose modification time is greater than the last 
scan time
- * are considered new or updated. These are replayed to create a new attempt 
info entry
- * and update or create a matching application info element in the list of 
applications.
+ * - New attempts are detected in [[checkForLogs]]: the log dir is scanned, 
and any entries in the
+ * log dir whose size changed since the last scan time are considered new or 
updated. These are
+ * replayed to create a new attempt info entry and update or create a matching 
application info
+ * element in the list of applications.
  * - Updated attempts are also found in [[checkForLogs]] -- if the attempt's 
log file has grown, the
  * attempt is replaced by another one with a larger log size.
  *
@@ -125,6 +126,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
   private val pendingReplayTasksCount = new 
java.util.concurrent.atomic.AtomicInteger(0)
 
   private val storePath = conf.get(LOCAL_STORE_DIR).map(new File(_))
+  private val fastInProgressParsing = conf.get(FAST_IN_PROGRESS_PARSING)
 
   // Visible for testing.
   private[history] val listing: KVStore = storePath.map { path =>
@@ -402,13 +404,13 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
    */
   private[history] def checkForLogs(): Unit = {
     try {
-      val newLastScanTime = getNewLastScanTime()
+      val newLastScanTime = clock.getTimeMillis()
       logDebug(s"Scanning $logDir with lastScanTime==$lastScanTime")
 
       val updated = Option(fs.listStatus(new 
Path(logDir))).map(_.toSeq).getOrElse(Nil)
         .filter { entry =>
           !entry.isDirectory() &&
-            // FsHistoryProvider generates a hidden file which can't be read.  
Accidentally
+            // FsHistoryProvider used to generate a hidden file which can't be 
read.  Accidentally
             // reading a garbage file is safe, but we would log an error which 
can be scary to
             // the end-user.
             !entry.getPath().getName().startsWith(".") &&
@@ -417,15 +419,24 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
         .filter { entry =>
           try {
             val info = listing.read(classOf[LogInfo], 
entry.getPath().toString())
-            if (info.fileSize < entry.getLen()) {
-              // Log size has changed, it should be parsed.
-              true
-            } else {
+
+            if (info.appId.isDefined) {
               // If the SHS view has a valid application, update the time the 
file was last seen so
-              // that the entry is not deleted from the SHS listing.
-              if (info.appId.isDefined) {
-                listing.write(info.copy(lastProcessed = newLastScanTime))
+              // that the entry is not deleted from the SHS listing. Also 
update the file size, in
+              // case the code below decides we don't need to parse the log.
+              listing.write(info.copy(lastProcessed = newLastScanTime, 
fileSize = entry.getLen()))
+            }
+
+            if (info.fileSize < entry.getLen()) {
+              if (info.appId.isDefined && fastInProgressParsing) {
+                // When fast in-progress parsing is on, we don't need to 
re-parse when the
+                // size changes, but we do need to invalidate any existing UIs.
+                invalidateUI(info.appId.get, info.attemptId)
+                false
+              } else {
+                true
               }
+            } else {
               false
             }
           } catch {
@@ -449,7 +460,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
       val tasks = updated.map { entry =>
         try {
           replayExecutor.submit(new Runnable {
-            override def run(): Unit = mergeApplicationListing(entry, 
newLastScanTime)
+            override def run(): Unit = mergeApplicationListing(entry, 
newLastScanTime, true)
           })
         } catch {
           // let the iteration over the updated entries break, since an 
exception on
@@ -542,25 +553,6 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
     }
   }
 
-  private[history] def getNewLastScanTime(): Long = {
-    val fileName = "." + UUID.randomUUID().toString
-    val path = new Path(logDir, fileName)
-    val fos = fs.create(path)
-
-    try {
-      fos.close()
-      fs.getFileStatus(path).getModificationTime
-    } catch {
-      case e: Exception =>
-        logError("Exception encountered when attempting to update last scan 
time", e)
-        lastScanTime.get()
-    } finally {
-      if (!fs.delete(path, true)) {
-        logWarning(s"Error deleting ${path}")
-      }
-    }
-  }
-
   override def writeEventLogs(
       appId: String,
       attemptId: Option[String],
@@ -607,7 +599,10 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
   /**
    * Replay the given log file, saving the application in the listing db.
    */
-  protected def mergeApplicationListing(fileStatus: FileStatus, scanTime: 
Long): Unit = {
+  protected def mergeApplicationListing(
+      fileStatus: FileStatus,
+      scanTime: Long,
+      enableOptimizations: Boolean): Unit = {
     val eventsFilter: ReplayEventsFilter = { eventString =>
       eventString.startsWith(APPL_START_EVENT_PREFIX) ||
         eventString.startsWith(APPL_END_EVENT_PREFIX) ||
@@ -616,32 +611,118 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
     }
 
     val logPath = fileStatus.getPath()
+    val appCompleted = isCompleted(logPath.getName())
+    val reparseChunkSize = conf.get(END_EVENT_REPARSE_CHUNK_SIZE)
+
+    // Enable halt support in listener if:
+    // - app in progress && fast parsing enabled
+    // - skipping to end event is enabled (regardless of in-progress state)
+    val shouldHalt = enableOptimizations &&
+      ((!appCompleted && fastInProgressParsing) || reparseChunkSize > 0)
+
     val bus = new ReplayListenerBus()
-    val listener = new AppListingListener(fileStatus, clock)
+    val listener = new AppListingListener(fileStatus, clock, shouldHalt)
     bus.addListener(listener)
-    replay(fileStatus, bus, eventsFilter = eventsFilter)
-
-    val (appId, attemptId) = listener.applicationInfo match {
-      case Some(app) =>
-        // Invalidate the existing UI for the reloaded app attempt, if any. 
See LoadedAppUI for a
-        // discussion on the UI lifecycle.
-        synchronized {
-          activeUIs.get((app.info.id, 
app.attempts.head.info.attemptId)).foreach { ui =>
-            ui.invalidate()
-            ui.ui.store.close()
+
+    logInfo(s"Parsing $logPath for listing data...")
+    Utils.tryWithResource(EventLoggingListener.openEventLog(logPath, fs)) { in 
=>
+      bus.replay(in, logPath.toString, !appCompleted, eventsFilter)
+    }
+
+    // If enabled above, the listing listener will halt parsing when there's 
enough information to
+    // create a listing entry. When the app is completed, or fast parsing is 
disabled, we still need
+    // to replay until the end of the log file to try to find the app end 
event. Instead of reading
+    // and parsing line by line, this code skips bytes from the underlying 
stream so that it is
+    // positioned somewhere close to the end of the log file.
+    //
+    // Because the application end event is written while some Spark 
subsystems such as the
+    // scheduler are still active, there is no guarantee that the end event 
will be the last
+    // in the log. So, to be safe, the code uses a configurable chunk to be 
re-parsed at
+    // the end of the file, and retries parsing the whole log later if the 
needed data is
+    // still not found.
+    //
+    // Note that skipping bytes in compressed files is still not cheap, but 
there are still some
+    // minor gains over the normal log parsing done by the replay bus.
+    //
+    // This code re-opens the file so that it knows where it's skipping to. 
This isn't as cheap as
+    // just skipping from the current position, but there isn't a a good way 
to detect what the
+    // current position is, since the replay listener bus buffers data 
internally.
+    val lookForEndEvent = shouldHalt && (appCompleted || 
!fastInProgressParsing)
+    if (lookForEndEvent && listener.applicationInfo.isDefined) {
+      Utils.tryWithResource(EventLoggingListener.openEventLog(logPath, fs)) { 
in =>
+        val target = fileStatus.getLen() - reparseChunkSize
+        if (target > 0) {
+          logInfo(s"Looking for end event; skipping $target bytes from 
$logPath...")
+          var skipped = 0L
+          while (skipped < target) {
+            skipped += in.skip(target - skipped)
           }
         }
 
+        val source = Source.fromInputStream(in).getLines()
+
+        // Because skipping may leave the stream in the middle of a line, read 
the next line
+        // before replaying.
+        if (target > 0) {
+          source.next()
+        }
+
+        bus.replay(source, logPath.toString, !appCompleted, eventsFilter)
+      }
+    }
+
+    logInfo(s"Finished parsing $logPath")
+
+    listener.applicationInfo match {
+      case Some(app) if !lookForEndEvent || app.attempts.head.info.completed =>
+        // In this case, we either didn't care about the end event, or we 
found it. So the
+        // listing data is good.
+        invalidateUI(app.info.id, app.attempts.head.info.attemptId)
         addListing(app)
-        (Some(app.info.id), app.attempts.head.info.attemptId)
+        listing.write(LogInfo(logPath.toString(), scanTime, Some(app.info.id),
+          app.attempts.head.info.attemptId, fileStatus.getLen()))
+
+        // For a finished log, remove the corresponding "in progress" entry 
from the listing DB if
+        // the file is really gone.
+        if (appCompleted) {
+          val inProgressLog = logPath.toString() + 
EventLoggingListener.IN_PROGRESS
+          try {
+            // Fetch the entry first to avoid an RPC when it's already removed.
+            listing.read(classOf[LogInfo], inProgressLog)
+            if (!fs.isFile(new Path(inProgressLog))) {
+              listing.delete(classOf[LogInfo], inProgressLog)
+            }
+          } catch {
+            case _: NoSuchElementException =>
+          }
+        }
+
+      case Some(_) =>
+        // In this case, the attempt is still not marked as finished but was 
expected to. This can
+        // mean the end event is before the configured threshold, so call the 
method again to
+        // re-parse the whole log.
+        logInfo(s"Reparsing $logPath since end event was not found.")
+        mergeApplicationListing(fileStatus, scanTime, false)
 
       case _ =>
         // If the app hasn't written down its app ID to the logs, still record 
the entry in the
         // listing db, with an empty ID. This will make the log eligible for 
deletion if the app
         // does not make progress after the configured max log age.
-        (None, None)
+        listing.write(LogInfo(logPath.toString(), scanTime, None, None, 
fileStatus.getLen()))
+    }
+  }
+
+  /**
+   * Invalidate an existing UI for a given app attempt. See LoadedAppUI for a 
discussion on the
+   * UI lifecycle.
+   */
+  private def invalidateUI(appId: String, attemptId: Option[String]): Unit = {
+    synchronized {
+      activeUIs.get((appId, attemptId)).foreach { ui =>
+        ui.invalidate()
+        ui.ui.store.close()
+      }
     }
-    listing.write(LogInfo(logPath.toString(), scanTime, appId, attemptId, 
fileStatus.getLen()))
   }
 
   /**
@@ -697,29 +778,6 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
   }
 
   /**
-   * Replays the events in the specified log file on the supplied 
`ReplayListenerBus`.
-   * `ReplayEventsFilter` determines what events are replayed.
-   */
-  private def replay(
-      eventLog: FileStatus,
-      bus: ReplayListenerBus,
-      eventsFilter: ReplayEventsFilter = SELECT_ALL_FILTER): Unit = {
-    val logPath = eventLog.getPath()
-    val isCompleted = 
!logPath.getName().endsWith(EventLoggingListener.IN_PROGRESS)
-    logInfo(s"Replaying log path: $logPath")
-    // Note that the eventLog may have *increased* in size since when we 
grabbed the filestatus,
-    // and when we read the file here.  That is OK -- it may result in an 
unnecessary refresh
-    // when there is no update, but will not result in missing an update.  We 
*must* prevent
-    // an error the other way -- if we report a size bigger (ie later) than 
the file that is
-    // actually read, we may never refresh the app.  FileStatus is guaranteed 
to be static
-    // after it's created, so we get a file size that is no bigger than what 
is actually read.
-    Utils.tryWithResource(EventLoggingListener.openEventLog(logPath, fs)) { in 
=>
-      bus.replay(in, logPath.toString, !isCompleted, eventsFilter)
-      logInfo(s"Finished parsing $logPath")
-    }
-  }
-
-  /**
    * Rebuilds the application state store from its event log.
    */
   private def rebuildAppStore(
@@ -741,8 +799,13 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
     } replayBus.addListener(listener)
 
     try {
-      replay(eventLog, replayBus)
+      val path = eventLog.getPath()
+      logInfo(s"Parsing $path to re-build UI...")
+      Utils.tryWithResource(EventLoggingListener.openEventLog(path, fs)) { in 
=>
+        replayBus.replay(in, path.toString(), maybeTruncated = 
!isCompleted(path.toString()))
+      }
       trackingStore.close(false)
+      logInfo(s"Finished parsing $path")
     } catch {
       case e: Exception =>
         Utils.tryLogNonFatalError {
@@ -881,6 +944,10 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
     }
   }
 
+  private def isCompleted(name: String): Boolean = {
+    !name.endsWith(EventLoggingListener.IN_PROGRESS)
+  }
+
 }
 
 private[history] object FsHistoryProvider {
@@ -945,11 +1012,17 @@ private[history] class ApplicationInfoWrapper(
 
 }
 
-private[history] class AppListingListener(log: FileStatus, clock: Clock) 
extends SparkListener {
+private[history] class AppListingListener(
+    log: FileStatus,
+    clock: Clock,
+    haltEnabled: Boolean) extends SparkListener {
 
   private val app = new MutableApplicationInfo()
   private val attempt = new MutableAttemptInfo(log.getPath().getName(), 
log.getLen())
 
+  private var gotEnvUpdate = false
+  private var halted = false
+
   override def onApplicationStart(event: SparkListenerApplicationStart): Unit 
= {
     app.id = event.appId.orNull
     app.name = event.appName
@@ -958,6 +1031,8 @@ private[history] class AppListingListener(log: FileStatus, 
clock: Clock) extends
     attempt.startTime = new Date(event.time)
     attempt.lastUpdated = new Date(clock.getTimeMillis())
     attempt.sparkUser = event.sparkUser
+
+    checkProgress()
   }
 
   override def onApplicationEnd(event: SparkListenerApplicationEnd): Unit = {
@@ -968,11 +1043,18 @@ private[history] class AppListingListener(log: 
FileStatus, clock: Clock) extends
   }
 
   override def onEnvironmentUpdate(event: SparkListenerEnvironmentUpdate): 
Unit = {
-    val allProperties = event.environmentDetails("Spark Properties").toMap
-    attempt.viewAcls = allProperties.get("spark.ui.view.acls")
-    attempt.adminAcls = allProperties.get("spark.admin.acls")
-    attempt.viewAclsGroups = allProperties.get("spark.ui.view.acls.groups")
-    attempt.adminAclsGroups = allProperties.get("spark.admin.acls.groups")
+    // Only parse the first env update, since any future changes don't have 
any effect on
+    // the ACLs set for the UI.
+    if (!gotEnvUpdate) {
+      val allProperties = event.environmentDetails("Spark Properties").toMap
+      attempt.viewAcls = allProperties.get("spark.ui.view.acls")
+      attempt.adminAcls = allProperties.get("spark.admin.acls")
+      attempt.viewAclsGroups = allProperties.get("spark.ui.view.acls.groups")
+      attempt.adminAclsGroups = allProperties.get("spark.admin.acls.groups")
+
+      gotEnvUpdate = true
+      checkProgress()
+    }
   }
 
   override def onOtherEvent(event: SparkListenerEvent): Unit = event match {
@@ -989,6 +1071,17 @@ private[history] class AppListingListener(log: 
FileStatus, clock: Clock) extends
     }
   }
 
+  /**
+   * Throws a halt exception to stop replay if enough data to create the app 
listing has been
+   * read.
+   */
+  private def checkProgress(): Unit = {
+    if (haltEnabled && !halted && app.id != null && gotEnvUpdate) {
+      halted = true
+      throw new HaltReplayException()
+    }
+  }
+
   private class MutableApplicationInfo {
     var id: String = null
     var name: String = null

http://git-wip-us.apache.org/repos/asf/spark/blob/653fe024/core/src/main/scala/org/apache/spark/deploy/history/config.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/config.scala 
b/core/src/main/scala/org/apache/spark/deploy/history/config.scala
index efdbf67..25ba9ed 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/config.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/config.scala
@@ -49,4 +49,19 @@ private[spark] object config {
     .intConf
     .createWithDefault(18080)
 
+  val FAST_IN_PROGRESS_PARSING =
+    ConfigBuilder("spark.history.fs.inProgressOptimization.enabled")
+      .doc("Enable optimized handling of in-progress logs. This option may 
leave finished " +
+        "applications that fail to rename their event logs listed as 
in-progress.")
+      .booleanConf
+      .createWithDefault(true)
+
+  val END_EVENT_REPARSE_CHUNK_SIZE =
+    ConfigBuilder("spark.history.fs.endEventReparseChunkSize")
+      .doc("How many bytes to parse at the end of log files looking for the 
end event. " +
+        "This is used to speed up generation of application listings by 
skipping unnecessary " +
+        "parts of event log files. It can be disabled by setting this config 
to 0.")
+      .bytesConf(ByteUnit.BYTE)
+      .createWithDefaultString("1m")
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/653fe024/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala 
b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
index c9cd662..226c237 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
@@ -115,6 +115,8 @@ private[spark] class ReplayListenerBus extends 
SparkListenerBus with Logging {
         }
       }
     } catch {
+      case e: HaltReplayException =>
+        // Just stop replay.
       case _: EOFException if maybeTruncated =>
       case ioe: IOException =>
         throw ioe
@@ -124,8 +126,17 @@ private[spark] class ReplayListenerBus extends 
SparkListenerBus with Logging {
     }
   }
 
+  override protected def isIgnorableException(e: Throwable): Boolean = {
+    e.isInstanceOf[HaltReplayException]
+  }
+
 }
 
+/**
+ * Exception that can be thrown by listeners to halt replay. This is handled 
by ReplayListenerBus
+ * only, and will cause errors if thrown when using other bus implementations.
+ */
+private[spark] class HaltReplayException extends RuntimeException
 
 private[spark] object ReplayListenerBus {
 

http://git-wip-us.apache.org/repos/asf/spark/blob/653fe024/core/src/main/scala/org/apache/spark/util/ListenerBus.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/ListenerBus.scala 
b/core/src/main/scala/org/apache/spark/util/ListenerBus.scala
index 76a5629..b25a731 100644
--- a/core/src/main/scala/org/apache/spark/util/ListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/util/ListenerBus.scala
@@ -81,7 +81,7 @@ private[spark] trait ListenerBus[L <: AnyRef, E] extends 
Logging {
       try {
         doPostEvent(listener, event)
       } catch {
-        case NonFatal(e) =>
+        case NonFatal(e) if !isIgnorableException(e) =>
           logError(s"Listener ${Utils.getFormattedClassName(listener)} threw 
an exception", e)
       } finally {
         if (maybeTimerContext != null) {
@@ -97,6 +97,9 @@ private[spark] trait ListenerBus[L <: AnyRef, E] extends 
Logging {
    */
   protected def doPostEvent(listener: L, event: E): Unit
 
+  /** Allows bus implementations to prevent error logging for certain 
exceptions. */
+  protected def isIgnorableException(e: Throwable): Boolean = false
+
   private[spark] def findListenersByClass[T <: L : ClassTag](): Seq[T] = {
     val c = implicitly[ClassTag[T]].runtimeClass
     listeners.asScala.filter(_.getClass == c).map(_.asInstanceOf[T]).toSeq

http://git-wip-us.apache.org/repos/asf/spark/blob/653fe024/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
 
b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
index 0ba57bf..77b2394 100644
--- 
a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
@@ -31,7 +31,7 @@ import org.apache.hadoop.fs.{FileStatus, Path}
 import org.apache.hadoop.hdfs.DistributedFileSystem
 import org.json4s.jackson.JsonMethods._
 import org.mockito.Matchers.any
-import org.mockito.Mockito.{doReturn, mock, spy, verify}
+import org.mockito.Mockito.{mock, spy, verify}
 import org.scalatest.BeforeAndAfter
 import org.scalatest.Matchers
 import org.scalatest.concurrent.Eventually._
@@ -151,8 +151,9 @@ class FsHistoryProviderSuite extends SparkFunSuite with 
BeforeAndAfter with Matc
       var mergeApplicationListingCall = 0
       override protected def mergeApplicationListing(
           fileStatus: FileStatus,
-          lastSeen: Long): Unit = {
-        super.mergeApplicationListing(fileStatus, lastSeen)
+          lastSeen: Long,
+          enableSkipToEnd: Boolean): Unit = {
+        super.mergeApplicationListing(fileStatus, lastSeen, enableSkipToEnd)
         mergeApplicationListingCall += 1
       }
     }
@@ -256,14 +257,13 @@ class FsHistoryProviderSuite extends SparkFunSuite with 
BeforeAndAfter with Matc
       )
 
     updateAndCheck(provider) { list =>
-      list should not be (null)
       list.size should be (1)
       list.head.attempts.size should be (3)
       list.head.attempts.head.attemptId should be (Some("attempt3"))
     }
 
     val app2Attempt1 = newLogFile("app2", Some("attempt1"), inProgress = false)
-    writeFile(attempt1, true, None,
+    writeFile(app2Attempt1, true, None,
       SparkListenerApplicationStart("app2", Some("app2"), 5L, "test", 
Some("attempt1")),
       SparkListenerApplicationEnd(6L)
       )
@@ -649,8 +649,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with 
BeforeAndAfter with Matc
     // Add more info to the app log, and trigger the provider to update things.
     writeFile(appLog, true, None,
       SparkListenerApplicationStart(appId, Some(appId), 1L, "test", None),
-      SparkListenerJobStart(0, 1L, Nil, null),
-      SparkListenerApplicationEnd(5L)
+      SparkListenerJobStart(0, 1L, Nil, null)
       )
     provider.checkForLogs()
 
@@ -668,11 +667,12 @@ class FsHistoryProviderSuite extends SparkFunSuite with 
BeforeAndAfter with Matc
   test("clean up stale app information") {
     val storeDir = Utils.createTempDir()
     val conf = createTestConf().set(LOCAL_STORE_DIR, 
storeDir.getAbsolutePath())
-    val provider = spy(new FsHistoryProvider(conf))
+    val clock = new ManualClock()
+    val provider = spy(new FsHistoryProvider(conf, clock))
     val appId = "new1"
 
     // Write logs for two app attempts.
-    doReturn(1L).when(provider).getNewLastScanTime()
+    clock.advance(1)
     val attempt1 = newLogFile(appId, Some("1"), inProgress = false)
     writeFile(attempt1, true, None,
       SparkListenerApplicationStart(appId, Some(appId), 1L, "test", Some("1")),
@@ -697,7 +697,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with 
BeforeAndAfter with Matc
 
     // Delete the underlying log file for attempt 1 and rescan. The UI should 
go away, but since
     // attempt 2 still exists, listing data should be there.
-    doReturn(2L).when(provider).getNewLastScanTime()
+    clock.advance(1)
     attempt1.delete()
     updateAndCheck(provider) { list =>
       assert(list.size === 1)
@@ -708,7 +708,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with 
BeforeAndAfter with Matc
     assert(provider.getAppUI(appId, None) === None)
 
     // Delete the second attempt's log file. Now everything should go away.
-    doReturn(3L).when(provider).getNewLastScanTime()
+    clock.advance(1)
     attempt2.delete()
     updateAndCheck(provider) { list =>
       assert(list.isEmpty)
@@ -718,9 +718,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with 
BeforeAndAfter with Matc
   test("SPARK-21571: clean up removes invalid history files") {
     val clock = new ManualClock()
     val conf = createTestConf().set(MAX_LOG_AGE_S.key, s"2d")
-    val provider = new FsHistoryProvider(conf, clock) {
-      override def getNewLastScanTime(): Long = clock.getTimeMillis()
-    }
+    val provider = new FsHistoryProvider(conf, clock)
 
     // Create 0-byte size inprogress and complete files
     var logCount = 0
@@ -772,6 +770,54 @@ class FsHistoryProviderSuite extends SparkFunSuite with 
BeforeAndAfter with Matc
     assert(new File(testDir.toURI).listFiles().size === validLogCount)
   }
 
+  test("always find end event for finished apps") {
+    // Create a log file where the end event is before the configure chunk to 
be reparsed at
+    // the end of the file. The correct listing should still be generated.
+    val log = newLogFile("end-event-test", None, inProgress = false)
+    writeFile(log, true, None,
+      Seq(
+        SparkListenerApplicationStart("end-event-test", 
Some("end-event-test"), 1L, "test", None),
+        SparkListenerEnvironmentUpdate(Map(
+          "Spark Properties" -> Seq.empty,
+          "JVM Information" -> Seq.empty,
+          "System Properties" -> Seq.empty,
+          "Classpath Entries" -> Seq.empty
+        )),
+        SparkListenerApplicationEnd(5L)
+      ) ++ (1 to 1000).map { i => SparkListenerJobStart(i, i, Nil) }: _*)
+
+    val conf = createTestConf().set(END_EVENT_REPARSE_CHUNK_SIZE.key, s"1k")
+    val provider = new FsHistoryProvider(conf)
+    updateAndCheck(provider) { list =>
+      assert(list.size === 1)
+      assert(list(0).attempts.size === 1)
+      assert(list(0).attempts(0).completed)
+    }
+  }
+
+  test("parse event logs with optimizations off") {
+    val conf = createTestConf()
+      .set(END_EVENT_REPARSE_CHUNK_SIZE, 0L)
+      .set(FAST_IN_PROGRESS_PARSING, false)
+    val provider = new FsHistoryProvider(conf)
+
+    val complete = newLogFile("complete", None, inProgress = false)
+    writeFile(complete, true, None,
+      SparkListenerApplicationStart("complete", Some("complete"), 1L, "test", 
None),
+      SparkListenerApplicationEnd(5L)
+      )
+
+    val incomplete = newLogFile("incomplete", None, inProgress = true)
+    writeFile(incomplete, true, None,
+      SparkListenerApplicationStart("incomplete", Some("incomplete"), 1L, 
"test", None)
+      )
+
+    updateAndCheck(provider) { list =>
+      list.size should be (2)
+      list.count(_.attempts.head.completed) should be (1)
+    }
+  }
+
   /**
    * Asks the provider to check for logs and calls a function to perform 
checks on the updated
    * app list. Example:
@@ -815,7 +861,8 @@ class FsHistoryProviderSuite extends SparkFunSuite with 
BeforeAndAfter with Matc
 
   private def createTestConf(inMemory: Boolean = false): SparkConf = {
     val conf = new SparkConf()
-      .set("spark.history.fs.logDirectory", testDir.getAbsolutePath())
+      .set(EVENT_LOG_DIR, testDir.getAbsolutePath())
+      .set(FAST_IN_PROGRESS_PARSING, true)
 
     if (!inMemory) {
       conf.set(LOCAL_STORE_DIR, Utils.createTempDir().getAbsolutePath())
@@ -848,4 +895,3 @@ class TestGroupsMappingProvider extends 
GroupMappingServiceProvider {
     mappings.get(username).map(Set(_)).getOrElse(Set.empty)
   }
 }
-


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to