[SPARK-20652][SQL] Store SQL UI data in the new app status store.

This change replaces the SQLListener with a new implementation that
saves the data to the same store used by the SparkContext's status
store. For that, the types used by the old SQLListener had to be
updated a bit so that they're more serialization-friendly.

The interface for getting data from the store was abstracted into
a new class, SQLAppStatusStore (following the convention used in
core).

Another change is the way that the SQL UI hooks up into the core
UI or the SHS. The old "SparkHistoryListenerFactory" was replaced
with a new "AppStatePlugin" that more explicitly differentiates
between the two use cases: processing events, and showing the UI.
Both live apps and the SHS use this new API (previously, it was
restricted to the SHS).

Note on the above: this causes a slight change of behavior for
live apps; the SQL tab will only show up after the first execution
is started.

The metrics gathering code was re-worked a bit so that the types
used are less memory hungry and more serialization-friendly. This
reduces memory usage when using in-memory stores, and reduces load
times when using disk stores.

Tested with existing and added unit tests. Note one unit test was
disabled because it depends on SPARK-20653, which isn't in yet.

Author: Marcelo Vanzin <van...@cloudera.com>

Closes #19681 from vanzin/SPARK-20652.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0ffa7c48
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0ffa7c48
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0ffa7c48

Branch: refs/heads/master
Commit: 0ffa7c488fa8156e2a1aa282e60b7c36b86d8af8
Parents: 4741c07
Author: Marcelo Vanzin <van...@cloudera.com>
Authored: Tue Nov 14 15:28:22 2017 -0600
Committer: Imran Rashid <iras...@cloudera.com>
Committed: Tue Nov 14 15:28:22 2017 -0600

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   |  15 +-
 .../deploy/history/FsHistoryProvider.scala      |  12 +-
 .../apache/spark/scheduler/SparkListener.scala  |  12 -
 .../apache/spark/status/AppStatusPlugin.scala   |  71 ++++
 .../apache/spark/status/AppStatusStore.scala    |   8 +-
 ....spark.scheduler.SparkHistoryListenerFactory |   1 -
 .../org.apache.spark.status.AppStatusPlugin     |   1 +
 .../org/apache/spark/sql/SparkSession.scala     |   5 -
 .../sql/execution/ui/AllExecutionsPage.scala    |  86 ++--
 .../spark/sql/execution/ui/ExecutionPage.scala  |  60 ++-
 .../sql/execution/ui/SQLAppStatusListener.scala | 366 +++++++++++++++++
 .../sql/execution/ui/SQLAppStatusStore.scala    | 179 ++++++++
 .../spark/sql/execution/ui/SQLListener.scala    | 403 +------------------
 .../apache/spark/sql/execution/ui/SQLTab.scala  |   2 +-
 .../apache/spark/sql/internal/SharedState.scala |  19 -
 .../sql/execution/metric/SQLMetricsSuite.scala  |  18 +-
 .../execution/metric/SQLMetricsTestUtils.scala  |  30 +-
 .../sql/execution/ui/SQLListenerSuite.scala     | 340 ++++++++--------
 .../spark/sql/test/SharedSparkSession.scala     |   1 -
 19 files changed, 920 insertions(+), 709 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0ffa7c48/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 1d325e6..23fd54f 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -54,7 +54,7 @@ import org.apache.spark.rpc.RpcEndpointRef
 import org.apache.spark.scheduler._
 import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, 
StandaloneSchedulerBackend}
 import org.apache.spark.scheduler.local.LocalSchedulerBackend
-import org.apache.spark.status.AppStatusStore
+import org.apache.spark.status.{AppStatusPlugin, AppStatusStore}
 import org.apache.spark.storage._
 import org.apache.spark.storage.BlockManagerMessages.TriggerThreadDump
 import org.apache.spark.ui.{ConsoleProgressBar, SparkUI}
@@ -246,6 +246,8 @@ class SparkContext(config: SparkConf) extends Logging {
    */
   def isStopped: Boolean = stopped.get()
 
+  private[spark] def statusStore: AppStatusStore = _statusStore
+
   // An asynchronous listener bus for Spark events
   private[spark] def listenerBus: LiveListenerBus = _listenerBus
 
@@ -455,9 +457,14 @@ class SparkContext(config: SparkConf) extends Logging {
         // For tests, do not enable the UI
         None
       }
-    // Bind the UI before starting the task scheduler to communicate
-    // the bound port to the cluster manager properly
-    _ui.foreach(_.bind())
+    _ui.foreach { ui =>
+      // Load any plugins that might want to modify the UI.
+      AppStatusPlugin.loadPlugins().foreach(_.setupUI(ui))
+
+      // Bind the UI before starting the task scheduler to communicate
+      // the bound port to the cluster manager properly
+      ui.bind()
+    }
 
     _hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(_conf)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/0ffa7c48/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index a6dc533..25f82b5 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -41,7 +41,7 @@ import org.apache.spark.deploy.history.config._
 import org.apache.spark.internal.Logging
 import org.apache.spark.scheduler._
 import org.apache.spark.scheduler.ReplayListenerBus._
-import org.apache.spark.status.{AppStatusListener, AppStatusStore, 
AppStatusStoreMetadata, KVUtils}
+import org.apache.spark.status._
 import org.apache.spark.status.KVUtils._
 import org.apache.spark.status.api.v1
 import org.apache.spark.ui.SparkUI
@@ -319,6 +319,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
       val _listener = new AppStatusListener(kvstore, conf, false,
         lastUpdateTime = Some(attempt.info.lastUpdated.getTime()))
       replayBus.addListener(_listener)
+      AppStatusPlugin.loadPlugins().foreach { plugin =>
+        plugin.setupListeners(conf, kvstore, l => replayBus.addListener(l), 
false)
+      }
       Some(_listener)
     } else {
       None
@@ -333,11 +336,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
     }
 
     try {
-      val listenerFactories = 
ServiceLoader.load(classOf[SparkHistoryListenerFactory],
-        Utils.getContextOrSparkClassLoader).asScala
-      listenerFactories.foreach { listenerFactory =>
-        val listeners = listenerFactory.createListeners(conf, loadedUI.ui)
-        listeners.foreach(replayBus.addListener)
+      AppStatusPlugin.loadPlugins().foreach { plugin =>
+        plugin.setupUI(loadedUI.ui)
       }
 
       val fileStatus = fs.getFileStatus(new Path(logDir, attempt.logPath))

http://git-wip-us.apache.org/repos/asf/spark/blob/0ffa7c48/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala 
b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index b76e560..3b677ca 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -168,18 +168,6 @@ case class SparkListenerApplicationEnd(time: Long) extends 
SparkListenerEvent
 case class SparkListenerLogStart(sparkVersion: String) extends 
SparkListenerEvent
 
 /**
- * Interface for creating history listeners defined in other modules like SQL, 
which are used to
- * rebuild the history UI.
- */
-private[spark] trait SparkHistoryListenerFactory {
-  /**
-   * Create listeners used to rebuild the history UI.
-   */
-  def createListeners(conf: SparkConf, sparkUI: SparkUI): Seq[SparkListener]
-}
-
-
-/**
  * Interface for listening to events from the Spark scheduler. Most 
applications should probably
  * extend SparkListener or SparkFirehoseListener directly, rather than 
implementing this class.
  *

http://git-wip-us.apache.org/repos/asf/spark/blob/0ffa7c48/core/src/main/scala/org/apache/spark/status/AppStatusPlugin.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusPlugin.scala 
b/core/src/main/scala/org/apache/spark/status/AppStatusPlugin.scala
new file mode 100644
index 0000000..69ca02e
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/status/AppStatusPlugin.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status
+
+import java.util.ServiceLoader
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.SparkConf
+import org.apache.spark.scheduler.SparkListener
+import org.apache.spark.ui.SparkUI
+import org.apache.spark.util.Utils
+import org.apache.spark.util.kvstore.KVStore
+
+/**
+ * An interface that defines plugins for collecting and storing application 
state.
+ *
+ * The plugin implementations are invoked for both live and replayed 
applications. For live
+ * applications, it's recommended that plugins defer creation of UI tabs until 
there's actual
+ * data to be shown.
+ */
+private[spark] trait AppStatusPlugin {
+
+  /**
+   * Install listeners to collect data about the running application and 
populate the given
+   * store.
+   *
+   * @param conf The Spark configuration.
+   * @param store The KVStore where to keep application data.
+   * @param addListenerFn Function to register listeners with a bus.
+   * @param live Whether this is a live application (or an application being 
replayed by the
+   *             HistoryServer).
+   */
+  def setupListeners(
+      conf: SparkConf,
+      store: KVStore,
+      addListenerFn: SparkListener => Unit,
+      live: Boolean): Unit
+
+  /**
+   * Install any needed extensions (tabs, pages, etc) to a Spark UI. The 
plugin can detect whether
+   * the app is live or replayed by looking at the UI's SparkContext field 
`sc`.
+   *
+   * @param ui The Spark UI instance for the application.
+   */
+  def setupUI(ui: SparkUI): Unit
+
+}
+
+private[spark] object AppStatusPlugin {
+
+  def loadPlugins(): Iterable[AppStatusPlugin] = {
+    ServiceLoader.load(classOf[AppStatusPlugin], 
Utils.getContextOrSparkClassLoader).asScala
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/0ffa7c48/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala 
b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
index 9b42f55..d0615e5 100644
--- a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
+++ b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
@@ -32,7 +32,7 @@ import org.apache.spark.util.kvstore.{InMemoryStore, KVStore}
 /**
  * A wrapper around a KVStore that provides methods for accessing the API data 
stored within.
  */
-private[spark] class AppStatusStore(store: KVStore) {
+private[spark] class AppStatusStore(val store: KVStore) {
 
   def applicationInfo(): v1.ApplicationInfo = {
     store.view(classOf[ApplicationInfoWrapper]).max(1).iterator().next().info
@@ -338,9 +338,11 @@ private[spark] object AppStatusStore {
    */
   def createLiveStore(conf: SparkConf, addListenerFn: SparkListener => Unit): 
AppStatusStore = {
     val store = new InMemoryStore()
-    val stateStore = new AppStatusStore(store)
     addListenerFn(new AppStatusListener(store, conf, true))
-    stateStore
+    AppStatusPlugin.loadPlugins().foreach { p =>
+      p.setupListeners(conf, store, addListenerFn, true)
+    }
+    new AppStatusStore(store)
   }
 
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/0ffa7c48/sql/core/src/main/resources/META-INF/services/org.apache.spark.scheduler.SparkHistoryListenerFactory
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/resources/META-INF/services/org.apache.spark.scheduler.SparkHistoryListenerFactory
 
b/sql/core/src/main/resources/META-INF/services/org.apache.spark.scheduler.SparkHistoryListenerFactory
deleted file mode 100644
index 507100b..0000000
--- 
a/sql/core/src/main/resources/META-INF/services/org.apache.spark.scheduler.SparkHistoryListenerFactory
+++ /dev/null
@@ -1 +0,0 @@
-org.apache.spark.sql.execution.ui.SQLHistoryListenerFactory

http://git-wip-us.apache.org/repos/asf/spark/blob/0ffa7c48/sql/core/src/main/resources/META-INF/services/org.apache.spark.status.AppStatusPlugin
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/resources/META-INF/services/org.apache.spark.status.AppStatusPlugin
 
b/sql/core/src/main/resources/META-INF/services/org.apache.spark.status.AppStatusPlugin
new file mode 100644
index 0000000..ac6d7f6
--- /dev/null
+++ 
b/sql/core/src/main/resources/META-INF/services/org.apache.spark.status.AppStatusPlugin
@@ -0,0 +1 @@
+org.apache.spark.sql.execution.ui.SQLAppStatusPlugin

http://git-wip-us.apache.org/repos/asf/spark/blob/0ffa7c48/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
index 2821f5e..272eb84 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -38,7 +38,6 @@ import 
org.apache.spark.sql.catalyst.expressions.AttributeReference
 import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, Range}
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.datasources.LogicalRelation
-import org.apache.spark.sql.execution.ui.SQLListener
 import org.apache.spark.sql.internal._
 import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
 import org.apache.spark.sql.sources.BaseRelation
@@ -957,7 +956,6 @@ object SparkSession {
         sparkContext.addSparkListener(new SparkListener {
           override def onApplicationEnd(applicationEnd: 
SparkListenerApplicationEnd): Unit = {
             defaultSession.set(null)
-            sqlListener.set(null)
           }
         })
       }
@@ -1026,9 +1024,6 @@ object SparkSession {
    */
   def getDefaultSession: Option[SparkSession] = Option(defaultSession.get)
 
-  /** A global SQL listener used for the SQL UI. */
-  private[sql] val sqlListener = new AtomicReference[SQLListener]()
-
   
////////////////////////////////////////////////////////////////////////////////////////
   // Private methods from now on
   
////////////////////////////////////////////////////////////////////////////////////////

http://git-wip-us.apache.org/repos/asf/spark/blob/0ffa7c48/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala
index f9c6986..7019d98 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala
@@ -24,34 +24,54 @@ import scala.xml.{Node, NodeSeq}
 
 import org.apache.commons.lang3.StringEscapeUtils
 
+import org.apache.spark.JobExecutionStatus
 import org.apache.spark.internal.Logging
 import org.apache.spark.ui.{UIUtils, WebUIPage}
 
 private[ui] class AllExecutionsPage(parent: SQLTab) extends WebUIPage("") with 
Logging {
 
-  private val listener = parent.listener
+  private val sqlStore = parent.sqlStore
 
   override def render(request: HttpServletRequest): Seq[Node] = {
     val currentTime = System.currentTimeMillis()
-    val content = listener.synchronized {
+    val running = new mutable.ArrayBuffer[SQLExecutionUIData]()
+    val completed = new mutable.ArrayBuffer[SQLExecutionUIData]()
+    val failed = new mutable.ArrayBuffer[SQLExecutionUIData]()
+
+    sqlStore.executionsList().foreach { e =>
+      val isRunning = e.jobs.exists { case (_, status) => status == 
JobExecutionStatus.RUNNING }
+      val isFailed = e.jobs.exists { case (_, status) => status == 
JobExecutionStatus.FAILED }
+      if (isRunning) {
+        running += e
+      } else if (isFailed) {
+        failed += e
+      } else {
+        completed += e
+      }
+    }
+
+    val content = {
       val _content = mutable.ListBuffer[Node]()
-      if (listener.getRunningExecutions.nonEmpty) {
+
+      if (running.nonEmpty) {
         _content ++=
           new RunningExecutionTable(
-            parent, s"Running Queries 
(${listener.getRunningExecutions.size})", currentTime,
-            
listener.getRunningExecutions.sortBy(_.submissionTime).reverse).toNodeSeq
+            parent, s"Running Queries (${running.size})", currentTime,
+            running.sortBy(_.submissionTime).reverse).toNodeSeq
       }
-      if (listener.getCompletedExecutions.nonEmpty) {
+
+      if (completed.nonEmpty) {
         _content ++=
           new CompletedExecutionTable(
-            parent, s"Completed Queries 
(${listener.getCompletedExecutions.size})", currentTime,
-            
listener.getCompletedExecutions.sortBy(_.submissionTime).reverse).toNodeSeq
+            parent, s"Completed Queries (${completed.size})", currentTime,
+            completed.sortBy(_.submissionTime).reverse).toNodeSeq
       }
-      if (listener.getFailedExecutions.nonEmpty) {
+
+      if (failed.nonEmpty) {
         _content ++=
           new FailedExecutionTable(
-            parent, s"Failed Queries (${listener.getFailedExecutions.size})", 
currentTime,
-            
listener.getFailedExecutions.sortBy(_.submissionTime).reverse).toNodeSeq
+            parent, s"Failed Queries (${failed.size})", currentTime,
+            failed.sortBy(_.submissionTime).reverse).toNodeSeq
       }
       _content
     }
@@ -65,26 +85,26 @@ private[ui] class AllExecutionsPage(parent: SQLTab) extends 
WebUIPage("") with L
       <div>
         <ul class="unstyled">
           {
-            if (listener.getRunningExecutions.nonEmpty) {
+            if (running.nonEmpty) {
               <li>
                 <a href="#running-execution-table"><strong>Running 
Queries:</strong></a>
-                {listener.getRunningExecutions.size}
+                {running.size}
               </li>
             }
           }
           {
-            if (listener.getCompletedExecutions.nonEmpty) {
+            if (completed.nonEmpty) {
               <li>
                 <a href="#completed-execution-table"><strong>Completed 
Queries:</strong></a>
-                {listener.getCompletedExecutions.size}
+                {completed.size}
               </li>
             }
           }
           {
-            if (listener.getFailedExecutions.nonEmpty) {
+            if (failed.nonEmpty) {
               <li>
                 <a href="#failed-execution-table"><strong>Failed 
Queries:</strong></a>
-                {listener.getFailedExecutions.size}
+                {failed.size}
               </li>
             }
           }
@@ -114,23 +134,19 @@ private[ui] abstract class ExecutionTable(
 
   protected def row(currentTime: Long, executionUIData: SQLExecutionUIData): 
Seq[Node] = {
     val submissionTime = executionUIData.submissionTime
-    val duration = executionUIData.completionTime.getOrElse(currentTime) - 
submissionTime
+    val duration = 
executionUIData.completionTime.map(_.getTime()).getOrElse(currentTime) -
+      submissionTime
 
-    val runningJobs = executionUIData.runningJobs.map { jobId =>
-      <a href={jobURL(jobId)}>
-        [{jobId.toString}]
-      </a>
-    }
-    val succeededJobs = executionUIData.succeededJobs.sorted.map { jobId =>
-      <a href={jobURL(jobId)}>
-        [{jobId.toString}]
-      </a>
-    }
-    val failedJobs = executionUIData.failedJobs.sorted.map { jobId =>
-      <a href={jobURL(jobId)}>
-        [{jobId.toString}]
-      </a>
+    def jobLinks(status: JobExecutionStatus): Seq[Node] = {
+      executionUIData.jobs.flatMap { case (jobId, jobStatus) =>
+        if (jobStatus == status) {
+          <a href={jobURL(jobId)}>[{jobId.toString}]</a>
+        } else {
+          None
+        }
+      }.toSeq
     }
+
     <tr>
       <td>
         {executionUIData.executionId.toString}
@@ -146,17 +162,17 @@ private[ui] abstract class ExecutionTable(
       </td>
       {if (showRunningJobs) {
         <td>
-          {runningJobs}
+          {jobLinks(JobExecutionStatus.RUNNING)}
         </td>
       }}
       {if (showSucceededJobs) {
         <td>
-          {succeededJobs}
+          {jobLinks(JobExecutionStatus.SUCCEEDED)}
         </td>
       }}
       {if (showFailedJobs) {
         <td>
-          {failedJobs}
+          {jobLinks(JobExecutionStatus.FAILED)}
         </td>
       }}
     </tr>

http://git-wip-us.apache.org/repos/asf/spark/blob/0ffa7c48/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala
index 460fc94..f29e135 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala
@@ -21,24 +21,42 @@ import javax.servlet.http.HttpServletRequest
 
 import scala.xml.Node
 
+import org.apache.spark.JobExecutionStatus
 import org.apache.spark.internal.Logging
 import org.apache.spark.ui.{UIUtils, WebUIPage}
 
 class ExecutionPage(parent: SQLTab) extends WebUIPage("execution") with 
Logging {
 
-  private val listener = parent.listener
+  private val sqlStore = parent.sqlStore
 
-  override def render(request: HttpServletRequest): Seq[Node] = 
listener.synchronized {
+  override def render(request: HttpServletRequest): Seq[Node] = {
     // stripXSS is called first to remove suspicious characters used in XSS 
attacks
     val parameterExecutionId = UIUtils.stripXSS(request.getParameter("id"))
     require(parameterExecutionId != null && parameterExecutionId.nonEmpty,
       "Missing execution id parameter")
 
     val executionId = parameterExecutionId.toLong
-    val content = listener.getExecution(executionId).map { executionUIData =>
+    val content = sqlStore.execution(executionId).map { executionUIData =>
       val currentTime = System.currentTimeMillis()
-      val duration =
-        executionUIData.completionTime.getOrElse(currentTime) - 
executionUIData.submissionTime
+      val duration = 
executionUIData.completionTime.map(_.getTime()).getOrElse(currentTime) -
+        executionUIData.submissionTime
+
+      def jobLinks(status: JobExecutionStatus, label: String): Seq[Node] = {
+        val jobs = executionUIData.jobs.flatMap { case (jobId, jobStatus) =>
+          if (jobStatus == status) Some(jobId) else None
+        }
+        if (jobs.nonEmpty) {
+          <li>
+            <strong>{label} </strong>
+            {jobs.toSeq.sorted.map { jobId =>
+              <a 
href={jobURL(jobId.intValue())}>{jobId.toString}</a><span>&nbsp;</span>
+            }}
+          </li>
+        } else {
+          Nil
+        }
+      }
+
 
       val summary =
         <div>
@@ -49,37 +67,17 @@ class ExecutionPage(parent: SQLTab) extends 
WebUIPage("execution") with Logging
             <li>
               <strong>Duration: </strong>{UIUtils.formatDuration(duration)}
             </li>
-            {if (executionUIData.runningJobs.nonEmpty) {
-              <li>
-                <strong>Running Jobs: </strong>
-                {executionUIData.runningJobs.sorted.map { jobId =>
-                <a href={jobURL(jobId)}>{jobId.toString}</a><span>&nbsp;</span>
-              }}
-              </li>
-            }}
-            {if (executionUIData.succeededJobs.nonEmpty) {
-              <li>
-                <strong>Succeeded Jobs: </strong>
-                {executionUIData.succeededJobs.sorted.map { jobId =>
-                  <a 
href={jobURL(jobId)}>{jobId.toString}</a><span>&nbsp;</span>
-                }}
-              </li>
-            }}
-            {if (executionUIData.failedJobs.nonEmpty) {
-              <li>
-                <strong>Failed Jobs: </strong>
-                {executionUIData.failedJobs.sorted.map { jobId =>
-                  <a 
href={jobURL(jobId)}>{jobId.toString}</a><span>&nbsp;</span>
-                }}
-              </li>
-            }}
+            {jobLinks(JobExecutionStatus.RUNNING, "Running Jobs:")}
+            {jobLinks(JobExecutionStatus.SUCCEEDED, "Succeeded Jobs:")}
+            {jobLinks(JobExecutionStatus.FAILED, "Failed Jobs:")}
           </ul>
         </div>
 
-      val metrics = listener.getExecutionMetrics(executionId)
+      val metrics = sqlStore.executionMetrics(executionId)
+      val graph = sqlStore.planGraph(executionId)
 
       summary ++
-        planVisualization(metrics, executionUIData.physicalPlanGraph) ++
+        planVisualization(metrics, graph) ++
         physicalPlanDescription(executionUIData.physicalPlanDescription)
     }.getOrElse {
       <div>No information to display for Plan {executionId}</div>

http://git-wip-us.apache.org/repos/asf/spark/blob/0ffa7c48/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
new file mode 100644
index 0000000..43cec48
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.ui
+
+import java.util.Date
+import java.util.concurrent.ConcurrentHashMap
+import java.util.function.Function
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.{JobExecutionStatus, SparkConf}
+import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler._
+import org.apache.spark.sql.execution.SQLExecution
+import org.apache.spark.sql.execution.metric._
+import org.apache.spark.status.LiveEntity
+import org.apache.spark.status.config._
+import org.apache.spark.ui.SparkUI
+import org.apache.spark.util.kvstore.KVStore
+
+private[sql] class SQLAppStatusListener(
+    conf: SparkConf,
+    kvstore: KVStore,
+    live: Boolean,
+    ui: Option[SparkUI] = None)
+  extends SparkListener with Logging {
+
+  // How often to flush intermediate state of a live execution to the store. 
When replaying logs,
+  // never flush (only do the very last write).
+  private val liveUpdatePeriodNs = if (live) 
conf.get(LIVE_ENTITY_UPDATE_PERIOD) else -1L
+
+  // Live tracked data is needed by the SQL status store to calculate metrics 
for in-flight
+  // executions; that means arbitrary threads may be querying these maps, so 
they need to be
+  // thread-safe.
+  private val liveExecutions = new ConcurrentHashMap[Long, LiveExecutionData]()
+  private val stageMetrics = new ConcurrentHashMap[Int, LiveStageMetrics]()
+
+  private var uiInitialized = false
+
+  override def onJobStart(event: SparkListenerJobStart): Unit = {
+    val executionIdString = 
event.properties.getProperty(SQLExecution.EXECUTION_ID_KEY)
+    if (executionIdString == null) {
+      // This is not a job created by SQL
+      return
+    }
+
+    val executionId = executionIdString.toLong
+    val jobId = event.jobId
+    val exec = getOrCreateExecution(executionId)
+
+    // Record the accumulator IDs for the stages of this job, so that the code 
that keeps
+    // track of the metrics knows which accumulators to look at.
+    val accumIds = exec.metrics.map(_.accumulatorId).sorted.toList
+    event.stageIds.foreach { id =>
+      stageMetrics.put(id, new LiveStageMetrics(id, 0, accumIds.toArray, new 
ConcurrentHashMap()))
+    }
+
+    exec.jobs = exec.jobs + (jobId -> JobExecutionStatus.RUNNING)
+    exec.stages = event.stageIds.toSet
+    update(exec)
+  }
+
+  override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit = {
+    if (!isSQLStage(event.stageInfo.stageId)) {
+      return
+    }
+
+    // Reset the metrics tracking object for the new attempt.
+    Option(stageMetrics.get(event.stageInfo.stageId)).foreach { metrics =>
+      metrics.taskMetrics.clear()
+      metrics.attemptId = event.stageInfo.attemptId
+    }
+  }
+
+  override def onJobEnd(event: SparkListenerJobEnd): Unit = {
+    liveExecutions.values().asScala.foreach { exec =>
+      if (exec.jobs.contains(event.jobId)) {
+        val result = event.jobResult match {
+          case JobSucceeded => JobExecutionStatus.SUCCEEDED
+          case _ => JobExecutionStatus.FAILED
+        }
+        exec.jobs = exec.jobs + (event.jobId -> result)
+        exec.endEvents += 1
+        update(exec)
+      }
+    }
+  }
+
+  override def onExecutorMetricsUpdate(event: 
SparkListenerExecutorMetricsUpdate): Unit = {
+    event.accumUpdates.foreach { case (taskId, stageId, attemptId, 
accumUpdates) =>
+      updateStageMetrics(stageId, attemptId, taskId, accumUpdates, false)
+    }
+  }
+
+  override def onTaskEnd(event: SparkListenerTaskEnd): Unit = {
+    if (!isSQLStage(event.stageId)) {
+      return
+    }
+
+    val info = event.taskInfo
+    // SPARK-20342. If processing events from a live application, use the task 
metrics info to
+    // work around a race in the DAGScheduler. The metrics info does not 
contain accumulator info
+    // when reading event logs in the SHS, so we have to rely on the 
accumulator in that case.
+    val accums = if (live && event.taskMetrics != null) {
+      event.taskMetrics.externalAccums.flatMap { a =>
+        // This call may fail if the accumulator is gc'ed, so account for that.
+        try {
+          Some(a.toInfo(Some(a.value), None))
+        } catch {
+          case _: IllegalAccessError => None
+        }
+      }
+    } else {
+      info.accumulables
+    }
+    updateStageMetrics(event.stageId, event.stageAttemptId, info.taskId, 
accums,
+      info.successful)
+  }
+
+  def liveExecutionMetrics(executionId: Long): Option[Map[Long, String]] = {
+    Option(liveExecutions.get(executionId)).map { exec =>
+      if (exec.metricsValues != null) {
+        exec.metricsValues
+      } else {
+        aggregateMetrics(exec)
+      }
+    }
+  }
+
+  private def aggregateMetrics(exec: LiveExecutionData): Map[Long, String] = {
+    val metricIds = exec.metrics.map(_.accumulatorId).sorted
+    val metricTypes = exec.metrics.map { m => (m.accumulatorId, m.metricType) 
}.toMap
+    val metrics = exec.stages.toSeq
+      .flatMap { stageId => Option(stageMetrics.get(stageId)) }
+      .flatMap(_.taskMetrics.values().asScala)
+      .flatMap { metrics => metrics.ids.zip(metrics.values) }
+
+    val aggregatedMetrics = (metrics ++ exec.driverAccumUpdates.toSeq)
+      .filter { case (id, _) => metricIds.contains(id) }
+      .groupBy(_._1)
+      .map { case (id, values) =>
+        id -> SQLMetrics.stringValue(metricTypes(id), values.map(_._2).toSeq)
+      }
+
+    // Check the execution again for whether the aggregated metrics data has 
been calculated.
+    // This can happen if the UI is requesting this data, and the 
onExecutionEnd handler is
+    // running at the same time. The metrics calculcated for the UI can be 
innacurate in that
+    // case, since the onExecutionEnd handler will clean up tracked stage 
metrics.
+    if (exec.metricsValues != null) {
+      exec.metricsValues
+    } else {
+      aggregatedMetrics
+    }
+  }
+
+  private def updateStageMetrics(
+      stageId: Int,
+      attemptId: Int,
+      taskId: Long,
+      accumUpdates: Seq[AccumulableInfo],
+      succeeded: Boolean): Unit = {
+    Option(stageMetrics.get(stageId)).foreach { metrics =>
+      if (metrics.attemptId != attemptId || metrics.accumulatorIds.isEmpty) {
+        return
+      }
+
+      val oldTaskMetrics = metrics.taskMetrics.get(taskId)
+      if (oldTaskMetrics != null && oldTaskMetrics.succeeded) {
+        return
+      }
+
+      val updates = accumUpdates
+        .filter { acc => acc.update.isDefined && 
metrics.accumulatorIds.contains(acc.id) }
+        .sortBy(_.id)
+
+      if (updates.isEmpty) {
+        return
+      }
+
+      val ids = new Array[Long](updates.size)
+      val values = new Array[Long](updates.size)
+      updates.zipWithIndex.foreach { case (acc, idx) =>
+        ids(idx) = acc.id
+        // In a live application, accumulators have Long values, but when 
reading from event
+        // logs, they have String values. For now, assume all accumulators are 
Long and covert
+        // accordingly.
+        values(idx) = acc.update.get match {
+          case s: String => s.toLong
+          case l: Long => l
+          case o => throw new IllegalArgumentException(s"Unexpected: $o")
+        }
+      }
+
+      // TODO: storing metrics by task ID can cause metrics for the same task 
index to be
+      // counted multiple times, for example due to speculation or re-attempts.
+      metrics.taskMetrics.put(taskId, new LiveTaskMetrics(ids, values, 
succeeded))
+    }
+  }
+
+  private def onExecutionStart(event: SparkListenerSQLExecutionStart): Unit = {
+    // Install the SQL tab in a live app if it hasn't been initialized yet.
+    if (!uiInitialized) {
+      ui.foreach { _ui =>
+        new SQLTab(new SQLAppStatusStore(kvstore, Some(this)), _ui)
+      }
+      uiInitialized = true
+    }
+
+    val SparkListenerSQLExecutionStart(executionId, description, details,
+      physicalPlanDescription, sparkPlanInfo, time) = event
+
+    def toStoredNodes(nodes: Seq[SparkPlanGraphNode]): 
Seq[SparkPlanGraphNodeWrapper] = {
+      nodes.map {
+        case cluster: SparkPlanGraphCluster =>
+          val storedCluster = new SparkPlanGraphClusterWrapper(
+            cluster.id,
+            cluster.name,
+            cluster.desc,
+            toStoredNodes(cluster.nodes),
+            cluster.metrics)
+          new SparkPlanGraphNodeWrapper(null, storedCluster)
+
+        case node =>
+          new SparkPlanGraphNodeWrapper(node, null)
+      }
+    }
+
+    val planGraph = SparkPlanGraph(sparkPlanInfo)
+    val sqlPlanMetrics = planGraph.allNodes.flatMap { node =>
+      node.metrics.map { metric => (metric.accumulatorId, metric) }
+    }.toMap.values.toList
+
+    val graphToStore = new SparkPlanGraphWrapper(
+      executionId,
+      toStoredNodes(planGraph.nodes),
+      planGraph.edges)
+    kvstore.write(graphToStore)
+
+    val exec = getOrCreateExecution(executionId)
+    exec.description = description
+    exec.details = details
+    exec.physicalPlanDescription = physicalPlanDescription
+    exec.metrics = sqlPlanMetrics
+    exec.submissionTime = time
+    update(exec)
+  }
+
+  private def onExecutionEnd(event: SparkListenerSQLExecutionEnd): Unit = {
+    val SparkListenerSQLExecutionEnd(executionId, time) = event
+    Option(liveExecutions.get(executionId)).foreach { exec =>
+      exec.metricsValues = aggregateMetrics(exec)
+      exec.completionTime = Some(new Date(time))
+      exec.endEvents += 1
+      update(exec)
+
+      // Remove stale LiveStageMetrics objects for stages that are not active 
anymore.
+      val activeStages = liveExecutions.values().asScala.flatMap { other =>
+        if (other != exec) other.stages else Nil
+      }.toSet
+      stageMetrics.keySet().asScala
+        .filter(!activeStages.contains(_))
+        .foreach(stageMetrics.remove)
+    }
+  }
+
+  private def onDriverAccumUpdates(event: SparkListenerDriverAccumUpdates): 
Unit = {
+    val SparkListenerDriverAccumUpdates(executionId, accumUpdates) = event
+    Option(liveExecutions.get(executionId)).foreach { exec =>
+      exec.driverAccumUpdates = accumUpdates.toMap
+      update(exec)
+    }
+  }
+
+  override def onOtherEvent(event: SparkListenerEvent): Unit = event match {
+    case e: SparkListenerSQLExecutionStart => onExecutionStart(e)
+    case e: SparkListenerSQLExecutionEnd => onExecutionEnd(e)
+    case e: SparkListenerDriverAccumUpdates => onDriverAccumUpdates(e)
+    case _ => // Ignore
+  }
+
+  private def getOrCreateExecution(executionId: Long): LiveExecutionData = {
+    liveExecutions.computeIfAbsent(executionId,
+      new Function[Long, LiveExecutionData]() {
+        override def apply(key: Long): LiveExecutionData = new 
LiveExecutionData(executionId)
+      })
+  }
+
+  private def update(exec: LiveExecutionData): Unit = {
+    val now = System.nanoTime()
+    if (exec.endEvents >= exec.jobs.size + 1) {
+      exec.write(kvstore, now)
+      liveExecutions.remove(exec.executionId)
+    } else if (liveUpdatePeriodNs >= 0) {
+      if (now - exec.lastWriteTime > liveUpdatePeriodNs) {
+        exec.write(kvstore, now)
+      }
+    }
+  }
+
+  private def isSQLStage(stageId: Int): Boolean = {
+    liveExecutions.values().asScala.exists { exec =>
+      exec.stages.contains(stageId)
+    }
+  }
+
+}
+
+private class LiveExecutionData(val executionId: Long) extends LiveEntity {
+
+  var description: String = null
+  var details: String = null
+  var physicalPlanDescription: String = null
+  var metrics = Seq[SQLPlanMetric]()
+  var submissionTime = -1L
+  var completionTime: Option[Date] = None
+
+  var jobs = Map[Int, JobExecutionStatus]()
+  var stages = Set[Int]()
+  var driverAccumUpdates = Map[Long, Long]()
+
+  @volatile var metricsValues: Map[Long, String] = null
+
+  // Just in case job end and execution end arrive out of order, keep track of 
how many
+  // end events arrived so that the listener can stop tracking the execution.
+  var endEvents = 0
+
+  override protected def doUpdate(): Any = {
+    new SQLExecutionUIData(
+      executionId,
+      description,
+      details,
+      physicalPlanDescription,
+      metrics,
+      submissionTime,
+      completionTime,
+      jobs,
+      stages,
+      metricsValues)
+  }
+
+}
+
+private class LiveStageMetrics(
+    val stageId: Int,
+    var attemptId: Int,
+    val accumulatorIds: Array[Long],
+    val taskMetrics: ConcurrentHashMap[Long, LiveTaskMetrics])
+
+private[sql] class LiveTaskMetrics(
+    val ids: Array[Long],
+    val values: Array[Long],
+    val succeeded: Boolean)

http://git-wip-us.apache.org/repos/asf/spark/blob/0ffa7c48/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala
new file mode 100644
index 0000000..586d3ae
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.ui
+
+import java.lang.{Long => JLong}
+import java.util.Date
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize
+
+import org.apache.spark.{JobExecutionStatus, SparkConf}
+import org.apache.spark.scheduler.SparkListener
+import org.apache.spark.status.AppStatusPlugin
+import org.apache.spark.status.KVUtils.KVIndexParam
+import org.apache.spark.ui.SparkUI
+import org.apache.spark.util.Utils
+import org.apache.spark.util.kvstore.KVStore
+
+/**
+ * Provides a view of a KVStore with methods that make it easy to query 
SQL-specific state. There's
+ * no state kept in this class, so it's ok to have multiple instances of it in 
an application.
+ */
+private[sql] class SQLAppStatusStore(
+    store: KVStore,
+    listener: Option[SQLAppStatusListener] = None) {
+
+  def executionsList(): Seq[SQLExecutionUIData] = {
+    store.view(classOf[SQLExecutionUIData]).asScala.toSeq
+  }
+
+  def execution(executionId: Long): Option[SQLExecutionUIData] = {
+    try {
+      Some(store.read(classOf[SQLExecutionUIData], executionId))
+    } catch {
+      case _: NoSuchElementException => None
+    }
+  }
+
+  def executionsCount(): Long = {
+    store.count(classOf[SQLExecutionUIData])
+  }
+
+  def executionMetrics(executionId: Long): Map[Long, String] = {
+    def metricsFromStore(): Option[Map[Long, String]] = {
+      val exec = store.read(classOf[SQLExecutionUIData], executionId)
+      Option(exec.metricValues)
+    }
+
+    metricsFromStore()
+      .orElse(listener.flatMap(_.liveExecutionMetrics(executionId)))
+      // Try a second time in case the execution finished while this method is 
trying to
+      // get the metrics.
+      .orElse(metricsFromStore())
+      .getOrElse(Map())
+  }
+
+  def planGraph(executionId: Long): SparkPlanGraph = {
+    store.read(classOf[SparkPlanGraphWrapper], executionId).toSparkPlanGraph()
+  }
+
+}
+
+/**
+ * An AppStatusPlugin for handling the SQL UI and listeners.
+ */
+private[sql] class SQLAppStatusPlugin extends AppStatusPlugin {
+
+  override def setupListeners(
+      conf: SparkConf,
+      store: KVStore,
+      addListenerFn: SparkListener => Unit,
+      live: Boolean): Unit = {
+    // For live applications, the listener is installed in [[setupUI]]. This 
also avoids adding
+    // the listener when the UI is disabled. Force installation during 
testing, though.
+    if (!live || Utils.isTesting) {
+      val listener = new SQLAppStatusListener(conf, store, live, None)
+      addListenerFn(listener)
+    }
+  }
+
+  override def setupUI(ui: SparkUI): Unit = {
+    ui.sc match {
+      case Some(sc) =>
+        // If this is a live application, then install a listener that will 
enable the SQL
+        // tab as soon as there's a SQL event posted to the bus.
+        val listener = new SQLAppStatusListener(sc.conf, ui.store.store, true, 
Some(ui))
+        sc.listenerBus.addToStatusQueue(listener)
+
+      case _ =>
+        // For a replayed application, only add the tab if the store already 
contains SQL data.
+        val sqlStore = new SQLAppStatusStore(ui.store.store)
+        if (sqlStore.executionsCount() > 0) {
+          new SQLTab(sqlStore, ui)
+        }
+    }
+  }
+
+}
+
+private[sql] class SQLExecutionUIData(
+    @KVIndexParam val executionId: Long,
+    val description: String,
+    val details: String,
+    val physicalPlanDescription: String,
+    val metrics: Seq[SQLPlanMetric],
+    val submissionTime: Long,
+    val completionTime: Option[Date],
+    @JsonDeserialize(keyAs = classOf[Integer])
+    val jobs: Map[Int, JobExecutionStatus],
+    @JsonDeserialize(contentAs = classOf[Integer])
+    val stages: Set[Int],
+    /**
+     * This field is only populated after the execution is finished; it will 
be null while the
+     * execution is still running. During execution, aggregate metrics need to 
be retrieved
+     * from the SQL listener instance.
+     */
+    @JsonDeserialize(keyAs = classOf[JLong])
+    val metricValues: Map[Long, String]
+    )
+
+private[sql] class SparkPlanGraphWrapper(
+    @KVIndexParam val executionId: Long,
+    val nodes: Seq[SparkPlanGraphNodeWrapper],
+    val edges: Seq[SparkPlanGraphEdge]) {
+
+  def toSparkPlanGraph(): SparkPlanGraph = {
+    SparkPlanGraph(nodes.map(_.toSparkPlanGraphNode()), edges)
+  }
+
+}
+
+private[sql] class SparkPlanGraphClusterWrapper(
+    val id: Long,
+    val name: String,
+    val desc: String,
+    val nodes: Seq[SparkPlanGraphNodeWrapper],
+    val metrics: Seq[SQLPlanMetric]) {
+
+  def toSparkPlanGraphCluster(): SparkPlanGraphCluster = {
+    new SparkPlanGraphCluster(id, name, desc,
+      new ArrayBuffer() ++ nodes.map(_.toSparkPlanGraphNode()),
+      metrics)
+  }
+
+}
+
+/** Only one of the values should be set. */
+private[sql] class SparkPlanGraphNodeWrapper(
+    val node: SparkPlanGraphNode,
+    val cluster: SparkPlanGraphClusterWrapper) {
+
+  def toSparkPlanGraphNode(): SparkPlanGraphNode = {
+    assert(node == null ^ cluster == null, "One and only of of nore or cluster 
must be set.")
+    if (node != null) node else cluster.toSparkPlanGraphCluster()
+  }
+
+}
+
+private[sql] case class SQLPlanMetric(
+    name: String,
+    accumulatorId: Long,
+    metricType: String)

http://git-wip-us.apache.org/repos/asf/spark/blob/0ffa7c48/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
index 8c27af3..b58b8c6 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
@@ -17,21 +17,15 @@
 
 package org.apache.spark.sql.execution.ui
 
-import scala.collection.mutable
-
 import com.fasterxml.jackson.databind.JavaType
 import com.fasterxml.jackson.databind.`type`.TypeFactory
 import com.fasterxml.jackson.databind.annotation.JsonDeserialize
 import com.fasterxml.jackson.databind.util.Converter
 
-import org.apache.spark.{JobExecutionStatus, SparkConf}
 import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.internal.Logging
 import org.apache.spark.scheduler._
-import org.apache.spark.sql.execution.{SparkPlanInfo, SQLExecution}
+import org.apache.spark.sql.execution.SparkPlanInfo
 import org.apache.spark.sql.execution.metric._
-import org.apache.spark.ui.SparkUI
-import org.apache.spark.util.AccumulatorContext
 
 @DeveloperApi
 case class SparkListenerSQLExecutionStart(
@@ -89,398 +83,3 @@ private class LongLongTupleConverter extends 
Converter[(Object, Object), (Long,
     typeFactory.constructSimpleType(classOf[(_, _)], classOf[(_, _)], 
Array(longType, longType))
   }
 }
-
-class SQLHistoryListenerFactory extends SparkHistoryListenerFactory {
-
-  override def createListeners(conf: SparkConf, sparkUI: SparkUI): 
Seq[SparkListener] = {
-    List(new SQLHistoryListener(conf, sparkUI))
-  }
-}
-
-class SQLListener(conf: SparkConf) extends SparkListener with Logging {
-
-  private val retainedExecutions = 
conf.getInt("spark.sql.ui.retainedExecutions", 1000)
-
-  private val activeExecutions = mutable.HashMap[Long, SQLExecutionUIData]()
-
-  // Old data in the following fields must be removed in 
"trimExecutionsIfNecessary".
-  // If adding new fields, make sure "trimExecutionsIfNecessary" can clean up 
old data
-  private val _executionIdToData = mutable.HashMap[Long, SQLExecutionUIData]()
-
-  /**
-   * Maintain the relation between job id and execution id so that we can get 
the execution id in
-   * the "onJobEnd" method.
-   */
-  private val _jobIdToExecutionId = mutable.HashMap[Long, Long]()
-
-  private val _stageIdToStageMetrics = mutable.HashMap[Long, SQLStageMetrics]()
-
-  private val failedExecutions = mutable.ListBuffer[SQLExecutionUIData]()
-
-  private val completedExecutions = mutable.ListBuffer[SQLExecutionUIData]()
-
-  def executionIdToData: Map[Long, SQLExecutionUIData] = synchronized {
-    _executionIdToData.toMap
-  }
-
-  def jobIdToExecutionId: Map[Long, Long] = synchronized {
-    _jobIdToExecutionId.toMap
-  }
-
-  def stageIdToStageMetrics: Map[Long, SQLStageMetrics] = synchronized {
-    _stageIdToStageMetrics.toMap
-  }
-
-  private def trimExecutionsIfNecessary(
-      executions: mutable.ListBuffer[SQLExecutionUIData]): Unit = {
-    if (executions.size > retainedExecutions) {
-      val toRemove = math.max(retainedExecutions / 10, 1)
-      executions.take(toRemove).foreach { execution =>
-        for (executionUIData <- 
_executionIdToData.remove(execution.executionId)) {
-          for (jobId <- executionUIData.jobs.keys) {
-            _jobIdToExecutionId.remove(jobId)
-          }
-          for (stageId <- executionUIData.stages) {
-            _stageIdToStageMetrics.remove(stageId)
-          }
-        }
-      }
-      executions.trimStart(toRemove)
-    }
-  }
-
-  override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
-    val executionIdString = 
jobStart.properties.getProperty(SQLExecution.EXECUTION_ID_KEY)
-    if (executionIdString == null) {
-      // This is not a job created by SQL
-      return
-    }
-    val executionId = executionIdString.toLong
-    val jobId = jobStart.jobId
-    val stageIds = jobStart.stageIds
-
-    synchronized {
-      activeExecutions.get(executionId).foreach { executionUIData =>
-        executionUIData.jobs(jobId) = JobExecutionStatus.RUNNING
-        executionUIData.stages ++= stageIds
-        stageIds.foreach(stageId =>
-          _stageIdToStageMetrics(stageId) = new SQLStageMetrics(stageAttemptId 
= 0))
-        _jobIdToExecutionId(jobId) = executionId
-      }
-    }
-  }
-
-  override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = synchronized {
-    val jobId = jobEnd.jobId
-    for (executionId <- _jobIdToExecutionId.get(jobId);
-         executionUIData <- _executionIdToData.get(executionId)) {
-      jobEnd.jobResult match {
-        case JobSucceeded => executionUIData.jobs(jobId) = 
JobExecutionStatus.SUCCEEDED
-        case JobFailed(_) => executionUIData.jobs(jobId) = 
JobExecutionStatus.FAILED
-      }
-      if (executionUIData.completionTime.nonEmpty && 
!executionUIData.hasRunningJobs) {
-        // We are the last job of this execution, so mark the execution as 
finished. Note that
-        // `onExecutionEnd` also does this, but currently that can be called 
before `onJobEnd`
-        // since these are called on different threads.
-        markExecutionFinished(executionId)
-      }
-    }
-  }
-
-  override def onExecutorMetricsUpdate(
-      executorMetricsUpdate: SparkListenerExecutorMetricsUpdate): Unit = 
synchronized {
-    for ((taskId, stageId, stageAttemptID, accumUpdates) <- 
executorMetricsUpdate.accumUpdates) {
-      updateTaskAccumulatorValues(taskId, stageId, stageAttemptID, 
accumUpdates, finishTask = false)
-    }
-  }
-
-  override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): 
Unit = synchronized {
-    val stageId = stageSubmitted.stageInfo.stageId
-    val stageAttemptId = stageSubmitted.stageInfo.attemptId
-    // Always override metrics for old stage attempt
-    if (_stageIdToStageMetrics.contains(stageId)) {
-      _stageIdToStageMetrics(stageId) = new SQLStageMetrics(stageAttemptId)
-    } else {
-      // If a stage belongs to some SQL execution, its stageId will be put in 
"onJobStart".
-      // Since "_stageIdToStageMetrics" doesn't contain it, it must not belong 
to any SQL execution.
-      // So we can ignore it. Otherwise, this may lead to memory leaks 
(SPARK-11126).
-    }
-  }
-
-  override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = synchronized {
-    if (taskEnd.taskMetrics != null) {
-      updateTaskAccumulatorValues(
-        taskEnd.taskInfo.taskId,
-        taskEnd.stageId,
-        taskEnd.stageAttemptId,
-        taskEnd.taskMetrics.externalAccums.map(a => a.toInfo(Some(a.value), 
None)),
-        finishTask = true)
-    }
-  }
-
-  /**
-   * Update the accumulator values of a task with the latest metrics for this 
task. This is called
-   * every time we receive an executor heartbeat or when a task finishes.
-   */
-  protected def updateTaskAccumulatorValues(
-      taskId: Long,
-      stageId: Int,
-      stageAttemptID: Int,
-      _accumulatorUpdates: Seq[AccumulableInfo],
-      finishTask: Boolean): Unit = {
-    val accumulatorUpdates =
-      _accumulatorUpdates.filter(_.update.isDefined).map(accum => (accum.id, 
accum.update.get))
-
-    _stageIdToStageMetrics.get(stageId) match {
-      case Some(stageMetrics) =>
-        if (stageAttemptID < stageMetrics.stageAttemptId) {
-          // A task of an old stage attempt. Because a new stage is submitted, 
we can ignore it.
-        } else if (stageAttemptID > stageMetrics.stageAttemptId) {
-          logWarning(s"A task should not have a higher stageAttemptID 
($stageAttemptID) then " +
-            s"what we have seen (${stageMetrics.stageAttemptId})")
-        } else {
-          // TODO We don't know the attemptId. Currently, what we can do is 
overriding the
-          // accumulator updates. However, if there are two same task are 
running, such as
-          // speculation, the accumulator updates will be overriding by 
different task attempts,
-          // the results will be weird.
-          stageMetrics.taskIdToMetricUpdates.get(taskId) match {
-            case Some(taskMetrics) =>
-              if (finishTask) {
-                taskMetrics.finished = true
-                taskMetrics.accumulatorUpdates = accumulatorUpdates
-              } else if (!taskMetrics.finished) {
-                taskMetrics.accumulatorUpdates = accumulatorUpdates
-              } else {
-                // If a task is finished, we should not override with 
accumulator updates from
-                // heartbeat reports
-              }
-            case None =>
-              stageMetrics.taskIdToMetricUpdates(taskId) = new SQLTaskMetrics(
-                  finished = finishTask, accumulatorUpdates)
-          }
-        }
-      case None =>
-      // This execution and its stage have been dropped
-    }
-  }
-
-  override def onOtherEvent(event: SparkListenerEvent): Unit = event match {
-    case SparkListenerSQLExecutionStart(executionId, description, details,
-      physicalPlanDescription, sparkPlanInfo, time) =>
-      val physicalPlanGraph = SparkPlanGraph(sparkPlanInfo)
-      val sqlPlanMetrics = physicalPlanGraph.allNodes.flatMap { node =>
-        node.metrics.map(metric => metric.accumulatorId -> metric)
-      }
-      val executionUIData = new SQLExecutionUIData(
-        executionId,
-        description,
-        details,
-        physicalPlanDescription,
-        physicalPlanGraph,
-        sqlPlanMetrics.toMap,
-        time)
-      synchronized {
-        activeExecutions(executionId) = executionUIData
-        _executionIdToData(executionId) = executionUIData
-      }
-    case SparkListenerSQLExecutionEnd(executionId, time) => synchronized {
-      _executionIdToData.get(executionId).foreach { executionUIData =>
-        executionUIData.completionTime = Some(time)
-        if (!executionUIData.hasRunningJobs) {
-          // onExecutionEnd happens after all "onJobEnd"s
-          // So we should update the execution lists.
-          markExecutionFinished(executionId)
-        } else {
-          // There are some running jobs, onExecutionEnd happens before some 
"onJobEnd"s.
-          // Then we don't if the execution is successful, so let the last 
onJobEnd updates the
-          // execution lists.
-        }
-      }
-    }
-    case SparkListenerDriverAccumUpdates(executionId, accumUpdates) => 
synchronized {
-      _executionIdToData.get(executionId).foreach { executionUIData =>
-        for ((accId, accValue) <- accumUpdates) {
-          executionUIData.driverAccumUpdates(accId) = accValue
-        }
-      }
-    }
-    case _ => // Ignore
-  }
-
-  private def markExecutionFinished(executionId: Long): Unit = {
-    activeExecutions.remove(executionId).foreach { executionUIData =>
-      if (executionUIData.isFailed) {
-        failedExecutions += executionUIData
-        trimExecutionsIfNecessary(failedExecutions)
-      } else {
-        completedExecutions += executionUIData
-        trimExecutionsIfNecessary(completedExecutions)
-      }
-    }
-  }
-
-  def getRunningExecutions: Seq[SQLExecutionUIData] = synchronized {
-    activeExecutions.values.toSeq
-  }
-
-  def getFailedExecutions: Seq[SQLExecutionUIData] = synchronized {
-    failedExecutions
-  }
-
-  def getCompletedExecutions: Seq[SQLExecutionUIData] = synchronized {
-    completedExecutions
-  }
-
-  def getExecution(executionId: Long): Option[SQLExecutionUIData] = 
synchronized {
-    _executionIdToData.get(executionId)
-  }
-
-  /**
-   * Get all accumulator updates from all tasks which belong to this execution 
and merge them.
-   */
-  def getExecutionMetrics(executionId: Long): Map[Long, String] = synchronized 
{
-    _executionIdToData.get(executionId) match {
-      case Some(executionUIData) =>
-        val accumulatorUpdates = {
-          for (stageId <- executionUIData.stages;
-               stageMetrics <- _stageIdToStageMetrics.get(stageId).toIterable;
-               taskMetrics <- stageMetrics.taskIdToMetricUpdates.values;
-               accumulatorUpdate <- taskMetrics.accumulatorUpdates) yield {
-            (accumulatorUpdate._1, accumulatorUpdate._2)
-          }
-        }
-
-        val driverUpdates = executionUIData.driverAccumUpdates.toSeq
-        val totalUpdates = (accumulatorUpdates ++ driverUpdates).filter {
-          case (id, _) => executionUIData.accumulatorMetrics.contains(id)
-        }
-        mergeAccumulatorUpdates(totalUpdates, accumulatorId =>
-          executionUIData.accumulatorMetrics(accumulatorId).metricType)
-      case None =>
-        // This execution has been dropped
-        Map.empty
-    }
-  }
-
-  private def mergeAccumulatorUpdates(
-      accumulatorUpdates: Seq[(Long, Any)],
-      metricTypeFunc: Long => String): Map[Long, String] = {
-    accumulatorUpdates.groupBy(_._1).map { case (accumulatorId, values) =>
-      val metricType = metricTypeFunc(accumulatorId)
-      accumulatorId ->
-        SQLMetrics.stringValue(metricType, values.map(_._2.asInstanceOf[Long]))
-    }
-  }
-
-}
-
-
-/**
- * A [[SQLListener]] for rendering the SQL UI in the history server.
- */
-class SQLHistoryListener(conf: SparkConf, sparkUI: SparkUI)
-  extends SQLListener(conf) {
-
-  private var sqlTabAttached = false
-
-  override def onExecutorMetricsUpdate(u: SparkListenerExecutorMetricsUpdate): 
Unit = {
-    // Do nothing; these events are not logged
-  }
-
-  override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = synchronized {
-    updateTaskAccumulatorValues(
-      taskEnd.taskInfo.taskId,
-      taskEnd.stageId,
-      taskEnd.stageAttemptId,
-      taskEnd.taskInfo.accumulables.flatMap { a =>
-        // Filter out accumulators that are not SQL metrics
-        // For now we assume all SQL metrics are Long's that have been JSON 
serialized as String's
-        if (a.metadata == Some(AccumulatorContext.SQL_ACCUM_IDENTIFIER)) {
-          val newValue = a.update.map(_.toString.toLong).getOrElse(0L)
-          Some(a.copy(update = Some(newValue)))
-        } else {
-          None
-        }
-      },
-      finishTask = true)
-  }
-
-  override def onOtherEvent(event: SparkListenerEvent): Unit = event match {
-    case _: SparkListenerSQLExecutionStart =>
-      if (!sqlTabAttached) {
-        new SQLTab(this, sparkUI)
-        sqlTabAttached = true
-      }
-      super.onOtherEvent(event)
-    case _ => super.onOtherEvent(event)
-  }
-}
-
-/**
- * Represent all necessary data for an execution that will be used in Web UI.
- */
-private[ui] class SQLExecutionUIData(
-    val executionId: Long,
-    val description: String,
-    val details: String,
-    val physicalPlanDescription: String,
-    val physicalPlanGraph: SparkPlanGraph,
-    val accumulatorMetrics: Map[Long, SQLPlanMetric],
-    val submissionTime: Long) {
-
-  var completionTime: Option[Long] = None
-
-  val jobs: mutable.HashMap[Long, JobExecutionStatus] = mutable.HashMap.empty
-
-  val stages: mutable.ArrayBuffer[Int] = mutable.ArrayBuffer()
-
-  val driverAccumUpdates: mutable.HashMap[Long, Long] = mutable.HashMap.empty
-
-  /**
-   * Return whether there are running jobs in this execution.
-   */
-  def hasRunningJobs: Boolean = jobs.values.exists(_ == 
JobExecutionStatus.RUNNING)
-
-  /**
-   * Return whether there are any failed jobs in this execution.
-   */
-  def isFailed: Boolean = jobs.values.exists(_ == JobExecutionStatus.FAILED)
-
-  def runningJobs: Seq[Long] =
-    jobs.filter { case (_, status) => status == JobExecutionStatus.RUNNING 
}.keys.toSeq
-
-  def succeededJobs: Seq[Long] =
-    jobs.filter { case (_, status) => status == JobExecutionStatus.SUCCEEDED 
}.keys.toSeq
-
-  def failedJobs: Seq[Long] =
-    jobs.filter { case (_, status) => status == JobExecutionStatus.FAILED 
}.keys.toSeq
-}
-
-/**
- * Represent a metric in a SQLPlan.
- *
- * Because we cannot revert our changes for an "Accumulator", we need to 
maintain accumulator
- * updates for each task. So that if a task is retried, we can simply override 
the old updates with
- * the new updates of the new attempt task. Since we cannot add them to 
accumulator, we need to use
- * "AccumulatorParam" to get the aggregation value.
- */
-private[ui] case class SQLPlanMetric(
-    name: String,
-    accumulatorId: Long,
-    metricType: String)
-
-/**
- * Store all accumulatorUpdates for all tasks in a Spark stage.
- */
-private[ui] class SQLStageMetrics(
-    val stageAttemptId: Long,
-    val taskIdToMetricUpdates: mutable.HashMap[Long, SQLTaskMetrics] = 
mutable.HashMap.empty)
-
-
-// TODO Should add attemptId here when we can get it from 
SparkListenerExecutorMetricsUpdate
-/**
- * Store all accumulatorUpdates for a Spark task.
- */
-private[ui] class SQLTaskMetrics(
-    var finished: Boolean,
-    var accumulatorUpdates: Seq[(Long, Any)])

http://git-wip-us.apache.org/repos/asf/spark/blob/0ffa7c48/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala
index d0376af..a321a22 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.ui
 import org.apache.spark.internal.Logging
 import org.apache.spark.ui.{SparkUI, SparkUITab}
 
-class SQLTab(val listener: SQLListener, sparkUI: SparkUI)
+class SQLTab(val sqlStore: SQLAppStatusStore, sparkUI: SparkUI)
   extends SparkUITab(sparkUI, "SQL") with Logging {
 
   val parent = sparkUI

http://git-wip-us.apache.org/repos/asf/spark/blob/0ffa7c48/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
index ad9db30..3e479fa 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
@@ -32,7 +32,6 @@ import org.apache.spark.scheduler.LiveListenerBus
 import org.apache.spark.sql.{SparkSession, SQLContext}
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.execution.CacheManager
-import org.apache.spark.sql.execution.ui.{SQLListener, SQLTab}
 import org.apache.spark.sql.internal.StaticSQLConf._
 import org.apache.spark.util.{MutableURLClassLoader, Utils}
 
@@ -84,11 +83,6 @@ private[sql] class SharedState(val sparkContext: 
SparkContext) extends Logging {
   val cacheManager: CacheManager = new CacheManager
 
   /**
-   * A listener for SQL-specific 
[[org.apache.spark.scheduler.SparkListenerEvent]]s.
-   */
-  val listener: SQLListener = createListenerAndUI(sparkContext)
-
-  /**
    * A catalog that interacts with external systems.
    */
   lazy val externalCatalog: ExternalCatalog = {
@@ -142,19 +136,6 @@ private[sql] class SharedState(val sparkContext: 
SparkContext) extends Logging {
   val jarClassLoader = new NonClosableMutableURLClassLoader(
     org.apache.spark.util.Utils.getContextOrSparkClassLoader)
 
-  /**
-   * Create a SQLListener then add it into SparkContext, and create a SQLTab 
if there is SparkUI.
-   */
-  private def createListenerAndUI(sc: SparkContext): SQLListener = {
-    if (SparkSession.sqlListener.get() == null) {
-      val listener = new SQLListener(sc.conf)
-      if (SparkSession.sqlListener.compareAndSet(null, listener)) {
-        sc.listenerBus.addToStatusQueue(listener)
-        sc.ui.foreach(new SQLTab(listener, _))
-      }
-    }
-    SparkSession.sqlListener.get()
-  }
 }
 
 object SharedState extends Logging {

http://git-wip-us.apache.org/repos/asf/spark/blob/0ffa7c48/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
index 58a194b..d588af3 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
@@ -24,6 +24,7 @@ import scala.util.Random
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
+import org.apache.spark.sql.execution.ui.SQLAppStatusStore
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSQLContext
@@ -32,6 +33,13 @@ import org.apache.spark.util.{AccumulatorContext, 
JsonProtocol}
 class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with 
SharedSQLContext {
   import testImplicits._
 
+  private def statusStore: SQLAppStatusStore = {
+    new SQLAppStatusStore(sparkContext.statusStore.store)
+  }
+
+  private def currentExecutionIds(): Set[Long] = {
+    statusStore.executionsList.map(_.executionId).toSet
+  }
 
   /**
    * Generates a `DataFrame` by filling randomly generated bytes for hash 
collision.
@@ -420,21 +428,19 @@ class SQLMetricsSuite extends SparkFunSuite with 
SQLMetricsTestUtils with Shared
     withTempPath { file =>
       // person creates a temporary view. get the DF before listing previous 
execution IDs
       val data = person.select('name)
-      sparkContext.listenerBus.waitUntilEmpty(10000)
-      val previousExecutionIds = 
spark.sharedState.listener.executionIdToData.keySet
+      val previousExecutionIds = currentExecutionIds()
       // Assume the execution plan is
       // PhysicalRDD(nodeId = 0)
       data.write.format("json").save(file.getAbsolutePath)
       sparkContext.listenerBus.waitUntilEmpty(10000)
-      val executionIds =
-        
spark.sharedState.listener.executionIdToData.keySet.diff(previousExecutionIds)
+      val executionIds = currentExecutionIds().diff(previousExecutionIds)
       assert(executionIds.size === 1)
       val executionId = executionIds.head
-      val jobs = spark.sharedState.listener.getExecution(executionId).get.jobs
+      val jobs = statusStore.execution(executionId).get.jobs
       // Use "<=" because there is a race condition that we may miss some jobs
       // TODO Change "<=" to "=" once we fix the race condition that missing 
the JobStarted event.
       assert(jobs.size <= 1)
-      val metricValues = 
spark.sharedState.listener.getExecutionMetrics(executionId)
+      val metricValues = statusStore.executionMetrics(executionId)
       // Because "save" will create a new DataFrame internally, we cannot get 
the real metric id.
       // However, we still can check the value.
       assert(metricValues.values.toSeq.exists(_ === "2"))

http://git-wip-us.apache.org/repos/asf/spark/blob/0ffa7c48/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala
index 3966e98..d89c4b1 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala
@@ -25,7 +25,7 @@ import org.apache.spark.scheduler.{SparkListener, 
SparkListenerTaskEnd}
 import org.apache.spark.sql.DataFrame
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.execution.SparkPlanInfo
-import org.apache.spark.sql.execution.ui.SparkPlanGraph
+import org.apache.spark.sql.execution.ui.{SparkPlanGraph, SQLAppStatusStore}
 import org.apache.spark.sql.test.SQLTestUtils
 import org.apache.spark.util.Utils
 
@@ -34,6 +34,14 @@ trait SQLMetricsTestUtils extends SQLTestUtils {
 
   import testImplicits._
 
+  private def statusStore: SQLAppStatusStore = {
+    new SQLAppStatusStore(sparkContext.statusStore.store)
+  }
+
+  private def currentExecutionIds(): Set[Long] = {
+    statusStore.executionsList.map(_.executionId).toSet
+  }
+
   /**
    * Get execution metrics for the SQL execution and verify metrics values.
    *
@@ -41,24 +49,23 @@ trait SQLMetricsTestUtils extends SQLTestUtils {
    * @param func the function can produce execution id after running.
    */
   private def verifyWriteDataMetrics(metricsValues: Seq[Int])(func: => Unit): 
Unit = {
-    val previousExecutionIds = 
spark.sharedState.listener.executionIdToData.keySet
+    val previousExecutionIds = currentExecutionIds()
     // Run the given function to trigger query execution.
     func
     spark.sparkContext.listenerBus.waitUntilEmpty(10000)
-    val executionIds =
-      
spark.sharedState.listener.executionIdToData.keySet.diff(previousExecutionIds)
+    val executionIds = currentExecutionIds().diff(previousExecutionIds)
     assert(executionIds.size == 1)
     val executionId = executionIds.head
 
-    val executionData = 
spark.sharedState.listener.getExecution(executionId).get
-    val executedNode = executionData.physicalPlanGraph.nodes.head
+    val executionData = statusStore.execution(executionId).get
+    val executedNode = statusStore.planGraph(executionId).nodes.head
 
     val metricsNames = Seq(
       "number of written files",
       "number of dynamic part",
       "number of output rows")
 
-    val metrics = spark.sharedState.listener.getExecutionMetrics(executionId)
+    val metrics = statusStore.executionMetrics(executionId)
 
     metricsNames.zip(metricsValues).foreach { case (metricsName, expected) =>
       val sqlMetric = executedNode.metrics.find(_.name == metricsName)
@@ -134,22 +141,21 @@ trait SQLMetricsTestUtils extends SQLTestUtils {
        expectedNumOfJobs: Int,
        expectedNodeIds: Set[Long],
        enableWholeStage: Boolean = false): Option[Map[Long, (String, 
Map[String, Any])]] = {
-    val previousExecutionIds = 
spark.sharedState.listener.executionIdToData.keySet
+    val previousExecutionIds = currentExecutionIds()
     withSQLConf("spark.sql.codegen.wholeStage" -> enableWholeStage.toString) {
       df.collect()
     }
     sparkContext.listenerBus.waitUntilEmpty(10000)
-    val executionIds =
-      
spark.sharedState.listener.executionIdToData.keySet.diff(previousExecutionIds)
+    val executionIds = currentExecutionIds().diff(previousExecutionIds)
     assert(executionIds.size === 1)
     val executionId = executionIds.head
-    val jobs = spark.sharedState.listener.getExecution(executionId).get.jobs
+    val jobs = statusStore.execution(executionId).get.jobs
     // Use "<=" because there is a race condition that we may miss some jobs
     // TODO Change it to "=" once we fix the race condition that missing the 
JobStarted event.
     assert(jobs.size <= expectedNumOfJobs)
     if (jobs.size == expectedNumOfJobs) {
       // If we can track all jobs, check the metric values
-      val metricValues = 
spark.sharedState.listener.getExecutionMetrics(executionId)
+      val metricValues = statusStore.executionMetrics(executionId)
       val metrics = SparkPlanGraph(SparkPlanInfo.fromSparkPlan(
         df.queryExecution.executedPlan)).allNodes.filter { node =>
         expectedNodeIds.contains(node.id)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to