This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 9b098f1 [SPARK-30119][WEBUI] Support pagination for streaming tab 9b098f1 is described below commit 9b098f1eb91a5e9f488d573bfeea3f6bfd9b95b3 Author: iRakson <raksonrak...@gmail.com> AuthorDate: Fri Jun 12 10:27:31 2020 -0500 [SPARK-30119][WEBUI] Support pagination for streaming tab ### What changes were proposed in this pull request? #28747 reverted #28439 due to some flaky test case. This PR fixes the flaky test and adds pagination support. ### Why are the changes needed? To support pagination for streaming tab ### Does this PR introduce _any_ user-facing change? Yes, Now streaming tab tables will be paginated. ### How was this patch tested? Manually. Closes #28748 from iRakson/fixstreamingpagination. Authored-by: iRakson <raksonrak...@gmail.com> Signed-off-by: Sean Owen <sro...@gmail.com> --- .../resources/org/apache/spark/ui/static/webui.js | 3 +- .../spark/streaming/ui/AllBatchesTable.scala | 282 +++++++++++---------- .../apache/spark/streaming/ui/StreamingPage.scala | 113 ++++++--- .../apache/spark/streaming/UISeleniumSuite.scala | 39 ++- 4 files changed, 259 insertions(+), 178 deletions(-) diff --git a/core/src/main/resources/org/apache/spark/ui/static/webui.js b/core/src/main/resources/org/apache/spark/ui/static/webui.js index 4f8409c..bb37256 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/webui.js +++ b/core/src/main/resources/org/apache/spark/ui/static/webui.js @@ -87,7 +87,8 @@ $(function() { collapseTablePageLoad('collapse-aggregated-poolActiveStages','aggregated-poolActiveStages'); collapseTablePageLoad('collapse-aggregated-tasks','aggregated-tasks'); collapseTablePageLoad('collapse-aggregated-rdds','aggregated-rdds'); - collapseTablePageLoad('collapse-aggregated-activeBatches','aggregated-activeBatches'); + collapseTablePageLoad('collapse-aggregated-waitingBatches','aggregated-waitingBatches'); + collapseTablePageLoad('collapse-aggregated-runningBatches','aggregated-runningBatches'); collapseTablePageLoad('collapse-aggregated-completedBatches','aggregated-completedBatches'); collapseTablePageLoad('collapse-aggregated-runningExecutions','aggregated-runningExecutions'); collapseTablePageLoad('collapse-aggregated-completedExecutions','aggregated-completedExecutions'); diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala index 1e443f6..c0eec0e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala @@ -17,30 +17,41 @@ package org.apache.spark.streaming.ui -import scala.xml.Node - -import org.apache.spark.ui.{UIUtils => SparkUIUtils} +import java.net.URLEncoder +import java.nio.charset.StandardCharsets.UTF_8 +import javax.servlet.http.HttpServletRequest -private[ui] abstract class BatchTableBase(tableId: String, batchInterval: Long) { +import scala.xml.Node - protected def columns: Seq[Node] = { - <th>Batch Time</th> - <th>Records</th> - <th>Scheduling Delay - {SparkUIUtils.tooltip("Time taken by Streaming scheduler to submit jobs of a batch", "top")} - </th> - <th>Processing Time - {SparkUIUtils.tooltip("Time taken to process all jobs of a batch", "top")}</th> - } +import org.apache.spark.ui.{PagedDataSource, PagedTable, UIUtils => SparkUIUtils} + +private[ui] class StreamingPagedTable( + request: HttpServletRequest, + tableTag: String, + batches: Seq[BatchUIData], + basePath: String, + subPath: String, + batchInterval: Long) extends PagedTable[BatchUIData] { + + private val(sortColumn, desc, pageSize) = getTableParameters(request, tableTag, "Batch Time") + private val parameterPath = s"$basePath/$subPath/?${getParameterOtherTable(request, tableTag)}" + private val encodedSortColumn = URLEncoder.encode(sortColumn, UTF_8.name()) + + private val firstFailureReason: Option[String] = + if (!tableTag.equals("waitingBatches")) { + getFirstFailureReason(batches) + } else { + None + } /** * Return the first failure reason if finding in the batches. */ - protected def getFirstFailureReason(batches: Seq[BatchUIData]): Option[String] = { + private def getFirstFailureReason(batches: Seq[BatchUIData]): Option[String] = { batches.flatMap(_.outputOperations.flatMap(_._2.failureReason)).headOption } - protected def getFirstFailureTableCell(batch: BatchUIData): Seq[Node] = { + private def getFirstFailureTableCell(batch: BatchUIData): Seq[Node] = { val firstFailureReason = batch.outputOperations.flatMap(_._2.failureReason).headOption firstFailureReason.map { failureReason => val failureReasonForUI = UIUtils.createOutputOperationFailureForUI(failureReason) @@ -49,147 +60,154 @@ private[ui] abstract class BatchTableBase(tableId: String, batchInterval: Long) }.getOrElse(<td>-</td>) } - protected def baseRow(batch: BatchUIData): Seq[Node] = { - val batchTime = batch.batchTime.milliseconds - val formattedBatchTime = SparkUIUtils.formatBatchTime(batchTime, batchInterval) - val numRecords = batch.numRecords - val schedulingDelay = batch.schedulingDelay - val formattedSchedulingDelay = schedulingDelay.map(SparkUIUtils.formatDuration).getOrElse("-") - val processingTime = batch.processingDelay - val formattedProcessingTime = processingTime.map(SparkUIUtils.formatDuration).getOrElse("-") - val batchTimeId = s"batch-$batchTime" - - <td id={batchTimeId} sorttable_customkey={batchTime.toString} - isFailed={batch.isFailed.toString}> - <a href={s"batch?id=$batchTime"}> - {formattedBatchTime} - </a> - </td> - <td sorttable_customkey={numRecords.toString}>{numRecords.toString} records</td> - <td sorttable_customkey={schedulingDelay.getOrElse(Long.MaxValue).toString}> - {formattedSchedulingDelay} - </td> - <td sorttable_customkey={processingTime.getOrElse(Long.MaxValue).toString}> - {formattedProcessingTime} - </td> - } - - private def batchTable: Seq[Node] = { - <table id={tableId} class="table table-bordered table-striped table-sm sortable"> - <thead> - {columns} - </thead> - <tbody> - {renderRows} - </tbody> - </table> - } - - def toNodeSeq: Seq[Node] = { - batchTable - } - - protected def createOutputOperationProgressBar(batch: BatchUIData): Seq[Node] = { + private def createOutputOperationProgressBar(batch: BatchUIData): Seq[Node] = { <td class="progress-cell"> { - SparkUIUtils.makeProgressBar( - started = batch.numActiveOutputOp, - completed = batch.numCompletedOutputOp, - failed = batch.numFailedOutputOp, - skipped = 0, - reasonToNumKilled = Map.empty, - total = batch.outputOperations.size) + SparkUIUtils.makeProgressBar( + started = batch.numActiveOutputOp, + completed = batch.numCompletedOutputOp, + failed = batch.numFailedOutputOp, + skipped = 0, + reasonToNumKilled = Map.empty, + total = batch.outputOperations.size) } </td> } - /** - * Return HTML for all rows of this table. - */ - protected def renderRows: Seq[Node] -} + override def tableId: String = s"$tableTag-table" -private[ui] class ActiveBatchTable( - runningBatches: Seq[BatchUIData], - waitingBatches: Seq[BatchUIData], - batchInterval: Long) extends BatchTableBase("active-batches-table", batchInterval) { + override def tableCssClass: String = + "table table-bordered table-sm table-striped table-head-clickable table-cell-width-limited" - private val firstFailureReason = getFirstFailureReason(runningBatches) + override def pageSizeFormField: String = s"$tableTag.pageSize" - override protected def columns: Seq[Node] = super.columns ++ { - <th>Output Ops: Succeeded/Total</th> - <th>Status</th> ++ { - if (firstFailureReason.nonEmpty) { - <th>Error</th> - } else { - Nil - } - } - } + override def pageNumberFormField: String = s"$tableTag.page" - override protected def renderRows: Seq[Node] = { - // The "batchTime"s of "waitingBatches" must be greater than "runningBatches"'s, so display - // waiting batches before running batches - waitingBatches.flatMap(batch => <tr>{waitingBatchRow(batch)}</tr>) ++ - runningBatches.flatMap(batch => <tr>{runningBatchRow(batch)}</tr>) + override def pageLink(page: Int): String = { + parameterPath + + s"&$tableTag.sort=$encodedSortColumn" + + s"&$tableTag.desc=$desc" + + s"&$pageNumberFormField=$page" + + s"&$pageSizeFormField=$pageSize" + + s"#$tableTag" } - private def runningBatchRow(batch: BatchUIData): Seq[Node] = { - baseRow(batch) ++ createOutputOperationProgressBar(batch) ++ <td>processing</td> ++ { - if (firstFailureReason.nonEmpty) { - getFirstFailureTableCell(batch) - } else { - Nil + override def goButtonFormPath: String = + s"$parameterPath&$tableTag.sort=$encodedSortColumn&$tableTag.desc=$desc#$tableTag" + + override def dataSource: PagedDataSource[BatchUIData] = + new StreamingDataSource(batches, pageSize, sortColumn, desc) + + override def headers: Seq[Node] = { + // headers, sortable and tooltips + val headersAndCssClasses: Seq[(String, Boolean, Option[String])] = { + Seq( + ("Batch Time", true, None), + ("Records", true, None), + ("Scheduling Delay", true, Some("Time taken by Streaming scheduler to submit jobs " + + "of a batch")), + ("Processing Time", true, Some("Time taken to process all jobs of a batch"))) ++ { + if (tableTag.equals("completedBatches")) { + Seq( + ("Total Delay", true, Some("Total time taken to handle a batch")), + ("Output Ops: Succeeded/Total", false, None)) + } else { + Seq( + ("Output Ops: Succeeded/Total", false, None), + ("Status", false, None)) + } + } ++ { + if (firstFailureReason.nonEmpty) { + Seq(("Error", false, None)) + } else { + Nil + } } } + // check if sort column is a valid sortable column + isSortColumnValid(headersAndCssClasses, sortColumn) + + headerRow(headersAndCssClasses, desc, pageSize, sortColumn, parameterPath, tableTag, tableTag) } - private def waitingBatchRow(batch: BatchUIData): Seq[Node] = { - baseRow(batch) ++ createOutputOperationProgressBar(batch) ++ <td>queued</td>++ { - if (firstFailureReason.nonEmpty) { - // Waiting batches have not run yet, so must have no failure reasons. - <td>-</td> - } else { - Nil + override def row(batch: BatchUIData): Seq[Node] = { + val batchTime = batch.batchTime.milliseconds + val formattedBatchTime = SparkUIUtils.formatBatchTime(batchTime, batchInterval) + val numRecords = batch.numRecords + val schedulingDelay = batch.schedulingDelay + val formattedSchedulingDelay = schedulingDelay.map(SparkUIUtils.formatDuration).getOrElse("-") + val processingTime = batch.processingDelay + val formattedProcessingTime = processingTime.map(SparkUIUtils.formatDuration).getOrElse("-") + val batchTimeId = s"batch-$batchTime" + val totalDelay = batch.totalDelay + val formattedTotalDelay = totalDelay.map(SparkUIUtils.formatDuration).getOrElse("-") + + <tr> + <td id={batchTimeId} isFailed={batch.isFailed.toString}> + <a href={s"batch?id=$batchTime"}> + {formattedBatchTime} + </a> + </td> + <td> {numRecords.toString} records </td> + <td> {formattedSchedulingDelay} </td> + <td> {formattedProcessingTime} </td> + { + if (tableTag.equals("completedBatches")) { + <td> {formattedTotalDelay} </td> ++ + createOutputOperationProgressBar(batch) ++ { + if (firstFailureReason.nonEmpty) { + getFirstFailureTableCell(batch) + } else { + Nil + } + } + } else if (tableTag.equals("runningBatches")) { + createOutputOperationProgressBar(batch) ++ + <td> processing </td> ++ { + if (firstFailureReason.nonEmpty) { + getFirstFailureTableCell(batch) + } else { + Nil + } + } + } else { + createOutputOperationProgressBar(batch) ++ + <td> queued </td> ++ { + if (firstFailureReason.nonEmpty) { + // Waiting batches have not run yet, so must have no failure reasons. + <td>-</td> + } else { + Nil + } + } + } } - } + </tr> } } -private[ui] class CompletedBatchTable(batches: Seq[BatchUIData], batchInterval: Long) - extends BatchTableBase("completed-batches-table", batchInterval) { +private[ui] class StreamingDataSource(info: Seq[BatchUIData], pageSize: Int, sortColumn: String, + desc: Boolean) extends PagedDataSource[BatchUIData](pageSize) { - private val firstFailureReason = getFirstFailureReason(batches) + private val data = info.sorted(ordering(sortColumn, desc)) - override protected def columns: Seq[Node] = super.columns ++ { - <th>Total Delay {SparkUIUtils.tooltip("Total time taken to handle a batch", "top")}</th> - <th>Output Ops: Succeeded/Total</th> ++ { - if (firstFailureReason.nonEmpty) { - <th>Error</th> - } else { - Nil - } - } - } + override protected def dataSize: Int = data.size - override protected def renderRows: Seq[Node] = { - batches.flatMap(batch => <tr>{completedBatchRow(batch)}</tr>) - } + override protected def sliceData(from: Int, to: Int): Seq[BatchUIData] = data.slice(from, to) - private def completedBatchRow(batch: BatchUIData): Seq[Node] = { - val totalDelay = batch.totalDelay - val formattedTotalDelay = totalDelay.map(SparkUIUtils.formatDuration).getOrElse("-") - - baseRow(batch) ++ { - <td sorttable_customkey={totalDelay.getOrElse(Long.MaxValue).toString}> - {formattedTotalDelay} - </td> - } ++ createOutputOperationProgressBar(batch)++ { - if (firstFailureReason.nonEmpty) { - getFirstFailureTableCell(batch) - } else { - Nil - } + private def ordering(column: String, desc: Boolean): Ordering[BatchUIData] = { + val ordering: Ordering[BatchUIData] = column match { + case "Batch Time" => Ordering.by(_.batchTime.milliseconds) + case "Records" => Ordering.by(_.numRecords) + case "Scheduling Delay" => Ordering.by(_.schedulingDelay.getOrElse(Long.MaxValue)) + case "Processing Time" => Ordering.by(_.processingDelay.getOrElse(Long.MaxValue)) + case "Total Delay" => Ordering.by(_.totalDelay.getOrElse(Long.MaxValue)) + case unknownColumn => throw new IllegalArgumentException(s"Unknown Column: $unknownColumn") + } + if (desc) { + ordering.reverse + } else { + ordering } } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala index 3bdf009..42d0e50 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala @@ -20,10 +20,12 @@ package org.apache.spark.streaming.ui import java.util.concurrent.TimeUnit import javax.servlet.http.HttpServletRequest +import scala.collection.mutable import scala.xml.{Node, Unparsed} import org.apache.spark.internal.Logging import org.apache.spark.ui.{GraphUIData, JsCollector, UIUtils => SparkUIUtils, WebUIPage} +import org.apache.spark.util.Utils /** * A helper class for "scheduling delay", "processing time" and "total delay" to generate data that @@ -86,7 +88,7 @@ private[ui] class StreamingPage(parent: StreamingTab) onClickTimelineFunc ++ basicInfo ++ listener.synchronized { generateStatTable() ++ - generateBatchListTables() + generateBatchListTables(request) } SparkUIUtils.headerSparkPage(request, "Streaming Statistics", content, parent) } @@ -432,50 +434,97 @@ private[ui] class StreamingPage(parent: StreamingTab) </tr> } - private def generateBatchListTables(): Seq[Node] = { + private def streamingTable(request: HttpServletRequest, batches: Seq[BatchUIData], + tableTag: String): Seq[Node] = { + val interval: Long = listener.batchDuration + val streamingPage = Option(request.getParameter(s"$tableTag.page")).map(_.toInt).getOrElse(1) + + try { + new StreamingPagedTable( + request, + tableTag, + batches, + SparkUIUtils.prependBaseUri(request, parent.basePath), + "streaming", + interval + ).table(streamingPage) + } catch { + case e @ (_: IllegalArgumentException | _: IndexOutOfBoundsException) => + <div class="alert alert-error"> + <p>Error while rendering streaming table:</p> + <pre> + {Utils.exceptionString(e)} + </pre> + </div> + } + } + + private def generateBatchListTables(request: HttpServletRequest): Seq[Node] = { val runningBatches = listener.runningBatches.sortBy(_.batchTime.milliseconds).reverse val waitingBatches = listener.waitingBatches.sortBy(_.batchTime.milliseconds).reverse val completedBatches = listener.retainedCompletedBatches. sortBy(_.batchTime.milliseconds).reverse - val activeBatchesContent = { - <div class="row"> - <div class="col-12"> - <span id="activeBatches" class="collapse-aggregated-activeBatches collapse-table" - onClick="collapseTable('collapse-aggregated-activeBatches', - 'aggregated-activeBatches')"> - <h4> - <span class="collapse-table-arrow arrow-open"></span> - <a>Active Batches ({runningBatches.size + waitingBatches.size})</a> - </h4> - </span> - <div class="aggregated-activeBatches collapsible-table"> - {new ActiveBatchTable(runningBatches, waitingBatches, listener.batchDuration).toNodeSeq} + val content = mutable.ListBuffer[Node]() + + if (runningBatches.nonEmpty) { + content ++= + <div class="row"> + <div class="col-12"> + <span id="runningBatches" class="collapse-aggregated-runningBatches collapse-table" + onClick="collapseTable('collapse-aggregated-runningBatches', + 'aggregated-runningBatches')"> + <h4> + <span class="collapse-table-arrow arrow-open"></span> + <a>Running Batches ({runningBatches.size})</a> + </h4> + </span> + <div class="aggregated-runningBatches collapsible-table"> + { streamingTable(request, runningBatches, "runningBatches") } + </div> </div> </div> - </div> } - val completedBatchesContent = { - <div class="row"> - <div class="col-12"> - <span id="completedBatches" class="collapse-aggregated-completedBatches collapse-table" - onClick="collapseTable('collapse-aggregated-completedBatches', - 'aggregated-completedBatches')"> - <h4> - <span class="collapse-table-arrow arrow-open"></span> - <a>Completed Batches (last {completedBatches.size} - out of {listener.numTotalCompletedBatches})</a> - </h4> - </span> - <div class="aggregated-completedBatches collapsible-table"> - {new CompletedBatchTable(completedBatches, listener.batchDuration).toNodeSeq} + if (waitingBatches.nonEmpty) { + content ++= + <div class="row"> + <div class="col-12"> + <span id="waitingBatches" class="collapse-aggregated-waitingBatches collapse-table" + onClick="collapseTable('collapse-aggregated-waitingBatches', + 'aggregated-waitingBatches')"> + <h4> + <span class="collapse-table-arrow arrow-open"></span> + <a>Waiting Batches ({waitingBatches.size})</a> + </h4> + </span> + <div class="aggregated-waitingBatches collapsible-table"> + { streamingTable(request, waitingBatches, "waitingBatches") } + </div> </div> </div> - </div> } - activeBatchesContent ++ completedBatchesContent + if (completedBatches.nonEmpty) { + content ++= + <div class="row"> + <div class="col-12"> + <span id="completedBatches" class="collapse-aggregated-completedBatches collapse-table" + onClick="collapseTable('collapse-aggregated-completedBatches', + 'aggregated-completedBatches')"> + <h4> + <span class="collapse-table-arrow arrow-open"></span> + <a>Completed Batches (last {completedBatches.size} + out of {listener.numTotalCompletedBatches})</a> + </h4> + </span> + <div class="aggregated-completedBatches collapsible-table"> + { streamingTable(request, completedBatches, "completedBatches") } + </div> + </div> + </div> + } + content } } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala index bdc9e9e..7041e46 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala @@ -63,7 +63,7 @@ class UISeleniumSuite .setMaster("local") .setAppName("test") .set(UI_ENABLED, true) - val ssc = new StreamingContext(conf, Seconds(1)) + val ssc = new StreamingContext(conf, Milliseconds(100)) assert(ssc.sc.ui.isDefined, "Spark UI is not started!") ssc } @@ -104,7 +104,7 @@ class UISeleniumSuite find(cssSelector( """ul li a[href*="streaming"]""")) should not be (None) } - eventually(timeout(10.seconds), interval(50.milliseconds)) { + eventually(timeout(10.seconds), interval(500.milliseconds)) { // check whether streaming page exists go to (sparkUI.webUrl.stripSuffix("/") + "/streaming") val h3Text = findAll(cssSelector("h3")).map(_.text).toSeq @@ -125,24 +125,37 @@ class UISeleniumSuite // Check batch tables val h4Text = findAll(cssSelector("h4")).map(_.text).toSeq - h4Text.exists(_.matches("Active Batches \\(\\d+\\)")) should be (true) h4Text.exists(_.matches("Completed Batches \\(last \\d+ out of \\d+\\)")) should be (true) - findAll(cssSelector("""#active-batches-table th""")).map(_.text).toSeq should be { - List("Batch Time", "Records", "Scheduling Delay (?)", "Processing Time (?)", - "Output Ops: Succeeded/Total", "Status") - } - findAll(cssSelector("""#completed-batches-table th""")).map(_.text).toSeq should be { - List("Batch Time", "Records", "Scheduling Delay (?)", "Processing Time (?)", - "Total Delay (?)", "Output Ops: Succeeded/Total") + val arrow = 0x25BE.toChar + findAll(cssSelector("""#completedBatches-table th""")).map(_.text).toList should be { + List(s"Batch Time $arrow", "Records", "Scheduling Delay", "Processing Time", + "Total Delay", "Output Ops: Succeeded/Total") } - val batchLinks = - findAll(cssSelector("""#completed-batches-table a""")).flatMap(_.attribute("href")).toSeq + val pageSize = 1 + val pagedTablePath = "/streaming/?completedBatches.sort=Batch+Time" + + "&completedBatches.desc=true&completedBatches.page=1" + + s"&completedBatches.pageSize=$pageSize#completedBatches" + + go to (sparkUI.webUrl.stripSuffix("/") + pagedTablePath) + val completedTableRows = findAll(cssSelector("""#completedBatches-table tr""")) + .map(_.text).toList + // header row + pagesize + completedTableRows.length should be (1 + pageSize) + + val sortedBatchTimePath = "/streaming/?&completedBatches.sort=Batch+Time" + + s"&completedBatches.desc=false&completedBatches.pageSize=$pageSize#completedBatches" + + // sort batches in ascending order of batch time + go to (sparkUI.webUrl.stripSuffix("/") + sortedBatchTimePath) + + val batchLinks = findAll(cssSelector("""#completedBatches-table td a""")) + .flatMap(_.attribute("href")).toSeq batchLinks.size should be >= 1 // Check a normal batch page - go to (batchLinks.last) // Last should be the first batch, so it will have some jobs + go to (batchLinks.head) // Head is the first batch, so it will have some jobs val summaryText = findAll(cssSelector("li strong")).map(_.text).toSeq summaryText should contain ("Batch Duration:") summaryText should contain ("Input data size:") --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org