This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new f8786f01ad6 [SPARK-44394][CONNECT][WEBUI] Add a Spark UI page for 
Spark Connect
f8786f01ad6 is described below

commit f8786f01ad605c4549b60ba9998e35cca433665a
Author: Jason Li <jason...@databricks.com>
AuthorDate: Sat Jul 29 09:20:13 2023 -0700

    [SPARK-44394][CONNECT][WEBUI] Add a Spark UI page for Spark Connect
    
    ## What changes were proposed in this pull request?
    Add a new Spark UI page to display session and execution information for 
Spark Connect. This builds of the work in SPARK-43923 
(https://github.com/apache/spark/pull/41443) that adds the relevant 
SparkListenerEvents and mirrors the ThriftServerPage in the Spark UI for 
JDBC/ODBC.
    
    <img width="1709" alt="Screenshot 2023-07-27 at 11 29 22 PM" 
src="https://github.com/apache/spark/assets/65624911/934b7c69-3b44-460b-8fbb-36a9eb3f0798";>
    
    <img width="1716" alt="Screenshot 2023-07-27 at 11 29 15 PM" 
src="https://github.com/apache/spark/assets/65624911/33dbe6ab-44bf-49a5-ad4c-5ba4a476a1f0";>
    
    ### Why are the changes needed?
    This gives users a way to access session and execution information for 
Spark Connect via the UI and provides the frontend interface for the related 
SparkListenerEvents.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes, it will add a new tab/page in the Spark UI
    
    ### How was this patch tested?
    Unit tests
    
    Closes #41964 from jasonli-db/spark-connect-ui.
    
    Authored-by: Jason Li <jason...@databricks.com>
    Signed-off-by: Gengliang Wang <gengli...@apache.org>
---
 .../spark/sql/connect/common/ProtoUtils.scala      |   2 +-
 .../spark/sql/connect/SparkConnectPlugin.scala     |   2 +-
 .../apache/spark/sql/connect/config/Connect.scala  |  13 +
 .../connect/execution/ExecuteThreadRunner.scala    |  15 +-
 .../spark/sql/connect/service/ExecuteHolder.scala  |  39 +-
 .../sql/connect/service/SparkConnectServer.scala   |   3 +-
 .../sql/connect/service/SparkConnectService.scala  |  26 +-
 .../ui/SparkConnectServerAppStatusStore.scala      | 131 +++++
 .../ui/SparkConnectServerHistoryServerPlugin.scala |  22 +-
 .../connect/ui/SparkConnectServerListener.scala    | 377 ++++++++++++++
 .../sql/connect/ui/SparkConnectServerPage.scala    | 541 +++++++++++++++++++++
 .../connect/ui/SparkConnectServerSessionPage.scala | 128 +++++
 .../sql/connect/ui/SparkConnectServerTab.scala     |  57 +++
 .../org/apache/spark/sql/connect/ui/ToolTips.scala |  39 ++
 .../ui/SparkConnectServerListenerSuite.scala       | 234 +++++++++
 .../connect/ui/SparkConnectServerPageSuite.scala   | 135 +++++
 .../main/scala/org/apache/spark/SparkContext.scala |   2 +-
 .../main/scala/org/apache/spark/ui/SparkUI.scala   |  18 +-
 .../test/scala/org/apache/spark/ui/UISuite.scala   |   2 +-
 .../ui/StreamingQueryHistoryServerPlugin.scala     |   2 +-
 20 files changed, 1758 insertions(+), 30 deletions(-)

diff --git 
a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/ProtoUtils.scala
 
b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/ProtoUtils.scala
index e2934b56744..99684eef7d7 100644
--- 
a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/ProtoUtils.scala
+++ 
b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/ProtoUtils.scala
@@ -93,7 +93,7 @@ private[connect] object ProtoUtils {
   def throwIfInvalidTag(tag: String): Unit = {
     // Same format rules apply to Spark Connect execution tags as to 
SparkContext job tags,
     // because the Spark Connect job tag is also used as part of SparkContext 
job tag.
-    // See SparkContext.throwIfInvalidTag and ExecuteHolder.tagToSparkJobTag
+    // See SparkContext.throwIfInvalidTag and ExecuteHolderSessionTag
     if (tag == null) {
       throw new IllegalArgumentException("Spark Connect tag cannot be null.")
     }
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/SparkConnectPlugin.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/SparkConnectPlugin.scala
index bb694a76798..ca8617cbe1a 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/SparkConnectPlugin.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/SparkConnectPlugin.scala
@@ -45,7 +45,7 @@ class SparkConnectPlugin extends SparkPlugin {
     override def init(
         sc: SparkContext,
         pluginContext: PluginContext): util.Map[String, String] = {
-      SparkConnectService.start()
+      SparkConnectService.start(sc)
       Map.empty[String, String].asJava
     }
 
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala
index 31f119047e4..23aa42bad30 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala
@@ -121,4 +121,17 @@ object Connect {
       .version("3.5.0")
       .booleanConf
       .createWithDefault(false)
+
+  val CONNECT_UI_STATEMENT_LIMIT =
+    ConfigBuilder("spark.sql.connect.ui.retainedStatements")
+      .doc("The number of statements kept in the Spark Connect UI history.")
+      .version("3.5.0")
+      .intConf
+      .createWithDefault(200)
+
+  val CONNECT_UI_SESSION_LIMIT = 
ConfigBuilder("spark.sql.connect.ui.retainedSessions")
+    .doc("The number of client sessions kept in the Spark Connect UI history.")
+    .version("3.5.0")
+    .intConf
+    .createWithDefault(200)
 }
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteThreadRunner.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteThreadRunner.scala
index 6758df0d7e6..dad84180a0f 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteThreadRunner.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteThreadRunner.scala
@@ -27,7 +27,7 @@ import org.apache.spark.connect.proto
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.connect.common.ProtoUtils
 import org.apache.spark.sql.connect.planner.SparkConnectPlanner
-import org.apache.spark.sql.connect.service.ExecuteHolder
+import org.apache.spark.sql.connect.service.{ExecuteHolder, ExecuteSessionTag}
 import org.apache.spark.sql.connect.utils.ErrorUtils
 import org.apache.spark.util.Utils
 
@@ -95,8 +95,11 @@ private[connect] class ExecuteThreadRunner(executeHolder: 
ExecuteHolder) extends
       } finally {
         
executeHolder.sessionHolder.session.sparkContext.removeJobTag(executeHolder.jobTag)
         executeHolder.sparkSessionTags.foreach { tag =>
-          executeHolder.sessionHolder.session.sparkContext
-            .removeJobTag(executeHolder.tagToSparkJobTag(tag))
+          executeHolder.sessionHolder.session.sparkContext.removeJobTag(
+            ExecuteSessionTag(
+              executeHolder.sessionHolder.userId,
+              executeHolder.sessionHolder.sessionId,
+              tag))
         }
       }
     } catch {
@@ -128,7 +131,11 @@ private[connect] class ExecuteThreadRunner(executeHolder: 
ExecuteHolder) extends
       session.sparkContext.addJobTag(executeHolder.jobTag)
       // Also set all user defined tags as Spark Job tags.
       executeHolder.sparkSessionTags.foreach { tag =>
-        session.sparkContext.addJobTag(executeHolder.tagToSparkJobTag(tag))
+        session.sparkContext.addJobTag(
+          ExecuteSessionTag(
+            executeHolder.sessionHolder.userId,
+            executeHolder.sessionHolder.sessionId,
+            tag))
       }
       session.sparkContext.setJobDescription(
         s"Spark Connect - ${StringUtils.abbreviate(debugString, 128)}")
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteHolder.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteHolder.scala
index 36c96b2617f..7ad68d06f96 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteHolder.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteHolder.scala
@@ -38,11 +38,7 @@ private[connect] class ExecuteHolder(
    * Tag that is set for this execution on SparkContext, via 
SparkContext.addJobTag. Used
    * (internally) for cancallation of the Spark Jobs ran by this execution.
    */
-  val jobTag =
-    s"SparkConnect_Execute_" +
-      s"User_${sessionHolder.userId}_" +
-      s"Session_${sessionHolder.sessionId}_" +
-      s"Operation_${operationId}"
+  val jobTag = ExecuteJobTag(sessionHolder.userId, sessionHolder.sessionId, 
operationId)
 
   /**
    * Tags set by Spark Connect client users via SparkSession.addTag. Used to 
identify and group
@@ -122,3 +118,36 @@ private[connect] class ExecuteHolder(
       
s"User_${sessionHolder.userId}_Session_${sessionHolder.sessionId}_Tag_${tag}"
   }
 }
+
+/** Used to identify ExecuteHolder jobTag among SparkContext.SPARK_JOB_TAGS. */
+object ExecuteJobTag {
+  private val prefix = "SparkConnect_OperationTag"
+
+  def apply(sessionId: String, userId: String, operationId: String): String = {
+    s"${prefix}_" +
+      s"User_${userId}_" +
+      s"Session_${sessionId}_" +
+      s"Operation_${operationId}"
+  }
+
+  def unapply(jobTag: String): Option[String] = {
+    if (jobTag.startsWith(prefix)) Some(jobTag) else None
+  }
+}
+
+/** Used to identify ExecuteHolder sessionTag among 
SparkContext.SPARK_JOB_TAGS. */
+object ExecuteSessionTag {
+  private val prefix = "SparkConnect_SessionTag"
+
+  def apply(userId: String, sessionId: String, tag: String): String = {
+    ProtoUtils.throwIfInvalidTag(tag)
+    s"${prefix}_" +
+      s"User_${userId}_" +
+      s"Session_${sessionId}_" +
+      s"Tag_${tag}"
+  }
+
+  def unapply(sessionTag: String): Option[String] = {
+    if (sessionTag.startsWith(prefix)) Some(sessionTag) else None
+  }
+}
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectServer.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectServer.scala
index 1ed8ee2ff86..26c1062bf34 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectServer.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectServer.scala
@@ -34,7 +34,7 @@ object SparkConnectServer extends Logging {
     val session = SparkSession.builder.getOrCreate()
     try {
       try {
-        SparkConnectService.start()
+        SparkConnectService.start(session.sparkContext)
         SparkConnectService.server.getListenSockets.foreach { sa =>
           val isa = sa.asInstanceOf[InetSocketAddress]
           logInfo(
@@ -49,6 +49,7 @@ object SparkConnectServer extends Logging {
       SparkConnectService.server.awaitTermination()
     } finally {
       session.stop()
+      SparkConnectService.uiTab.foreach(_.detach())
     }
   }
 }
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala
index 87e4f21732f..206e24714fe 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala
@@ -29,13 +29,16 @@ import io.grpc.protobuf.services.ProtoReflectionService
 import io.grpc.stub.StreamObserver
 import org.apache.commons.lang3.StringUtils
 
-import org.apache.spark.{SparkEnv, SparkSQLException}
+import org.apache.spark.{SparkContext, SparkEnv, SparkSQLException}
 import org.apache.spark.connect.proto
 import org.apache.spark.connect.proto.{AddArtifactsRequest, 
AddArtifactsResponse}
 import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.UI.UI_ENABLED
 import org.apache.spark.sql.SparkSession
 import 
org.apache.spark.sql.connect.config.Connect.{CONNECT_GRPC_BINDING_ADDRESS, 
CONNECT_GRPC_BINDING_PORT, CONNECT_GRPC_MAX_INBOUND_MESSAGE_SIZE}
+import org.apache.spark.sql.connect.ui.{SparkConnectServerAppStatusStore, 
SparkConnectServerListener, SparkConnectServerTab}
 import org.apache.spark.sql.connect.utils.ErrorUtils
+import org.apache.spark.status.ElementTrackingStore
 
 /**
  * The SparkConnectService implementation.
@@ -181,6 +184,9 @@ object SparkConnectService extends Logging {
 
   private[connect] var server: Server = _
 
+  private[connect] var uiTab: Option[SparkConnectServerTab] = None
+  private[connect] var listener: SparkConnectServerListener = _
+
   // For testing purpose, it's package level private.
   private[connect] def localPort: Int = {
     assert(server != null)
@@ -251,6 +257,20 @@ object SparkConnectService extends Logging {
     SparkSession.active.newSession()
   }
 
+  private def createListenerAndUI(sc: SparkContext): Unit = {
+    val kvStore = sc.statusStore.store.asInstanceOf[ElementTrackingStore]
+    listener = new SparkConnectServerListener(kvStore, sc.conf)
+    sc.listenerBus.addToStatusQueue(listener)
+    uiTab = if (sc.getConf.get(UI_ENABLED)) {
+      Some(
+        new SparkConnectServerTab(
+          new SparkConnectServerAppStatusStore(kvStore),
+          SparkConnectServerTab.getSparkUI(sc)))
+    } else {
+      None
+    }
+  }
+
   /**
    * Starts the GRPC Serivce.
    */
@@ -280,8 +300,9 @@ object SparkConnectService extends Logging {
   }
 
   // Starts the service
-  def start(): Unit = {
+  def start(sc: SparkContext): Unit = {
     startGRPCService()
+    createListenerAndUI(sc)
   }
 
   def stop(timeout: Option[Long] = None, unit: Option[TimeUnit] = None): Unit 
= {
@@ -294,6 +315,7 @@ object SparkConnectService extends Logging {
       }
     }
     userSessionMapping.invalidateAll()
+    uiTab.foreach(_.detach())
   }
 
   def extractErrorMessage(st: Throwable): String = {
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/ui/SparkConnectServerAppStatusStore.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/ui/SparkConnectServerAppStatusStore.scala
new file mode 100644
index 00000000000..28812e49d1f
--- /dev/null
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/ui/SparkConnectServerAppStatusStore.scala
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.ui
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+import com.fasterxml.jackson.annotation.JsonIgnore
+
+import org.apache.spark.status.KVUtils
+import org.apache.spark.status.KVUtils.KVIndexParam
+import org.apache.spark.util.kvstore.{KVIndex, KVStore}
+
+class SparkConnectServerAppStatusStore(store: KVStore) {
+  def getSessionList: Seq[SessionInfo] = {
+    KVUtils.viewToSeq(store.view(classOf[SessionInfo]))
+  }
+
+  def getExecutionList: Seq[ExecutionInfo] = {
+    KVUtils.viewToSeq(store.view(classOf[ExecutionInfo]))
+  }
+
+  def getOnlineSessionNum: Int = {
+    KVUtils.count(store.view(classOf[SessionInfo]))(_.finishTimestamp == 0)
+  }
+
+  def getSession(sessionId: String): Option[SessionInfo] = {
+    try {
+      Some(store.read(classOf[SessionInfo], sessionId))
+    } catch {
+      case _: NoSuchElementException => None
+    }
+  }
+
+  def getExecution(executionId: String): Option[ExecutionInfo] = {
+    try {
+      Some(store.read(classOf[ExecutionInfo], executionId))
+    } catch {
+      case _: NoSuchElementException => None
+    }
+  }
+
+  /**
+   * When an error or a cancellation occurs, we set the finishTimestamp of the 
statement.
+   * Therefore, when we count the number of running statements, we need to 
exclude errors and
+   * cancellations and count all statements that have not been closed so far.
+   */
+  def getTotalRunning: Int = {
+    KVUtils.count(store.view(classOf[ExecutionInfo]))(_.isExecutionActive)
+  }
+
+  def getSessionCount: Long = {
+    store.count(classOf[SessionInfo])
+  }
+
+  def getExecutionCount: Long = {
+    store.count(classOf[ExecutionInfo])
+  }
+}
+
+private[connect] class SessionInfo(
+    @KVIndexParam val sessionId: String,
+    val startTimestamp: Long,
+    val userId: String,
+    val finishTimestamp: Long,
+    val totalExecution: Long) {
+  @JsonIgnore @KVIndex("finishTime")
+  private def finishTimeIndex: Long = if (finishTimestamp > 0L) 
finishTimestamp else -1L
+  def totalTime: Long = {
+    if (finishTimestamp == 0L) {
+      System.currentTimeMillis - startTimestamp
+    } else {
+      finishTimestamp - startTimestamp
+    }
+  }
+}
+
+private[connect] class ExecutionInfo(
+    @KVIndexParam val jobTag: String,
+    val statement: String,
+    val sessionId: String,
+    val startTimestamp: Long,
+    val userId: String,
+    val operationId: String,
+    val sparkSessionTags: Set[String],
+    val finishTimestamp: Long,
+    val closeTimestamp: Long,
+    val detail: String,
+    val state: ExecutionState.Value,
+    val jobId: ArrayBuffer[String],
+    val sqlExecId: mutable.Set[String]) {
+  @JsonIgnore @KVIndex("finishTime")
+  private def finishTimeIndex: Long = if (finishTimestamp > 0L && 
!isExecutionActive) {
+    finishTimestamp
+  } else -1L
+
+  @JsonIgnore @KVIndex("isExecutionActive")
+  def isExecutionActive: Boolean = {
+    state == ExecutionState.STARTED ||
+    state == ExecutionState.COMPILED ||
+    state == ExecutionState.READY
+  }
+
+  def totalTime(endTime: Long): Long = {
+    if (endTime == 0L) {
+      System.currentTimeMillis - startTimestamp
+    } else {
+      endTime - startTimestamp
+    }
+  }
+}
+
+private[connect] object ExecutionState extends Enumeration {
+  val STARTED, COMPILED, READY, CANCELED, FAILED, FINISHED, CLOSED = Value
+  type ExecutionState = Value
+}
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/StreamingQueryHistoryServerPlugin.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/ui/SparkConnectServerHistoryServerPlugin.scala
similarity index 57%
copy from 
sql/core/src/main/scala/org/apache/spark/sql/execution/ui/StreamingQueryHistoryServerPlugin.scala
copy to 
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/ui/SparkConnectServerHistoryServerPlugin.scala
index a127fa59b74..ba289e30a65 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/StreamingQueryHistoryServerPlugin.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/ui/SparkConnectServerHistoryServerPlugin.scala
@@ -15,29 +15,27 @@
  * limitations under the License.
  */
 
-package org.apache.spark.sql.execution.ui
+package org.apache.spark.sql.connect.ui
 
 import org.apache.spark.SparkConf
 import org.apache.spark.scheduler.SparkListener
-import org.apache.spark.sql.execution.streaming.StreamingQueryListenerBus
-import org.apache.spark.sql.streaming.ui.{StreamingQueryStatusListener, 
StreamingQueryTab}
 import org.apache.spark.status.{AppHistoryServerPlugin, ElementTrackingStore}
 import org.apache.spark.ui.SparkUI
 
-class StreamingQueryHistoryServerPlugin extends AppHistoryServerPlugin {
+class SparkConnectServerHistoryServerPlugin extends AppHistoryServerPlugin {
 
-  override def createListeners(conf: SparkConf, store: ElementTrackingStore): 
Seq[SparkListener] = {
-    val listenerBus = new StreamingQueryListenerBus(None)
-    listenerBus.addListener(new StreamingQueryStatusListener(conf, store))
-    Seq(listenerBus)
+  override def createListeners(
+      conf: SparkConf,
+      store: ElementTrackingStore): Seq[SparkListener] = {
+    Seq(new SparkConnectServerListener(store, conf))
   }
 
   override def setupUI(ui: SparkUI): Unit = {
-    val streamingQueryStatusStore = new 
StreamingQueryStatusStore(ui.store.store)
-    if (streamingQueryStatusStore.allQueryUIData.nonEmpty) {
-      new StreamingQueryTab(streamingQueryStatusStore, ui)
+    val store = new SparkConnectServerAppStatusStore(ui.store.store)
+    if (store.getSessionCount > 0) {
+      new SparkConnectServerTab(store, ui)
     }
   }
 
-  override def displayOrder: Int = 1
+  override def displayOrder: Int = 3
 }
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/ui/SparkConnectServerListener.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/ui/SparkConnectServerListener.scala
new file mode 100644
index 00000000000..b40e847f404
--- /dev/null
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/ui/SparkConnectServerListener.scala
@@ -0,0 +1,377 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.ui
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{SparkConf, SparkContext, SparkEnv}
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.Status.LIVE_ENTITY_UPDATE_PERIOD
+import org.apache.spark.scheduler._
+import org.apache.spark.sql.connect.config.Connect.{CONNECT_UI_SESSION_LIMIT, 
CONNECT_UI_STATEMENT_LIMIT}
+import org.apache.spark.sql.connect.service._
+import org.apache.spark.sql.execution.SQLExecution
+import org.apache.spark.status.{ElementTrackingStore, KVUtils, LiveEntity}
+
+private[connect] class SparkConnectServerListener(
+    kvstore: ElementTrackingStore,
+    sparkConf: SparkConf,
+    live: Boolean = true)
+    extends SparkListener
+    with Logging {
+
+  private val sessionList = new mutable.LinkedHashMap[String, LiveSessionData]
+  private val executionList = new mutable.LinkedHashMap[String, 
LiveExecutionData]
+
+  private val (retainedStatements: Int, retainedSessions: Int) = {
+    (
+      SparkEnv.get.conf.get(CONNECT_UI_STATEMENT_LIMIT),
+      SparkEnv.get.conf.get(CONNECT_UI_SESSION_LIMIT))
+  }
+
+  // How often to update live entities. -1 means "never update" when replaying 
applications,
+  // meaning only the last write will happen. For live applications, this 
avoids a few
+  // operations that we can live without when rapidly processing incoming 
events.
+  private val liveUpdatePeriodNs = if (live) 
sparkConf.get(LIVE_ENTITY_UPDATE_PERIOD) else -1L
+
+  // Returns true if this listener has no live data. Exposed for tests only.
+  private[connect] def noLiveData(): Boolean = synchronized {
+    sessionList.isEmpty && executionList.isEmpty
+  }
+
+  kvstore.addTrigger(classOf[SessionInfo], retainedSessions) { count =>
+    cleanupSession(count)
+  }
+
+  kvstore.addTrigger(classOf[ExecutionInfo], retainedStatements) { count =>
+    cleanupExecutions(count)
+  }
+
+  override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
+    val jobTags = Option(jobStart.properties)
+      .flatMap { p => Option(p.getProperty(SparkContext.SPARK_JOB_TAGS)) }
+      .map(_.split(SparkContext.SPARK_JOB_TAGS_SEP).toSet)
+      .getOrElse(Set())
+      .toSeq
+      .filter(!_.isEmpty)
+      .sorted
+    val executeJobTagOpt = jobTags.find {
+      case ExecuteJobTag(_) => true
+      case _ => false
+    }
+    if (executeJobTagOpt.isEmpty) {
+      return
+    }
+    val executeJobTag = executeJobTagOpt.get
+    val exec = executionList.get(executeJobTag)
+    val executionIdOpt: Option[String] = Option(jobStart.properties)
+      .flatMap { p => Option(p.getProperty(SQLExecution.EXECUTION_ID_KEY)) }
+    if (exec.nonEmpty) {
+      exec.foreach { exec =>
+        exec.jobId += jobStart.jobId.toString
+        executionIdOpt.foreach { execId => exec.sqlExecId += execId }
+        updateLiveStore(exec)
+      }
+    } else {
+      // It may possible that event reordering happens, such a way that 
JobStart event come after
+      // Execution end event (Refer SPARK-27019). To handle that situation, if 
occurs in
+      // Spark Connect Server, following code will take care. Here will come 
only if JobStart
+      // event comes after Execution End event.
+      val storeExecInfo =
+        KVUtils.viewToSeq(kvstore.view(classOf[ExecutionInfo]), 
Int.MaxValue)(exec =>
+          exec.jobTag == executeJobTag)
+      storeExecInfo.foreach { exec =>
+        val liveExec = getOrCreateExecution(
+          exec.jobTag,
+          exec.statement,
+          exec.sessionId,
+          exec.startTimestamp,
+          exec.userId,
+          exec.operationId,
+          exec.sparkSessionTags)
+        liveExec.jobId += jobStart.jobId.toString
+        executionIdOpt.foreach { execId => exec.sqlExecId += execId }
+        updateStoreWithTriggerEnabled(liveExec)
+        executionList.remove(liveExec.jobTag)
+      }
+    }
+  }
+
+  override def onOtherEvent(event: SparkListenerEvent): Unit = {
+    event match {
+      case e: SparkListenerConnectOperationStarted => onOperationStarted(e)
+      case e: SparkListenerConnectOperationAnalyzed => onOperationAnalyzed(e)
+      case e: SparkListenerConnectOperationReadyForExecution => 
onOperationReadyForExecution(e)
+      case e: SparkListenerConnectOperationCanceled => onOperationCanceled(e)
+      case e: SparkListenerConnectOperationFailed => onOperationFailed(e)
+      case e: SparkListenerConnectOperationFinished => onOperationFinished(e)
+      case e: SparkListenerConnectOperationClosed => onOperationClosed(e)
+      case e: SparkListenerConnectSessionStarted => onSessionStarted(e)
+      case e: SparkListenerConnectSessionClosed => onSessionClosed(e)
+      case _ => // Ignore
+    }
+  }
+
+  private def onOperationStarted(e: SparkListenerConnectOperationStarted) = 
synchronized {
+    val executionData = getOrCreateExecution(
+      e.jobTag,
+      e.statementText,
+      e.sessionId,
+      e.eventTime,
+      e.userId,
+      e.operationId,
+      e.sparkSessionTags)
+    executionData.state = ExecutionState.STARTED
+    executionList.put(e.jobTag, executionData)
+    updateLiveStore(executionData)
+    sessionList.get(e.sessionId) match {
+      case Some(sessionData) =>
+        sessionData.totalExecution += 1
+        updateLiveStore(sessionData)
+      case None =>
+        logWarning(
+          s"onOperationStart called with unknown session id: ${e.sessionId}." +
+            s"Regardless, the operation has been registered.")
+    }
+  }
+
+  private def onOperationAnalyzed(e: SparkListenerConnectOperationAnalyzed) = 
synchronized {
+    executionList.get(e.jobTag) match {
+      case Some(executionData) =>
+        executionData.state = ExecutionState.COMPILED
+        updateLiveStore(executionData)
+      case None =>
+        logWarning(s"onOperationAnalyzed called with unknown operation id: 
${e.jobTag}")
+    }
+  }
+
+  private def onOperationReadyForExecution(
+      e: SparkListenerConnectOperationReadyForExecution): Unit = synchronized {
+    executionList.get(e.jobTag) match {
+      case Some(executionData) =>
+        executionData.state = ExecutionState.READY
+        updateLiveStore(executionData)
+      case None =>
+        logWarning(s"onOperationReadyForExectuion called with unknown 
operation id: ${e.jobTag}")
+    }
+  }
+
+  private def onOperationCanceled(e: SparkListenerConnectOperationCanceled) = 
synchronized {
+    executionList.get(e.jobTag) match {
+      case Some(executionData) =>
+        executionData.finishTimestamp = e.eventTime
+        executionData.state = ExecutionState.CANCELED
+        updateLiveStore(executionData)
+      case None =>
+        logWarning(s"onOperationCanceled called with unknown operation id: 
${e.jobTag}")
+    }
+  }
+  private def onOperationFailed(e: SparkListenerConnectOperationFailed) = 
synchronized {
+    executionList.get(e.jobTag) match {
+      case Some(executionData) =>
+        executionData.finishTimestamp = e.eventTime
+        executionData.detail = e.errorMessage
+        executionData.state = ExecutionState.FAILED
+        updateLiveStore(executionData)
+      case None =>
+        logWarning(s"onOperationFailed called with unknown operation id: 
${e.jobTag}")
+    }
+  }
+  private def onOperationFinished(e: SparkListenerConnectOperationFinished) = 
synchronized {
+    executionList.get(e.jobTag) match {
+      case Some(executionData) =>
+        executionData.finishTimestamp = e.eventTime
+        executionData.state = ExecutionState.FINISHED
+        updateLiveStore(executionData)
+      case None =>
+        logWarning(s"onOperationFinished called with unknown operation id: 
${e.jobTag}")
+    }
+  }
+  private def onOperationClosed(e: SparkListenerConnectOperationClosed) = 
synchronized {
+    executionList.get(e.jobTag) match {
+      case Some(executionData) =>
+        executionData.closeTimestamp = e.eventTime
+        executionData.state = ExecutionState.CLOSED
+        updateStoreWithTriggerEnabled(executionData)
+        executionList.remove(e.jobTag)
+      case None =>
+        logWarning(s"onOperationClosed called with unknown operation id: 
${e.jobTag}")
+    }
+  }
+
+  private def onSessionStarted(e: SparkListenerConnectSessionStarted) = 
synchronized {
+    val session = getOrCreateSession(e.sessionId, e.userId, e.eventTime)
+    sessionList.put(e.sessionId, session)
+    updateLiveStore(session)
+  }
+
+  private def onSessionClosed(e: SparkListenerConnectSessionClosed) = 
synchronized {
+    sessionList.get(e.sessionId) match {
+      case Some(sessionData) =>
+        sessionData.finishTimestamp = e.eventTime
+        updateStoreWithTriggerEnabled(sessionData)
+        sessionList.remove(e.sessionId)
+
+      case None => logWarning(s"onSessionClosed called with unknown session 
id: ${e.sessionId}")
+    }
+  }
+
+  // Update both live and history stores. Trigger is enabled by default, hence
+  // it will cleanup the entity which exceeds the threshold.
+  def updateStoreWithTriggerEnabled(entity: LiveEntity): Unit = synchronized {
+    entity.write(kvstore, System.nanoTime(), checkTriggers = true)
+  }
+
+  // Update only live stores. If trigger is enabled, it will cleanup entity
+  // which exceeds the threshold.
+  def updateLiveStore(entity: LiveEntity, trigger: Boolean = false): Unit = 
synchronized {
+    val now = System.nanoTime()
+    if (live && liveUpdatePeriodNs >= 0 && now - entity.lastWriteTime > 
liveUpdatePeriodNs) {
+      entity.write(kvstore, now, checkTriggers = trigger)
+    }
+  }
+
+  private def getOrCreateSession(
+      sessionId: String,
+      userName: String,
+      startTime: Long): LiveSessionData = synchronized {
+    sessionList.getOrElseUpdate(sessionId, new LiveSessionData(sessionId, 
startTime, userName))
+  }
+
+  private def getOrCreateExecution(
+      jobTag: String,
+      statement: String,
+      sessionId: String,
+      startTimestamp: Long,
+      userId: String,
+      operationId: String,
+      sparkSessionTags: Set[String]): LiveExecutionData = synchronized {
+    executionList.getOrElseUpdate(
+      jobTag,
+      new LiveExecutionData(
+        jobTag,
+        statement,
+        sessionId,
+        startTimestamp,
+        userId,
+        operationId,
+        sparkSessionTags))
+  }
+
+  private def cleanupExecutions(count: Long): Unit = {
+    val countToDelete = calculateNumberToRemove(count, retainedStatements)
+    if (countToDelete <= 0L) {
+      return
+    }
+    val view = 
kvstore.view(classOf[ExecutionInfo]).index("finishTime").first(0L)
+    val toDelete = KVUtils.viewToSeq(view, countToDelete.toInt) { j =>
+      j.finishTimestamp != 0
+    }
+    toDelete.foreach { j => kvstore.delete(j.getClass, j.jobTag) }
+  }
+
+  private def cleanupSession(count: Long): Unit = {
+    val countToDelete = calculateNumberToRemove(count, retainedSessions)
+    if (countToDelete <= 0L) {
+      return
+    }
+    val view = kvstore.view(classOf[SessionInfo]).index("finishTime").first(0L)
+    val toDelete = KVUtils.viewToSeq(view, countToDelete.toInt) { j =>
+      j.finishTimestamp != 0L
+    }
+
+    toDelete.foreach { j => kvstore.delete(j.getClass, j.sessionId) }
+  }
+
+  /**
+   * Remove at least (retainedSize / 10) items to reduce friction. Because 
tracking may be done
+   * asynchronously, this method may return 0 in case enough items have been 
deleted already.
+   */
+  private def calculateNumberToRemove(dataSize: Long, retainedSize: Long): 
Long = {
+    if (dataSize > retainedSize) {
+      math.max(retainedSize / 10L, dataSize - retainedSize)
+    } else {
+      0L
+    }
+  }
+}
+
+private[connect] class LiveExecutionData(
+    val jobTag: String,
+    val statement: String,
+    val sessionId: String,
+    val startTimestamp: Long,
+    val userId: String,
+    val operationId: String,
+    val sparkSessionTags: Set[String])
+    extends LiveEntity {
+
+  var finishTimestamp: Long = 0L
+  var closeTimestamp: Long = 0L
+  var detail: String = ""
+  var state: ExecutionState.Value = ExecutionState.STARTED
+  val jobId: ArrayBuffer[String] = ArrayBuffer[String]()
+  var sqlExecId: mutable.Set[String] = mutable.Set[String]()
+
+  override protected def doUpdate(): Any = {
+    new ExecutionInfo(
+      jobTag,
+      statement,
+      sessionId,
+      startTimestamp,
+      userId,
+      operationId,
+      sparkSessionTags,
+      finishTimestamp,
+      closeTimestamp,
+      detail,
+      state,
+      jobId,
+      sqlExecId)
+  }
+
+  def totalTime(endTime: Long): Long = {
+    if (endTime == 0L) {
+      System.currentTimeMillis - startTimestamp
+    } else {
+      endTime - startTimestamp
+    }
+  }
+}
+
+private[connect] class LiveSessionData(
+    val sessionId: String,
+    val startTimestamp: Long,
+    val userName: String)
+    extends LiveEntity {
+
+  var finishTimestamp: Long = 0L
+  var totalExecution: Int = 0
+
+  override protected def doUpdate(): Any = {
+    new SessionInfo(sessionId, startTimestamp, userName, finishTimestamp, 
totalExecution)
+  }
+  def totalTime: Long = {
+    if (finishTimestamp == 0L) {
+      System.currentTimeMillis - startTimestamp
+    } else {
+      finishTimestamp - startTimestamp
+    }
+  }
+}
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/ui/SparkConnectServerPage.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/ui/SparkConnectServerPage.scala
new file mode 100644
index 00000000000..57535cc06a2
--- /dev/null
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/ui/SparkConnectServerPage.scala
@@ -0,0 +1,541 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.ui
+
+import java.net.URLEncoder
+import java.nio.charset.StandardCharsets.UTF_8
+import javax.servlet.http.HttpServletRequest
+
+import scala.xml.Node
+
+import org.apache.commons.text.StringEscapeUtils
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.connect.ui.ToolTips._
+import org.apache.spark.ui._
+import org.apache.spark.ui.UIUtils._
+import org.apache.spark.util.Utils
+
+/** Page for Spark UI that shows statistics for a Spark Connect Server. */
+private[ui] class SparkConnectServerPage(parent: SparkConnectServerTab)
+    extends WebUIPage("")
+    with Logging {
+
+  private val store = parent.store
+  private val startTime = parent.startTime
+
+  /** Render the page */
+  def render(request: HttpServletRequest): Seq[Node] = {
+    val content = store.synchronized { // make sure all parts in this page are 
consistent
+      generateBasicStats() ++
+        <br/> ++
+        <h4>
+          {store.getOnlineSessionNum}
+          session(s) are online,
+          running
+          {store.getTotalRunning}
+          Request(s)
+        </h4> ++
+        generateSessionStatsTable(request) ++
+        generateSQLStatsTable(request)
+    }
+    UIUtils.headerSparkPage(request, "Spark Connect", content, parent)
+  }
+
+  /** Generate basic stats of the Spark Connect server */
+  private def generateBasicStats(): Seq[Node] = {
+    val timeSinceStart = System.currentTimeMillis() - startTime.getTime
+    <ul class ="list-unstyled">
+      <li>
+        <strong>Started at: </strong> {formatDate(startTime)}
+      </li>
+      <li>
+        <strong>Time since start: 
</strong>{formatDurationVerbose(timeSinceStart)}
+      </li>
+    </ul>
+  }
+
+  /** Generate stats of batch statements of the Spark Connect program */
+  private def generateSQLStatsTable(request: HttpServletRequest): Seq[Node] = {
+
+    val numStatement = store.getExecutionList.size
+
+    val table = if (numStatement > 0) {
+
+      val sqlTableTag = "sqlstat"
+
+      val sqlTablePage =
+        
Option(request.getParameter(s"$sqlTableTag.page")).map(_.toInt).getOrElse(1)
+
+      try {
+        Some(
+          new SqlStatsPagedTable(
+            request,
+            parent,
+            store.getExecutionList,
+            "connect",
+            UIUtils.prependBaseUri(request, parent.basePath),
+            sqlTableTag,
+            showSessionLink = true).table(sqlTablePage))
+      } catch {
+        case e @ (_: IllegalArgumentException | _: IndexOutOfBoundsException) 
=>
+          Some(<div class="alert alert-error">
+            <p>Error while rendering job table:</p>
+            <pre>
+              {Utils.exceptionString(e)}
+            </pre>
+          </div>)
+      }
+    } else {
+      None
+    }
+    val content =
+      <span id="sqlstat" class="collapse-aggregated-sqlstat collapse-table"
+            onClick="collapseTable('collapse-aggregated-sqlstat',
+                'aggregated-sqlstat')">
+        <h4>
+          <span class="collapse-table-arrow arrow-open"></span>
+          <a>Request Statistics ({numStatement})</a>
+        </h4>
+      </span> ++
+        <div class="aggregated-sqlstat collapsible-table">
+          {table.getOrElse("No statistics have been generated yet.")}
+        </div>
+    content
+  }
+
+  /** Generate stats of batch sessions of the Spark Connect server */
+  private def generateSessionStatsTable(request: HttpServletRequest): 
Seq[Node] = {
+    val numSessions = store.getSessionList.size
+    val table = if (numSessions > 0) {
+
+      val sessionTableTag = "sessionstat"
+
+      val sessionTablePage =
+        
Option(request.getParameter(s"$sessionTableTag.page")).map(_.toInt).getOrElse(1)
+
+      try {
+        Some(
+          new SessionStatsPagedTable(
+            request,
+            parent,
+            store.getSessionList,
+            "connect",
+            UIUtils.prependBaseUri(request, parent.basePath),
+            sessionTableTag).table(sessionTablePage))
+      } catch {
+        case e @ (_: IllegalArgumentException | _: IndexOutOfBoundsException) 
=>
+          Some(<div class="alert alert-error">
+            <p>Error while rendering job table:</p>
+            <pre>
+              {Utils.exceptionString(e)}
+            </pre>
+          </div>)
+      }
+    } else {
+      None
+    }
+
+    val content =
+      <span id="sessionstat" class="collapse-aggregated-sessionstat 
collapse-table"
+            onClick="collapseTable('collapse-aggregated-sessionstat',
+                'aggregated-sessionstat')">
+        <h4>
+          <span class="collapse-table-arrow arrow-open"></span>
+          <a>Session Statistics ({numSessions})</a>
+        </h4>
+      </span> ++
+        <div class="aggregated-sessionstat collapsible-table">
+          {table.getOrElse("No statistics have been generated yet.")}
+        </div>
+
+    content
+  }
+}
+
+private[ui] class SqlStatsPagedTable(
+    request: HttpServletRequest,
+    parent: SparkConnectServerTab,
+    data: Seq[ExecutionInfo],
+    subPath: String,
+    basePath: String,
+    sqlStatsTableTag: String,
+    showSessionLink: Boolean)
+    extends PagedTable[SqlStatsTableRow] {
+
+  private val (sortColumn, desc, pageSize) =
+    getTableParameters(request, sqlStatsTableTag, "Start Time")
+
+  private val encodedSortColumn = URLEncoder.encode(sortColumn, UTF_8.name())
+
+  private val parameterPath =
+    s"$basePath/$subPath/?${getParameterOtherTable(request, sqlStatsTableTag)}"
+
+  override val dataSource = new SqlStatsTableDataSource(data, pageSize, 
sortColumn, desc)
+
+  override def tableId: String = sqlStatsTableTag
+
+  override def tableCssClass: String =
+    "table table-bordered table-sm table-striped table-head-clickable 
table-cell-width-limited"
+
+  override def pageLink(page: Int): String = {
+    parameterPath +
+      s"&$pageNumberFormField=$page" +
+      s"&$sqlStatsTableTag.sort=$encodedSortColumn" +
+      s"&$sqlStatsTableTag.desc=$desc" +
+      s"&$pageSizeFormField=$pageSize" +
+      s"#$sqlStatsTableTag"
+  }
+
+  override def pageSizeFormField: String = s"$sqlStatsTableTag.pageSize"
+
+  override def pageNumberFormField: String = s"$sqlStatsTableTag.page"
+
+  override def goButtonFormPath: String =
+    s"$parameterPath&$sqlStatsTableTag.sort=$encodedSortColumn" +
+      s"&$sqlStatsTableTag.desc=$desc#$sqlStatsTableTag"
+
+  override def headers: Seq[Node] = {
+    val sqlTableHeadersAndTooltips: Seq[(String, Boolean, Option[String])] =
+      if (showSessionLink) {
+        Seq(
+          ("User", true, None),
+          ("Job ID", true, None),
+          ("SQL Query ID", true, None),
+          ("Session ID", true, None),
+          ("Start Time", true, None),
+          ("Finish Time", true, Some(SPARK_CONNECT_SERVER_FINISH_TIME)),
+          ("Close Time", true, Some(SPARK_CONNECT_SERVER_CLOSE_TIME)),
+          ("Execution Time", true, Some(SPARK_CONNECT_SERVER_EXECUTION)),
+          ("Duration", true, Some(SPARK_CONNECT_SERVER_DURATION)),
+          ("Statement", true, None),
+          ("State", true, None),
+          ("Operation ID", true, None),
+          ("Job Tag", true, None),
+          ("Spark Session Tags", true, None),
+          ("Detail", true, None))
+      } else {
+        Seq(
+          ("User", true, None),
+          ("Job ID", true, None),
+          ("SQL Query ID", true, None),
+          ("Start Time", true, None),
+          ("Finish Time", true, Some(SPARK_CONNECT_SERVER_FINISH_TIME)),
+          ("Close Time", true, Some(SPARK_CONNECT_SERVER_CLOSE_TIME)),
+          ("Execution Time", true, Some(SPARK_CONNECT_SERVER_EXECUTION)),
+          ("Duration", true, Some(SPARK_CONNECT_SERVER_DURATION)),
+          ("Statement", true, None),
+          ("State", true, None),
+          ("Operation ID", true, None),
+          ("Job Tag", true, None),
+          ("Spark Session Tags", true, None),
+          ("Detail", true, None))
+      }
+
+    isSortColumnValid(sqlTableHeadersAndTooltips, sortColumn)
+
+    headerRow(
+      sqlTableHeadersAndTooltips,
+      desc,
+      pageSize,
+      sortColumn,
+      parameterPath,
+      sqlStatsTableTag,
+      sqlStatsTableTag)
+  }
+
+  override def row(sqlStatsTableRow: SqlStatsTableRow): Seq[Node] = {
+    val info = sqlStatsTableRow.executionInfo
+    val startTime = info.startTimestamp
+    val executionTime = sqlStatsTableRow.executionTime
+    val duration = sqlStatsTableRow.duration
+
+    def jobLinks(jobData: Seq[String]): Seq[Node] = {
+      jobData.map { jobId =>
+        <a href={jobURL(request, jobId)}>[{jobId}]</a>
+      }
+    }
+    def sqlLinks(sqlData: Seq[String]): Seq[Node] = {
+      sqlData.map { sqlExecId =>
+        <a href={sqlURL(request, sqlExecId)}>[{sqlExecId}]</a>
+      }
+    }
+    val sessionLink = "%s/%s/session/?id=%s".format(
+      UIUtils.prependBaseUri(request, parent.basePath),
+      parent.prefix,
+      info.sessionId)
+
+    <tr>
+      <td>
+        {info.userId}
+      </td>
+      <td>
+        {jobLinks(sqlStatsTableRow.jobId)}
+      </td>
+      <td>
+        {sqlLinks(sqlStatsTableRow.sqlExecId)}
+      </td>
+      {
+      if (showSessionLink) {
+        <td>
+          <a href={sessionLink}>{info.sessionId}</a>
+        </td>
+      }
+    }
+      <td>
+        {UIUtils.formatDate(startTime)}
+      </td>
+      <td>
+        {if (info.finishTimestamp > 0) formatDate(info.finishTimestamp)}
+      </td>
+      <td>
+        {if (info.closeTimestamp > 0) formatDate(info.closeTimestamp)}
+      </td>
+      <!-- Returns a human-readable string representing a duration such as "5 
second 35 ms"-->
+      <td >
+        {formatDurationVerbose(executionTime)}
+      </td>
+      <td >
+        {formatDurationVerbose(duration)}
+      </td>
+      <td>
+        <span class="description-input">
+          {info.statement}
+        </span>
+      </td>
+      <td>
+        {if (info.isExecutionActive) "RUNNING" else info.state}
+      </td>
+      <td>
+        {info.operationId}
+      </td>
+      <td>
+        {info.jobTag}
+      </td>
+      <td>
+        {sqlStatsTableRow.sparkSessionTags.mkString(", ")}
+      </td>
+      {errorMessageCell(Option(info.detail))}
+    </tr>
+  }
+
+  private def errorMessageCell(errorMessageOption: Option[String]): Seq[Node] 
= {
+    val errorMessage = errorMessageOption.getOrElse("")
+    val isMultiline = errorMessage.indexOf('\n') >= 0
+    val errorSummary = StringEscapeUtils.escapeHtml4(if (isMultiline) {
+      errorMessage.substring(0, errorMessage.indexOf('\n'))
+    } else {
+      errorMessage
+    })
+    val details = detailsUINode(isMultiline, errorMessage)
+    <td>
+      {errorSummary}{details}
+    </td>
+  }
+
+  private def jobURL(request: HttpServletRequest, jobId: String): String =
+    "%s/jobs/job/?id=%s".format(UIUtils.prependBaseUri(request, 
parent.basePath), jobId)
+
+  private def sqlURL(request: HttpServletRequest, sqlExecId: String): String =
+    "%s/SQL/execution/?id=%s".format(UIUtils.prependBaseUri(request, 
parent.basePath), sqlExecId)
+}
+
+private[ui] class SessionStatsPagedTable(
+    request: HttpServletRequest,
+    parent: SparkConnectServerTab,
+    data: Seq[SessionInfo],
+    subPath: String,
+    basePath: String,
+    sessionStatsTableTag: String)
+    extends PagedTable[SessionInfo] {
+
+  private val (sortColumn, desc, pageSize) =
+    getTableParameters(request, sessionStatsTableTag, "Start Time")
+
+  private val encodedSortColumn = URLEncoder.encode(sortColumn, UTF_8.name())
+
+  private val parameterPath =
+    s"$basePath/$subPath/?${getParameterOtherTable(request, 
sessionStatsTableTag)}"
+
+  override val dataSource = new SessionStatsTableDataSource(data, pageSize, 
sortColumn, desc)
+
+  override def tableId: String = sessionStatsTableTag
+
+  override def tableCssClass: String =
+    "table table-bordered table-sm table-striped table-head-clickable 
table-cell-width-limited"
+
+  override def pageLink(page: Int): String = {
+    parameterPath +
+      s"&$pageNumberFormField=$page" +
+      s"&$sessionStatsTableTag.sort=$encodedSortColumn" +
+      s"&$sessionStatsTableTag.desc=$desc" +
+      s"&$pageSizeFormField=$pageSize" +
+      s"#$sessionStatsTableTag"
+  }
+
+  override def pageSizeFormField: String = s"$sessionStatsTableTag.pageSize"
+
+  override def pageNumberFormField: String = s"$sessionStatsTableTag.page"
+
+  override def goButtonFormPath: String =
+    s"$parameterPath&$sessionStatsTableTag.sort=$encodedSortColumn" +
+      s"&$sessionStatsTableTag.desc=$desc#$sessionStatsTableTag"
+
+  override def headers: Seq[Node] = {
+    val sessionTableHeadersAndTooltips: Seq[(String, Boolean, Option[String])] 
=
+      Seq(
+        ("User", true, None),
+        ("Session ID", true, None),
+        ("Start Time", true, None),
+        ("Finish Time", true, None),
+        ("Duration", true, Some(SPARK_CONNECT_SESSION_DURATION)),
+        ("Total Execute", true, Some(SPARK_CONNECT_SESSION_TOTAL_EXECUTE)))
+
+    isSortColumnValid(sessionTableHeadersAndTooltips, sortColumn)
+
+    headerRow(
+      sessionTableHeadersAndTooltips,
+      desc,
+      pageSize,
+      sortColumn,
+      parameterPath,
+      sessionStatsTableTag,
+      sessionStatsTableTag)
+  }
+
+  override def row(session: SessionInfo): Seq[Node] = {
+    val sessionLink = "%s/%s/session/?id=%s".format(
+      UIUtils.prependBaseUri(request, parent.basePath),
+      parent.prefix,
+      session.sessionId)
+    <tr>
+      <td> {session.userId} </td>
+      <td> <a href={sessionLink}> {session.sessionId} </a> </td>
+      <td> {formatDate(session.startTimestamp)} </td>
+      <td> {if (session.finishTimestamp > 0) 
formatDate(session.finishTimestamp)} </td>
+      <td> {formatDurationVerbose(session.totalTime)} </td>
+      <td> {session.totalExecution.toString} </td>
+    </tr>
+  }
+}
+
+private[ui] class SqlStatsTableRow(
+    val jobTag: String,
+    val jobId: Seq[String],
+    val sqlExecId: Seq[String],
+    val duration: Long,
+    val executionTime: Long,
+    val sparkSessionTags: Seq[String],
+    val executionInfo: ExecutionInfo)
+
+private[ui] class SqlStatsTableDataSource(
+    info: Seq[ExecutionInfo],
+    pageSize: Int,
+    sortColumn: String,
+    desc: Boolean)
+    extends PagedDataSource[SqlStatsTableRow](pageSize) {
+
+  // Convert ExecutionInfo to SqlStatsTableRow which contains the final 
contents to show in
+  // the table so that we can avoid creating duplicate contents during sorting 
the data
+  private val data = info.map(sqlStatsTableRow).sorted(ordering(sortColumn, 
desc))
+
+  override def dataSize: Int = data.size
+
+  override def sliceData(from: Int, to: Int): Seq[SqlStatsTableRow] = 
data.slice(from, to)
+
+  private def sqlStatsTableRow(executionInfo: ExecutionInfo): SqlStatsTableRow 
= {
+    val duration = executionInfo.totalTime(executionInfo.closeTimestamp)
+    val executionTime = executionInfo.totalTime(executionInfo.finishTimestamp)
+    val jobId = executionInfo.jobId.toSeq.sorted
+    val sqlExecId = executionInfo.sqlExecId.toSeq.sorted
+    val sparkSessionTags = executionInfo.sparkSessionTags.toSeq.sorted
+
+    new SqlStatsTableRow(
+      executionInfo.jobTag,
+      jobId,
+      sqlExecId,
+      duration,
+      executionTime,
+      sparkSessionTags,
+      executionInfo)
+  }
+
+  /**
+   * Return Ordering according to sortColumn and desc.
+   */
+  private def ordering(sortColumn: String, desc: Boolean): 
Ordering[SqlStatsTableRow] = {
+    val ordering: Ordering[SqlStatsTableRow] = sortColumn match {
+      case "User" => Ordering.by(_.executionInfo.userId)
+      case "Operation ID" => Ordering.by(_.executionInfo.operationId)
+      case "Job ID" => Ordering.by(_.jobId.headOption)
+      case "SQL Query ID" => Ordering.by(_.sqlExecId.headOption)
+      case "Session ID" => Ordering.by(_.executionInfo.sessionId)
+      case "Start Time" => Ordering.by(_.executionInfo.startTimestamp)
+      case "Finish Time" => Ordering.by(_.executionInfo.finishTimestamp)
+      case "Close Time" => Ordering.by(_.executionInfo.closeTimestamp)
+      case "Execution Time" => Ordering.by(_.executionTime)
+      case "Duration" => Ordering.by(_.duration)
+      case "Statement" => Ordering.by(_.executionInfo.statement)
+      case "State" => Ordering.by(_.executionInfo.state)
+      case "Detail" => Ordering.by(_.executionInfo.detail)
+      case "Job Tag" => Ordering.by(_.executionInfo.jobTag)
+      case "Spark Session Tags" => Ordering.by(_.sparkSessionTags.headOption)
+      case unknownColumn => throw new IllegalArgumentException(s"Unknown 
column: $unknownColumn")
+    }
+    if (desc) {
+      ordering.reverse
+    } else {
+      ordering
+    }
+  }
+}
+
+private[ui] class SessionStatsTableDataSource(
+    info: Seq[SessionInfo],
+    pageSize: Int,
+    sortColumn: String,
+    desc: Boolean)
+    extends PagedDataSource[SessionInfo](pageSize) {
+
+  // Sorting SessionInfo data
+  private val data = info.sorted(ordering(sortColumn, desc))
+
+  override def dataSize: Int = data.size
+
+  override def sliceData(from: Int, to: Int): Seq[SessionInfo] = 
data.slice(from, to)
+
+  /**
+   * Return Ordering according to sortColumn and desc.
+   */
+  private def ordering(sortColumn: String, desc: Boolean): 
Ordering[SessionInfo] = {
+    val ordering: Ordering[SessionInfo] = sortColumn match {
+      case "User" => Ordering.by(_.userId)
+      case "Session ID" => Ordering.by(_.sessionId)
+      case "Start Time" => Ordering.by(_.startTimestamp)
+      case "Finish Time" => Ordering.by(_.finishTimestamp)
+      case "Duration" => Ordering.by(_.totalTime)
+      case "Total Execute" => Ordering.by(_.totalExecution)
+      case unknownColumn => throw new IllegalArgumentException(s"Unknown 
column: $unknownColumn")
+    }
+    if (desc) {
+      ordering.reverse
+    } else {
+      ordering
+    }
+  }
+}
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/ui/SparkConnectServerSessionPage.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/ui/SparkConnectServerSessionPage.scala
new file mode 100644
index 00000000000..fde6e8da8b6
--- /dev/null
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/ui/SparkConnectServerSessionPage.scala
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.ui
+
+import javax.servlet.http.HttpServletRequest
+
+import scala.xml.Node
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.ui._
+import org.apache.spark.ui.UIUtils._
+import org.apache.spark.util.Utils
+
+/** Page for Spark UI that contains information pertaining to a single Spark 
Connect session */
+private[ui] class SparkConnectServerSessionPage(parent: SparkConnectServerTab)
+    extends WebUIPage("session")
+    with Logging {
+
+  val store = parent.store
+  private val startTime = parent.startTime
+
+  /** Render the page */
+  def render(request: HttpServletRequest): Seq[Node] = {
+    val sessionId = request.getParameter("id")
+    require(sessionId != null && sessionId.nonEmpty, "Missing id parameter")
+
+    val content = store.synchronized { // make sure all parts in this page are 
consistent
+      store
+        .getSession(sessionId)
+        .map { sessionStat =>
+          generateBasicStats() ++
+            <br/> ++
+            <h4>
+            User
+            {sessionStat.userId}
+            ,
+            Session created at
+            {formatDate(sessionStat.startTimestamp)}
+            ,
+            Total run
+            {sessionStat.totalExecution}
+            Request(s)
+          </h4> ++
+            generateSQLStatsTable(request, sessionStat.sessionId)
+        }
+        .getOrElse(<div>No information to display for session 
{sessionId}</div>)
+    }
+    UIUtils.headerSparkPage(request, "Spark Connect Session", content, parent)
+  }
+
+  /** Generate basic stats of the Spark Connect Server */
+  private def generateBasicStats(): Seq[Node] = {
+    val timeSinceStart = System.currentTimeMillis() - startTime.getTime
+    <ul class ="list-unstyled">
+      <li>
+        <strong>Started at: </strong> {formatDate(startTime)}
+      </li>
+      <li>
+        <strong>Time since start: 
</strong>{formatDurationVerbose(timeSinceStart)}
+      </li>
+    </ul>
+  }
+
+  /** Generate stats of batch statements of the Spark Connect server */
+  private def generateSQLStatsTable(request: HttpServletRequest, sessionID: 
String): Seq[Node] = {
+    val executionList = store.getExecutionList
+      .filter(_.sessionId == sessionID)
+    val numStatement = executionList.size
+    val table = if (numStatement > 0) {
+
+      val sqlTableTag = "sqlsessionstat"
+
+      val sqlTablePage =
+        
Option(request.getParameter(s"$sqlTableTag.page")).map(_.toInt).getOrElse(1)
+
+      try {
+        Some(
+          new SqlStatsPagedTable(
+            request,
+            parent,
+            executionList,
+            "connect/session",
+            UIUtils.prependBaseUri(request, parent.basePath),
+            sqlTableTag,
+            showSessionLink = false).table(sqlTablePage))
+      } catch {
+        case e @ (_: IllegalArgumentException | _: IndexOutOfBoundsException) 
=>
+          Some(<div class="alert alert-error">
+            <p>Error while rendering job table:</p>
+            <pre>
+              {Utils.exceptionString(e)}
+            </pre>
+          </div>)
+      }
+    } else {
+      None
+    }
+    val content =
+      <span id="sqlsessionstat" class="collapse-aggregated-sqlsessionstat 
collapse-table"
+            onClick="collapseTable('collapse-aggregated-sqlsessionstat',
+                'aggregated-sqlsessionstat')">
+        <h4>
+          <span class="collapse-table-arrow arrow-open"></span>
+          <a>Request Statistics</a>
+        </h4>
+      </span> ++
+        <div class="aggregated-sqlsessionstat collapsible-table">
+          {table.getOrElse("No statistics have been generated yet.")}
+        </div>
+
+    content
+  }
+}
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/ui/SparkConnectServerTab.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/ui/SparkConnectServerTab.scala
new file mode 100644
index 00000000000..4d1820a994c
--- /dev/null
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/ui/SparkConnectServerTab.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.ui
+
+import java.util.Date
+
+import org.apache.spark.SparkContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.errors.QueryExecutionErrors
+import org.apache.spark.ui.{SparkUI, SparkUITab}
+
+private[connect] class SparkConnectServerTab(
+    val store: SparkConnectServerAppStatusStore,
+    sparkUI: SparkUI)
+    extends SparkUITab(sparkUI, "connect")
+    with Logging {
+
+  override val name = "Connect"
+
+  val parent = sparkUI
+  val startTime =
+    try {
+      sparkUI.store.applicationInfo().attempts.head.startTime
+    } catch {
+      case _: NoSuchElementException => new Date(System.currentTimeMillis())
+    }
+
+  attachPage(new SparkConnectServerPage(this))
+  attachPage(new SparkConnectServerSessionPage(this))
+  parent.attachTab(this)
+  def detach(): Unit = {
+    parent.detachTab(this)
+  }
+}
+
+private[connect] object SparkConnectServerTab {
+  def getSparkUI(sparkContext: SparkContext): SparkUI = {
+    sparkContext.ui.getOrElse {
+      throw QueryExecutionErrors.parentSparkUIToAttachTabNotFoundError()
+    }
+  }
+}
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/ui/ToolTips.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/ui/ToolTips.scala
new file mode 100644
index 00000000000..9b51ace83c6
--- /dev/null
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/ui/ToolTips.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.ui
+
+private[ui] object ToolTips {
+  val SPARK_CONNECT_SERVER_FINISH_TIME =
+    "Execution finish time, before fetching the results"
+
+  val SPARK_CONNECT_SERVER_CLOSE_TIME =
+    "Operation close time after fetching the results"
+
+  val SPARK_CONNECT_SERVER_EXECUTION =
+    "Difference between start time and finish time"
+
+  val SPARK_CONNECT_SERVER_DURATION =
+    "Difference between start time and close time"
+
+  val SPARK_CONNECT_SESSION_TOTAL_EXECUTE =
+    "Number of operations submitted in this session"
+
+  val SPARK_CONNECT_SESSION_DURATION =
+    "Elapsed time since session start, or until closed if the session was 
closed"
+
+}
diff --git 
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ui/SparkConnectServerListenerSuite.scala
 
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ui/SparkConnectServerListenerSuite.scala
new file mode 100644
index 00000000000..9292e44f177
--- /dev/null
+++ 
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ui/SparkConnectServerListenerSuite.scala
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.ui
+
+import java.util.Properties
+
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.{SharedSparkContext, SparkConf, SparkContext, 
SparkEnv, SparkFunSuite}
+import org.apache.spark.internal.config.Status.{ASYNC_TRACKING_ENABLED, 
LIVE_ENTITY_UPDATE_PERIOD}
+import org.apache.spark.scheduler.SparkListenerJobStart
+import org.apache.spark.sql.connect.config.Connect.{CONNECT_UI_SESSION_LIMIT, 
CONNECT_UI_STATEMENT_LIMIT}
+import org.apache.spark.sql.connect.service._
+import org.apache.spark.status.ElementTrackingStore
+import org.apache.spark.util.kvstore.InMemoryStore
+
+class SparkConnectServerListenerSuite
+    extends SparkFunSuite
+    with BeforeAndAfter
+    with SharedSparkContext {
+
+  private var kvstore: ElementTrackingStore = _
+
+  after {
+    if (kvstore != null) {
+      kvstore.close()
+      kvstore = null
+    }
+  }
+
+  Seq(true, false).foreach { live =>
+    test(s"listener events should store successfully (live = $live)") {
+      val (statusStore: SparkConnectServerAppStatusStore, listener: 
SparkConnectServerListener) =
+        createAppStatusStore(live)
+
+      listener.onOtherEvent(
+        SparkListenerConnectSessionStarted("sessionId", "user", 
System.currentTimeMillis()))
+      listener.onOtherEvent(
+        SparkListenerConnectOperationStarted(
+          ExecuteJobTag("sessionId", "userId", "operationId"),
+          "operationId",
+          System.currentTimeMillis(),
+          "sessionId",
+          "userId",
+          "userName",
+          "dummy query",
+          None,
+          Set()))
+      listener.onOtherEvent(
+        SparkListenerConnectOperationAnalyzed(
+          ExecuteJobTag("sessionId", "userId", "operationId"),
+          "operationId",
+          System.currentTimeMillis()))
+      listener.onJobStart(
+        SparkListenerJobStart(0, System.currentTimeMillis(), Nil, 
createProperties))
+      listener.onOtherEvent(
+        SparkListenerConnectOperationFinished(
+          ExecuteJobTag("sessionId", "userId", "operationId"),
+          "sessionId",
+          System.currentTimeMillis()))
+      listener.onOtherEvent(
+        SparkListenerConnectOperationClosed(
+          ExecuteJobTag("sessionId", "userId", "operationId"),
+          "sessionId",
+          System.currentTimeMillis()))
+
+      if (live) {
+        assert(statusStore.getOnlineSessionNum === 1)
+      }
+
+      listener.onOtherEvent(
+        SparkListenerConnectSessionClosed("sessionId", "userId", 
System.currentTimeMillis()))
+
+      if (!live) {
+        // To update history store
+        kvstore.close(false)
+      }
+      assert(statusStore.getOnlineSessionNum === 0)
+      assert(statusStore.getExecutionList.size === 1)
+
+      val storeExecData = statusStore.getExecutionList.head
+
+      assert(storeExecData.jobTag === ExecuteJobTag("sessionId", "userId", 
"operationId"))
+      assert(storeExecData.sessionId === "sessionId")
+      assert(storeExecData.statement === "dummy query")
+      assert(storeExecData.jobId === Seq("0"))
+      assert(listener.noLiveData())
+    }
+  }
+
+  Seq(true, false).foreach { live =>
+    test(s"cleanup session if exceeds the threshold (live = $live)") {
+      val (statusStore: SparkConnectServerAppStatusStore, listener: 
SparkConnectServerListener) =
+        createAppStatusStore(live)
+      var time = 0
+      listener.onOtherEvent(
+        SparkListenerConnectSessionStarted("sessionId1", "user", 
System.currentTimeMillis()))
+      time += 1
+      listener.onOtherEvent(
+        SparkListenerConnectSessionStarted("sessionId2", "user", 
System.currentTimeMillis()))
+      time += 1
+      listener.onOtherEvent(SparkListenerConnectSessionClosed("sessionId1", 
"userId", time))
+      time += 1
+      listener.onOtherEvent(SparkListenerConnectSessionClosed("sessionId2", 
"userId", time))
+      listener.onOtherEvent(
+        SparkListenerConnectSessionStarted("sessionId3", "user", 
System.currentTimeMillis()))
+      time += 1
+      listener.onOtherEvent(SparkListenerConnectSessionClosed("sessionId3", 
"userId", time))
+
+      if (!live) {
+        kvstore.close(false)
+      }
+      assert(statusStore.getOnlineSessionNum === 0)
+      assert(statusStore.getSessionCount === 1)
+      assert(statusStore.getSession("sessionId1") === None)
+      assert(listener.noLiveData())
+    }
+  }
+
+  test("update execution info when jobstart event come after execution end 
event") {
+    val (statusStore: SparkConnectServerAppStatusStore, listener: 
SparkConnectServerListener) =
+      createAppStatusStore(true)
+
+    listener.onOtherEvent(
+      SparkListenerConnectSessionStarted("sessionId", "userId", 
System.currentTimeMillis()))
+    listener.onOtherEvent(
+      SparkListenerConnectOperationStarted(
+        ExecuteJobTag("sessionId", "userId", "operationId"),
+        "operationId",
+        System.currentTimeMillis(),
+        "sessionId",
+        "userId",
+        "userName",
+        "dummy query",
+        None,
+        Set()))
+    listener.onOtherEvent(
+      SparkListenerConnectOperationAnalyzed(
+        ExecuteJobTag("sessionId", "userId", "operationId"),
+        "operationId",
+        System.currentTimeMillis()))
+    listener.onOtherEvent(
+      SparkListenerConnectOperationFinished(
+        ExecuteJobTag("sessionId", "userId", "operationId"),
+        "operationId",
+        System.currentTimeMillis()))
+    listener.onOtherEvent(
+      SparkListenerConnectOperationClosed(
+        ExecuteJobTag("sessionId", "userId", "operationId"),
+        "operationId",
+        System.currentTimeMillis()))
+
+    listener.onJobStart(
+      SparkListenerJobStart(0, System.currentTimeMillis(), Nil, 
createProperties))
+    listener.onOtherEvent(
+      SparkListenerConnectSessionClosed("sessionId", "userId", 
System.currentTimeMillis()))
+    val exec = statusStore.getExecution(ExecuteJobTag("sessionId", "userId", 
"operationId"))
+    assert(exec.isDefined)
+    assert(exec.get.jobId === Seq("0"))
+    assert(listener.noLiveData())
+  }
+
+  test("SPARK-31387 - listener update methods should not throw exception with 
unknown input") {
+    val (statusStore: SparkConnectServerAppStatusStore, listener: 
SparkConnectServerListener) =
+      createAppStatusStore(true)
+
+    val unknownSession = "unknown_session"
+    val unknownJob = "unknown_job_tag"
+    listener.onOtherEvent(SparkListenerConnectSessionClosed(unknownSession, 
"userId", 0))
+    listener.onOtherEvent(
+      SparkListenerConnectOperationStarted(
+        ExecuteJobTag("sessionId", "userId", "operationId"),
+        "operationId",
+        System.currentTimeMillis(),
+        unknownSession,
+        "userId",
+        "userName",
+        "dummy query",
+        None,
+        Set()))
+    listener.onOtherEvent(
+      SparkListenerConnectOperationAnalyzed(
+        unknownJob,
+        "operationId",
+        System.currentTimeMillis()))
+    listener.onOtherEvent(SparkListenerConnectOperationCanceled(unknownJob, 
"userId", 0))
+    listener.onOtherEvent(
+      SparkListenerConnectOperationFailed(unknownJob, "operationId", 0, "msg"))
+    listener.onOtherEvent(SparkListenerConnectOperationFinished(unknownJob, 
"operationId", 0))
+    listener.onOtherEvent(SparkListenerConnectOperationClosed(unknownJob, 
"operationId", 0))
+  }
+
+  private def createProperties: Properties = {
+    val properties = new Properties()
+    properties.setProperty(
+      SparkContext.SPARK_JOB_TAGS,
+      ExecuteJobTag("sessionId", "userId", "operationId"))
+    properties
+  }
+
+  private def createAppStatusStore(live: Boolean) = {
+    val sparkConf = new SparkConf()
+    sparkConf
+      .set(ASYNC_TRACKING_ENABLED, false)
+      .set(LIVE_ENTITY_UPDATE_PERIOD, 0L)
+    SparkEnv.get.conf
+      .set(CONNECT_UI_SESSION_LIMIT, 1)
+      .set(CONNECT_UI_STATEMENT_LIMIT, 10)
+    kvstore = new ElementTrackingStore(new InMemoryStore, sparkConf)
+    if (live) {
+      val listener = new SparkConnectServerListener(kvstore, sparkConf)
+      (new SparkConnectServerAppStatusStore(kvstore), listener)
+    } else {
+      (
+        new SparkConnectServerAppStatusStore(kvstore),
+        new SparkConnectServerListener(kvstore, sparkConf, false))
+    }
+  }
+}
diff --git 
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ui/SparkConnectServerPageSuite.scala
 
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ui/SparkConnectServerPageSuite.scala
new file mode 100644
index 00000000000..99d0a14f1e8
--- /dev/null
+++ 
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ui/SparkConnectServerPageSuite.scala
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.ui
+
+import java.util.{Calendar, Locale}
+import javax.servlet.http.HttpServletRequest
+
+import org.mockito.Mockito.{mock, when, RETURNS_SMART_NULLS}
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.{SharedSparkContext, SparkConf, SparkFunSuite}
+import org.apache.spark.scheduler.SparkListenerJobStart
+import org.apache.spark.sql.connect.service._
+import org.apache.spark.status.ElementTrackingStore
+import org.apache.spark.util.kvstore.InMemoryStore
+
+class SparkConnectServerPageSuite
+    extends SparkFunSuite
+    with BeforeAndAfter
+    with SharedSparkContext {
+
+  private var kvstore: ElementTrackingStore = _
+
+  after {
+    if (kvstore != null) {
+      kvstore.close()
+      kvstore = null
+    }
+  }
+
+  /**
+   * Run a dummy session and return the store
+   */
+  private def getStatusStore: SparkConnectServerAppStatusStore = {
+    kvstore = new ElementTrackingStore(new InMemoryStore, new SparkConf())
+    // val server = mock(classOf[SparkConnectServer], RETURNS_SMART_NULLS)
+    val sparkConf = new SparkConf
+
+    val listener = new SparkConnectServerListener(kvstore, sparkConf)
+    val statusStore = new SparkConnectServerAppStatusStore(kvstore)
+
+    listener.onOtherEvent(
+      SparkListenerConnectSessionStarted("sessionId", "userId", 
System.currentTimeMillis()))
+    listener.onOtherEvent(
+      SparkListenerConnectOperationStarted(
+        "jobTag",
+        "operationId",
+        System.currentTimeMillis(),
+        "sessionId",
+        "userId",
+        "userName",
+        "dummy query",
+        None,
+        Set()))
+    listener.onOtherEvent(
+      SparkListenerConnectOperationAnalyzed("jobTag", "dummy plan", 
System.currentTimeMillis()))
+    listener.onOtherEvent(SparkListenerJobStart(0, System.currentTimeMillis(), 
Seq()))
+    listener.onOtherEvent(
+      SparkListenerConnectOperationFinished("jobTag", "operationId", 
System.currentTimeMillis()))
+    listener.onOtherEvent(
+      SparkListenerConnectOperationClosed("jobTag", "operationId", 
System.currentTimeMillis()))
+    listener.onOtherEvent(
+      SparkListenerConnectSessionClosed("sessionId", "userId", 
System.currentTimeMillis()))
+
+    statusStore
+  }
+
+  test("Spark Connect Server page should load successfully") {
+    val store = getStatusStore
+
+    val request = mock(classOf[HttpServletRequest])
+    val tab = mock(classOf[SparkConnectServerTab], RETURNS_SMART_NULLS)
+    when(tab.startTime).thenReturn(Calendar.getInstance().getTime)
+    when(tab.store).thenReturn(store)
+    when(tab.appName).thenReturn("testing")
+    when(tab.headerTabs).thenReturn(Seq.empty)
+    val page = new SparkConnectServerPage(tab)
+    val html = page.render(request).toString().toLowerCase(Locale.ROOT)
+
+    // session statistics and sql statistics tables should load successfully
+    assert(html.contains("session statistics (1)"))
+    assert(html.contains("request statistics (1)"))
+    assert(html.contains("dummy query"))
+
+    // Pagination support
+    assert(html.contains("<label>1 pages. jump to</label>"))
+
+    // Hiding table support
+    assert(
+      html.contains("class=\"collapse-aggregated-sessionstat" +
+        " collapse-table\" onclick=\"collapsetable"))
+  }
+
+  test("Spark Connect Server session page should load successfully") {
+    val store = getStatusStore
+
+    val request = mock(classOf[HttpServletRequest])
+    when(request.getParameter("id")).thenReturn("sessionId")
+    val tab = mock(classOf[SparkConnectServerTab], RETURNS_SMART_NULLS)
+    when(tab.startTime).thenReturn(Calendar.getInstance().getTime)
+    when(tab.store).thenReturn(store)
+    when(tab.appName).thenReturn("testing")
+    when(tab.headerTabs).thenReturn(Seq.empty)
+    val page = new SparkConnectServerSessionPage(tab)
+    val html = page.render(request).toString().toLowerCase(Locale.ROOT)
+
+    // session sql statistics table should load successfully
+    assert(html.contains("request statistics"))
+    assert(html.contains("userid"))
+    assert(html.contains("jobtag"))
+
+    // Pagination support
+    assert(html.contains("<label>1 pages. jump to</label>"))
+
+    // Hiding table support
+    assert(
+      html.contains("collapse-aggregated-sqlsessionstat collapse-table\"" +
+        " onclick=\"collapsetable"))
+  }
+}
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index f48cb32b319..533ce28ab4d 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -678,7 +678,7 @@ class SparkContext(config: SparkConf) extends Logging {
     postApplicationStart()
 
     // After application started, attach handlers to started server and start 
handler.
-    _ui.foreach(_.attachAllHandler())
+    _ui.foreach(_.attachAllHandlers())
     // Attach the driver metrics servlet handler to the web ui after the 
metrics system is started.
     _env.metricsSystem.getServletHandlers.foreach(handler => 
ui.foreach(_.attachHandler(handler)))
 
diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala 
b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
index ac154b79385..685407c1120 100644
--- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
@@ -67,15 +67,31 @@ private[spark] class SparkUI private (
     createServletHandler("/", servlet, basePath)
   }
 
+  private var readyToAttachHandlers = false
+
   /**
    * Attach all existing handlers to ServerInfo.
    */
-  def attachAllHandler(): Unit = {
+  def attachAllHandlers(): Unit = {
+    // Attach all handlers that have been added already, but not yet attached.
     serverInfo.foreach { server =>
       server.removeHandler(initHandler)
       handlers.foreach(server.addHandler(_, securityManager))
     }
+    // Handlers attached after this can be directly started.
+    readyToAttachHandlers = true
   }
+
+  /** Attaches a handler to this UI.
+   *  Note: The handler will not be attached until readyToAttachHandlers is 
true,
+   *  handlers added before that will be attached by attachAllHandlers */
+  override def attachHandler(handler: ServletContextHandler): Unit = 
synchronized {
+    handlers += handler
+    if (readyToAttachHandlers) {
+      serverInfo.foreach(_.addHandler(handler, securityManager))
+    }
+  }
+
   /** Initialize all components of the server. */
   def initialize(): Unit = {
     val jobsTab = new JobsTab(this, store)
diff --git a/core/src/test/scala/org/apache/spark/ui/UISuite.scala 
b/core/src/test/scala/org/apache/spark/ui/UISuite.scala
index 352c51baa8c..e7d57a6e6de 100644
--- a/core/src/test/scala/org/apache/spark/ui/UISuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/UISuite.scala
@@ -442,7 +442,7 @@ class UISuite extends SparkFunSuite {
       sparkUI.bind()
       assert(TestUtils.httpResponseMessage(new URL(sparkUI.webUrl + "/jobs"))
         === "Spark is starting up. Please wait a while until it's ready.")
-      sparkUI.attachAllHandler()
+      sparkUI.attachAllHandlers()
       assert(TestUtils.httpResponseMessage(new URL(sparkUI.webUrl + 
"/jobs")).contains(sc.appName))
       sparkUI.stop()
     }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/StreamingQueryHistoryServerPlugin.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/StreamingQueryHistoryServerPlugin.scala
index a127fa59b74..76f64dcb644 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/StreamingQueryHistoryServerPlugin.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/StreamingQueryHistoryServerPlugin.scala
@@ -39,5 +39,5 @@ class StreamingQueryHistoryServerPlugin extends 
AppHistoryServerPlugin {
     }
   }
 
-  override def displayOrder: Int = 1
+  override def displayOrder: Int = 2
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to