Repository: spark
Updated Branches:
  refs/heads/branch-1.4 99897fe3e -> 2e8a141b5


[SPARK-7305] [STREAMING] [WEBUI] Make BatchPage show friendly information when 
jobs are dropped by SparkListener

If jobs are dropped by SparkListener, at least we can show the job ids in 
BatchPage. Screenshot:

![b1](https://cloud.githubusercontent.com/assets/1000778/7434968/f19aa784-eff3-11e4-8f86-36a073873574.png)

Author: zsxwing <[email protected]>

Closes #5840 from zsxwing/SPARK-7305 and squashes the following commits:

aca0ba6 [zsxwing] Fix the code style
718765e [zsxwing] Make generateNormalJobRow private
8073b03 [zsxwing] Merge branch 'master' into SPARK-7305
83dec11 [zsxwing] Make BatchPage show friendly information when jobs are 
dropped by SparkListener

(cherry picked from commit 22ab70e06ede65ca865073fe36c859042a920aa3)
Signed-off-by: Tathagata Das <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2e8a141b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2e8a141b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2e8a141b

Branch: refs/heads/branch-1.4
Commit: 2e8a141b5ace2d1c0c89168aa235b0490ec7d763
Parents: 99897fe
Author: zsxwing <[email protected]>
Authored: Thu May 7 17:34:44 2015 -0700
Committer: Tathagata Das <[email protected]>
Committed: Thu May 7 17:34:59 2015 -0700

----------------------------------------------------------------------
 .../apache/spark/streaming/ui/BatchPage.scala   | 136 +++++++++++++++----
 1 file changed, 106 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2e8a141b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
index 3f1cab6..831f60e 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
@@ -19,7 +19,7 @@ package org.apache.spark.streaming.ui
 
 import javax.servlet.http.HttpServletRequest
 
-import scala.xml.{NodeSeq, Node}
+import scala.xml.{NodeSeq, Node, Text}
 
 import org.apache.commons.lang3.StringEscapeUtils
 
@@ -28,6 +28,7 @@ import org.apache.spark.ui.{UIUtils => SparkUIUtils, 
WebUIPage}
 import org.apache.spark.streaming.ui.StreamingJobProgressListener.{SparkJobId, 
OutputOpId}
 import org.apache.spark.ui.jobs.UIData.JobUIData
 
+private case class SparkJobIdWithUIData(sparkJobId: SparkJobId, jobUIData: 
Option[JobUIData])
 
 private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
   private val streamingListener = parent.listener
@@ -44,25 +45,33 @@ private[ui] class BatchPage(parent: StreamingTab) extends 
WebUIPage("batch") {
       <th>Error</th>
   }
 
+  private def generateJobRow(
+      outputOpId: OutputOpId,
+      outputOpDescription: Seq[Node],
+      formattedOutputOpDuration: String,
+      numSparkJobRowsInOutputOp: Int,
+      isFirstRow: Boolean,
+      sparkJob: SparkJobIdWithUIData): Seq[Node] = {
+    if (sparkJob.jobUIData.isDefined) {
+      generateNormalJobRow(outputOpId, outputOpDescription, 
formattedOutputOpDuration,
+        numSparkJobRowsInOutputOp, isFirstRow, sparkJob.jobUIData.get)
+    } else {
+      generateDroppedJobRow(outputOpId, outputOpDescription, 
formattedOutputOpDuration,
+        numSparkJobRowsInOutputOp, isFirstRow, sparkJob.sparkJobId)
+    }
+  }
+
   /**
    * Generate a row for a Spark Job. Because duplicated output op infos needs 
to be collapsed into
    * one cell, we use "rowspan" for the first row of a output op.
    */
-  def generateJobRow(
+  private def generateNormalJobRow(
       outputOpId: OutputOpId,
+      outputOpDescription: Seq[Node],
       formattedOutputOpDuration: String,
       numSparkJobRowsInOutputOp: Int,
       isFirstRow: Boolean,
       sparkJob: JobUIData): Seq[Node] = {
-    val lastStageInfo = Option(sparkJob.stageIds)
-      .filter(_.nonEmpty)
-      .flatMap { ids => sparkListener.stageIdToInfo.get(ids.max) }
-    val lastStageData = lastStageInfo.flatMap { s =>
-      sparkListener.stageIdToData.get((s.stageId, s.attemptId))
-    }
-
-    val lastStageName = lastStageInfo.map(_.name).getOrElse("(Unknown Stage 
Name)")
-    val lastStageDescription = 
lastStageData.flatMap(_.description).getOrElse("")
     val duration: Option[Long] = {
       sparkJob.submissionTime.map { start =>
         val end = sparkJob.completionTime.getOrElse(System.currentTimeMillis())
@@ -83,9 +92,7 @@ private[ui] class BatchPage(parent: StreamingTab) extends 
WebUIPage("batch") {
       if (isFirstRow) {
         <td class="output-op-id-cell" 
rowspan={numSparkJobRowsInOutputOp.toString}>{outputOpId.toString}</td>
         <td rowspan={numSparkJobRowsInOutputOp.toString}>
-          <span class="description-input" title={lastStageDescription}>
-            {lastStageDescription}
-          </span>{lastStageName}
+          {outputOpDescription}
         </td>
         <td 
rowspan={numSparkJobRowsInOutputOp.toString}>{formattedOutputOpDuration}</td>
       } else {
@@ -122,27 +129,96 @@ private[ui] class BatchPage(parent: StreamingTab) extends 
WebUIPage("batch") {
     </tr>
   }
 
-  private def generateOutputOpIdRow(
-      outputOpId: OutputOpId, sparkJobs: Seq[JobUIData]): Seq[Node] = {
-    val sparkjobDurations = sparkJobs.map(sparkJob => {
-      sparkJob.submissionTime.map { start =>
-        val end = sparkJob.completionTime.getOrElse(System.currentTimeMillis())
-        end - start
+  /**
+   * If a job is dropped by sparkListener due to exceeding the limitation, we 
only show the job id
+   * with "-" cells.
+   */
+  private def generateDroppedJobRow(
+      outputOpId: OutputOpId,
+      outputOpDescription: Seq[Node],
+      formattedOutputOpDuration: String,
+      numSparkJobRowsInOutputOp: Int,
+      isFirstRow: Boolean,
+      jobId: Int): Seq[Node] = {
+    // In the first row, output op id and its information needs to be shown. 
In other rows, these
+    // cells will be taken up due to "rowspan".
+    // scalastyle:off
+    val prefixCells =
+      if (isFirstRow) {
+        <td class="output-op-id-cell" 
rowspan={numSparkJobRowsInOutputOp.toString}>{outputOpId.toString}</td>
+          <td 
rowspan={numSparkJobRowsInOutputOp.toString}>{outputOpDescription}</td>
+          <td 
rowspan={numSparkJobRowsInOutputOp.toString}>{formattedOutputOpDuration}</td>
+      } else {
+        Nil
       }
-    })
+    // scalastyle:on
+
+    <tr>
+      {prefixCells}
+      <td sorttable_customkey={jobId.toString}>
+        {jobId.toString}
+      </td>
+      <!-- Duration -->
+      <td>-</td>
+      <!-- Stages: Succeeded/Total -->
+      <td>-</td>
+      <!-- Tasks (for all stages): Succeeded/Total -->
+      <td>-</td>
+      <!-- Error -->
+      <td>-</td>
+    </tr>
+  }
+
+  private def generateOutputOpIdRow(
+      outputOpId: OutputOpId, sparkJobs: Seq[SparkJobIdWithUIData]): Seq[Node] 
= {
+    // We don't count the durations of dropped jobs
+    val sparkJobDurations = 
sparkJobs.filter(_.jobUIData.nonEmpty).map(_.jobUIData.get).
+      map(sparkJob => {
+        sparkJob.submissionTime.map { start =>
+          val end = 
sparkJob.completionTime.getOrElse(System.currentTimeMillis())
+          end - start
+        }
+      })
     val formattedOutputOpDuration =
-      if (sparkjobDurations.exists(_ == None)) {
-        // If any job does not finish, set "formattedOutputOpDuration" to "-"
+      if (sparkJobDurations.isEmpty || sparkJobDurations.exists(_ == None)) {
+        // If no job or any job does not finish, set 
"formattedOutputOpDuration" to "-"
         "-"
       } else {
-        SparkUIUtils.formatDuration(sparkjobDurations.flatMap(x => x).sum)
+        SparkUIUtils.formatDuration(sparkJobDurations.flatMap(x => x).sum)
       }
-    generateJobRow(outputOpId, formattedOutputOpDuration, sparkJobs.size, 
true, sparkJobs.head) ++
+
+    val description = generateOutputOpDescription(sparkJobs)
+
+    generateJobRow(
+      outputOpId, description, formattedOutputOpDuration, sparkJobs.size, 
true, sparkJobs.head) ++
       sparkJobs.tail.map { sparkJob =>
-        generateJobRow(outputOpId, formattedOutputOpDuration, sparkJobs.size, 
false, sparkJob)
+        generateJobRow(
+          outputOpId, description, formattedOutputOpDuration, sparkJobs.size, 
false, sparkJob)
       }.flatMap(x => x)
   }
 
+  private def generateOutputOpDescription(sparkJobs: 
Seq[SparkJobIdWithUIData]): Seq[Node] = {
+    val lastStageInfo =
+      sparkJobs.flatMap(_.jobUIData).headOption. // Get the first JobUIData
+        flatMap { sparkJob => // For the first job, get the latest Stage info
+          if (sparkJob.stageIds.isEmpty) {
+            None
+          } else {
+            sparkListener.stageIdToInfo.get(sparkJob.stageIds.max)
+          }
+        }
+    val lastStageData = lastStageInfo.flatMap { s =>
+      sparkListener.stageIdToData.get((s.stageId, s.attemptId))
+    }
+
+    val lastStageName = lastStageInfo.map(_.name).getOrElse("(Unknown Stage 
Name)")
+    val lastStageDescription = 
lastStageData.flatMap(_.description).getOrElse("")
+
+    <span class="description-input" title={lastStageDescription}>
+      {lastStageDescription}
+    </span> ++ Text(lastStageName)
+  }
+
   private def failureReasonCell(failureReason: String): Seq[Node] = {
     val isMultiline = failureReason.indexOf('\n') >= 0
     // Display the first line by default
@@ -187,10 +263,10 @@ private[ui] class BatchPage(parent: StreamingTab) extends 
WebUIPage("batch") {
         (outputOpId, outputOpIdAndSparkJobIds.map(_.sparkJobId).sorted)
       }
     sparkListener.synchronized {
-      val outputOpIdWithJobs: Seq[(OutputOpId, Seq[JobUIData])] =
+      val outputOpIdWithJobs: Seq[(OutputOpId, Seq[SparkJobIdWithUIData])] =
         outputOpIdToSparkJobIds.map { case (outputOpId, sparkJobIds) =>
-          // Filter out spark Job ids that don't exist in sparkListener
-          (outputOpId, sparkJobIds.flatMap(getJobData))
+          (outputOpId,
+            sparkJobIds.map(sparkJobId => SparkJobIdWithUIData(sparkJobId, 
getJobData(sparkJobId))))
         }
 
       <table id="batch-job-table" class="table table-bordered table-striped 
table-condensed">
@@ -200,7 +276,7 @@ private[ui] class BatchPage(parent: StreamingTab) extends 
WebUIPage("batch") {
         <tbody>
           {
             outputOpIdWithJobs.map {
-              case (outputOpId, jobs) => generateOutputOpIdRow(outputOpId, 
jobs)
+              case (outputOpId, sparkJobIds) => 
generateOutputOpIdRow(outputOpId, sparkJobIds)
             }
           }
         </tbody>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to