Repository: spark Updated Branches: refs/heads/branch-2.0 bdd27d1ea -> 8b33aa089
[SPARK-17843][WEB UI] Indicate event logs pending for processing on h⦠## What changes were proposed in this pull request? Backport PR #15410 to branch-2.0 ## How was this patch tested? Existing unit tests. Screenshots for UI changes provided in PR #15410. Author: Vinayak <[email protected]> Closes #15991 from vijoshi/SAAS-608. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8b33aa08 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8b33aa08 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8b33aa08 Branch: refs/heads/branch-2.0 Commit: 8b33aa0899d5388ab51b8daa786d0f76fe255ddf Parents: bdd27d1 Author: Vinayak <[email protected]> Authored: Wed Nov 30 08:30:19 2016 -0600 Committer: Tom Graves <[email protected]> Committed: Wed Nov 30 08:30:19 2016 -0600 ---------------------------------------------------------------------- .../spark/ui/static/historypage-common.js | 24 ++++++++ .../history/ApplicationHistoryProvider.scala | 24 ++++++++ .../deploy/history/FsHistoryProvider.scala | 59 ++++++++++++++------ .../spark/deploy/history/HistoryPage.scala | 19 +++++++ .../spark/deploy/history/HistoryServer.scala | 8 +++ 5 files changed, 116 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/8b33aa08/core/src/main/resources/org/apache/spark/ui/static/historypage-common.js ---------------------------------------------------------------------- diff --git a/core/src/main/resources/org/apache/spark/ui/static/historypage-common.js b/core/src/main/resources/org/apache/spark/ui/static/historypage-common.js new file mode 100644 index 0000000..55d540d --- /dev/null +++ b/core/src/main/resources/org/apache/spark/ui/static/historypage-common.js @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +$(document).ready(function() { + if ($('#last-updated').length) { + var lastUpdatedMillis = Number($('#last-updated').text()); + var updatedDate = new Date(lastUpdatedMillis); + $('#last-updated').text(updatedDate.toLocaleDateString()+", "+updatedDate.toLocaleTimeString()) + } +}); http://git-wip-us.apache.org/repos/asf/spark/blob/8b33aa08/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala index ba42b48..f3ea541 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala @@ -73,6 +73,30 @@ private[history] case class LoadedAppUI( private[history] abstract class ApplicationHistoryProvider { /** + * Returns the count of application event logs that the provider is currently still processing. + * History Server UI can use this to indicate to a user that the application listing on the UI + * can be expected to list additional known applications once the processing of these + * application event logs completes. + * + * A History Provider that does not have a notion of count of event logs that may be pending + * for processing need not override this method. + * + * @return Count of application event logs that are currently under process + */ + def getEventLogsUnderProcess(): Int = { + return 0; + } + + /** + * Returns the time the history provider last updated the application history information + * + * @return 0 if this is undefined or unsupported, otherwise the last updated time in millis + */ + def getLastUpdatedTime(): Long = { + return 0; + } + + /** * Returns a list of applications available for the history server to show. * * @return List of all know applications. http://git-wip-us.apache.org/repos/asf/spark/blob/8b33aa08/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index a1e36c5..0d9d22a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -19,7 +19,7 @@ package org.apache.spark.deploy.history import java.io.{FileNotFoundException, IOException, OutputStream} import java.util.UUID -import java.util.concurrent.{Executors, ExecutorService, TimeUnit} +import java.util.concurrent.{Executors, ExecutorService, Future, TimeUnit} import java.util.zip.{ZipEntry, ZipOutputStream} import scala.collection.mutable @@ -107,7 +107,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) // The modification time of the newest log detected during the last scan. Currently only // used for logging msgs (logs are re-scanned based on file size, rather than modtime) - private var lastScanTime = -1L + private val lastScanTime = new java.util.concurrent.atomic.AtomicLong(-1) // Mapping of application IDs to their metadata, in descending end time order. Apps are inserted // into the map in order, so the LinkedHashMap maintains the correct ordering. @@ -119,6 +119,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) // List of application logs to be deleted by event log cleaner. private var attemptsToClean = new mutable.ListBuffer[FsApplicationAttemptInfo] + private val pendingReplayTasksCount = new java.util.concurrent.atomic.AtomicInteger(0) + /** * Return a runnable that performs the given operation on the event logs. * This operation is expected to be executed periodically. @@ -223,6 +225,10 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) applications.get(appId) } + override def getEventLogsUnderProcess(): Int = pendingReplayTasksCount.get() + + override def getLastUpdatedTime(): Long = lastScanTime.get() + override def getAppUI(appId: String, attemptId: Option[String]): Option[LoadedAppUI] = { try { applications.get(appId).flatMap { appInfo => @@ -310,26 +316,43 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) if (logInfos.nonEmpty) { logDebug(s"New/updated attempts found: ${logInfos.size} ${logInfos.map(_.getPath)}") } - logInfos.map { file => - replayExecutor.submit(new Runnable { + + var tasks = mutable.ListBuffer[Future[_]]() + + try { + for (file <- logInfos) { + tasks += replayExecutor.submit(new Runnable { override def run(): Unit = mergeApplicationListing(file) }) } - .foreach { task => - try { - // Wait for all tasks to finish. This makes sure that checkForLogs - // is not scheduled again while some tasks are already running in - // the replayExecutor. - task.get() - } catch { - case e: InterruptedException => - throw e - case e: Exception => - logError("Exception while merging application listings", e) - } + } catch { + // let the iteration over logInfos break, since an exception on + // replayExecutor.submit (..) indicates the ExecutorService is unable + // to take any more submissions at this time + + case e: Exception => + logError(s"Exception while submitting event log for replay", e) + } + + pendingReplayTasksCount.addAndGet(tasks.size) + + tasks.foreach { task => + try { + // Wait for all tasks to finish. This makes sure that checkForLogs + // is not scheduled again while some tasks are already running in + // the replayExecutor. + task.get() + } catch { + case e: InterruptedException => + throw e + case e: Exception => + logError("Exception while merging application listings", e) + } finally { + pendingReplayTasksCount.decrementAndGet() } + } - lastScanTime = newLastScanTime + lastScanTime.set(newLastScanTime) } catch { case e: Exception => logError("Exception in checking for event log updates", e) } @@ -346,7 +369,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } catch { case e: Exception => logError("Exception encountered when attempting to update last scan time", e) - lastScanTime + lastScanTime.get() } finally { if (!fs.delete(path, true)) { logWarning(s"Error deleting ${path}") http://git-wip-us.apache.org/repos/asf/spark/blob/8b33aa08/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala index a120b6c..faf807d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala @@ -33,14 +33,31 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("") .filter(_.completed != requestedIncomplete) val allAppsSize = allApps.size + val eventLogsUnderProcessCount = parent.getEventLogsUnderProcess() + val lastUpdatedTime = parent.getLastUpdatedTime() val providerConfig = parent.getProviderConfig() val content = + <script src={UIUtils.prependBaseUri("/static/historypage-common.js")}></script> <div> <div class="span12"> <ul class="unstyled"> {providerConfig.map { case (k, v) => <li><strong>{k}:</strong> {v}</li> }} </ul> { + if (eventLogsUnderProcessCount > 0) { + <p>There are {eventLogsUnderProcessCount} event log(s) currently being + processed which may result in additional applications getting listed on this page. + Refresh the page to view updates. </p> + } + } + + { + if (lastUpdatedTime > 0) { + <p>Last updated: <span id="last-updated">{lastUpdatedTime}</span></p> + } + } + + { if (allAppsSize > 0) { <script src={UIUtils.prependBaseUri("/static/dataTables.rowsGroup.js")}></script> ++ <div id="history-summary" class="span12 pagination"></div> ++ @@ -48,6 +65,8 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("") <script>setAppLimit({parent.maxApplications})</script> } else if (requestedIncomplete) { <h4>No incomplete applications found!</h4> + } else if (eventLogsUnderProcessCount > 0) { + <h4>No completed applications found!</h4> } else { <h4>No completed applications found!</h4> ++ <p>Did you specify the correct logging directory? http://git-wip-us.apache.org/repos/asf/spark/blob/8b33aa08/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala index 735aa43..996c19e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala @@ -178,6 +178,14 @@ class HistoryServer( provider.getListing() } + def getEventLogsUnderProcess(): Int = { + provider.getEventLogsUnderProcess() + } + + def getLastUpdatedTime(): Long = { + provider.getLastUpdatedTime() + } + def getApplicationInfoList: Iterator[ApplicationInfo] = { getApplicationList().iterator.map(ApplicationsListResource.appHistoryInfoToPublicAppInfo) } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
