This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new ebfff7a [SPARK-30984][SS] Add UI test for Structured Streaming UI
ebfff7a is described below
commit ebfff7af6a9b2d672871317d30161cdafaa32ca4
Author: Shixiong Zhu <[email protected]>
AuthorDate: Wed Mar 4 13:55:34 2020 +0800
[SPARK-30984][SS] Add UI test for Structured Streaming UI
### What changes were proposed in this pull request?
- Add a UI test for Structured Streaming UI
- Fix the unsafe usages of `SimpleDateFormat` by using a ThreadLocal shared
object.
- Use `start` to replace `submission` to be consistent with the API
`StreamingQuery.start()`.
### Why are the changes needed?
Structured Streaming UI is missing UI tests.
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
The new test.
Closes #27732 from zsxwing/ss-ui-test.
Authored-by: Shixiong Zhu <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
sql/core/pom.xml | 10 ++
.../sql/execution/streaming/ProgressReporter.scala | 2 +-
.../sql/execution/streaming/StreamExecution.scala | 4 +-
.../sql/streaming/StreamingQueryListener.scala | 4 +-
.../sql/streaming/ui/StreamingQueryPage.scala | 22 ++--
.../ui/StreamingQueryStatisticsPage.scala | 38 +++---
.../ui/StreamingQueryStatusListener.scala | 13 +-
.../apache/spark/sql/streaming/ui/UIUtils.scala | 17 +++
.../streaming/StreamingQueryListenerSuite.scala | 6 +-
.../sql/streaming/ui/StreamingQueryPageSuite.scala | 2 +-
.../ui/StreamingQueryStatusListenerSuite.scala | 9 +-
.../spark/sql/streaming/ui/UISeleniumSuite.scala | 145 +++++++++++++++++++++
12 files changed, 221 insertions(+), 51 deletions(-)
diff --git a/sql/core/pom.xml b/sql/core/pom.xml
index 6cdc63b..c95fe3c 100644
--- a/sql/core/pom.xml
+++ b/sql/core/pom.xml
@@ -150,6 +150,16 @@
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.seleniumhq.selenium</groupId>
+ <artifactId>selenium-java</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.seleniumhq.selenium</groupId>
+ <artifactId>selenium-htmlunit-driver</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
index f20291e..feb151a 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
@@ -349,7 +349,7 @@ trait ProgressReporter extends Logging {
result
}
- private def formatTimestamp(millis: Long): String = {
+ protected def formatTimestamp(millis: Long): String = {
timestampFormat.format(new Date(millis))
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 8b3534b..8006437 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -307,8 +307,8 @@ abstract class StreamExecution(
}
// `postEvent` does not throw non fatal exception.
- val submissionTime = triggerClock.getTimeMillis()
- postEvent(new QueryStartedEvent(id, runId, name, submissionTime))
+ val startTimestamp = triggerClock.getTimeMillis()
+ postEvent(new QueryStartedEvent(id, runId, name,
formatTimestamp(startTimestamp)))
// Unblock starting thread
startLatch.countDown()
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
index dd842cd..7ae38c7 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
@@ -82,7 +82,7 @@ object StreamingQueryListener {
* @param id A unique query id that persists across restarts. See
`StreamingQuery.id()`.
* @param runId A query id that is unique for every start/restart. See
`StreamingQuery.runId()`.
* @param name User-specified name of the query, null if not specified.
- * @param submissionTime The timestamp to start a query.
+ * @param timestamp The timestamp to start a query.
* @since 2.1.0
*/
@Evolving
@@ -90,7 +90,7 @@ object StreamingQueryListener {
val id: UUID,
val runId: UUID,
val name: String,
- val submissionTime: Long) extends Event
+ val timestamp: String) extends Event
/**
* Event representing any progress updates in a query.
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPage.scala
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPage.scala
index 650f64f..7336765 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPage.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPage.scala
@@ -17,22 +17,18 @@
package org.apache.spark.sql.streaming.ui
-import java.text.SimpleDateFormat
import javax.servlet.http.HttpServletRequest
import scala.xml.Node
-import org.apache.commons.lang3.StringEscapeUtils
+import org.apache.commons.text.StringEscapeUtils
import org.apache.spark.internal.Logging
-import org.apache.spark.sql.catalyst.util.DateTimeUtils.getTimeZone
import org.apache.spark.sql.streaming.ui.UIUtils._
import org.apache.spark.ui.{UIUtils => SparkUIUtils, WebUIPage}
private[ui] class StreamingQueryPage(parent: StreamingQueryTab)
extends WebUIPage("") with Logging {
- val df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")
- df.setTimeZone(getTimeZone("UTC"))
override def render(request: HttpServletRequest): Seq[Node] = {
val content = generateStreamingQueryTable(request)
@@ -61,11 +57,11 @@ private[ui] class StreamingQueryPage(parent:
StreamingQueryTab)
val name = UIUtils.getQueryName(query)
val status = UIUtils.getQueryStatus(query)
val duration = if (queryActive) {
- SparkUIUtils.formatDurationVerbose(System.currentTimeMillis() -
query.submissionTime)
+ SparkUIUtils.formatDurationVerbose(System.currentTimeMillis() -
query.startTimestamp)
} else {
withNoProgress(query, {
val endTimeMs = query.lastProgress.timestamp
- SparkUIUtils.formatDurationVerbose(df.parse(endTimeMs).getTime -
query.submissionTime)
+ SparkUIUtils.formatDurationVerbose(parseProgressTimestamp(endTimeMs) -
query.startTimestamp)
}, "-")
}
@@ -74,7 +70,7 @@ private[ui] class StreamingQueryPage(parent:
StreamingQueryTab)
<td> {status} </td>
<td> {query.id} </td>
<td> <a href={statisticsLink}> {query.runId} </a> </td>
- <td> {SparkUIUtils.formatDate(query.submissionTime)} </td>
+ <td> {SparkUIUtils.formatDate(query.startTimestamp)} </td>
<td> {duration} </td>
<td> {withNoProgress(query, {
(query.recentProgress.map(p =>
withNumberInvalid(p.inputRowsPerSecond)).sum /
@@ -94,29 +90,29 @@ private[ui] class StreamingQueryPage(parent:
StreamingQueryTab)
.partition(_.isActive)
val activeQueryTables = if (activeQueries.nonEmpty) {
val headerRow = Seq(
- "Name", "Status", "Id", "Run ID", "Submitted Time", "Duration", "Avg
Input /sec",
+ "Name", "Status", "Id", "Run ID", "Start Time", "Duration", "Avg Input
/sec",
"Avg Process /sec", "Lastest Batch")
Some(SparkUIUtils.listingTable(headerRow, generateDataRow(request,
queryActive = true),
- activeQueries, true, None, Seq(null), false))
+ activeQueries, true, Some("activeQueries-table"), Seq(null), false))
} else {
None
}
val inactiveQueryTables = if (inactiveQueries.nonEmpty) {
val headerRow = Seq(
- "Name", "Status", "Id", "Run ID", "Submitted Time", "Duration", "Avg
Input /sec",
+ "Name", "Status", "Id", "Run ID", "Start Time", "Duration", "Avg Input
/sec",
"Avg Process /sec", "Lastest Batch", "Error")
Some(SparkUIUtils.listingTable(headerRow, generateDataRow(request,
queryActive = false),
- inactiveQueries, true, None, Seq(null), false))
+ inactiveQueries, true, Some("completedQueries-table"), Seq(null),
false))
} else {
None
}
// scalastyle:off
val content =
- <span id="completed" class="collapse-aggregated-activeQueries
collapse-table"
+ <span id="active" class="collapse-aggregated-activeQueries
collapse-table"
onClick="collapseTable('collapse-aggregated-activeQueries','aggregated-activeQueries')">
<h5 id="activequeries">
<span class="collapse-table-arrow arrow-open"></span>
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatisticsPage.scala
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatisticsPage.scala
index fa9896e..65052d9 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatisticsPage.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatisticsPage.scala
@@ -19,21 +19,17 @@ package org.apache.spark.sql.streaming.ui
import java.{util => ju}
import java.lang.{Long => JLong}
-import java.text.SimpleDateFormat
import java.util.UUID
import javax.servlet.http.HttpServletRequest
import scala.xml.{Node, Unparsed}
import org.apache.spark.internal.Logging
-import org.apache.spark.sql.catalyst.util.DateTimeUtils.getTimeZone
import org.apache.spark.sql.streaming.ui.UIUtils._
import org.apache.spark.ui.{GraphUIData, JsCollector, UIUtils => SparkUIUtils,
WebUIPage}
private[ui] class StreamingQueryStatisticsPage(parent: StreamingQueryTab)
extends WebUIPage("statistics") with Logging {
- val df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")
- df.setTimeZone(getTimeZone("UTC"))
def generateLoadResources(request: HttpServletRequest): Seq[Node] = {
// scalastyle:off
@@ -101,13 +97,13 @@ private[ui] class StreamingQueryStatisticsPage(parent:
StreamingQueryTab)
def generateBasicInfo(query: StreamingQueryUIData): Seq[Node] = {
val duration = if (query.isActive) {
- SparkUIUtils.formatDurationVerbose(System.currentTimeMillis() -
query.submissionTime)
+ SparkUIUtils.formatDurationVerbose(System.currentTimeMillis() -
query.startTimestamp)
} else {
withNoProgress(query, {
val end = query.lastProgress.timestamp
val start = query.recentProgress.head.timestamp
SparkUIUtils.formatDurationVerbose(
- df.parse(end).getTime - df.parse(start).getTime)
+ parseProgressTimestamp(end) - parseProgressTimestamp(start))
}, "-")
}
@@ -119,7 +115,7 @@ private[ui] class StreamingQueryStatisticsPage(parent:
StreamingQueryTab)
</strong>
since
<strong>
- {SparkUIUtils.formatDate(query.submissionTime)}
+ {SparkUIUtils.formatDate(query.startTimestamp)}
</strong>
(<strong>{numBatches}</strong> completed batches)
</div>
@@ -132,13 +128,13 @@ private[ui] class StreamingQueryStatisticsPage(parent:
StreamingQueryTab)
def generateStatTable(query: StreamingQueryUIData): Seq[Node] = {
val batchToTimestamps = withNoProgress(query,
- query.recentProgress.map(p => (p.batchId,
df.parse(p.timestamp).getTime)),
+ query.recentProgress.map(p => (p.batchId,
parseProgressTimestamp(p.timestamp))),
Array.empty[(Long, Long)])
val batchTimes = batchToTimestamps.map(_._2)
val minBatchTime =
- withNoProgress(query,
df.parse(query.recentProgress.head.timestamp).getTime, 0L)
+ withNoProgress(query,
parseProgressTimestamp(query.recentProgress.head.timestamp), 0L)
val maxBatchTime =
- withNoProgress(query, df.parse(query.lastProgress.timestamp).getTime, 0L)
+ withNoProgress(query,
parseProgressTimestamp(query.lastProgress.timestamp), 0L)
val maxRecordRate =
withNoProgress(query,
query.recentProgress.map(_.inputRowsPerSecond).max, 0L)
val minRecordRate = 0L
@@ -152,22 +148,26 @@ private[ui] class StreamingQueryStatisticsPage(parent:
StreamingQueryTab)
val minBatchDuration = 0L
val inputRateData = withNoProgress(query,
- query.recentProgress.map(p => (df.parse(p.timestamp).getTime,
+ query.recentProgress.map(p => (parseProgressTimestamp(p.timestamp),
withNumberInvalid { p.inputRowsPerSecond })), Array.empty[(Long,
Double)])
val processRateData = withNoProgress(query,
- query.recentProgress.map(p => (df.parse(p.timestamp).getTime,
+ query.recentProgress.map(p => (parseProgressTimestamp(p.timestamp),
withNumberInvalid { p.processedRowsPerSecond })), Array.empty[(Long,
Double)])
val inputRowsData = withNoProgress(query,
- query.recentProgress.map(p => (df.parse(p.timestamp).getTime,
+ query.recentProgress.map(p => (parseProgressTimestamp(p.timestamp),
withNumberInvalid { p.numInputRows })), Array.empty[(Long, Double)])
val batchDurations = withNoProgress(query,
- query.recentProgress.map(p => (df.parse(p.timestamp).getTime,
+ query.recentProgress.map(p => (parseProgressTimestamp(p.timestamp),
withNumberInvalid { p.batchDuration })), Array.empty[(Long, Double)])
- val operationDurationData = withNoProgress(query, query.recentProgress.map
{ p =>
- val durationMs = p.durationMs
- // remove "triggerExecution" as it count the other operation duration.
- durationMs.remove("triggerExecution")
- (df.parse(p.timestamp).getTime, durationMs)}, Array.empty[(Long,
ju.Map[String, JLong])])
+ val operationDurationData = withNoProgress(
+ query,
+ query.recentProgress.map { p =>
+ val durationMs = p.durationMs
+ // remove "triggerExecution" as it count the other operation duration.
+ durationMs.remove("triggerExecution")
+ (parseProgressTimestamp(p.timestamp), durationMs)
+ },
+ Array.empty[(Long, ju.Map[String, JLong])])
val jsCollector = new JsCollector
val graphUIDataForInputRate =
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala
index 9181511..e331083 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala
@@ -17,7 +17,6 @@
package org.apache.spark.sql.streaming.ui
-import java.text.SimpleDateFormat
import java.util.UUID
import java.util.concurrent.ConcurrentHashMap
@@ -25,9 +24,9 @@ import scala.collection.JavaConverters._
import scala.collection.mutable
import org.apache.spark.SparkConf
-import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.internal.StaticSQLConf
import org.apache.spark.sql.streaming.{StreamingQueryListener,
StreamingQueryProgress}
+import org.apache.spark.sql.streaming.ui.UIUtils.parseProgressTimestamp
/**
* A customized StreamingQueryListener used in structured streaming UI, which
contains all
@@ -36,9 +35,6 @@ import
org.apache.spark.sql.streaming.{StreamingQueryListener, StreamingQueryPro
*/
private[sql] class StreamingQueryStatusListener(conf: SparkConf) extends
StreamingQueryListener {
- private val timestampFormat = new
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601
- timestampFormat.setTimeZone(DateTimeUtils.getTimeZone("UTC"))
-
/**
* We use runId as the key here instead of id in active query status map,
* because the runId is unique for every started query, even it its a
restart.
@@ -51,12 +47,13 @@ private[sql] class StreamingQueryStatusListener(conf:
SparkConf) extends Streami
private val inactiveQueryStatusRetention =
conf.get(StaticSQLConf.STREAMING_UI_RETAINED_QUERIES)
override def onQueryStarted(event:
StreamingQueryListener.QueryStartedEvent): Unit = {
+ val startTimestamp = parseProgressTimestamp(event.timestamp)
activeQueryStatus.putIfAbsent(event.runId,
- new StreamingQueryUIData(event.name, event.id, event.runId,
event.submissionTime))
+ new StreamingQueryUIData(event.name, event.id, event.runId,
startTimestamp))
}
override def onQueryProgress(event:
StreamingQueryListener.QueryProgressEvent): Unit = {
- val batchTimestamp =
timestampFormat.parse(event.progress.timestamp).getTime
+ val batchTimestamp = parseProgressTimestamp(event.progress.timestamp)
val queryStatus = activeQueryStatus.getOrDefault(
event.progress.runId,
new StreamingQueryUIData(event.progress.name, event.progress.id,
event.progress.runId,
@@ -89,7 +86,7 @@ private[ui] class StreamingQueryUIData(
val name: String,
val id: UUID,
val runId: UUID,
- val submissionTime: Long) {
+ val startTimestamp: Long) {
/** Holds the most recent query progress updates. */
private val progressBuffer = new mutable.Queue[StreamingQueryProgress]()
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/UIUtils.scala
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/UIUtils.scala
index 57b9dec..cdad5ed 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/UIUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/UIUtils.scala
@@ -17,6 +17,11 @@
package org.apache.spark.sql.streaming.ui
+import java.text.SimpleDateFormat
+import java.util.Locale
+
+import org.apache.spark.sql.catalyst.util.DateTimeUtils.getTimeZone
+
private[ui] object UIUtils {
/**
@@ -57,4 +62,16 @@ private[ui] object UIUtils {
query.exception.map(_ => "FAILED").getOrElse("FINISHED")
}
}
+
+ private val progressTimestampFormat = new ThreadLocal[SimpleDateFormat]() {
+ override def initialValue(): SimpleDateFormat = {
+ val format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") //
ISO8601
+ format.setTimeZone(getTimeZone("UTC"))
+ format
+ }
+ }
+
+ def parseProgressTimestamp(timestamp: String): Long = {
+ progressTimestampFormat.get.parse(timestamp).getTime
+ }
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
index 6bb1646..e585b8a 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
@@ -254,8 +254,10 @@ class StreamingQueryListenerSuite extends StreamTest with
BeforeAndAfter {
assert(newEvent.name === event.name)
}
- testSerialization(new QueryStartedEvent(UUID.randomUUID, UUID.randomUUID,
"name", 1L))
- testSerialization(new QueryStartedEvent(UUID.randomUUID, UUID.randomUUID,
null, 1L))
+ testSerialization(
+ new QueryStartedEvent(UUID.randomUUID, UUID.randomUUID, "name",
"2016-12-05T20:54:20.827Z"))
+ testSerialization(
+ new QueryStartedEvent(UUID.randomUUID, UUID.randomUUID, null,
"2016-12-05T20:54:20.827Z"))
}
test("QueryProgressEvent serialization") {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPageSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPageSuite.scala
index de43e47..2a1e18a 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPageSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPageSuite.scala
@@ -97,7 +97,7 @@ class StreamingQueryPageSuite extends SharedSparkSession with
BeforeAndAfter {
when(streamQuery.name).thenReturn("query")
when(streamQuery.id).thenReturn(id)
when(streamQuery.runId).thenReturn(id)
- when(streamQuery.submissionTime).thenReturn(1L)
+ when(streamQuery.startTimestamp).thenReturn(1L)
when(streamQuery.lastProgress).thenReturn(progress)
when(streamQuery.recentProgress).thenReturn(Array(progress))
when(streamQuery.exception).thenReturn(None)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListenerSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListenerSuite.scala
index adbb501..6aa440e 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListenerSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListenerSuite.scala
@@ -32,7 +32,8 @@ class StreamingQueryStatusListenerSuite extends StreamTest {
// hanlde query started event
val id = UUID.randomUUID()
val runId = UUID.randomUUID()
- val startEvent = new StreamingQueryListener.QueryStartedEvent(id, runId,
"test", 1L)
+ val startEvent = new StreamingQueryListener.QueryStartedEvent(
+ id, runId, "test", "2016-12-05T20:54:20.827Z")
listener.onQueryStarted(startEvent)
// result checking
@@ -78,7 +79,8 @@ class StreamingQueryStatusListenerSuite extends StreamTest {
// handle first time start
val id = UUID.randomUUID()
val runId0 = UUID.randomUUID()
- val startEvent0 = new StreamingQueryListener.QueryStartedEvent(id, runId0,
"test", 1L)
+ val startEvent0 = new StreamingQueryListener.QueryStartedEvent(
+ id, runId0, "test", "2016-12-05T20:54:20.827Z")
listener.onQueryStarted(startEvent0)
// handle terminate event
@@ -87,7 +89,8 @@ class StreamingQueryStatusListenerSuite extends StreamTest {
// handle second time start
val runId1 = UUID.randomUUID()
- val startEvent1 = new StreamingQueryListener.QueryStartedEvent(id, runId1,
"test", 1L)
+ val startEvent1 = new StreamingQueryListener.QueryStartedEvent(
+ id, runId1, "test", "2016-12-05T20:54:20.827Z")
listener.onQueryStarted(startEvent1)
// result checking
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/UISeleniumSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/UISeleniumSuite.scala
new file mode 100644
index 0000000..fdf4c66
--- /dev/null
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/UISeleniumSuite.scala
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming.ui
+
+import org.openqa.selenium.WebDriver
+import org.openqa.selenium.htmlunit.HtmlUnitDriver
+import org.scalatest._
+import org.scalatest.concurrent.Eventually._
+import org.scalatest.time.SpanSugar._
+import org.scalatestplus.selenium.WebBrowser
+
+import org.apache.spark._
+import org.apache.spark.internal.config.UI.{UI_ENABLED, UI_PORT}
+import org.apache.spark.sql.LocalSparkSession.withSparkSession
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.util.quietly
+import org.apache.spark.sql.streaming.StreamingQueryException
+import org.apache.spark.ui.SparkUICssErrorHandler
+
+class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with
BeforeAndAfterAll {
+
+ implicit var webDriver: WebDriver = _
+
+ override def beforeAll(): Unit = {
+ super.beforeAll()
+ webDriver = new HtmlUnitDriver {
+ getWebClient.setCssErrorHandler(new SparkUICssErrorHandler)
+ }
+ }
+
+ private def newSparkSession(
+ master: String = "local",
+ additionalConfs: Map[String, String] = Map.empty): SparkSession = {
+ val conf = new SparkConf()
+ .setMaster(master)
+ .setAppName("ui-test")
+ .set(UI_ENABLED, true)
+ .set(UI_PORT, 0)
+ additionalConfs.foreach { case (k, v) => conf.set(k, v) }
+ val spark =
SparkSession.builder().master(master).config(conf).getOrCreate()
+ assert(spark.sparkContext.ui.isDefined)
+ spark
+ }
+
+ def goToUi(spark: SparkSession, path: String): Unit = {
+ go to (spark.sparkContext.ui.get.webUrl.stripSuffix("/") + path)
+ }
+
+ test("SPARK-30984: Structured Streaming UI should be activated when running
a streaming query") {
+ quietly {
+ withSparkSession(newSparkSession()) { spark =>
+ import spark.implicits._
+ try {
+ spark.range(1, 10).count()
+
+ goToUi(spark, "/StreamingQuery")
+
+ val h3Text = findAll(cssSelector("h3")).map(_.text).toSeq
+ h3Text should not contain ("Streaming Query")
+
+ val activeQuery =
+
spark.readStream.format("rate").load().writeStream.format("noop").start()
+ val completedQuery =
+
spark.readStream.format("rate").load().writeStream.format("noop").start()
+ completedQuery.stop()
+ val failedQuery =
spark.readStream.format("rate").load().select("value").as[Long]
+ .map(_ / 0).writeStream.format("noop").start()
+ try {
+ failedQuery.awaitTermination()
+ } catch {
+ case _: StreamingQueryException =>
+ }
+
+ eventually(timeout(30.seconds), interval(100.milliseconds)) {
+ // Check the query list page
+ goToUi(spark, "/StreamingQuery")
+
+ findAll(cssSelector("h3")).map(_.text).toSeq should
contain("Streaming Query")
+ findAll(cssSelector("""#activeQueries-table
th""")).map(_.text).toSeq should be {
+ List("Name", "Status", "Id", "Run ID", "Start Time", "Duration",
"Avg Input /sec",
+ "Avg Process /sec", "Lastest Batch")
+ }
+ val activeQueries =
+ findAll(cssSelector("""#activeQueries-table
td""")).map(_.text).toSeq
+ activeQueries should contain(activeQuery.id.toString)
+ activeQueries should contain(activeQuery.runId.toString)
+ findAll(cssSelector("""#completedQueries-table th"""))
+ .map(_.text).toSeq should be {
+ List("Name", "Status", "Id", "Run ID", "Start Time",
"Duration", "Avg Input /sec",
+ "Avg Process /sec", "Lastest Batch", "Error")
+ }
+ val completedQueries =
+ findAll(cssSelector("""#completedQueries-table
td""")).map(_.text).toSeq
+ completedQueries should contain(completedQuery.id.toString)
+ completedQueries should contain(completedQuery.runId.toString)
+ completedQueries should contain(failedQuery.id.toString)
+ completedQueries should contain(failedQuery.runId.toString)
+
+ // Check the query statistics page
+ val activeQueryLink =
+ findAll(cssSelector("""#activeQueries-table
a""")).flatMap(_.attribute("href")).next
+ go to activeQueryLink
+
+ findAll(cssSelector("h3"))
+ .map(_.text).toSeq should contain("Streaming Query Statistics")
+ val summaryText = findAll(cssSelector("div
strong")).map(_.text).toSeq
+ summaryText should contain ("Name:")
+ summaryText should contain ("Id:")
+ summaryText should contain ("RunId:")
+ findAll(cssSelector("""#stat-table th""")).map(_.text).toSeq
should be {
+ List("", "Timelines", "Histograms")
+ }
+ }
+ } finally {
+ spark.streams.active.foreach(_.stop())
+ }
+ }
+ }
+ }
+
+ override def afterAll(): Unit = {
+ try {
+ if (webDriver != null) {
+ webDriver.quit()
+ }
+ } finally {
+ super.afterAll()
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]