This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 9c583b8  [SPARK-30984][SS] Add UI test for Structured Streaming UI
9c583b8 is described below

commit 9c583b8aff4f3d5799524619f4997281ae428da5
Author: Shixiong Zhu <zsxw...@gmail.com>
AuthorDate: Wed Mar 4 13:55:34 2020 +0800

    [SPARK-30984][SS] Add UI test for Structured Streaming UI
    
    ### What changes were proposed in this pull request?
    
    - Add a UI test for Structured Streaming UI
    - Fix the unsafe usages of `SimpleDateFormat` by using a ThreadLocal shared 
object.
    - Use `start` to replace `submission` to be consistent with the API 
`StreamingQuery.start()`.
    
    ### Why are the changes needed?
    
    Structured Streaming UI is missing UI tests.
    
    ### Does this PR introduce any user-facing change?
    
    No
    
    ### How was this patch tested?
    
    The new test.
    
    Closes #27732 from zsxwing/ss-ui-test.
    
    Authored-by: Shixiong Zhu <zsxw...@gmail.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
    (cherry picked from commit ebfff7af6a9b2d672871317d30161cdafaa32ca4)
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 sql/core/pom.xml                                   |  10 ++
 .../sql/execution/streaming/ProgressReporter.scala |   2 +-
 .../sql/execution/streaming/StreamExecution.scala  |   4 +-
 .../sql/streaming/StreamingQueryListener.scala     |   4 +-
 .../sql/streaming/ui/StreamingQueryPage.scala      |  22 ++--
 .../ui/StreamingQueryStatisticsPage.scala          |  38 +++---
 .../ui/StreamingQueryStatusListener.scala          |  13 +-
 .../apache/spark/sql/streaming/ui/UIUtils.scala    |  17 +++
 .../streaming/StreamingQueryListenerSuite.scala    |   6 +-
 .../sql/streaming/ui/StreamingQueryPageSuite.scala |   2 +-
 .../ui/StreamingQueryStatusListenerSuite.scala     |   9 +-
 .../spark/sql/streaming/ui/UISeleniumSuite.scala   | 145 +++++++++++++++++++++
 12 files changed, 221 insertions(+), 51 deletions(-)

diff --git a/sql/core/pom.xml b/sql/core/pom.xml
index 0e664ec..37da614 100644
--- a/sql/core/pom.xml
+++ b/sql/core/pom.xml
@@ -150,6 +150,16 @@
       <artifactId>mockito-core</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.seleniumhq.selenium</groupId>
+      <artifactId>selenium-java</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.seleniumhq.selenium</groupId>
+      <artifactId>selenium-htmlunit-driver</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
   <build>
     
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
index f20291e..feb151a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
@@ -349,7 +349,7 @@ trait ProgressReporter extends Logging {
     result
   }
 
-  private def formatTimestamp(millis: Long): String = {
+  protected def formatTimestamp(millis: Long): String = {
     timestampFormat.format(new Date(millis))
   }
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 8b3534b..8006437 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -307,8 +307,8 @@ abstract class StreamExecution(
       }
 
       // `postEvent` does not throw non fatal exception.
-      val submissionTime = triggerClock.getTimeMillis()
-      postEvent(new QueryStartedEvent(id, runId, name, submissionTime))
+      val startTimestamp = triggerClock.getTimeMillis()
+      postEvent(new QueryStartedEvent(id, runId, name, 
formatTimestamp(startTimestamp)))
 
       // Unblock starting thread
       startLatch.countDown()
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
index dd842cd..7ae38c7 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
@@ -82,7 +82,7 @@ object StreamingQueryListener {
    * @param id A unique query id that persists across restarts. See 
`StreamingQuery.id()`.
    * @param runId A query id that is unique for every start/restart. See 
`StreamingQuery.runId()`.
    * @param name User-specified name of the query, null if not specified.
-   * @param submissionTime The timestamp to start a query.
+   * @param timestamp The timestamp to start a query.
    * @since 2.1.0
    */
   @Evolving
@@ -90,7 +90,7 @@ object StreamingQueryListener {
       val id: UUID,
       val runId: UUID,
       val name: String,
-      val submissionTime: Long) extends Event
+      val timestamp: String) extends Event
 
   /**
    * Event representing any progress updates in a query.
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPage.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPage.scala
index 650f64f..7336765 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPage.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPage.scala
@@ -17,22 +17,18 @@
 
 package org.apache.spark.sql.streaming.ui
 
-import java.text.SimpleDateFormat
 import javax.servlet.http.HttpServletRequest
 
 import scala.xml.Node
 
-import org.apache.commons.lang3.StringEscapeUtils
+import org.apache.commons.text.StringEscapeUtils
 
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.catalyst.util.DateTimeUtils.getTimeZone
 import org.apache.spark.sql.streaming.ui.UIUtils._
 import org.apache.spark.ui.{UIUtils => SparkUIUtils, WebUIPage}
 
 private[ui] class StreamingQueryPage(parent: StreamingQueryTab)
     extends WebUIPage("") with Logging {
-  val df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")
-  df.setTimeZone(getTimeZone("UTC"))
 
   override def render(request: HttpServletRequest): Seq[Node] = {
     val content = generateStreamingQueryTable(request)
@@ -61,11 +57,11 @@ private[ui] class StreamingQueryPage(parent: 
StreamingQueryTab)
     val name = UIUtils.getQueryName(query)
     val status = UIUtils.getQueryStatus(query)
     val duration = if (queryActive) {
-      SparkUIUtils.formatDurationVerbose(System.currentTimeMillis() - 
query.submissionTime)
+      SparkUIUtils.formatDurationVerbose(System.currentTimeMillis() - 
query.startTimestamp)
     } else {
       withNoProgress(query, {
         val endTimeMs = query.lastProgress.timestamp
-        SparkUIUtils.formatDurationVerbose(df.parse(endTimeMs).getTime - 
query.submissionTime)
+        SparkUIUtils.formatDurationVerbose(parseProgressTimestamp(endTimeMs) - 
query.startTimestamp)
       }, "-")
     }
 
@@ -74,7 +70,7 @@ private[ui] class StreamingQueryPage(parent: 
StreamingQueryTab)
       <td> {status} </td>
       <td> {query.id} </td>
       <td> <a href={statisticsLink}> {query.runId} </a> </td>
-      <td> {SparkUIUtils.formatDate(query.submissionTime)} </td>
+      <td> {SparkUIUtils.formatDate(query.startTimestamp)} </td>
       <td> {duration} </td>
       <td> {withNoProgress(query, {
         (query.recentProgress.map(p => 
withNumberInvalid(p.inputRowsPerSecond)).sum /
@@ -94,29 +90,29 @@ private[ui] class StreamingQueryPage(parent: 
StreamingQueryTab)
       .partition(_.isActive)
     val activeQueryTables = if (activeQueries.nonEmpty) {
       val headerRow = Seq(
-        "Name", "Status", "Id", "Run ID", "Submitted Time", "Duration", "Avg 
Input /sec",
+        "Name", "Status", "Id", "Run ID", "Start Time", "Duration", "Avg Input 
/sec",
         "Avg Process /sec", "Lastest Batch")
 
       Some(SparkUIUtils.listingTable(headerRow, generateDataRow(request, 
queryActive = true),
-        activeQueries, true, None, Seq(null), false))
+        activeQueries, true, Some("activeQueries-table"), Seq(null), false))
     } else {
       None
     }
 
     val inactiveQueryTables = if (inactiveQueries.nonEmpty) {
       val headerRow = Seq(
-        "Name", "Status", "Id", "Run ID", "Submitted Time", "Duration", "Avg 
Input /sec",
+        "Name", "Status", "Id", "Run ID", "Start Time", "Duration", "Avg Input 
/sec",
         "Avg Process /sec", "Lastest Batch", "Error")
 
       Some(SparkUIUtils.listingTable(headerRow, generateDataRow(request, 
queryActive = false),
-        inactiveQueries, true, None, Seq(null), false))
+        inactiveQueries, true, Some("completedQueries-table"), Seq(null), 
false))
     } else {
       None
     }
 
     // scalastyle:off
     val content =
-      <span id="completed" class="collapse-aggregated-activeQueries 
collapse-table"
+      <span id="active" class="collapse-aggregated-activeQueries 
collapse-table"
             
onClick="collapseTable('collapse-aggregated-activeQueries','aggregated-activeQueries')">
         <h5 id="activequeries">
           <span class="collapse-table-arrow arrow-open"></span>
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatisticsPage.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatisticsPage.scala
index fa9896e..65052d9 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatisticsPage.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatisticsPage.scala
@@ -19,21 +19,17 @@ package org.apache.spark.sql.streaming.ui
 
 import java.{util => ju}
 import java.lang.{Long => JLong}
-import java.text.SimpleDateFormat
 import java.util.UUID
 import javax.servlet.http.HttpServletRequest
 
 import scala.xml.{Node, Unparsed}
 
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.catalyst.util.DateTimeUtils.getTimeZone
 import org.apache.spark.sql.streaming.ui.UIUtils._
 import org.apache.spark.ui.{GraphUIData, JsCollector, UIUtils => SparkUIUtils, 
WebUIPage}
 
 private[ui] class StreamingQueryStatisticsPage(parent: StreamingQueryTab)
   extends WebUIPage("statistics") with Logging {
-  val df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")
-  df.setTimeZone(getTimeZone("UTC"))
 
   def generateLoadResources(request: HttpServletRequest): Seq[Node] = {
     // scalastyle:off
@@ -101,13 +97,13 @@ private[ui] class StreamingQueryStatisticsPage(parent: 
StreamingQueryTab)
 
   def generateBasicInfo(query: StreamingQueryUIData): Seq[Node] = {
     val duration = if (query.isActive) {
-      SparkUIUtils.formatDurationVerbose(System.currentTimeMillis() - 
query.submissionTime)
+      SparkUIUtils.formatDurationVerbose(System.currentTimeMillis() - 
query.startTimestamp)
     } else {
       withNoProgress(query, {
         val end = query.lastProgress.timestamp
         val start = query.recentProgress.head.timestamp
         SparkUIUtils.formatDurationVerbose(
-          df.parse(end).getTime - df.parse(start).getTime)
+          parseProgressTimestamp(end) - parseProgressTimestamp(start))
       }, "-")
     }
 
@@ -119,7 +115,7 @@ private[ui] class StreamingQueryStatisticsPage(parent: 
StreamingQueryTab)
       </strong>
       since
       <strong>
-        {SparkUIUtils.formatDate(query.submissionTime)}
+        {SparkUIUtils.formatDate(query.startTimestamp)}
       </strong>
       (<strong>{numBatches}</strong> completed batches)
     </div>
@@ -132,13 +128,13 @@ private[ui] class StreamingQueryStatisticsPage(parent: 
StreamingQueryTab)
 
   def generateStatTable(query: StreamingQueryUIData): Seq[Node] = {
     val batchToTimestamps = withNoProgress(query,
-      query.recentProgress.map(p => (p.batchId, 
df.parse(p.timestamp).getTime)),
+      query.recentProgress.map(p => (p.batchId, 
parseProgressTimestamp(p.timestamp))),
       Array.empty[(Long, Long)])
     val batchTimes = batchToTimestamps.map(_._2)
     val minBatchTime =
-      withNoProgress(query, 
df.parse(query.recentProgress.head.timestamp).getTime, 0L)
+      withNoProgress(query, 
parseProgressTimestamp(query.recentProgress.head.timestamp), 0L)
     val maxBatchTime =
-      withNoProgress(query, df.parse(query.lastProgress.timestamp).getTime, 0L)
+      withNoProgress(query, 
parseProgressTimestamp(query.lastProgress.timestamp), 0L)
     val maxRecordRate =
       withNoProgress(query, 
query.recentProgress.map(_.inputRowsPerSecond).max, 0L)
     val minRecordRate = 0L
@@ -152,22 +148,26 @@ private[ui] class StreamingQueryStatisticsPage(parent: 
StreamingQueryTab)
     val minBatchDuration = 0L
 
     val inputRateData = withNoProgress(query,
-      query.recentProgress.map(p => (df.parse(p.timestamp).getTime,
+      query.recentProgress.map(p => (parseProgressTimestamp(p.timestamp),
         withNumberInvalid { p.inputRowsPerSecond })), Array.empty[(Long, 
Double)])
     val processRateData = withNoProgress(query,
-      query.recentProgress.map(p => (df.parse(p.timestamp).getTime,
+      query.recentProgress.map(p => (parseProgressTimestamp(p.timestamp),
         withNumberInvalid { p.processedRowsPerSecond })), Array.empty[(Long, 
Double)])
     val inputRowsData = withNoProgress(query,
-      query.recentProgress.map(p => (df.parse(p.timestamp).getTime,
+      query.recentProgress.map(p => (parseProgressTimestamp(p.timestamp),
         withNumberInvalid { p.numInputRows })), Array.empty[(Long, Double)])
     val batchDurations = withNoProgress(query,
-      query.recentProgress.map(p => (df.parse(p.timestamp).getTime,
+      query.recentProgress.map(p => (parseProgressTimestamp(p.timestamp),
         withNumberInvalid { p.batchDuration })), Array.empty[(Long, Double)])
-    val operationDurationData = withNoProgress(query, query.recentProgress.map 
{ p =>
-      val durationMs = p.durationMs
-      // remove "triggerExecution" as it count the other operation duration.
-      durationMs.remove("triggerExecution")
-      (df.parse(p.timestamp).getTime, durationMs)}, Array.empty[(Long, 
ju.Map[String, JLong])])
+    val operationDurationData = withNoProgress(
+      query,
+      query.recentProgress.map { p =>
+        val durationMs = p.durationMs
+        // remove "triggerExecution" as it count the other operation duration.
+        durationMs.remove("triggerExecution")
+        (parseProgressTimestamp(p.timestamp), durationMs)
+      },
+      Array.empty[(Long, ju.Map[String, JLong])])
 
     val jsCollector = new JsCollector
     val graphUIDataForInputRate =
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala
index 9181511..e331083 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala
@@ -17,7 +17,6 @@
 
 package org.apache.spark.sql.streaming.ui
 
-import java.text.SimpleDateFormat
 import java.util.UUID
 import java.util.concurrent.ConcurrentHashMap
 
@@ -25,9 +24,9 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable
 
 import org.apache.spark.SparkConf
-import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.internal.StaticSQLConf
 import org.apache.spark.sql.streaming.{StreamingQueryListener, 
StreamingQueryProgress}
+import org.apache.spark.sql.streaming.ui.UIUtils.parseProgressTimestamp
 
 /**
  * A customized StreamingQueryListener used in structured streaming UI, which 
contains all
@@ -36,9 +35,6 @@ import 
org.apache.spark.sql.streaming.{StreamingQueryListener, StreamingQueryPro
  */
 private[sql] class StreamingQueryStatusListener(conf: SparkConf) extends 
StreamingQueryListener {
 
-  private val timestampFormat = new 
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601
-  timestampFormat.setTimeZone(DateTimeUtils.getTimeZone("UTC"))
-
   /**
    * We use runId as the key here instead of id in active query status map,
    * because the runId is unique for every started query, even it its a 
restart.
@@ -51,12 +47,13 @@ private[sql] class StreamingQueryStatusListener(conf: 
SparkConf) extends Streami
   private val inactiveQueryStatusRetention = 
conf.get(StaticSQLConf.STREAMING_UI_RETAINED_QUERIES)
 
   override def onQueryStarted(event: 
StreamingQueryListener.QueryStartedEvent): Unit = {
+    val startTimestamp = parseProgressTimestamp(event.timestamp)
     activeQueryStatus.putIfAbsent(event.runId,
-      new StreamingQueryUIData(event.name, event.id, event.runId, 
event.submissionTime))
+      new StreamingQueryUIData(event.name, event.id, event.runId, 
startTimestamp))
   }
 
   override def onQueryProgress(event: 
StreamingQueryListener.QueryProgressEvent): Unit = {
-    val batchTimestamp = 
timestampFormat.parse(event.progress.timestamp).getTime
+    val batchTimestamp = parseProgressTimestamp(event.progress.timestamp)
     val queryStatus = activeQueryStatus.getOrDefault(
       event.progress.runId,
       new StreamingQueryUIData(event.progress.name, event.progress.id, 
event.progress.runId,
@@ -89,7 +86,7 @@ private[ui] class StreamingQueryUIData(
     val name: String,
     val id: UUID,
     val runId: UUID,
-    val submissionTime: Long) {
+    val startTimestamp: Long) {
 
   /** Holds the most recent query progress updates. */
   private val progressBuffer = new mutable.Queue[StreamingQueryProgress]()
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/UIUtils.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/UIUtils.scala
index 57b9dec..cdad5ed 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/UIUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/UIUtils.scala
@@ -17,6 +17,11 @@
 
 package org.apache.spark.sql.streaming.ui
 
+import java.text.SimpleDateFormat
+import java.util.Locale
+
+import org.apache.spark.sql.catalyst.util.DateTimeUtils.getTimeZone
+
 private[ui] object UIUtils {
 
   /**
@@ -57,4 +62,16 @@ private[ui] object UIUtils {
       query.exception.map(_ => "FAILED").getOrElse("FINISHED")
     }
   }
+
+  private val progressTimestampFormat = new ThreadLocal[SimpleDateFormat]() {
+    override def initialValue(): SimpleDateFormat = {
+      val format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // 
ISO8601
+      format.setTimeZone(getTimeZone("UTC"))
+      format
+    }
+  }
+
+  def parseProgressTimestamp(timestamp: String): Long = {
+    progressTimestampFormat.get.parse(timestamp).getTime
+  }
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
index 6bb1646..e585b8a 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
@@ -254,8 +254,10 @@ class StreamingQueryListenerSuite extends StreamTest with 
BeforeAndAfter {
       assert(newEvent.name === event.name)
     }
 
-    testSerialization(new QueryStartedEvent(UUID.randomUUID, UUID.randomUUID, 
"name", 1L))
-    testSerialization(new QueryStartedEvent(UUID.randomUUID, UUID.randomUUID, 
null, 1L))
+    testSerialization(
+      new QueryStartedEvent(UUID.randomUUID, UUID.randomUUID, "name", 
"2016-12-05T20:54:20.827Z"))
+    testSerialization(
+      new QueryStartedEvent(UUID.randomUUID, UUID.randomUUID, null, 
"2016-12-05T20:54:20.827Z"))
   }
 
   test("QueryProgressEvent serialization") {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPageSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPageSuite.scala
index de43e47..2a1e18a 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPageSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPageSuite.scala
@@ -97,7 +97,7 @@ class StreamingQueryPageSuite extends SharedSparkSession with 
BeforeAndAfter {
     when(streamQuery.name).thenReturn("query")
     when(streamQuery.id).thenReturn(id)
     when(streamQuery.runId).thenReturn(id)
-    when(streamQuery.submissionTime).thenReturn(1L)
+    when(streamQuery.startTimestamp).thenReturn(1L)
     when(streamQuery.lastProgress).thenReturn(progress)
     when(streamQuery.recentProgress).thenReturn(Array(progress))
     when(streamQuery.exception).thenReturn(None)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListenerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListenerSuite.scala
index adbb501..6aa440e 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListenerSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListenerSuite.scala
@@ -32,7 +32,8 @@ class StreamingQueryStatusListenerSuite extends StreamTest {
     // hanlde query started event
     val id = UUID.randomUUID()
     val runId = UUID.randomUUID()
-    val startEvent = new StreamingQueryListener.QueryStartedEvent(id, runId, 
"test", 1L)
+    val startEvent = new StreamingQueryListener.QueryStartedEvent(
+      id, runId, "test", "2016-12-05T20:54:20.827Z")
     listener.onQueryStarted(startEvent)
 
     // result checking
@@ -78,7 +79,8 @@ class StreamingQueryStatusListenerSuite extends StreamTest {
     // handle first time start
     val id = UUID.randomUUID()
     val runId0 = UUID.randomUUID()
-    val startEvent0 = new StreamingQueryListener.QueryStartedEvent(id, runId0, 
"test", 1L)
+    val startEvent0 = new StreamingQueryListener.QueryStartedEvent(
+      id, runId0, "test", "2016-12-05T20:54:20.827Z")
     listener.onQueryStarted(startEvent0)
 
     // handle terminate event
@@ -87,7 +89,8 @@ class StreamingQueryStatusListenerSuite extends StreamTest {
 
     // handle second time start
     val runId1 = UUID.randomUUID()
-    val startEvent1 = new StreamingQueryListener.QueryStartedEvent(id, runId1, 
"test", 1L)
+    val startEvent1 = new StreamingQueryListener.QueryStartedEvent(
+      id, runId1, "test", "2016-12-05T20:54:20.827Z")
     listener.onQueryStarted(startEvent1)
 
     // result checking
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/UISeleniumSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/UISeleniumSuite.scala
new file mode 100644
index 0000000..fdf4c66
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/UISeleniumSuite.scala
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming.ui
+
+import org.openqa.selenium.WebDriver
+import org.openqa.selenium.htmlunit.HtmlUnitDriver
+import org.scalatest._
+import org.scalatest.concurrent.Eventually._
+import org.scalatest.time.SpanSugar._
+import org.scalatestplus.selenium.WebBrowser
+
+import org.apache.spark._
+import org.apache.spark.internal.config.UI.{UI_ENABLED, UI_PORT}
+import org.apache.spark.sql.LocalSparkSession.withSparkSession
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.util.quietly
+import org.apache.spark.sql.streaming.StreamingQueryException
+import org.apache.spark.ui.SparkUICssErrorHandler
+
+class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with 
BeforeAndAfterAll {
+
+  implicit var webDriver: WebDriver = _
+
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+    webDriver = new HtmlUnitDriver {
+      getWebClient.setCssErrorHandler(new SparkUICssErrorHandler)
+    }
+  }
+
+  private def newSparkSession(
+      master: String = "local",
+      additionalConfs: Map[String, String] = Map.empty): SparkSession = {
+    val conf = new SparkConf()
+      .setMaster(master)
+      .setAppName("ui-test")
+      .set(UI_ENABLED, true)
+      .set(UI_PORT, 0)
+    additionalConfs.foreach { case (k, v) => conf.set(k, v) }
+    val spark = 
SparkSession.builder().master(master).config(conf).getOrCreate()
+    assert(spark.sparkContext.ui.isDefined)
+    spark
+  }
+
+  def goToUi(spark: SparkSession, path: String): Unit = {
+    go to (spark.sparkContext.ui.get.webUrl.stripSuffix("/") + path)
+  }
+
+  test("SPARK-30984: Structured Streaming UI should be activated when running 
a streaming query") {
+    quietly {
+      withSparkSession(newSparkSession()) { spark =>
+        import spark.implicits._
+        try {
+          spark.range(1, 10).count()
+
+          goToUi(spark, "/StreamingQuery")
+
+          val h3Text = findAll(cssSelector("h3")).map(_.text).toSeq
+          h3Text should not contain ("Streaming Query")
+
+          val activeQuery =
+            
spark.readStream.format("rate").load().writeStream.format("noop").start()
+          val completedQuery =
+            
spark.readStream.format("rate").load().writeStream.format("noop").start()
+          completedQuery.stop()
+          val failedQuery = 
spark.readStream.format("rate").load().select("value").as[Long]
+            .map(_ / 0).writeStream.format("noop").start()
+          try {
+            failedQuery.awaitTermination()
+          } catch {
+            case _: StreamingQueryException =>
+          }
+
+          eventually(timeout(30.seconds), interval(100.milliseconds)) {
+            // Check the query list page
+            goToUi(spark, "/StreamingQuery")
+
+            findAll(cssSelector("h3")).map(_.text).toSeq should 
contain("Streaming Query")
+            findAll(cssSelector("""#activeQueries-table 
th""")).map(_.text).toSeq should be {
+              List("Name", "Status", "Id", "Run ID", "Start Time", "Duration", 
"Avg Input /sec",
+                "Avg Process /sec", "Lastest Batch")
+            }
+            val activeQueries =
+              findAll(cssSelector("""#activeQueries-table 
td""")).map(_.text).toSeq
+            activeQueries should contain(activeQuery.id.toString)
+            activeQueries should contain(activeQuery.runId.toString)
+            findAll(cssSelector("""#completedQueries-table th"""))
+              .map(_.text).toSeq should be {
+                List("Name", "Status", "Id", "Run ID", "Start Time", 
"Duration", "Avg Input /sec",
+                  "Avg Process /sec", "Lastest Batch", "Error")
+              }
+            val completedQueries =
+              findAll(cssSelector("""#completedQueries-table 
td""")).map(_.text).toSeq
+            completedQueries should contain(completedQuery.id.toString)
+            completedQueries should contain(completedQuery.runId.toString)
+            completedQueries should contain(failedQuery.id.toString)
+            completedQueries should contain(failedQuery.runId.toString)
+
+            // Check the query statistics page
+            val activeQueryLink =
+              findAll(cssSelector("""#activeQueries-table 
a""")).flatMap(_.attribute("href")).next
+            go to activeQueryLink
+
+            findAll(cssSelector("h3"))
+              .map(_.text).toSeq should contain("Streaming Query Statistics")
+            val summaryText = findAll(cssSelector("div 
strong")).map(_.text).toSeq
+            summaryText should contain ("Name:")
+            summaryText should contain ("Id:")
+            summaryText should contain ("RunId:")
+            findAll(cssSelector("""#stat-table th""")).map(_.text).toSeq 
should be {
+              List("", "Timelines", "Histograms")
+            }
+          }
+        } finally {
+          spark.streams.active.foreach(_.stop())
+        }
+      }
+    }
+  }
+
+  override def afterAll(): Unit = {
+    try {
+      if (webDriver != null) {
+        webDriver.quit()
+      }
+    } finally {
+      super.afterAll()
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to