[SPARK-20652][SQL] Store SQL UI data in the new app status store. This change replaces the SQLListener with a new implementation that saves the data to the same store used by the SparkContext's status store. For that, the types used by the old SQLListener had to be updated a bit so that they're more serialization-friendly.
The interface for getting data from the store was abstracted into a new class, SQLAppStatusStore (following the convention used in core). Another change is the way that the SQL UI hooks up into the core UI or the SHS. The old "SparkHistoryListenerFactory" was replaced with a new "AppStatePlugin" that more explicitly differentiates between the two use cases: processing events, and showing the UI. Both live apps and the SHS use this new API (previously, it was restricted to the SHS). Note on the above: this causes a slight change of behavior for live apps; the SQL tab will only show up after the first execution is started. The metrics gathering code was re-worked a bit so that the types used are less memory hungry and more serialization-friendly. This reduces memory usage when using in-memory stores, and reduces load times when using disk stores. Tested with existing and added unit tests. Note one unit test was disabled because it depends on SPARK-20653, which isn't in yet. Author: Marcelo Vanzin <van...@cloudera.com> Closes #19681 from vanzin/SPARK-20652. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0ffa7c48 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0ffa7c48 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0ffa7c48 Branch: refs/heads/master Commit: 0ffa7c488fa8156e2a1aa282e60b7c36b86d8af8 Parents: 4741c07 Author: Marcelo Vanzin <van...@cloudera.com> Authored: Tue Nov 14 15:28:22 2017 -0600 Committer: Imran Rashid <iras...@cloudera.com> Committed: Tue Nov 14 15:28:22 2017 -0600 ---------------------------------------------------------------------- .../scala/org/apache/spark/SparkContext.scala | 15 +- .../deploy/history/FsHistoryProvider.scala | 12 +- .../apache/spark/scheduler/SparkListener.scala | 12 - .../apache/spark/status/AppStatusPlugin.scala | 71 ++++ .../apache/spark/status/AppStatusStore.scala | 8 +- ....spark.scheduler.SparkHistoryListenerFactory | 1 - .../org.apache.spark.status.AppStatusPlugin | 1 + .../org/apache/spark/sql/SparkSession.scala | 5 - .../sql/execution/ui/AllExecutionsPage.scala | 86 ++-- .../spark/sql/execution/ui/ExecutionPage.scala | 60 ++- .../sql/execution/ui/SQLAppStatusListener.scala | 366 +++++++++++++++++ .../sql/execution/ui/SQLAppStatusStore.scala | 179 ++++++++ .../spark/sql/execution/ui/SQLListener.scala | 403 +------------------ .../apache/spark/sql/execution/ui/SQLTab.scala | 2 +- .../apache/spark/sql/internal/SharedState.scala | 19 - .../sql/execution/metric/SQLMetricsSuite.scala | 18 +- .../execution/metric/SQLMetricsTestUtils.scala | 30 +- .../sql/execution/ui/SQLListenerSuite.scala | 340 ++++++++-------- .../spark/sql/test/SharedSparkSession.scala | 1 - 19 files changed, 920 insertions(+), 709 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/0ffa7c48/core/src/main/scala/org/apache/spark/SparkContext.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 1d325e6..23fd54f 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -54,7 +54,7 @@ import org.apache.spark.rpc.RpcEndpointRef import org.apache.spark.scheduler._ import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, StandaloneSchedulerBackend} import org.apache.spark.scheduler.local.LocalSchedulerBackend -import org.apache.spark.status.AppStatusStore +import org.apache.spark.status.{AppStatusPlugin, AppStatusStore} import org.apache.spark.storage._ import org.apache.spark.storage.BlockManagerMessages.TriggerThreadDump import org.apache.spark.ui.{ConsoleProgressBar, SparkUI} @@ -246,6 +246,8 @@ class SparkContext(config: SparkConf) extends Logging { */ def isStopped: Boolean = stopped.get() + private[spark] def statusStore: AppStatusStore = _statusStore + // An asynchronous listener bus for Spark events private[spark] def listenerBus: LiveListenerBus = _listenerBus @@ -455,9 +457,14 @@ class SparkContext(config: SparkConf) extends Logging { // For tests, do not enable the UI None } - // Bind the UI before starting the task scheduler to communicate - // the bound port to the cluster manager properly - _ui.foreach(_.bind()) + _ui.foreach { ui => + // Load any plugins that might want to modify the UI. + AppStatusPlugin.loadPlugins().foreach(_.setupUI(ui)) + + // Bind the UI before starting the task scheduler to communicate + // the bound port to the cluster manager properly + ui.bind() + } _hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(_conf) http://git-wip-us.apache.org/repos/asf/spark/blob/0ffa7c48/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index a6dc533..25f82b5 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -41,7 +41,7 @@ import org.apache.spark.deploy.history.config._ import org.apache.spark.internal.Logging import org.apache.spark.scheduler._ import org.apache.spark.scheduler.ReplayListenerBus._ -import org.apache.spark.status.{AppStatusListener, AppStatusStore, AppStatusStoreMetadata, KVUtils} +import org.apache.spark.status._ import org.apache.spark.status.KVUtils._ import org.apache.spark.status.api.v1 import org.apache.spark.ui.SparkUI @@ -319,6 +319,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) val _listener = new AppStatusListener(kvstore, conf, false, lastUpdateTime = Some(attempt.info.lastUpdated.getTime())) replayBus.addListener(_listener) + AppStatusPlugin.loadPlugins().foreach { plugin => + plugin.setupListeners(conf, kvstore, l => replayBus.addListener(l), false) + } Some(_listener) } else { None @@ -333,11 +336,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } try { - val listenerFactories = ServiceLoader.load(classOf[SparkHistoryListenerFactory], - Utils.getContextOrSparkClassLoader).asScala - listenerFactories.foreach { listenerFactory => - val listeners = listenerFactory.createListeners(conf, loadedUI.ui) - listeners.foreach(replayBus.addListener) + AppStatusPlugin.loadPlugins().foreach { plugin => + plugin.setupUI(loadedUI.ui) } val fileStatus = fs.getFileStatus(new Path(logDir, attempt.logPath)) http://git-wip-us.apache.org/repos/asf/spark/blob/0ffa7c48/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala index b76e560..3b677ca 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala @@ -168,18 +168,6 @@ case class SparkListenerApplicationEnd(time: Long) extends SparkListenerEvent case class SparkListenerLogStart(sparkVersion: String) extends SparkListenerEvent /** - * Interface for creating history listeners defined in other modules like SQL, which are used to - * rebuild the history UI. - */ -private[spark] trait SparkHistoryListenerFactory { - /** - * Create listeners used to rebuild the history UI. - */ - def createListeners(conf: SparkConf, sparkUI: SparkUI): Seq[SparkListener] -} - - -/** * Interface for listening to events from the Spark scheduler. Most applications should probably * extend SparkListener or SparkFirehoseListener directly, rather than implementing this class. * http://git-wip-us.apache.org/repos/asf/spark/blob/0ffa7c48/core/src/main/scala/org/apache/spark/status/AppStatusPlugin.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusPlugin.scala b/core/src/main/scala/org/apache/spark/status/AppStatusPlugin.scala new file mode 100644 index 0000000..69ca02e --- /dev/null +++ b/core/src/main/scala/org/apache/spark/status/AppStatusPlugin.scala @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.status + +import java.util.ServiceLoader + +import scala.collection.JavaConverters._ + +import org.apache.spark.SparkConf +import org.apache.spark.scheduler.SparkListener +import org.apache.spark.ui.SparkUI +import org.apache.spark.util.Utils +import org.apache.spark.util.kvstore.KVStore + +/** + * An interface that defines plugins for collecting and storing application state. + * + * The plugin implementations are invoked for both live and replayed applications. For live + * applications, it's recommended that plugins defer creation of UI tabs until there's actual + * data to be shown. + */ +private[spark] trait AppStatusPlugin { + + /** + * Install listeners to collect data about the running application and populate the given + * store. + * + * @param conf The Spark configuration. + * @param store The KVStore where to keep application data. + * @param addListenerFn Function to register listeners with a bus. + * @param live Whether this is a live application (or an application being replayed by the + * HistoryServer). + */ + def setupListeners( + conf: SparkConf, + store: KVStore, + addListenerFn: SparkListener => Unit, + live: Boolean): Unit + + /** + * Install any needed extensions (tabs, pages, etc) to a Spark UI. The plugin can detect whether + * the app is live or replayed by looking at the UI's SparkContext field `sc`. + * + * @param ui The Spark UI instance for the application. + */ + def setupUI(ui: SparkUI): Unit + +} + +private[spark] object AppStatusPlugin { + + def loadPlugins(): Iterable[AppStatusPlugin] = { + ServiceLoader.load(classOf[AppStatusPlugin], Utils.getContextOrSparkClassLoader).asScala + } + +} http://git-wip-us.apache.org/repos/asf/spark/blob/0ffa7c48/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala index 9b42f55..d0615e5 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala @@ -32,7 +32,7 @@ import org.apache.spark.util.kvstore.{InMemoryStore, KVStore} /** * A wrapper around a KVStore that provides methods for accessing the API data stored within. */ -private[spark] class AppStatusStore(store: KVStore) { +private[spark] class AppStatusStore(val store: KVStore) { def applicationInfo(): v1.ApplicationInfo = { store.view(classOf[ApplicationInfoWrapper]).max(1).iterator().next().info @@ -338,9 +338,11 @@ private[spark] object AppStatusStore { */ def createLiveStore(conf: SparkConf, addListenerFn: SparkListener => Unit): AppStatusStore = { val store = new InMemoryStore() - val stateStore = new AppStatusStore(store) addListenerFn(new AppStatusListener(store, conf, true)) - stateStore + AppStatusPlugin.loadPlugins().foreach { p => + p.setupListeners(conf, store, addListenerFn, true) + } + new AppStatusStore(store) } } http://git-wip-us.apache.org/repos/asf/spark/blob/0ffa7c48/sql/core/src/main/resources/META-INF/services/org.apache.spark.scheduler.SparkHistoryListenerFactory ---------------------------------------------------------------------- diff --git a/sql/core/src/main/resources/META-INF/services/org.apache.spark.scheduler.SparkHistoryListenerFactory b/sql/core/src/main/resources/META-INF/services/org.apache.spark.scheduler.SparkHistoryListenerFactory deleted file mode 100644 index 507100b..0000000 --- a/sql/core/src/main/resources/META-INF/services/org.apache.spark.scheduler.SparkHistoryListenerFactory +++ /dev/null @@ -1 +0,0 @@ -org.apache.spark.sql.execution.ui.SQLHistoryListenerFactory http://git-wip-us.apache.org/repos/asf/spark/blob/0ffa7c48/sql/core/src/main/resources/META-INF/services/org.apache.spark.status.AppStatusPlugin ---------------------------------------------------------------------- diff --git a/sql/core/src/main/resources/META-INF/services/org.apache.spark.status.AppStatusPlugin b/sql/core/src/main/resources/META-INF/services/org.apache.spark.status.AppStatusPlugin new file mode 100644 index 0000000..ac6d7f6 --- /dev/null +++ b/sql/core/src/main/resources/META-INF/services/org.apache.spark.status.AppStatusPlugin @@ -0,0 +1 @@ +org.apache.spark.sql.execution.ui.SQLAppStatusPlugin http://git-wip-us.apache.org/repos/asf/spark/blob/0ffa7c48/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index 2821f5e..272eb84 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -38,7 +38,6 @@ import org.apache.spark.sql.catalyst.expressions.AttributeReference import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, Range} import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.datasources.LogicalRelation -import org.apache.spark.sql.execution.ui.SQLListener import org.apache.spark.sql.internal._ import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION import org.apache.spark.sql.sources.BaseRelation @@ -957,7 +956,6 @@ object SparkSession { sparkContext.addSparkListener(new SparkListener { override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = { defaultSession.set(null) - sqlListener.set(null) } }) } @@ -1026,9 +1024,6 @@ object SparkSession { */ def getDefaultSession: Option[SparkSession] = Option(defaultSession.get) - /** A global SQL listener used for the SQL UI. */ - private[sql] val sqlListener = new AtomicReference[SQLListener]() - //////////////////////////////////////////////////////////////////////////////////////// // Private methods from now on //////////////////////////////////////////////////////////////////////////////////////// http://git-wip-us.apache.org/repos/asf/spark/blob/0ffa7c48/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala index f9c6986..7019d98 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala @@ -24,34 +24,54 @@ import scala.xml.{Node, NodeSeq} import org.apache.commons.lang3.StringEscapeUtils +import org.apache.spark.JobExecutionStatus import org.apache.spark.internal.Logging import org.apache.spark.ui.{UIUtils, WebUIPage} private[ui] class AllExecutionsPage(parent: SQLTab) extends WebUIPage("") with Logging { - private val listener = parent.listener + private val sqlStore = parent.sqlStore override def render(request: HttpServletRequest): Seq[Node] = { val currentTime = System.currentTimeMillis() - val content = listener.synchronized { + val running = new mutable.ArrayBuffer[SQLExecutionUIData]() + val completed = new mutable.ArrayBuffer[SQLExecutionUIData]() + val failed = new mutable.ArrayBuffer[SQLExecutionUIData]() + + sqlStore.executionsList().foreach { e => + val isRunning = e.jobs.exists { case (_, status) => status == JobExecutionStatus.RUNNING } + val isFailed = e.jobs.exists { case (_, status) => status == JobExecutionStatus.FAILED } + if (isRunning) { + running += e + } else if (isFailed) { + failed += e + } else { + completed += e + } + } + + val content = { val _content = mutable.ListBuffer[Node]() - if (listener.getRunningExecutions.nonEmpty) { + + if (running.nonEmpty) { _content ++= new RunningExecutionTable( - parent, s"Running Queries (${listener.getRunningExecutions.size})", currentTime, - listener.getRunningExecutions.sortBy(_.submissionTime).reverse).toNodeSeq + parent, s"Running Queries (${running.size})", currentTime, + running.sortBy(_.submissionTime).reverse).toNodeSeq } - if (listener.getCompletedExecutions.nonEmpty) { + + if (completed.nonEmpty) { _content ++= new CompletedExecutionTable( - parent, s"Completed Queries (${listener.getCompletedExecutions.size})", currentTime, - listener.getCompletedExecutions.sortBy(_.submissionTime).reverse).toNodeSeq + parent, s"Completed Queries (${completed.size})", currentTime, + completed.sortBy(_.submissionTime).reverse).toNodeSeq } - if (listener.getFailedExecutions.nonEmpty) { + + if (failed.nonEmpty) { _content ++= new FailedExecutionTable( - parent, s"Failed Queries (${listener.getFailedExecutions.size})", currentTime, - listener.getFailedExecutions.sortBy(_.submissionTime).reverse).toNodeSeq + parent, s"Failed Queries (${failed.size})", currentTime, + failed.sortBy(_.submissionTime).reverse).toNodeSeq } _content } @@ -65,26 +85,26 @@ private[ui] class AllExecutionsPage(parent: SQLTab) extends WebUIPage("") with L <div> <ul class="unstyled"> { - if (listener.getRunningExecutions.nonEmpty) { + if (running.nonEmpty) { <li> <a href="#running-execution-table"><strong>Running Queries:</strong></a> - {listener.getRunningExecutions.size} + {running.size} </li> } } { - if (listener.getCompletedExecutions.nonEmpty) { + if (completed.nonEmpty) { <li> <a href="#completed-execution-table"><strong>Completed Queries:</strong></a> - {listener.getCompletedExecutions.size} + {completed.size} </li> } } { - if (listener.getFailedExecutions.nonEmpty) { + if (failed.nonEmpty) { <li> <a href="#failed-execution-table"><strong>Failed Queries:</strong></a> - {listener.getFailedExecutions.size} + {failed.size} </li> } } @@ -114,23 +134,19 @@ private[ui] abstract class ExecutionTable( protected def row(currentTime: Long, executionUIData: SQLExecutionUIData): Seq[Node] = { val submissionTime = executionUIData.submissionTime - val duration = executionUIData.completionTime.getOrElse(currentTime) - submissionTime + val duration = executionUIData.completionTime.map(_.getTime()).getOrElse(currentTime) - + submissionTime - val runningJobs = executionUIData.runningJobs.map { jobId => - <a href={jobURL(jobId)}> - [{jobId.toString}] - </a> - } - val succeededJobs = executionUIData.succeededJobs.sorted.map { jobId => - <a href={jobURL(jobId)}> - [{jobId.toString}] - </a> - } - val failedJobs = executionUIData.failedJobs.sorted.map { jobId => - <a href={jobURL(jobId)}> - [{jobId.toString}] - </a> + def jobLinks(status: JobExecutionStatus): Seq[Node] = { + executionUIData.jobs.flatMap { case (jobId, jobStatus) => + if (jobStatus == status) { + <a href={jobURL(jobId)}>[{jobId.toString}]</a> + } else { + None + } + }.toSeq } + <tr> <td> {executionUIData.executionId.toString} @@ -146,17 +162,17 @@ private[ui] abstract class ExecutionTable( </td> {if (showRunningJobs) { <td> - {runningJobs} + {jobLinks(JobExecutionStatus.RUNNING)} </td> }} {if (showSucceededJobs) { <td> - {succeededJobs} + {jobLinks(JobExecutionStatus.SUCCEEDED)} </td> }} {if (showFailedJobs) { <td> - {failedJobs} + {jobLinks(JobExecutionStatus.FAILED)} </td> }} </tr> http://git-wip-us.apache.org/repos/asf/spark/blob/0ffa7c48/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala index 460fc94..f29e135 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala @@ -21,24 +21,42 @@ import javax.servlet.http.HttpServletRequest import scala.xml.Node +import org.apache.spark.JobExecutionStatus import org.apache.spark.internal.Logging import org.apache.spark.ui.{UIUtils, WebUIPage} class ExecutionPage(parent: SQLTab) extends WebUIPage("execution") with Logging { - private val listener = parent.listener + private val sqlStore = parent.sqlStore - override def render(request: HttpServletRequest): Seq[Node] = listener.synchronized { + override def render(request: HttpServletRequest): Seq[Node] = { // stripXSS is called first to remove suspicious characters used in XSS attacks val parameterExecutionId = UIUtils.stripXSS(request.getParameter("id")) require(parameterExecutionId != null && parameterExecutionId.nonEmpty, "Missing execution id parameter") val executionId = parameterExecutionId.toLong - val content = listener.getExecution(executionId).map { executionUIData => + val content = sqlStore.execution(executionId).map { executionUIData => val currentTime = System.currentTimeMillis() - val duration = - executionUIData.completionTime.getOrElse(currentTime) - executionUIData.submissionTime + val duration = executionUIData.completionTime.map(_.getTime()).getOrElse(currentTime) - + executionUIData.submissionTime + + def jobLinks(status: JobExecutionStatus, label: String): Seq[Node] = { + val jobs = executionUIData.jobs.flatMap { case (jobId, jobStatus) => + if (jobStatus == status) Some(jobId) else None + } + if (jobs.nonEmpty) { + <li> + <strong>{label} </strong> + {jobs.toSeq.sorted.map { jobId => + <a href={jobURL(jobId.intValue())}>{jobId.toString}</a><span> </span> + }} + </li> + } else { + Nil + } + } + val summary = <div> @@ -49,37 +67,17 @@ class ExecutionPage(parent: SQLTab) extends WebUIPage("execution") with Logging <li> <strong>Duration: </strong>{UIUtils.formatDuration(duration)} </li> - {if (executionUIData.runningJobs.nonEmpty) { - <li> - <strong>Running Jobs: </strong> - {executionUIData.runningJobs.sorted.map { jobId => - <a href={jobURL(jobId)}>{jobId.toString}</a><span> </span> - }} - </li> - }} - {if (executionUIData.succeededJobs.nonEmpty) { - <li> - <strong>Succeeded Jobs: </strong> - {executionUIData.succeededJobs.sorted.map { jobId => - <a href={jobURL(jobId)}>{jobId.toString}</a><span> </span> - }} - </li> - }} - {if (executionUIData.failedJobs.nonEmpty) { - <li> - <strong>Failed Jobs: </strong> - {executionUIData.failedJobs.sorted.map { jobId => - <a href={jobURL(jobId)}>{jobId.toString}</a><span> </span> - }} - </li> - }} + {jobLinks(JobExecutionStatus.RUNNING, "Running Jobs:")} + {jobLinks(JobExecutionStatus.SUCCEEDED, "Succeeded Jobs:")} + {jobLinks(JobExecutionStatus.FAILED, "Failed Jobs:")} </ul> </div> - val metrics = listener.getExecutionMetrics(executionId) + val metrics = sqlStore.executionMetrics(executionId) + val graph = sqlStore.planGraph(executionId) summary ++ - planVisualization(metrics, executionUIData.physicalPlanGraph) ++ + planVisualization(metrics, graph) ++ physicalPlanDescription(executionUIData.physicalPlanDescription) }.getOrElse { <div>No information to display for Plan {executionId}</div> http://git-wip-us.apache.org/repos/asf/spark/blob/0ffa7c48/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala new file mode 100644 index 0000000..43cec48 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala @@ -0,0 +1,366 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.ui + +import java.util.Date +import java.util.concurrent.ConcurrentHashMap +import java.util.function.Function + +import scala.collection.JavaConverters._ + +import org.apache.spark.{JobExecutionStatus, SparkConf} +import org.apache.spark.internal.Logging +import org.apache.spark.scheduler._ +import org.apache.spark.sql.execution.SQLExecution +import org.apache.spark.sql.execution.metric._ +import org.apache.spark.status.LiveEntity +import org.apache.spark.status.config._ +import org.apache.spark.ui.SparkUI +import org.apache.spark.util.kvstore.KVStore + +private[sql] class SQLAppStatusListener( + conf: SparkConf, + kvstore: KVStore, + live: Boolean, + ui: Option[SparkUI] = None) + extends SparkListener with Logging { + + // How often to flush intermediate state of a live execution to the store. When replaying logs, + // never flush (only do the very last write). + private val liveUpdatePeriodNs = if (live) conf.get(LIVE_ENTITY_UPDATE_PERIOD) else -1L + + // Live tracked data is needed by the SQL status store to calculate metrics for in-flight + // executions; that means arbitrary threads may be querying these maps, so they need to be + // thread-safe. + private val liveExecutions = new ConcurrentHashMap[Long, LiveExecutionData]() + private val stageMetrics = new ConcurrentHashMap[Int, LiveStageMetrics]() + + private var uiInitialized = false + + override def onJobStart(event: SparkListenerJobStart): Unit = { + val executionIdString = event.properties.getProperty(SQLExecution.EXECUTION_ID_KEY) + if (executionIdString == null) { + // This is not a job created by SQL + return + } + + val executionId = executionIdString.toLong + val jobId = event.jobId + val exec = getOrCreateExecution(executionId) + + // Record the accumulator IDs for the stages of this job, so that the code that keeps + // track of the metrics knows which accumulators to look at. + val accumIds = exec.metrics.map(_.accumulatorId).sorted.toList + event.stageIds.foreach { id => + stageMetrics.put(id, new LiveStageMetrics(id, 0, accumIds.toArray, new ConcurrentHashMap())) + } + + exec.jobs = exec.jobs + (jobId -> JobExecutionStatus.RUNNING) + exec.stages = event.stageIds.toSet + update(exec) + } + + override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit = { + if (!isSQLStage(event.stageInfo.stageId)) { + return + } + + // Reset the metrics tracking object for the new attempt. + Option(stageMetrics.get(event.stageInfo.stageId)).foreach { metrics => + metrics.taskMetrics.clear() + metrics.attemptId = event.stageInfo.attemptId + } + } + + override def onJobEnd(event: SparkListenerJobEnd): Unit = { + liveExecutions.values().asScala.foreach { exec => + if (exec.jobs.contains(event.jobId)) { + val result = event.jobResult match { + case JobSucceeded => JobExecutionStatus.SUCCEEDED + case _ => JobExecutionStatus.FAILED + } + exec.jobs = exec.jobs + (event.jobId -> result) + exec.endEvents += 1 + update(exec) + } + } + } + + override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = { + event.accumUpdates.foreach { case (taskId, stageId, attemptId, accumUpdates) => + updateStageMetrics(stageId, attemptId, taskId, accumUpdates, false) + } + } + + override def onTaskEnd(event: SparkListenerTaskEnd): Unit = { + if (!isSQLStage(event.stageId)) { + return + } + + val info = event.taskInfo + // SPARK-20342. If processing events from a live application, use the task metrics info to + // work around a race in the DAGScheduler. The metrics info does not contain accumulator info + // when reading event logs in the SHS, so we have to rely on the accumulator in that case. + val accums = if (live && event.taskMetrics != null) { + event.taskMetrics.externalAccums.flatMap { a => + // This call may fail if the accumulator is gc'ed, so account for that. + try { + Some(a.toInfo(Some(a.value), None)) + } catch { + case _: IllegalAccessError => None + } + } + } else { + info.accumulables + } + updateStageMetrics(event.stageId, event.stageAttemptId, info.taskId, accums, + info.successful) + } + + def liveExecutionMetrics(executionId: Long): Option[Map[Long, String]] = { + Option(liveExecutions.get(executionId)).map { exec => + if (exec.metricsValues != null) { + exec.metricsValues + } else { + aggregateMetrics(exec) + } + } + } + + private def aggregateMetrics(exec: LiveExecutionData): Map[Long, String] = { + val metricIds = exec.metrics.map(_.accumulatorId).sorted + val metricTypes = exec.metrics.map { m => (m.accumulatorId, m.metricType) }.toMap + val metrics = exec.stages.toSeq + .flatMap { stageId => Option(stageMetrics.get(stageId)) } + .flatMap(_.taskMetrics.values().asScala) + .flatMap { metrics => metrics.ids.zip(metrics.values) } + + val aggregatedMetrics = (metrics ++ exec.driverAccumUpdates.toSeq) + .filter { case (id, _) => metricIds.contains(id) } + .groupBy(_._1) + .map { case (id, values) => + id -> SQLMetrics.stringValue(metricTypes(id), values.map(_._2).toSeq) + } + + // Check the execution again for whether the aggregated metrics data has been calculated. + // This can happen if the UI is requesting this data, and the onExecutionEnd handler is + // running at the same time. The metrics calculcated for the UI can be innacurate in that + // case, since the onExecutionEnd handler will clean up tracked stage metrics. + if (exec.metricsValues != null) { + exec.metricsValues + } else { + aggregatedMetrics + } + } + + private def updateStageMetrics( + stageId: Int, + attemptId: Int, + taskId: Long, + accumUpdates: Seq[AccumulableInfo], + succeeded: Boolean): Unit = { + Option(stageMetrics.get(stageId)).foreach { metrics => + if (metrics.attemptId != attemptId || metrics.accumulatorIds.isEmpty) { + return + } + + val oldTaskMetrics = metrics.taskMetrics.get(taskId) + if (oldTaskMetrics != null && oldTaskMetrics.succeeded) { + return + } + + val updates = accumUpdates + .filter { acc => acc.update.isDefined && metrics.accumulatorIds.contains(acc.id) } + .sortBy(_.id) + + if (updates.isEmpty) { + return + } + + val ids = new Array[Long](updates.size) + val values = new Array[Long](updates.size) + updates.zipWithIndex.foreach { case (acc, idx) => + ids(idx) = acc.id + // In a live application, accumulators have Long values, but when reading from event + // logs, they have String values. For now, assume all accumulators are Long and covert + // accordingly. + values(idx) = acc.update.get match { + case s: String => s.toLong + case l: Long => l + case o => throw new IllegalArgumentException(s"Unexpected: $o") + } + } + + // TODO: storing metrics by task ID can cause metrics for the same task index to be + // counted multiple times, for example due to speculation or re-attempts. + metrics.taskMetrics.put(taskId, new LiveTaskMetrics(ids, values, succeeded)) + } + } + + private def onExecutionStart(event: SparkListenerSQLExecutionStart): Unit = { + // Install the SQL tab in a live app if it hasn't been initialized yet. + if (!uiInitialized) { + ui.foreach { _ui => + new SQLTab(new SQLAppStatusStore(kvstore, Some(this)), _ui) + } + uiInitialized = true + } + + val SparkListenerSQLExecutionStart(executionId, description, details, + physicalPlanDescription, sparkPlanInfo, time) = event + + def toStoredNodes(nodes: Seq[SparkPlanGraphNode]): Seq[SparkPlanGraphNodeWrapper] = { + nodes.map { + case cluster: SparkPlanGraphCluster => + val storedCluster = new SparkPlanGraphClusterWrapper( + cluster.id, + cluster.name, + cluster.desc, + toStoredNodes(cluster.nodes), + cluster.metrics) + new SparkPlanGraphNodeWrapper(null, storedCluster) + + case node => + new SparkPlanGraphNodeWrapper(node, null) + } + } + + val planGraph = SparkPlanGraph(sparkPlanInfo) + val sqlPlanMetrics = planGraph.allNodes.flatMap { node => + node.metrics.map { metric => (metric.accumulatorId, metric) } + }.toMap.values.toList + + val graphToStore = new SparkPlanGraphWrapper( + executionId, + toStoredNodes(planGraph.nodes), + planGraph.edges) + kvstore.write(graphToStore) + + val exec = getOrCreateExecution(executionId) + exec.description = description + exec.details = details + exec.physicalPlanDescription = physicalPlanDescription + exec.metrics = sqlPlanMetrics + exec.submissionTime = time + update(exec) + } + + private def onExecutionEnd(event: SparkListenerSQLExecutionEnd): Unit = { + val SparkListenerSQLExecutionEnd(executionId, time) = event + Option(liveExecutions.get(executionId)).foreach { exec => + exec.metricsValues = aggregateMetrics(exec) + exec.completionTime = Some(new Date(time)) + exec.endEvents += 1 + update(exec) + + // Remove stale LiveStageMetrics objects for stages that are not active anymore. + val activeStages = liveExecutions.values().asScala.flatMap { other => + if (other != exec) other.stages else Nil + }.toSet + stageMetrics.keySet().asScala + .filter(!activeStages.contains(_)) + .foreach(stageMetrics.remove) + } + } + + private def onDriverAccumUpdates(event: SparkListenerDriverAccumUpdates): Unit = { + val SparkListenerDriverAccumUpdates(executionId, accumUpdates) = event + Option(liveExecutions.get(executionId)).foreach { exec => + exec.driverAccumUpdates = accumUpdates.toMap + update(exec) + } + } + + override def onOtherEvent(event: SparkListenerEvent): Unit = event match { + case e: SparkListenerSQLExecutionStart => onExecutionStart(e) + case e: SparkListenerSQLExecutionEnd => onExecutionEnd(e) + case e: SparkListenerDriverAccumUpdates => onDriverAccumUpdates(e) + case _ => // Ignore + } + + private def getOrCreateExecution(executionId: Long): LiveExecutionData = { + liveExecutions.computeIfAbsent(executionId, + new Function[Long, LiveExecutionData]() { + override def apply(key: Long): LiveExecutionData = new LiveExecutionData(executionId) + }) + } + + private def update(exec: LiveExecutionData): Unit = { + val now = System.nanoTime() + if (exec.endEvents >= exec.jobs.size + 1) { + exec.write(kvstore, now) + liveExecutions.remove(exec.executionId) + } else if (liveUpdatePeriodNs >= 0) { + if (now - exec.lastWriteTime > liveUpdatePeriodNs) { + exec.write(kvstore, now) + } + } + } + + private def isSQLStage(stageId: Int): Boolean = { + liveExecutions.values().asScala.exists { exec => + exec.stages.contains(stageId) + } + } + +} + +private class LiveExecutionData(val executionId: Long) extends LiveEntity { + + var description: String = null + var details: String = null + var physicalPlanDescription: String = null + var metrics = Seq[SQLPlanMetric]() + var submissionTime = -1L + var completionTime: Option[Date] = None + + var jobs = Map[Int, JobExecutionStatus]() + var stages = Set[Int]() + var driverAccumUpdates = Map[Long, Long]() + + @volatile var metricsValues: Map[Long, String] = null + + // Just in case job end and execution end arrive out of order, keep track of how many + // end events arrived so that the listener can stop tracking the execution. + var endEvents = 0 + + override protected def doUpdate(): Any = { + new SQLExecutionUIData( + executionId, + description, + details, + physicalPlanDescription, + metrics, + submissionTime, + completionTime, + jobs, + stages, + metricsValues) + } + +} + +private class LiveStageMetrics( + val stageId: Int, + var attemptId: Int, + val accumulatorIds: Array[Long], + val taskMetrics: ConcurrentHashMap[Long, LiveTaskMetrics]) + +private[sql] class LiveTaskMetrics( + val ids: Array[Long], + val values: Array[Long], + val succeeded: Boolean) http://git-wip-us.apache.org/repos/asf/spark/blob/0ffa7c48/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala new file mode 100644 index 0000000..586d3ae --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.ui + +import java.lang.{Long => JLong} +import java.util.Date + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer + +import com.fasterxml.jackson.databind.annotation.JsonDeserialize + +import org.apache.spark.{JobExecutionStatus, SparkConf} +import org.apache.spark.scheduler.SparkListener +import org.apache.spark.status.AppStatusPlugin +import org.apache.spark.status.KVUtils.KVIndexParam +import org.apache.spark.ui.SparkUI +import org.apache.spark.util.Utils +import org.apache.spark.util.kvstore.KVStore + +/** + * Provides a view of a KVStore with methods that make it easy to query SQL-specific state. There's + * no state kept in this class, so it's ok to have multiple instances of it in an application. + */ +private[sql] class SQLAppStatusStore( + store: KVStore, + listener: Option[SQLAppStatusListener] = None) { + + def executionsList(): Seq[SQLExecutionUIData] = { + store.view(classOf[SQLExecutionUIData]).asScala.toSeq + } + + def execution(executionId: Long): Option[SQLExecutionUIData] = { + try { + Some(store.read(classOf[SQLExecutionUIData], executionId)) + } catch { + case _: NoSuchElementException => None + } + } + + def executionsCount(): Long = { + store.count(classOf[SQLExecutionUIData]) + } + + def executionMetrics(executionId: Long): Map[Long, String] = { + def metricsFromStore(): Option[Map[Long, String]] = { + val exec = store.read(classOf[SQLExecutionUIData], executionId) + Option(exec.metricValues) + } + + metricsFromStore() + .orElse(listener.flatMap(_.liveExecutionMetrics(executionId))) + // Try a second time in case the execution finished while this method is trying to + // get the metrics. + .orElse(metricsFromStore()) + .getOrElse(Map()) + } + + def planGraph(executionId: Long): SparkPlanGraph = { + store.read(classOf[SparkPlanGraphWrapper], executionId).toSparkPlanGraph() + } + +} + +/** + * An AppStatusPlugin for handling the SQL UI and listeners. + */ +private[sql] class SQLAppStatusPlugin extends AppStatusPlugin { + + override def setupListeners( + conf: SparkConf, + store: KVStore, + addListenerFn: SparkListener => Unit, + live: Boolean): Unit = { + // For live applications, the listener is installed in [[setupUI]]. This also avoids adding + // the listener when the UI is disabled. Force installation during testing, though. + if (!live || Utils.isTesting) { + val listener = new SQLAppStatusListener(conf, store, live, None) + addListenerFn(listener) + } + } + + override def setupUI(ui: SparkUI): Unit = { + ui.sc match { + case Some(sc) => + // If this is a live application, then install a listener that will enable the SQL + // tab as soon as there's a SQL event posted to the bus. + val listener = new SQLAppStatusListener(sc.conf, ui.store.store, true, Some(ui)) + sc.listenerBus.addToStatusQueue(listener) + + case _ => + // For a replayed application, only add the tab if the store already contains SQL data. + val sqlStore = new SQLAppStatusStore(ui.store.store) + if (sqlStore.executionsCount() > 0) { + new SQLTab(sqlStore, ui) + } + } + } + +} + +private[sql] class SQLExecutionUIData( + @KVIndexParam val executionId: Long, + val description: String, + val details: String, + val physicalPlanDescription: String, + val metrics: Seq[SQLPlanMetric], + val submissionTime: Long, + val completionTime: Option[Date], + @JsonDeserialize(keyAs = classOf[Integer]) + val jobs: Map[Int, JobExecutionStatus], + @JsonDeserialize(contentAs = classOf[Integer]) + val stages: Set[Int], + /** + * This field is only populated after the execution is finished; it will be null while the + * execution is still running. During execution, aggregate metrics need to be retrieved + * from the SQL listener instance. + */ + @JsonDeserialize(keyAs = classOf[JLong]) + val metricValues: Map[Long, String] + ) + +private[sql] class SparkPlanGraphWrapper( + @KVIndexParam val executionId: Long, + val nodes: Seq[SparkPlanGraphNodeWrapper], + val edges: Seq[SparkPlanGraphEdge]) { + + def toSparkPlanGraph(): SparkPlanGraph = { + SparkPlanGraph(nodes.map(_.toSparkPlanGraphNode()), edges) + } + +} + +private[sql] class SparkPlanGraphClusterWrapper( + val id: Long, + val name: String, + val desc: String, + val nodes: Seq[SparkPlanGraphNodeWrapper], + val metrics: Seq[SQLPlanMetric]) { + + def toSparkPlanGraphCluster(): SparkPlanGraphCluster = { + new SparkPlanGraphCluster(id, name, desc, + new ArrayBuffer() ++ nodes.map(_.toSparkPlanGraphNode()), + metrics) + } + +} + +/** Only one of the values should be set. */ +private[sql] class SparkPlanGraphNodeWrapper( + val node: SparkPlanGraphNode, + val cluster: SparkPlanGraphClusterWrapper) { + + def toSparkPlanGraphNode(): SparkPlanGraphNode = { + assert(node == null ^ cluster == null, "One and only of of nore or cluster must be set.") + if (node != null) node else cluster.toSparkPlanGraphCluster() + } + +} + +private[sql] case class SQLPlanMetric( + name: String, + accumulatorId: Long, + metricType: String) http://git-wip-us.apache.org/repos/asf/spark/blob/0ffa7c48/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala index 8c27af3..b58b8c6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala @@ -17,21 +17,15 @@ package org.apache.spark.sql.execution.ui -import scala.collection.mutable - import com.fasterxml.jackson.databind.JavaType import com.fasterxml.jackson.databind.`type`.TypeFactory import com.fasterxml.jackson.databind.annotation.JsonDeserialize import com.fasterxml.jackson.databind.util.Converter -import org.apache.spark.{JobExecutionStatus, SparkConf} import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.internal.Logging import org.apache.spark.scheduler._ -import org.apache.spark.sql.execution.{SparkPlanInfo, SQLExecution} +import org.apache.spark.sql.execution.SparkPlanInfo import org.apache.spark.sql.execution.metric._ -import org.apache.spark.ui.SparkUI -import org.apache.spark.util.AccumulatorContext @DeveloperApi case class SparkListenerSQLExecutionStart( @@ -89,398 +83,3 @@ private class LongLongTupleConverter extends Converter[(Object, Object), (Long, typeFactory.constructSimpleType(classOf[(_, _)], classOf[(_, _)], Array(longType, longType)) } } - -class SQLHistoryListenerFactory extends SparkHistoryListenerFactory { - - override def createListeners(conf: SparkConf, sparkUI: SparkUI): Seq[SparkListener] = { - List(new SQLHistoryListener(conf, sparkUI)) - } -} - -class SQLListener(conf: SparkConf) extends SparkListener with Logging { - - private val retainedExecutions = conf.getInt("spark.sql.ui.retainedExecutions", 1000) - - private val activeExecutions = mutable.HashMap[Long, SQLExecutionUIData]() - - // Old data in the following fields must be removed in "trimExecutionsIfNecessary". - // If adding new fields, make sure "trimExecutionsIfNecessary" can clean up old data - private val _executionIdToData = mutable.HashMap[Long, SQLExecutionUIData]() - - /** - * Maintain the relation between job id and execution id so that we can get the execution id in - * the "onJobEnd" method. - */ - private val _jobIdToExecutionId = mutable.HashMap[Long, Long]() - - private val _stageIdToStageMetrics = mutable.HashMap[Long, SQLStageMetrics]() - - private val failedExecutions = mutable.ListBuffer[SQLExecutionUIData]() - - private val completedExecutions = mutable.ListBuffer[SQLExecutionUIData]() - - def executionIdToData: Map[Long, SQLExecutionUIData] = synchronized { - _executionIdToData.toMap - } - - def jobIdToExecutionId: Map[Long, Long] = synchronized { - _jobIdToExecutionId.toMap - } - - def stageIdToStageMetrics: Map[Long, SQLStageMetrics] = synchronized { - _stageIdToStageMetrics.toMap - } - - private def trimExecutionsIfNecessary( - executions: mutable.ListBuffer[SQLExecutionUIData]): Unit = { - if (executions.size > retainedExecutions) { - val toRemove = math.max(retainedExecutions / 10, 1) - executions.take(toRemove).foreach { execution => - for (executionUIData <- _executionIdToData.remove(execution.executionId)) { - for (jobId <- executionUIData.jobs.keys) { - _jobIdToExecutionId.remove(jobId) - } - for (stageId <- executionUIData.stages) { - _stageIdToStageMetrics.remove(stageId) - } - } - } - executions.trimStart(toRemove) - } - } - - override def onJobStart(jobStart: SparkListenerJobStart): Unit = { - val executionIdString = jobStart.properties.getProperty(SQLExecution.EXECUTION_ID_KEY) - if (executionIdString == null) { - // This is not a job created by SQL - return - } - val executionId = executionIdString.toLong - val jobId = jobStart.jobId - val stageIds = jobStart.stageIds - - synchronized { - activeExecutions.get(executionId).foreach { executionUIData => - executionUIData.jobs(jobId) = JobExecutionStatus.RUNNING - executionUIData.stages ++= stageIds - stageIds.foreach(stageId => - _stageIdToStageMetrics(stageId) = new SQLStageMetrics(stageAttemptId = 0)) - _jobIdToExecutionId(jobId) = executionId - } - } - } - - override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = synchronized { - val jobId = jobEnd.jobId - for (executionId <- _jobIdToExecutionId.get(jobId); - executionUIData <- _executionIdToData.get(executionId)) { - jobEnd.jobResult match { - case JobSucceeded => executionUIData.jobs(jobId) = JobExecutionStatus.SUCCEEDED - case JobFailed(_) => executionUIData.jobs(jobId) = JobExecutionStatus.FAILED - } - if (executionUIData.completionTime.nonEmpty && !executionUIData.hasRunningJobs) { - // We are the last job of this execution, so mark the execution as finished. Note that - // `onExecutionEnd` also does this, but currently that can be called before `onJobEnd` - // since these are called on different threads. - markExecutionFinished(executionId) - } - } - } - - override def onExecutorMetricsUpdate( - executorMetricsUpdate: SparkListenerExecutorMetricsUpdate): Unit = synchronized { - for ((taskId, stageId, stageAttemptID, accumUpdates) <- executorMetricsUpdate.accumUpdates) { - updateTaskAccumulatorValues(taskId, stageId, stageAttemptID, accumUpdates, finishTask = false) - } - } - - override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = synchronized { - val stageId = stageSubmitted.stageInfo.stageId - val stageAttemptId = stageSubmitted.stageInfo.attemptId - // Always override metrics for old stage attempt - if (_stageIdToStageMetrics.contains(stageId)) { - _stageIdToStageMetrics(stageId) = new SQLStageMetrics(stageAttemptId) - } else { - // If a stage belongs to some SQL execution, its stageId will be put in "onJobStart". - // Since "_stageIdToStageMetrics" doesn't contain it, it must not belong to any SQL execution. - // So we can ignore it. Otherwise, this may lead to memory leaks (SPARK-11126). - } - } - - override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = synchronized { - if (taskEnd.taskMetrics != null) { - updateTaskAccumulatorValues( - taskEnd.taskInfo.taskId, - taskEnd.stageId, - taskEnd.stageAttemptId, - taskEnd.taskMetrics.externalAccums.map(a => a.toInfo(Some(a.value), None)), - finishTask = true) - } - } - - /** - * Update the accumulator values of a task with the latest metrics for this task. This is called - * every time we receive an executor heartbeat or when a task finishes. - */ - protected def updateTaskAccumulatorValues( - taskId: Long, - stageId: Int, - stageAttemptID: Int, - _accumulatorUpdates: Seq[AccumulableInfo], - finishTask: Boolean): Unit = { - val accumulatorUpdates = - _accumulatorUpdates.filter(_.update.isDefined).map(accum => (accum.id, accum.update.get)) - - _stageIdToStageMetrics.get(stageId) match { - case Some(stageMetrics) => - if (stageAttemptID < stageMetrics.stageAttemptId) { - // A task of an old stage attempt. Because a new stage is submitted, we can ignore it. - } else if (stageAttemptID > stageMetrics.stageAttemptId) { - logWarning(s"A task should not have a higher stageAttemptID ($stageAttemptID) then " + - s"what we have seen (${stageMetrics.stageAttemptId})") - } else { - // TODO We don't know the attemptId. Currently, what we can do is overriding the - // accumulator updates. However, if there are two same task are running, such as - // speculation, the accumulator updates will be overriding by different task attempts, - // the results will be weird. - stageMetrics.taskIdToMetricUpdates.get(taskId) match { - case Some(taskMetrics) => - if (finishTask) { - taskMetrics.finished = true - taskMetrics.accumulatorUpdates = accumulatorUpdates - } else if (!taskMetrics.finished) { - taskMetrics.accumulatorUpdates = accumulatorUpdates - } else { - // If a task is finished, we should not override with accumulator updates from - // heartbeat reports - } - case None => - stageMetrics.taskIdToMetricUpdates(taskId) = new SQLTaskMetrics( - finished = finishTask, accumulatorUpdates) - } - } - case None => - // This execution and its stage have been dropped - } - } - - override def onOtherEvent(event: SparkListenerEvent): Unit = event match { - case SparkListenerSQLExecutionStart(executionId, description, details, - physicalPlanDescription, sparkPlanInfo, time) => - val physicalPlanGraph = SparkPlanGraph(sparkPlanInfo) - val sqlPlanMetrics = physicalPlanGraph.allNodes.flatMap { node => - node.metrics.map(metric => metric.accumulatorId -> metric) - } - val executionUIData = new SQLExecutionUIData( - executionId, - description, - details, - physicalPlanDescription, - physicalPlanGraph, - sqlPlanMetrics.toMap, - time) - synchronized { - activeExecutions(executionId) = executionUIData - _executionIdToData(executionId) = executionUIData - } - case SparkListenerSQLExecutionEnd(executionId, time) => synchronized { - _executionIdToData.get(executionId).foreach { executionUIData => - executionUIData.completionTime = Some(time) - if (!executionUIData.hasRunningJobs) { - // onExecutionEnd happens after all "onJobEnd"s - // So we should update the execution lists. - markExecutionFinished(executionId) - } else { - // There are some running jobs, onExecutionEnd happens before some "onJobEnd"s. - // Then we don't if the execution is successful, so let the last onJobEnd updates the - // execution lists. - } - } - } - case SparkListenerDriverAccumUpdates(executionId, accumUpdates) => synchronized { - _executionIdToData.get(executionId).foreach { executionUIData => - for ((accId, accValue) <- accumUpdates) { - executionUIData.driverAccumUpdates(accId) = accValue - } - } - } - case _ => // Ignore - } - - private def markExecutionFinished(executionId: Long): Unit = { - activeExecutions.remove(executionId).foreach { executionUIData => - if (executionUIData.isFailed) { - failedExecutions += executionUIData - trimExecutionsIfNecessary(failedExecutions) - } else { - completedExecutions += executionUIData - trimExecutionsIfNecessary(completedExecutions) - } - } - } - - def getRunningExecutions: Seq[SQLExecutionUIData] = synchronized { - activeExecutions.values.toSeq - } - - def getFailedExecutions: Seq[SQLExecutionUIData] = synchronized { - failedExecutions - } - - def getCompletedExecutions: Seq[SQLExecutionUIData] = synchronized { - completedExecutions - } - - def getExecution(executionId: Long): Option[SQLExecutionUIData] = synchronized { - _executionIdToData.get(executionId) - } - - /** - * Get all accumulator updates from all tasks which belong to this execution and merge them. - */ - def getExecutionMetrics(executionId: Long): Map[Long, String] = synchronized { - _executionIdToData.get(executionId) match { - case Some(executionUIData) => - val accumulatorUpdates = { - for (stageId <- executionUIData.stages; - stageMetrics <- _stageIdToStageMetrics.get(stageId).toIterable; - taskMetrics <- stageMetrics.taskIdToMetricUpdates.values; - accumulatorUpdate <- taskMetrics.accumulatorUpdates) yield { - (accumulatorUpdate._1, accumulatorUpdate._2) - } - } - - val driverUpdates = executionUIData.driverAccumUpdates.toSeq - val totalUpdates = (accumulatorUpdates ++ driverUpdates).filter { - case (id, _) => executionUIData.accumulatorMetrics.contains(id) - } - mergeAccumulatorUpdates(totalUpdates, accumulatorId => - executionUIData.accumulatorMetrics(accumulatorId).metricType) - case None => - // This execution has been dropped - Map.empty - } - } - - private def mergeAccumulatorUpdates( - accumulatorUpdates: Seq[(Long, Any)], - metricTypeFunc: Long => String): Map[Long, String] = { - accumulatorUpdates.groupBy(_._1).map { case (accumulatorId, values) => - val metricType = metricTypeFunc(accumulatorId) - accumulatorId -> - SQLMetrics.stringValue(metricType, values.map(_._2.asInstanceOf[Long])) - } - } - -} - - -/** - * A [[SQLListener]] for rendering the SQL UI in the history server. - */ -class SQLHistoryListener(conf: SparkConf, sparkUI: SparkUI) - extends SQLListener(conf) { - - private var sqlTabAttached = false - - override def onExecutorMetricsUpdate(u: SparkListenerExecutorMetricsUpdate): Unit = { - // Do nothing; these events are not logged - } - - override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = synchronized { - updateTaskAccumulatorValues( - taskEnd.taskInfo.taskId, - taskEnd.stageId, - taskEnd.stageAttemptId, - taskEnd.taskInfo.accumulables.flatMap { a => - // Filter out accumulators that are not SQL metrics - // For now we assume all SQL metrics are Long's that have been JSON serialized as String's - if (a.metadata == Some(AccumulatorContext.SQL_ACCUM_IDENTIFIER)) { - val newValue = a.update.map(_.toString.toLong).getOrElse(0L) - Some(a.copy(update = Some(newValue))) - } else { - None - } - }, - finishTask = true) - } - - override def onOtherEvent(event: SparkListenerEvent): Unit = event match { - case _: SparkListenerSQLExecutionStart => - if (!sqlTabAttached) { - new SQLTab(this, sparkUI) - sqlTabAttached = true - } - super.onOtherEvent(event) - case _ => super.onOtherEvent(event) - } -} - -/** - * Represent all necessary data for an execution that will be used in Web UI. - */ -private[ui] class SQLExecutionUIData( - val executionId: Long, - val description: String, - val details: String, - val physicalPlanDescription: String, - val physicalPlanGraph: SparkPlanGraph, - val accumulatorMetrics: Map[Long, SQLPlanMetric], - val submissionTime: Long) { - - var completionTime: Option[Long] = None - - val jobs: mutable.HashMap[Long, JobExecutionStatus] = mutable.HashMap.empty - - val stages: mutable.ArrayBuffer[Int] = mutable.ArrayBuffer() - - val driverAccumUpdates: mutable.HashMap[Long, Long] = mutable.HashMap.empty - - /** - * Return whether there are running jobs in this execution. - */ - def hasRunningJobs: Boolean = jobs.values.exists(_ == JobExecutionStatus.RUNNING) - - /** - * Return whether there are any failed jobs in this execution. - */ - def isFailed: Boolean = jobs.values.exists(_ == JobExecutionStatus.FAILED) - - def runningJobs: Seq[Long] = - jobs.filter { case (_, status) => status == JobExecutionStatus.RUNNING }.keys.toSeq - - def succeededJobs: Seq[Long] = - jobs.filter { case (_, status) => status == JobExecutionStatus.SUCCEEDED }.keys.toSeq - - def failedJobs: Seq[Long] = - jobs.filter { case (_, status) => status == JobExecutionStatus.FAILED }.keys.toSeq -} - -/** - * Represent a metric in a SQLPlan. - * - * Because we cannot revert our changes for an "Accumulator", we need to maintain accumulator - * updates for each task. So that if a task is retried, we can simply override the old updates with - * the new updates of the new attempt task. Since we cannot add them to accumulator, we need to use - * "AccumulatorParam" to get the aggregation value. - */ -private[ui] case class SQLPlanMetric( - name: String, - accumulatorId: Long, - metricType: String) - -/** - * Store all accumulatorUpdates for all tasks in a Spark stage. - */ -private[ui] class SQLStageMetrics( - val stageAttemptId: Long, - val taskIdToMetricUpdates: mutable.HashMap[Long, SQLTaskMetrics] = mutable.HashMap.empty) - - -// TODO Should add attemptId here when we can get it from SparkListenerExecutorMetricsUpdate -/** - * Store all accumulatorUpdates for a Spark task. - */ -private[ui] class SQLTaskMetrics( - var finished: Boolean, - var accumulatorUpdates: Seq[(Long, Any)]) http://git-wip-us.apache.org/repos/asf/spark/blob/0ffa7c48/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala index d0376af..a321a22 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.ui import org.apache.spark.internal.Logging import org.apache.spark.ui.{SparkUI, SparkUITab} -class SQLTab(val listener: SQLListener, sparkUI: SparkUI) +class SQLTab(val sqlStore: SQLAppStatusStore, sparkUI: SparkUI) extends SparkUITab(sparkUI, "SQL") with Logging { val parent = sparkUI http://git-wip-us.apache.org/repos/asf/spark/blob/0ffa7c48/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala index ad9db30..3e479fa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala @@ -32,7 +32,6 @@ import org.apache.spark.scheduler.LiveListenerBus import org.apache.spark.sql.{SparkSession, SQLContext} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.execution.CacheManager -import org.apache.spark.sql.execution.ui.{SQLListener, SQLTab} import org.apache.spark.sql.internal.StaticSQLConf._ import org.apache.spark.util.{MutableURLClassLoader, Utils} @@ -84,11 +83,6 @@ private[sql] class SharedState(val sparkContext: SparkContext) extends Logging { val cacheManager: CacheManager = new CacheManager /** - * A listener for SQL-specific [[org.apache.spark.scheduler.SparkListenerEvent]]s. - */ - val listener: SQLListener = createListenerAndUI(sparkContext) - - /** * A catalog that interacts with external systems. */ lazy val externalCatalog: ExternalCatalog = { @@ -142,19 +136,6 @@ private[sql] class SharedState(val sparkContext: SparkContext) extends Logging { val jarClassLoader = new NonClosableMutableURLClassLoader( org.apache.spark.util.Utils.getContextOrSparkClassLoader) - /** - * Create a SQLListener then add it into SparkContext, and create a SQLTab if there is SparkUI. - */ - private def createListenerAndUI(sc: SparkContext): SQLListener = { - if (SparkSession.sqlListener.get() == null) { - val listener = new SQLListener(sc.conf) - if (SparkSession.sqlListener.compareAndSet(null, listener)) { - sc.listenerBus.addToStatusQueue(listener) - sc.ui.foreach(new SQLTab(listener, _)) - } - } - SparkSession.sqlListener.get() - } } object SharedState extends Logging { http://git-wip-us.apache.org/repos/asf/spark/blob/0ffa7c48/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala index 58a194b..d588af3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala @@ -24,6 +24,7 @@ import scala.util.Random import org.apache.spark.SparkFunSuite import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.plans.logical.LocalRelation +import org.apache.spark.sql.execution.ui.SQLAppStatusStore import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext @@ -32,6 +33,13 @@ import org.apache.spark.util.{AccumulatorContext, JsonProtocol} class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with SharedSQLContext { import testImplicits._ + private def statusStore: SQLAppStatusStore = { + new SQLAppStatusStore(sparkContext.statusStore.store) + } + + private def currentExecutionIds(): Set[Long] = { + statusStore.executionsList.map(_.executionId).toSet + } /** * Generates a `DataFrame` by filling randomly generated bytes for hash collision. @@ -420,21 +428,19 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared withTempPath { file => // person creates a temporary view. get the DF before listing previous execution IDs val data = person.select('name) - sparkContext.listenerBus.waitUntilEmpty(10000) - val previousExecutionIds = spark.sharedState.listener.executionIdToData.keySet + val previousExecutionIds = currentExecutionIds() // Assume the execution plan is // PhysicalRDD(nodeId = 0) data.write.format("json").save(file.getAbsolutePath) sparkContext.listenerBus.waitUntilEmpty(10000) - val executionIds = - spark.sharedState.listener.executionIdToData.keySet.diff(previousExecutionIds) + val executionIds = currentExecutionIds().diff(previousExecutionIds) assert(executionIds.size === 1) val executionId = executionIds.head - val jobs = spark.sharedState.listener.getExecution(executionId).get.jobs + val jobs = statusStore.execution(executionId).get.jobs // Use "<=" because there is a race condition that we may miss some jobs // TODO Change "<=" to "=" once we fix the race condition that missing the JobStarted event. assert(jobs.size <= 1) - val metricValues = spark.sharedState.listener.getExecutionMetrics(executionId) + val metricValues = statusStore.executionMetrics(executionId) // Because "save" will create a new DataFrame internally, we cannot get the real metric id. // However, we still can check the value. assert(metricValues.values.toSeq.exists(_ === "2")) http://git-wip-us.apache.org/repos/asf/spark/blob/0ffa7c48/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala index 3966e98..d89c4b1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala @@ -25,7 +25,7 @@ import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} import org.apache.spark.sql.DataFrame import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.execution.SparkPlanInfo -import org.apache.spark.sql.execution.ui.SparkPlanGraph +import org.apache.spark.sql.execution.ui.{SparkPlanGraph, SQLAppStatusStore} import org.apache.spark.sql.test.SQLTestUtils import org.apache.spark.util.Utils @@ -34,6 +34,14 @@ trait SQLMetricsTestUtils extends SQLTestUtils { import testImplicits._ + private def statusStore: SQLAppStatusStore = { + new SQLAppStatusStore(sparkContext.statusStore.store) + } + + private def currentExecutionIds(): Set[Long] = { + statusStore.executionsList.map(_.executionId).toSet + } + /** * Get execution metrics for the SQL execution and verify metrics values. * @@ -41,24 +49,23 @@ trait SQLMetricsTestUtils extends SQLTestUtils { * @param func the function can produce execution id after running. */ private def verifyWriteDataMetrics(metricsValues: Seq[Int])(func: => Unit): Unit = { - val previousExecutionIds = spark.sharedState.listener.executionIdToData.keySet + val previousExecutionIds = currentExecutionIds() // Run the given function to trigger query execution. func spark.sparkContext.listenerBus.waitUntilEmpty(10000) - val executionIds = - spark.sharedState.listener.executionIdToData.keySet.diff(previousExecutionIds) + val executionIds = currentExecutionIds().diff(previousExecutionIds) assert(executionIds.size == 1) val executionId = executionIds.head - val executionData = spark.sharedState.listener.getExecution(executionId).get - val executedNode = executionData.physicalPlanGraph.nodes.head + val executionData = statusStore.execution(executionId).get + val executedNode = statusStore.planGraph(executionId).nodes.head val metricsNames = Seq( "number of written files", "number of dynamic part", "number of output rows") - val metrics = spark.sharedState.listener.getExecutionMetrics(executionId) + val metrics = statusStore.executionMetrics(executionId) metricsNames.zip(metricsValues).foreach { case (metricsName, expected) => val sqlMetric = executedNode.metrics.find(_.name == metricsName) @@ -134,22 +141,21 @@ trait SQLMetricsTestUtils extends SQLTestUtils { expectedNumOfJobs: Int, expectedNodeIds: Set[Long], enableWholeStage: Boolean = false): Option[Map[Long, (String, Map[String, Any])]] = { - val previousExecutionIds = spark.sharedState.listener.executionIdToData.keySet + val previousExecutionIds = currentExecutionIds() withSQLConf("spark.sql.codegen.wholeStage" -> enableWholeStage.toString) { df.collect() } sparkContext.listenerBus.waitUntilEmpty(10000) - val executionIds = - spark.sharedState.listener.executionIdToData.keySet.diff(previousExecutionIds) + val executionIds = currentExecutionIds().diff(previousExecutionIds) assert(executionIds.size === 1) val executionId = executionIds.head - val jobs = spark.sharedState.listener.getExecution(executionId).get.jobs + val jobs = statusStore.execution(executionId).get.jobs // Use "<=" because there is a race condition that we may miss some jobs // TODO Change it to "=" once we fix the race condition that missing the JobStarted event. assert(jobs.size <= expectedNumOfJobs) if (jobs.size == expectedNumOfJobs) { // If we can track all jobs, check the metric values - val metricValues = spark.sharedState.listener.getExecutionMetrics(executionId) + val metricValues = statusStore.executionMetrics(executionId) val metrics = SparkPlanGraph(SparkPlanInfo.fromSparkPlan( df.queryExecution.executedPlan)).allNodes.filter { node => expectedNodeIds.contains(node.id) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org